このコネクタはデータをApache Cassandra データベースに書き込むシンクを提供します。
このコネクタを使うには、以下の依存をプロジェクトに追加します:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.11</artifactId>
<version>1.5-SNAPSHOT</version>
</dependency>
ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにそれらとどうやってリンクするかをここで見ます。
ローカルマシーン上でCassandraインスタンスを起動する多数の方法があります:
Flinkの Cassandra シンクは static CassandraSink.addSink(DataStream
以下の設定メソッドを使うことができます:
チェックポイントのコミッターは幾つかのリソース中の完了したチェックポイントについての追加の情報を格納します。この情報は障害時に最後に完了したチェックポイントの完全な再生を避けるために使われます。cassandraの個々のテーブル内のこれらを格納するためにCassandraCommitter
を使うことができます。このテーブルはFlinkによって掃除されないだろうことに注意してください。
Flinkはクエリが等羃(結果を変更する事無しに複数回適用することができることを意味します)で、チェックポイントが有効な時に、正確に一回の保証を提供することができます。障害時には、失敗したチェックポイントが完全に再生されるでしょう。
更に、非決定論的なプログラムについて先行書き込みログは有効にされていなければなりません。そのようなプログラムに関しては、再生されたチェックポイントは以前の試行とは完全に異なるものかも知れません。最初の試行の部分がすでに書き込まれているかもしれないので、これはデータベースが一貫性の無い状態にするかも知れません。先行書き込みログは再生されたチェックポイントが最初の試行と全く同じであることを保証します。この機能を有効にすることはレイテンシに不利な影響があるだろうことに注意してください。
注意: 先行書き込みログ機能は現在のところ実験的なものです。多くの場合において、これを有効にせずにコネクタを使うことで十分です。問題は開発メーリングリストに報告してください。
チェックポイントを有効にして、CassandraシンクはC*インスタンスへのアクションリクエストの少なくとも一回の配送を保証します。
詳細はチェックポイントのドキュメント と 耐障害性の保証のドキュメントにあります
Cassandra シンクは現在のところ Tuple と POJO データ型の両方をサポートし、Flinkは自動的に入力の型にどちらが使われているかを検知します。これらのストリーミングデータ型の一般的な使用法については、サポートされるデータ型を参照してください。PojoおよびTupleデータ型のそれぞれについて、SocketWindowWordCountに基づいた2つの実装を示します。
これらの例では、相関キー空間e example
とテーブル wordcount
が生成されていると仮定します。
CREATE KEYSPACE IF NOT EXISTS example
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
CREATE TABLE IF NOT EXISTS example.wordcount (
word text,
count bigint,
PRIMARY KEY(word)
);
Java/Scalaのタプルデータ型を持つ結果をCassandraシンクに格納する間、各レコードをデータベースに維持するために(setQuery(‘stmt’)を使って)CQL upsert文を設定する必要があります。PreparedStatement
としてキャッシュされるupsertクエリを使って、各タプルの要素はステートメントのパラメータに変換されます。
PreparedStatement
とBoundStatement
についての詳細は、DataStax Java ドライバ マニュアルを見てください
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<Tuple2<String, Long>> result = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
// normalize and split the line
String[] words = value.toLowerCase().split("\\s");
// emit the pairs
for (String word : words) {
//Do not accept empty word, since word is defined as primary key in C* table
if (!word.isEmpty()) {
out.collect(new Tuple2<String, Long>(word, 1L));
}
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
CassandraSink.addSink(result)
.setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
.setHost("127.0.0.1")
.build();
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
// parse the data, group it, window it, and aggregate the counts
val result: DataStream[(String, Long)] = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap(_.toLowerCase.split("\\s"))
.filter(_.nonEmpty)
.map((_, 1L))
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
CassandraSink.addSink(result)
.setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
.setHost("127.0.0.1")
.build()
result.print().setParallelism(1)
POJOデータ型をストリーミングする例と、同じPOJOエンティティをCassandraに格納します。更に、このエンティティはDataStax Java Driver com.datastax.driver.mapping.Mapper
クラスを使って専用のテーブルの関連するカラムへマップされるため、このPOJO実装はDataStax Java Driver Manualに従う必要があります。
Pojoクラス内のフィールド宣言に置かれたアノテーションを使って、各テーブルのカラムのマッピングを定義することができます。マッピングの詳細については、マップされたクラスの定義 と CQL データ型のCQLドキュメントを参照してください
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordCount> result = text
.flatMap(new FlatMapFunction<String, WordCount>() {
public void flatMap(String value, Collector<WordCount> out) {
// normalize and split the line
String[] words = value.toLowerCase().split("\\s");
// emit the pairs
for (String word : words) {
if (!word.isEmpty()) {
//Do not accept empty word, since word is defined as primary key in C* table
out.collect(new WordCount(word, 1L));
}
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<WordCount>() {
@Override
public WordCount reduce(WordCount a, WordCount b) {
return new WordCount(a.getWord(), a.getCount() + b.getCount());
}
});
CassandraSink.addSink(result)
.setHost("127.0.0.1")
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
.build();
@Table(keyspace = "example", name = "wordcount")
public class WordCount {
@Column(name = "word")
private String word = "";
@Column(name = "count")
private long count = 0;
public WordCount() {}
public WordCount(String word, long count) {
this.setWord(word);
this.setCount(count);
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
@Override
public String toString() {
return getWord() + " : " + getCount();
}
}