このコネクタはApache NiFi から読み込みおよび書き込みすることができるソースおよびシンクを提供します。このコネクタを使うには、以下の依存をプロジェクトに追加します:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-nifi_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>
ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにプログラムをライブラリと一緒にパッケージする方法についての情報はここを見てください。
Apache Nifi クラスタをセットアップするための説明は ここで見つけることができます。
コネクタはApache Nifi から Apache Flink へのデータを読み込むためのソースを提供します。
クラス NiFiSource(…)
はNifiからのデータ読み込みのための2つのコンストラクタを提供します。
NiFiSource(SiteToSiteConfig config)
- クライアントのSiteToSiteConfigとデフォルト1000msの待ち時間を指定される NiFiSource(…)
を構築します。
NiFiSource(SiteToSiteConfig config, long waitTimeMs)
- クライアントのSiteToSiteConfigと特定の待ち時間(ミリ秒)を指定される NiFiSource(…)
を構築します。
例:
StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data for Flink")
.requestBatchCount(5)
.buildConfig();
SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data for Flink")
.requestBatchCount(5)
.buildConfig()
val nifiSource = new NiFiSource(clientConfig)
ここで、データはApache Nifi サイト-to-サイト プロトコル設定の一部である “Data for Flink” と呼ばれる Apache Nifiの出力ポートから読み込まれます。
コネクタはApache FlinkからApache Nifi へのデータを書き込むためのシンクを提供します。
クラスNiFiSink(…)
は NiFiSink
をインスタンス化するためのコンストラクタを提供します。
NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder<T>)
constructs a NiFiSink(…)
given the client’s SiteToSiteConfig
and a NiFiDataPacketBuilder
that converts data from Flink to NiFiDataPacket
to be ingested by NiFi.例:
StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data from Flink")
.requestBatchCount(5)
.buildConfig();
SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...});
streamExecEnv.addSink(nifiSink);
val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data from Flink")
.requestBatchCount(5)
.buildConfig()
val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})
streamExecEnv.addSink(nifiSink)
Apache NiFi サイト-to-サイト プロトコルについての詳細な情報は ここで見つけることができます