このコネクタはElasticsearch インデックスへのドキュメントアクションをリクエストできるシンクを提供します。このコネクタを使うには、以下の依存のうちの一つをプロジェクトに追加します。Elasticsearch インストレーションのバージョンに依存します:
Maven 依存 | 以下からサポートされています | Elasticsearch バージョン |
---|---|---|
flink-connector-elasticsearch_2.10 | 1.0.0 | 1.x |
flink-connector-elasticsearch2_2.10 | 1.0.0 | 2.x |
flink-connector-elasticsearch5_2.10 | 1.2.0 | 5.x |
ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにプログラムをライブラリと一緒にパッケージする方法についての情報はここを見てください。
Elasticsearch クラスタをセットアップするための説明はここで見つかります。クラスタ名を設定および記憶するようにしてください。これはクラスタに対してドキュメントアクションをリクエストするためにElasticsearchSink
を作成する時に設定されるべきです。
ElasticsearchSink
はElasticsearchクラスタと通信するためにTransportClient
を使います。
下の例はシンクをどうやって設定および生成するかを示します:
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
List<TransportAddress> transportAddresses = new ArrayList<String>();
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
val input: DataStream[String] = ...
val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1")
val transportAddresses = new java.util.ArrayList[TransportAddress]
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300))
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300))
input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}))
val input: DataStream[String] = ...
val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1")
val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}))
String
のMap
がElasticsearchSink
を設定するためにどうやって使われるかに注意してください。設定のキーはここのElasticSearchのドキュメントの中で説明されます。特に重要なのはクラスタ名に対応するべきcluster.name
パラメータです。
また、例はそれぞれやってくる要素に関して1つのインデックスリクエストの実行を説明しているだけということに注意してください。一般的に、ElasticsearchSinkFunction
は異なるタイプ(例えば DeleteRequest
, UpdateRequest
など)の複数のリクエストを実行するために使うことができます。
内部的には、シンクはアクションリクエストをクラスタに送信するためにBulkProcessor
を使います。これは要素をまとめてクラスタに送る前に要素をバッファするでしょう。BulkProcessor
の挙動はMap
設定によって提供される設定キーを使って設定することができます: * bulk.flush.max.actions: バッファする要素の最大量 * bulk.flush.max.size.mb: バッファするデータの最大量 (メガバイト) * bulk.flush.interval.ms: 他の二つの設定に関係なくデータをフラッシュする間隔のミリ秒
Elasticsearch バージョン 1.x について、組み込みのノードを使った通信もサポートされます。組み込みノードとTransportClient
を使ったElasticsearchとの通信の違いについては、ここを見てください。
以下はTransportClient
の代わりに組み込みのノードを使うElasticsearchSink
をどうやって生成するかの例です:
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>;
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "my-cluster-name");
input.addSink(new ElasticsearchSink<>(config, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
val input: DataStream[String] = ...
val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "my-cluster-name")
input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}))
違いはもうElasticsearchのノードのアドレスのリストを提供する必要が無いということです。
Elasticsearch についてのもっと詳しい情報はここで見つかります。
Flinkプログラムの実行に関して、全ての依存物を含む uber-jar (executable jar) と呼ばれるものをビルドすることがお勧めです (更に詳しい情報はここを見てください)。
しかし、Elasticsearch シンクを含む uber-jarを実行する場合、IllegalArgumentException
が起こるかも知れません。これはElasticsearchのファイルとMETA-INF/services
内の依存が衝突することで起こります:
IllegalArgumentException[An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist. このAPIをサポートする対応するJARファイルをクラスパスに追加する必要があります。現在のクラスパスは以下の名前をサポートします: [es090, completion090, XBloomFilter]]
Mavenを使ってuber-jarがビルドされた場合、プラグインのセクションのMaven POMファイルに以下を追加することでこの問題を避けることができます:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>