このコネクタはデータをCassandra データベースに書き込むシンクを提供します。
このコネクタを使うには、以下の依存をプロジェクトに追加します:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>
ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにそれらとどうやってリンクするかをここで見ます
Cassandra の開始ページの説明に従います。
Flinkの Cassandra シンクは static CassandraSink.addSink(DataStream
以下の設定メソッドを使うことができます:
setQuery() シンクが受け取る各値について実行されるクエリを設定する。setHost() 接続する cassandra のホスト/ポートを設定する。このメソッドは単一のユーザケースのためのものです。setClusterBuilder() cassandraへの接続を設定するために使われるクラスタ ビルダーを設定する。setHost() 機能はこのメソッドの中に含めることができます。enableWriteAheadLog() は任意のメソッドです。非決定論的なアルゴリズムが確実に一回の処理をすることができます。
チェックポイントのコミッターは幾つかのリソース中の完了したチェックポイントについての追加の情報を格納します。この情報は障害時に最後に完了したチェックポイントの完全な再生を避けるために使われます。cassandraの個々のテーブル内のこれらを格納するためにCassandraCommitter
を使うことができます。このテーブルはFlinkによって掃除されないだろうことに注意してください。
build() は設定を終了し、CassandraSinkを返します。
Flinkはクエリが等羃(結果を変更する事無しに複数回適用することができることを意味します)で、チェックポイントが有効な時に、正確に一回の保証を提供することができます。障害時には、失敗したチェックポイントが完全に再生されるでしょう。
更に、非決定論的なプログラムについて先行書き込みログは有効にされていなければなりません。そのようなプログラムに関しては、再生されたチェックポイントは以前の試行とは完全に異なるものかも知れません。最初の試行の部分がすでに書き込まれているかもしれないので、これはデータベースが一貫性の無い状態にするかも知れません。先行書き込みログは再生されたチェックポイントが最初の試行と全く同じであることを保証します。この機能を有効にすることはレイテンシに不利な影響があるだろうことに注意してください。
注意: 先行書き込みログ機能は現在のところ実験的なものです。多くの場合において、これを有効にせずにコネクタを使うことで十分です。問題は開発メーリングリストに報告してください。
CassandraSink.addSink(input)
.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.setClusterBuilder(new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
})
.build();
CassandraSink.addSink(input)
.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.setClusterBuilder(new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
})
.build();
CassandraのシンクはDataStaxアノテーションを使うタプルとPOJOをサポートします。Flinkは自動的に入力のどのタイプが使われるかを検知します。
Pojoの場合の例:
@Table(keyspace= "test", name = "mappersink")
public class Pojo implements Serializable {
private static final long serialVersionUID = 1038054554690916991L;
@Column(name = "id")
private long id;
@Column(name = "value")
private String value;
public Pojo(long id, String value){
this.id = id;
this.value = value;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}