RabbitMQ コネクタ

RabbitMQ コネクタのライセンス

Flinkの RabbitMQ コネクタはMozilla Public License v1.1 (MPL 1.1)の元にライセンスされる “RabbitMQ AMQP Java Client” にMaven依存性を定義します。

Flink 自身は “RabbitMQ AMQP Java Client” のソースコードや “RabbitMQ AMQP Java Client” のパッケージバイナリのどちらも再利用しません。

FlinkのRabbitMQコネクタ(その結果 “RabbitMQ AMQP Java Client” を再配布することになる) に基づいて派生した仕事を作成および公開するユーザは、これは Mozilla Public License v1.1 (MPL 1.1) で宣言される条件の対象になるかもしれないことを認識する必要があります。

RabbitMQ コネクタ

このコネクタはRabbitMQ からのデータストリームへのアクセスを提供します。このコネクタを使うには、以下の依存をプロジェクトに追加します:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-rabbitmq_2.10</artifactId>
  <version>1.3-SNAPSHOT</version>
</dependency>

ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにそれらをリンクする方法はここを見てください。

RabbitMQのインストール

RabbitMQ ダウンロードページの説明に従います。インストールの後でサーバは自動的に開始し、RabbitMQに接続しているアプリケーションを起動することができます。

RabbitMQ ソース

このコネクタはRabbitMQキューからのメッセージを消費するための RMQSource クラスを提供します。このソースはFlinkでどうやって設定されているかによって3つの異なるレベルの保証を提供します。

  1. Exactly-once: RabbitMQソースを使って確実に1回の保証を達成するためには、以下が必須です -
    • チェックポイントを有効: チェックポイントを有効にすると、メッセージはチェックポイントが完了した時にのみ通知されます(そのため、RabbitMQのキューから削除されます)。
    • 相関idを使用: 相関idはRabbitMQアプリケーションの機能です。メッセージをMabbitMQに入れる時には、メッセージプロパティ内にそれを入れる必要があります。相関idはチェックポイントから回復する時に再処理されるメッセージを重複しないようにするためにソースによって使われます。
    • 非並行ソース: ソースは確実に1回を達成するために、非並行(並行度が1に設定される)でなければなりません。この制限は主にRabbitMQが1つのキューから複数のコンシューマにメッセージを発送するための方法によるものです。
  2. At-least-once: チェックポイントが有効だが相関idが使われていないかソースが並行な場合、そのソースは少なくとも1回の保証のみを提供します。

  3. No guarantee: チェックポイントが有効ではない場合、ソースはあまり強くは配送の保証を持ちません。この設定では、Flinkのチェックポイントとの協調の代わりに、一度ソースがメッセージを受信しそれらを処理すると、メッセージが自動的に通知されるでしょう。

以下は確実に1回のRabbitMQソースのセットアップのコード例です。インラインのコメントは、保証を緩めるために設定のどの部分を無視することができるかを説明します。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...);

final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build();
    
final DataStream<String> stream = env
    .addSource(new RMQSource<String>(
        connectionConfig,            // config for the RabbitMQ connection
        "queueName",                 // name of the RabbitMQ queue to consume
        true,                        // use correlation ids; can be false if only at-least-once is required
        new SimpleStringSchema()))   // deserialization schema to turn messages into Java objects
    .setParallelism(1);              // non-parallel source is only required for exactly-once
val env = StreamExecutionEnvironment.getExecutionEnvironment
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...)

val connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build
    
val stream = env
    .addSource(new RMQSource[String](
        connectionConfig,            // config for the RabbitMQ connection
        "queueName",                 // name of the RabbitMQ queue to consume
        true,                        // use correlation ids; can be false if only at-least-once is required
        new SimpleStringSchema))     // deserialization schema to turn messages into Java objects
    .setParallelism(1)               // non-parallel source is only required for exactly-once

RabbitMQ シンク

このコネクタはメッセージをRabbitMQキューへ送信するための RMQSinkクラスを提供します。以下はRabbitMQ シンクのセットアップのコード例です。

final DataStream<String> stream = ...

final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build();
    
stream.addSink(new RMQSink<String>(
    connectionConfig,            // config for the RabbitMQ connection
    "queueName",                 // name of the RabbitMQ queue to send messages to
    new SimpleStringSchema()));  // serialization schema to turn Java objects to messages
val stream: DataStream[String] = ...

val connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build
    
stream.addSink(new RMQSink[String](
    connectionConfig,         // config for the RabbitMQ connection
    "queueName",              // name of the RabbitMQ queue to send messages to
    new SimpleStringSchema))  // serialization schema to turn Java objects to messages

RabbitMQについての詳細はここで見つけることができます。

TOP
inserted by FC2 system