Elasticsearch コネクタ

このコネクタはElasticsearch インデックスへのドキュメントアクションをリクエストできるシンクを提供します。このコネクタを使うには、以下の依存のうちの一つをプロジェクトに追加します。Elasticsearch インストレーションのバージョンに依存します:

Maven 依存 以下からサポートされています Elasticsearch バージョン
flink-connector-elasticsearch_2.11 1.0.0 1.x
flink-connector-elasticsearch2_2.11 1.0.0 2.x
flink-connector-elasticsearch5_2.11 1.3.0 5.x

ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにプログラムをライブラリと一緒にパッケージする方法についての情報はここを見てください。

Elasticsearchのインストール

Elasticsearch クラスタをセットアップするための説明はここで見つかります。クラスタ名を設定および記憶するようにしてください。これはクラスタに対してドキュメントアクションをリクエストするためにElasticsearchSink を作成する時に設定されるべきです。

Elasticsearch シンク

ElasticsearchSink はElasticsearchクラスタと通信するためにTransportClientを使います。

下の例はシンクをどうやって設定および生成するかを示します:

DataStream<String> input = ...;

Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");

List<TransportAddress> transportAddresses = new ArrayList<String>();
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));

input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
    public IndexRequest createIndexRequest(String element) {
        Map<String, String> json = new HashMap<>();
        json.put("data", element);
    
        return Requests.indexRequest()
                .index("my-index")
                .type("my-type")
                .source(json);
    }
    
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}));
DataStream<String> input = ...;

Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");

List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));

input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
    public IndexRequest createIndexRequest(String element) {
        Map<String, String> json = new HashMap<>();
        json.put("data", element);
    
        return Requests.indexRequest()
                .index("my-index")
                .type("my-type")
                .source(json);
    }
    
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}));
val input: DataStream[String] = ...

val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1")

val transportAddresses = new java.util.ArrayList[TransportAddress]
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300))
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300))

input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
  def createIndexRequest(element: String): IndexRequest = {
    val json = new java.util.HashMap[String, String]
    json.put("data", element)
    
    return Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json);
  }
}))
val input: DataStream[String] = ...

val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1")

val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))

input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
  def createIndexRequest(element: String): IndexRequest = {
    val json = new java.util.HashMap[String, String]
    json.put("data", element)
    
    return Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json);
  }
}))

StringMapElasticsearchSinkを設定するためにどうやって使われるかに注意してください。設定のキーはここのElasticSearchのドキュメントの中で説明されます。特に重要なのはクラスタ名に対応するべきcluster.name パラメータです。

また、例はそれぞれやってくる要素に関して1つのインデックスリクエストの実行を説明しているだけということに注意してください。一般的に、ElasticsearchSinkFunctionは異なるタイプ(例えば DeleteRequest, UpdateRequestなど)の複数のリクエストを実行するために使うことができます。

内部的にはFlink Elasticsearch シンクの各並行インスタンスはアクションリクエストをクラスタに送信するためにBulkProcessor を使います。これは要素をまとめてクラスタに送る前に要素をバッファするでしょう。BulkProcessor は一度に1つのバルク リクエストを実行します。つまり、進行中のバッファされたアクションは2つ同時にフラッシュしないでしょう。

Elasticsearch シンクと耐障害性

Flinkのチェックポイントを有効にして、Flink CassandraシンクはElasticsearchクラスタへのアクションリクエストの少なくとも一回の配送を保証します。チェックポイントの時点のBulkProcessor内の全ての延期されているアクションリクエストを待つことで行われます。シンクに送信されたもっと多くのレコードの処理を進める前に、これはチェックポイントが起動される前の全てのリクエストがElasticsearchによって無事通知されたことを効果的に保障します。

チェックポイント耐障害性の詳細は耐障害性のドキュメントにあります。

耐障害性Elasticsearchシンクを使うために、トポロジのチェックポイントは実行環境で有効にされている必要があります:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs

注意: もしユーザがしたければ、生成されたElasticsearchSink上でdisableFlushOnCheckpoint() を呼ぶことで、フラッシュを無効にすることができます。これは、トポロジのチェックポイントを有効にしたとしても、本質的にシンクがもう配送の強い保証を提供しないだろうことに注意してください。

組み込みノードを使った通信 (Elasticsearch 1.xのみ)

Elasticsearch バージョン 1.x について、組み込みのノードを使った通信もサポートされます。組み込みノードとTransportClient を使ったElasticsearchとの通信の違いについては、ここを見てください。

以下はTransportClientの代わりに組み込みのノードを使うElasticsearchSinkをどうやって生成するかの例です:

DataStream<String> input = ...;

Map<String, String> config = new HashMap<>;
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "my-cluster-name");

input.addSink(new ElasticsearchSink<>(config, new ElasticsearchSinkFunction<String>() {
    public IndexRequest createIndexRequest(String element) {
        Map<String, String> json = new HashMap<>();
        json.put("data", element);
    
        return Requests.indexRequest()
                .index("my-index")
                .type("my-type")
                .source(json);
    }
    
    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}));
val input: DataStream[String] = ...

val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "my-cluster-name")

input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String] {
  def createIndexRequest(element: String): IndexRequest = {
    val json = new java.util.HashMap[String, String]
    json.put("data", element)
    
    return Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json);
  }
}))

違いはもうElasticsearchのノードのアドレスのリストを提供する必要が無いということです。

Elasticsearchのリクエストの障害の処理

Elasticsearch のアクションリクエストは、一時的なノードキューの容量の飽和あるいはインデックスされるドキュメントの不正な形式を含む様々な理由により、失敗するかもしれません。Flink Elasticsearch シンクにより、単純にActionRequestFailureHandler を実装しそれをコンストラクタに提供することで、ユーザはリクエストの失敗がどうやって処理されるかを指定することができます。

以下は例です:

DataStream<String> input = ...;

input.addSink(new ElasticsearchSink<>(
    config, transportAddresses,
    new ElasticsearchSinkFunction<String>() {...},
    new ActionRequestFailureHandler() {
        @Override
        void onFailure(ActionRequest action,
                Throwable failure,
                int restStatusCode,
                RequestIndexer indexer) throw Throwable {

            if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
                // full queue; re-add document for indexing
                indexer.add(action);
            } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
                // malformed document; simply drop request without failing sink
            } else {
                // for all other failures, fail the sink
                // here the failure is simply rethrown, but users can also choose to throw custom exceptions
                throw failure;
            }
        }
}));
val input: DataStream[String] = ...

input.addSink(new ElasticsearchSink(
    config, transportAddresses,
    new ElasticsearchSinkFunction[String] {...},
    new ActionRequestFailureHandler {
        @throws(classOf[Throwable])
        override def onFailure(ActionRequest action,
                Throwable failure,
                int restStatusCode,
                RequestIndexer indexer) {

            if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
                // full queue; re-add document for indexing
                indexer.add(action)
            } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
                // malformed document; simply drop request without failing sink
            } else {
                // for all other failures, fail the sink
                // here the failure is simply rethrown, but users can also choose to throw custom exceptions
                throw failure
            }
        }
}))

上の例は、シンクを失敗することなしに、シンクにキューの容量の飽和により失敗したリクエストを再追加し、不正な形式のドキュメントを持つリクエストを落とすようにさせます。他の全ての障害については、シンクは失敗するでしょう。もしActionRequestFailureHandler がコンストラクタに提供されない場合は、シンクは何らかの種類のエラーで失敗するでしょう。

BulkProcessorが内部的に全ての後退再試行が完了した後でのみまだ発生する障害については、onFailureが呼ばれることに注意してください。デフォルトでは、BulkProcessor は指数関数的なバックオフを持つ最大8回の試行を行います。内部的なBulkProcessorの挙動についての詳細と、それをどうやって設定するかについては、以下の章をみてください。

デフォルトでは、もし障害ハンドラが提供されない場合、シンクは全ての種類の例外について単純に失敗するNoOpFailureHandler を使います。コネクタは、キューの容量の飽和によって失敗したリクエストを常に再追加する RetryRejectedExecutionFailureHandler実装も提供します。

重要: 障害時の内部的なBulkProcessor へのリクエストの再追加は、シンクも再追加されたリクエストがチェックポイント時にフラッシュされるまで待つ必要があるため、長いチェックポイントに繋がるでしょう。例えば、RetryRejectedExecutionFailureHandlerを使う場合、チェックポイントはElasticsearchノードのリクエストが全ての待たされているリクエストのために十分な容量を持つまで待つ必要があるでしょう。これはもし再追加されたリクエストが成功しない場合、チェックポイントが完了しないだろうことも意味します。

Elasticsearch 1.xのための障害ハンドリング: Elasticsearch 1.x については、古いバージョンのJavaクライアントAPI(型は一般的な Exceptionで、障害メッセージのみが異なる)を使って正確な型を扱うことができないため、障害の型を一致させることは現実的ではありません。この場合、提供されたREST状態コードに適合することをお勧めします。

内部的なバルク プロセッサーの設定

内部的なBulkProcessor は、指定されたMap<String, String>内で以下の値を設定することで、バッファされたアクションリクエストがどのようにフラッシュされるかの挙動について更に設定することが可能です:

  • bulk.flush.max.actions: フラッシュ前にバッファするアクションの最大数。
  • bulk.flush.max.size.mb: フラッシュ前にバッファするデータの最大サイズ(メガバイト単位)。
  • bulk.flush.interval.ms: バッファされたアクションの数あるいはサイズに関係なく、フラッシュする間隔。

バージョン 2.x 以上については、一時的なリクエストのエラーがどのように再試行されるかの設定もサポートされます:

  • bulk.flush.backoff.enable: 一時的な EsRejectedExecutionExceptionによって1つ以上のアクションが失敗した場合、フラッシュのための速度を落とす遅延ありの再試行を実施するかどうか。
  • bulk.flush.backoff.type: 速度を落とす遅延の型。CONSTANT あるいは EXPONENTIALのどちらか
  • bulk.flush.backoff.delay: 速度を落とす遅延の量。一定に速度を落とす場合、これは単純に各際試行の間の遅延です。指数関数的に速度を落とす場合、これは初期に基づく遅延です。
  • bulk.flush.backoff.retries: 試行の速度を落とす再試行の量。

Elasticsearch についてのもっと詳しい情報はここで見つかります。

Elasticsearch コネクタを Uber-Jar にパッケージ化する

Flinkプログラムの実行に関して、全ての依存物を含む uber-jar (executable jar) と呼ばれるものをビルドすることがお勧めです (更に詳しい情報はここを見てください)。

しかし、Elasticsearch シンクを含む uber-jarを実行する場合、IllegalArgumentExceptionが起こるかも知れません。これはElasticsearchのファイルとMETA-INF/services内の依存が衝突することで起こります:

IllegalArgumentException[An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist.  このAPIをサポートする対応するJARファイルをクラスパスに追加する必要があります。現在のクラスパスは以下の名前をサポートします: [es090, completion090, XBloomFilter]]

Mavenを使ってuber-jarがビルドされた場合、プラグインのセクションのMaven POMファイルに以下を追加することでこの問題を避けることができます:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.4.3</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>

上に戻る

TOP
inserted by FC2 system