The Twitter Streaming API provides access to the stream of tweets made available by Twitter. Flink Streaming comes with a built-in TwitterSource
class for establishing a connection to this stream. このコネクタを使うには、以下の依存をプロジェクトに追加します:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>
ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにそれらをリンクする方法はここを見てください。
Twitterストリームに接続するために、ユーザはプログラムを登録し、認証のために必要な情報を取得する必要があります。その過程は以下で説明します。
まず最初に、Twitterアカウントが必要です。twitter.com/signup で無料でサインアップするか、Twitterのアプリケーション管理 にサインインし、“Create New App” ボタンをクリックすることでアプリケーションを登録します。プログラムについてのフォームを埋め、規約と条件を受諾します。アプリケーションを選択した後で、APIキーとAPI秘密鍵 (TwitterSource
の中では、それぞれtwitter-source.consumerKey
と twitter-source.consumerSecret
と呼ばれます)が “API Keys” タブの中にあります。必要な OAuth アクセストークン データ(TwitterSource
では、twitter-source.token
と twitter-source.tokenSecret
) を生成し、“Keys and Access Tokens” タブで取得することができます。これらの情報の要素を秘密にし、それらを公開リポジトリにpushしないようにしてください。
他のコネクタとは対照的に、TwitterSource
は他のサービスに依存しません。例えば、以下のコードは素直に動く筈です:
Properties props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "");
props.setProperty(TwitterSource.CONSUMER_SECRET, "");
props.setProperty(TwitterSource.TOKEN, "");
props.setProperty(TwitterSource.TOKEN_SECRET, "");
DataStream<String> streamSource = env.addSource(new TwitterSource(props));
val props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "");
props.setProperty(TwitterSource.CONSUMER_SECRET, "");
props.setProperty(TwitterSource.TOKEN, "");
props.setProperty(TwitterSource.TOKEN_SECRET, "");
DataStream<String> streamSource = env.addSource(new TwitterSource(props));
TwitterSource
はTweetに相当するJSONオブジェクトを含む文字列を発行します。
flink-examples-streaming
パッケージ内のTwitterExample
クラスは TwitterSource
を使う方法の完全な例を示します。
デフォルトでは、TwitterSource
は StatusesSampleEndpoint
を使います。このエンドポイントはTweetのランダムな標本を返します。ユーザが独自のエンドポイントを提供することができるTwitterSource.EndpointInitializer
インタフェースがあります。