Flinkの RabbitMQ コネクタはMozilla Public License v1.1 (MPL 1.1)の元にライセンスされる “RabbitMQ AMQP Java Client” にMaven依存性を定義します。
Flink 自身は “RabbitMQ AMQP Java Client” のソースコードや “RabbitMQ AMQP Java Client” のパッケージバイナリのどちらも再利用しません。
FlinkのRabbitMQコネクタ(その結果 “RabbitMQ AMQP Java Client” を再配布することになる) に基づいて派生した仕事を作成および公開するユーザは、これは Mozilla Public License v1.1 (MPL 1.1) で宣言される条件の対象になるかもしれないことを認識する必要があります。
このコネクタはRabbitMQ からのデータストリームへのアクセスを提供します。このコネクタを使うには、以下の依存をプロジェクトに追加します:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>
ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにそれらをリンクする方法はここを見てください。
RabbitMQ ダウンロードページの説明に従います。インストールの後でサーバは自動的に開始し、RabbitMQに接続しているアプリケーションを起動することができます。
このコネクタはRabbitMQキューからのメッセージを消費するための RMQSource
クラスを提供します。このソースはFlinkでどうやって設定されているかによって3つの異なるレベルの保証を提供します。
At-least-once: チェックポイントが有効だが相関idが使われていないかソースが並行な場合、そのソースは少なくとも1回の保証のみを提供します。
以下は確実に1回のRabbitMQソースのセットアップのコード例です。インラインのコメントは、保証を緩めるために設定のどの部分を無視することができるかを説明します。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...);
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5000)
...
.build();
final DataStream<String> stream = env
.addSource(new RMQSource<String>(
connectionConfig, // config for the RabbitMQ connection
"queueName", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1); // non-parallel source is only required for exactly-once
val env = StreamExecutionEnvironment.getExecutionEnvironment
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...)
val connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5000)
...
.build
val stream = env
.addSource(new RMQSource[String](
connectionConfig, // config for the RabbitMQ connection
"queueName", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema)) // deserialization schema to turn messages into Java objects
.setParallelism(1) // non-parallel source is only required for exactly-once
このコネクタはメッセージをRabbitMQキューへ送信するための RMQSink
クラスを提供します。以下はRabbitMQ シンクのセットアップのコード例です。
final DataStream<String> stream = ...
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5000)
...
.build();
stream.addSink(new RMQSink<String>(
connectionConfig, // config for the RabbitMQ connection
"queueName", // name of the RabbitMQ queue to send messages to
new SimpleStringSchema())); // serialization schema to turn Java objects to messages
val stream: DataStream[String] = ...
val connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5000)
...
.build
stream.addSink(new RMQSink[String](
connectionConfig, // config for the RabbitMQ connection
"queueName", // name of the RabbitMQ queue to send messages to
new SimpleStringSchema)) // serialization schema to turn Java objects to messages
RabbitMQについての詳細はここで見つけることができます。