このコネクタはElasticsearch インデックスへのドキュメントアクションをリクエストできるシンクを提供します。このコネクタを使うには、以下の依存のうちの一つをプロジェクトに追加します。Elasticsearch インストレーションのバージョンに依存します:
Maven 依存 | 以下からサポートされています | Elasticsearch バージョン |
---|---|---|
flink-connector-elasticsearch_2.11 | 1.0.0 | 1.x |
flink-connector-elasticsearch2_2.11 | 1.0.0 | 2.x |
flink-connector-elasticsearch5_2.11 | 1.3.0 | 5.x |
ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにプログラムをライブラリと一緒にパッケージする方法についての情報はここを見てください。
Elasticsearch クラスタをセットアップするための説明はここで見つかります。クラスタ名を設定および記憶するようにしてください。これはクラスタに対してドキュメントアクションをリクエストするためにElasticsearchSink
を作成する時に設定されるべきです。
ElasticsearchSink
はElasticsearchクラスタと通信するためにTransportClient
を使います。
下の例はシンクをどうやって設定および生成するかを示します:
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
List<TransportAddress> transportAddresses = new ArrayList<String>();
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
val input: DataStream[String] = ...
val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1")
val transportAddresses = new java.util.ArrayList[TransportAddress]
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300))
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300))
input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}))
val input: DataStream[String] = ...
val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1")
val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}))
String
のMap
がElasticsearchSink
を設定するためにどうやって使われるかに注意してください。設定のキーはここのElasticSearchのドキュメントの中で説明されます。特に重要なのはクラスタ名に対応するべきcluster.name
パラメータです。
また、例はそれぞれやってくる要素に関して1つのインデックスリクエストの実行を説明しているだけということに注意してください。一般的に、ElasticsearchSinkFunction
は異なるタイプ(例えば DeleteRequest
, UpdateRequest
など)の複数のリクエストを実行するために使うことができます。
内部的にはFlink Elasticsearch シンクの各並行インスタンスはアクションリクエストをクラスタに送信するためにBulkProcessor
を使います。これは要素をまとめてクラスタに送る前に要素をバッファするでしょう。BulkProcessor
は一度に1つのバルク リクエストを実行します。つまり、進行中のバッファされたアクションは2つ同時にフラッシュしないでしょう。
Flinkのチェックポイントを有効にして、Flink CassandraシンクはElasticsearchクラスタへのアクションリクエストの少なくとも一回の配送を保証します。チェックポイントの時点のBulkProcessor
内の全ての延期されているアクションリクエストを待つことで行われます。シンクに送信されたもっと多くのレコードの処理を進める前に、これはチェックポイントが起動される前の全てのリクエストがElasticsearchによって無事通知されたことを効果的に保障します。
チェックポイント耐障害性の詳細は耐障害性のドキュメントにあります。
耐障害性Elasticsearchシンクを使うために、トポロジのチェックポイントは実行環境で有効にされている必要があります:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
注意: もしユーザがしたければ、生成されたElasticsearchSink上でdisableFlushOnCheckpoint() を呼ぶことで、フラッシュを無効にすることができます。これは、トポロジのチェックポイントを有効にしたとしても、本質的にシンクがもう配送の強い保証を提供しないだろうことに注意してください。
Elasticsearch バージョン 1.x について、組み込みのノードを使った通信もサポートされます。組み込みノードとTransportClient
を使ったElasticsearchとの通信の違いについては、ここを見てください。
以下はTransportClient
の代わりに組み込みのノードを使うElasticsearchSink
をどうやって生成するかの例です:
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>;
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "my-cluster-name");
input.addSink(new ElasticsearchSink<>(config, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
val input: DataStream[String] = ...
val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "my-cluster-name")
input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}))
違いはもうElasticsearchのノードのアドレスのリストを提供する必要が無いということです。
Elasticsearch のアクションリクエストは、一時的なノードキューの容量の飽和あるいはインデックスされるドキュメントの不正な形式を含む様々な理由により、失敗するかもしれません。Flink Elasticsearch シンクにより、単純にActionRequestFailureHandler
を実装しそれをコンストラクタに提供することで、ユーザはリクエストの失敗がどうやって処理されるかを指定することができます。
以下は例です:
DataStream<String> input = ...;
input.addSink(new ElasticsearchSink<>(
config, transportAddresses,
new ElasticsearchSinkFunction<String>() {...},
new ActionRequestFailureHandler() {
@Override
void onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) throw Throwable {
if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
// full queue; re-add document for indexing
indexer.add(action);
} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
// malformed document; simply drop request without failing sink
} else {
// for all other failures, fail the sink
// here the failure is simply rethrown, but users can also choose to throw custom exceptions
throw failure;
}
}
}));
val input: DataStream[String] = ...
input.addSink(new ElasticsearchSink(
config, transportAddresses,
new ElasticsearchSinkFunction[String] {...},
new ActionRequestFailureHandler {
@throws(classOf[Throwable])
override def onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) {
if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
// full queue; re-add document for indexing
indexer.add(action)
} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
// malformed document; simply drop request without failing sink
} else {
// for all other failures, fail the sink
// here the failure is simply rethrown, but users can also choose to throw custom exceptions
throw failure
}
}
}))
上の例は、シンクを失敗することなしに、シンクにキューの容量の飽和により失敗したリクエストを再追加し、不正な形式のドキュメントを持つリクエストを落とすようにさせます。他の全ての障害については、シンクは失敗するでしょう。もしActionRequestFailureHandler
がコンストラクタに提供されない場合は、シンクは何らかの種類のエラーで失敗するでしょう。
BulkProcessor
が内部的に全ての後退再試行が完了した後でのみまだ発生する障害については、onFailure
が呼ばれることに注意してください。デフォルトでは、BulkProcessor
は指数関数的なバックオフを持つ最大8回の試行を行います。内部的なBulkProcessor
の挙動についての詳細と、それをどうやって設定するかについては、以下の章をみてください。
デフォルトでは、もし障害ハンドラが提供されない場合、シンクは全ての種類の例外について単純に失敗するNoOpFailureHandler
を使います。コネクタは、キューの容量の飽和によって失敗したリクエストを常に再追加する RetryRejectedExecutionFailureHandler
実装も提供します。
重要: 障害時の内部的なBulkProcessor へのリクエストの再追加は、シンクも再追加されたリクエストがチェックポイント時にフラッシュされるまで待つ必要があるため、長いチェックポイントに繋がるでしょう。例えば、RetryRejectedExecutionFailureHandlerを使う場合、チェックポイントはElasticsearchノードのリクエストが全ての待たされているリクエストのために十分な容量を持つまで待つ必要があるでしょう。これはもし再追加されたリクエストが成功しない場合、チェックポイントが完了しないだろうことも意味します。
Elasticsearch 1.xのための障害ハンドリング: Elasticsearch 1.x については、古いバージョンのJavaクライアントAPI(型は一般的な Exceptionで、障害メッセージのみが異なる)を使って正確な型を扱うことができないため、障害の型を一致させることは現実的ではありません。この場合、提供されたREST状態コードに適合することをお勧めします。
内部的なBulkProcessor
は、指定されたMap<String, String>
内で以下の値を設定することで、バッファされたアクションリクエストがどのようにフラッシュされるかの挙動について更に設定することが可能です:
バージョン 2.x 以上については、一時的なリクエストのエラーがどのように再試行されるかの設定もサポートされます:
EsRejectedExecutionException
によって1つ以上のアクションが失敗した場合、フラッシュのための速度を落とす遅延ありの再試行を実施するかどうか。CONSTANT
あるいは EXPONENTIAL
のどちらかElasticsearch についてのもっと詳しい情報はここで見つかります。
Flinkプログラムの実行に関して、全ての依存物を含む uber-jar (executable jar) と呼ばれるものをビルドすることがお勧めです (更に詳しい情報はここを見てください)。
しかし、Elasticsearch シンクを含む uber-jarを実行する場合、IllegalArgumentException
が起こるかも知れません。これはElasticsearchのファイルとMETA-INF/services
内の依存が衝突することで起こります:
IllegalArgumentException[An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist. このAPIをサポートする対応するJARファイルをクラスパスに追加する必要があります。現在のクラスパスは以下の名前をサポートします: [es090, completion090, XBloomFilter]]
Mavenを使ってuber-jarがビルドされた場合、プラグインのセクションのMaven POMファイルに以下を追加することでこの問題を避けることができます:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>