Twitter Connector

The Twitter Streaming API provides access to the stream of tweets made available by Twitter. Flink Streaming comes with a built-in TwitterSource class for establishing a connection to this stream. このコネクタを使うには、以下の依存をプロジェクトに追加します:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-twitter_2.10</artifactId>
  <version>1.3-SNAPSHOT</version>
</dependency>

ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにそれらをリンクする方法はここを見てください。

認証

Twitterストリームに接続するために、ユーザはプログラムを登録し、認証のために必要な情報を取得する必要があります。その過程は以下で説明します。

認証情報の取得

まず最初に、Twitterアカウントが必要です。twitter.com/signup で無料でサインアップするか、Twitterのアプリケーション管理 にサインインし、“Create New App” ボタンをクリックすることでアプリケーションを登録します。プログラムについてのフォームを埋め、規約と条件を受諾します。アプリケーションを選択した後で、APIキーとAPI秘密鍵 (TwitterSourceの中では、それぞれtwitter-source.consumerKeytwitter-source.consumerSecret と呼ばれます)が “API Keys” タブの中にあります。必要な OAuth アクセストークン データ(TwitterSourceでは、twitter-source.tokentwitter-source.tokenSecret ) を生成し、“Keys and Access Tokens” タブで取得することができます。これらの情報の要素を秘密にし、それらを公開リポジトリにpushしないようにしてください。

使い方

他のコネクタとは対照的に、TwitterSource は他のサービスに依存しません。例えば、以下のコードは素直に動く筈です:

Properties props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "");
props.setProperty(TwitterSource.CONSUMER_SECRET, "");
props.setProperty(TwitterSource.TOKEN, "");
props.setProperty(TwitterSource.TOKEN_SECRET, "");
DataStream<String> streamSource = env.addSource(new TwitterSource(props));
val props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "");
props.setProperty(TwitterSource.CONSUMER_SECRET, "");
props.setProperty(TwitterSource.TOKEN, "");
props.setProperty(TwitterSource.TOKEN_SECRET, "");
DataStream<String> streamSource = env.addSource(new TwitterSource(props));

TwitterSource はTweetに相当するJSONオブジェクトを含む文字列を発行します。

flink-examples-streamingパッケージ内のTwitterExample クラスは TwitterSourceを使う方法の完全な例を示します。

デフォルトでは、TwitterSourceStatusesSampleEndpoint を使います。このエンドポイントはTweetのランダムな標本を返します。ユーザが独自のエンドポイントを提供することができるTwitterSource.EndpointInitializer インタフェースがあります。

TOP
inserted by FC2 system