Elasticsearch コネクタ

このコネクタはElasticsearch インデックスへのドキュメントアクションをリクエストできるシンクを提供します。このコネクタを使うには、以下の依存のうちの一つをプロジェクトに追加します。Elasticsearch インストレーションのバージョンに依存します:

Maven 依存 以下からサポートされています Elasticsearch バージョン
flink-connector-elasticsearch_2.10 1.0.0 1.x
flink-connector-elasticsearch2_2.10 1.0.0 2.x
flink-connector-elasticsearch5_2.10 1.2.0 5.x

ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにプログラムをライブラリと一緒にパッケージする方法についての情報はここを見てください。

Elasticsearchのインストール

Elasticsearch クラスタをセットアップするための説明はここで見つかります。クラスタ名を設定および記憶するようにしてください。これはクラスタに対してドキュメントアクションをリクエストするためにElasticsearchSink を作成する時に設定されるべきです。

Elasticsearch シンク

ElasticsearchSink はElasticsearchクラスタと通信するためにTransportClientを使います。

下の例はシンクをどうやって設定および生成するかを示します:

DataStream<String> input = ...;

Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");

List<TransportAddress> transportAddresses = new ArrayList<String>();
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));

input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
    public IndexRequest createIndexRequest(String element) {
        Map<String, String> json = new HashMap<>();
        json.put("data", element);
    
        return Requests.indexRequest()
                .index("my-index")
                .type("my-type")
                .source(json);
    }
    
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}));
DataStream<String> input = ...;

Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");

List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));

input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
    public IndexRequest createIndexRequest(String element) {
        Map<String, String> json = new HashMap<>();
        json.put("data", element);
    
        return Requests.indexRequest()
                .index("my-index")
                .type("my-type")
                .source(json);
    }
    
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}));
val input: DataStream[String] = ...

val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1")

val transportAddresses = new java.util.ArrayList[TransportAddress]
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300))
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300))

input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
  def createIndexRequest(element: String): IndexRequest = {
    val json = new java.util.HashMap[String, String]
    json.put("data", element)
    
    return Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json);
  }
}))
val input: DataStream[String] = ...

val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1")

val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))

input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
  def createIndexRequest(element: String): IndexRequest = {
    val json = new java.util.HashMap[String, String]
    json.put("data", element)
    
    return Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json);
  }
}))

StringMapElasticsearchSinkを設定するためにどうやって使われるかに注意してください。設定のキーはここのElasticSearchのドキュメントの中で説明されます。特に重要なのはクラスタ名に対応するべきcluster.name パラメータです。

また、例はそれぞれやってくる要素に関して1つのインデックスリクエストの実行を説明しているだけということに注意してください。一般的に、ElasticsearchSinkFunctionは異なるタイプ(例えば DeleteRequest, UpdateRequestなど)の複数のリクエストを実行するために使うことができます。

内部的には、シンクはアクションリクエストをクラスタに送信するためにBulkProcessorを使います。これは要素をまとめてクラスタに送る前に要素をバッファするでしょう。BulkProcessor の挙動はMap 設定によって提供される設定キーを使って設定することができます: * bulk.flush.max.actions: バッファする要素の最大量 * bulk.flush.max.size.mb: バッファするデータの最大量 (メガバイト) * bulk.flush.interval.ms: 他の二つの設定に関係なくデータをフラッシュする間隔のミリ秒

組み込みノードを使った通信 (Elasticsearch 1.xのみ)

Elasticsearch バージョン 1.x について、組み込みのノードを使った通信もサポートされます。組み込みノードとTransportClient を使ったElasticsearchとの通信の違いについては、ここを見てください。

以下はTransportClientの代わりに組み込みのノードを使うElasticsearchSinkをどうやって生成するかの例です:

DataStream<String> input = ...;

Map<String, String> config = new HashMap<>;
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "my-cluster-name");

input.addSink(new ElasticsearchSink<>(config, new ElasticsearchSinkFunction<String>() {
    public IndexRequest createIndexRequest(String element) {
        Map<String, String> json = new HashMap<>();
        json.put("data", element);
    
        return Requests.indexRequest()
                .index("my-index")
                .type("my-type")
                .source(json);
    }
    
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}));
val input: DataStream[String] = ...

val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "my-cluster-name")

input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String] {
  def createIndexRequest(element: String): IndexRequest = {
    val json = new java.util.HashMap[String, String]
    json.put("data", element)
    
    return Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json);
  }
}))

違いはもうElasticsearchのノードのアドレスのリストを提供する必要が無いということです。

Elasticsearch についてのもっと詳しい情報はここで見つかります。

Elasticsearch コネクタを Uber-Jar にパッケージ化する

Flinkプログラムの実行に関して、全ての依存物を含む uber-jar (executable jar) と呼ばれるものをビルドすることがお勧めです (更に詳しい情報はここを見てください)。

しかし、Elasticsearch シンクを含む uber-jarを実行する場合、IllegalArgumentExceptionが起こるかも知れません。これはElasticsearchのファイルとMETA-INF/services内の依存が衝突することで起こります:

IllegalArgumentException[An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist.  このAPIをサポートする対応するJARファイルをクラスパスに追加する必要があります。現在のクラスパスは以下の名前をサポートします: [es090, completion090, XBloomFilter]]

Mavenを使ってuber-jarがビルドされた場合、プラグインのセクションのMaven POMファイルに以下を追加することでこの問題を避けることができます:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.4.3</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>
TOP
inserted by FC2 system