Apache Cassandra コネクタ

このコネクタはデータをApache Cassandra データベースに書き込むシンクを提供します。

このコネクタを使うには、以下の依存をプロジェクトに追加します:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-cassandra_2.11</artifactId>
  <version>1.5-SNAPSHOT</version>
</dependency>

ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにそれらとどうやってリンクするかをここで見ます。

Apache Cassandraのインストール

ローカルマシーン上でCassandraインスタンスを起動する多数の方法があります:

  1. Cassandra の開始のページの説明に従ってください。
  2. 公式の Docker リポジトリからCassandraの実行コンテナを起動します。

Cassandra Sinks

設定

Flinkの Cassandra シンクは static CassandraSink.addSink(DataStream input) メソッドを使って生成されます。このメソッドはCassandraSinkBuilderを返します。これはシンクをさらに設定するためのメソッドを提供し、シンク インスタンスを最後に`build()`します。

以下の設定メソッドを使うことができます:

  1. setQuery(String query)
    • シンクが受け取る各レコードについて実行されるupsertクエリを設定します。
    • クエリは内部的にCQL文として扱われます。
    • Tupleデータ型を処理するためのupsertクエリを設定してください。
    • POJOデータ型を処理するためのクエリを設定しないでください。
  2. setClusterBuilder()
    • 一貫性レベル、再試行ポリシーなどのようなより洗練された設定を使ってcassandraとの接続を設定するために使われるクラスタビルダーを設定します
  3. setHost(String host[, int port])
    • Cassandraインスタンスと接続売るためのhost/port情報を持つ setClusterBuilder() の単純なバージョン
  4. setMapperOptions(MapperOptions options)
    • DataStax ObjectMapper を設定するために使われるマッパーオプションを設定します
    • POJO データ型を処理する時のみ適用されます。
  5. enableWriteAheadLog([CheckpointCommitter committer])
    • 任意の設定
    • 非決定論的なアルゴリズムのために確実に一回の処理ができます
  6. build()
    • 設定を完成させ、CassandraSinkインスタンスを構築します。

先行書き込みログ

チェックポイントのコミッターは幾つかのリソース中の完了したチェックポイントについての追加の情報を格納します。この情報は障害時に最後に完了したチェックポイントの完全な再生を避けるために使われます。cassandraの個々のテーブル内のこれらを格納するためにCassandraCommitter を使うことができます。このテーブルはFlinkによって掃除されないだろうことに注意してください。

Flinkはクエリが等羃(結果を変更する事無しに複数回適用することができることを意味します)で、チェックポイントが有効な時に、正確に一回の保証を提供することができます。障害時には、失敗したチェックポイントが完全に再生されるでしょう。

更に、非決定論的なプログラムについて先行書き込みログは有効にされていなければなりません。そのようなプログラムに関しては、再生されたチェックポイントは以前の試行とは完全に異なるものかも知れません。最初の試行の部分がすでに書き込まれているかもしれないので、これはデータベースが一貫性の無い状態にするかも知れません。先行書き込みログは再生されたチェックポイントが最初の試行と全く同じであることを保証します。この機能を有効にすることはレイテンシに不利な影響があるだろうことに注意してください。

注意: 先行書き込みログ機能は現在のところ実験的なものです。多くの場合において、これを有効にせずにコネクタを使うことで十分です。問題は開発メーリングリストに報告してください。

チェックポイントと耐障害性

チェックポイントを有効にして、CassandraシンクはC*インスタンスへのアクションリクエストの少なくとも一回の配送を保証します。

詳細はチェックポイントのドキュメント耐障害性の保証のドキュメントにあります

Cassandra シンクは現在のところ Tuple と POJO データ型の両方をサポートし、Flinkは自動的に入力の型にどちらが使われているかを検知します。これらのストリーミングデータ型の一般的な使用法については、サポートされるデータ型を参照してください。PojoおよびTupleデータ型のそれぞれについて、SocketWindowWordCountに基づいた2つの実装を示します。

これらの例では、相関キー空間e example とテーブル wordcount が生成されていると仮定します。

CREATE KEYSPACE IF NOT EXISTS example
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
CREATE TABLE IF NOT EXISTS example.wordcount (
    word text,
    count bigint,
    PRIMARY KEY(word)
    );

ストリーミング タプル データ型のためのCassandraシンクの例

Java/Scalaのタプルデータ型を持つ結果をCassandraシンクに格納する間、各レコードをデータベースに維持するために(setQuery(‘stmt’)を使って)CQL upsert文を設定する必要があります。PreparedStatementとしてキャッシュされるupsertクエリを使って、各タプルの要素はステートメントのパラメータに変換されます。

PreparedStatementBoundStatementについての詳細は、DataStax Java ドライバ マニュアルを見てください

// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");

// parse the data, group it, window it, and aggregate the counts
DataStream<Tuple2<String, Long>> result = text
        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
                // normalize and split the line
                String[] words = value.toLowerCase().split("\\s");

                // emit the pairs
                for (String word : words) {
                    //Do not accept empty word, since word is defined as primary key in C* table
                    if (!word.isEmpty()) {
                        out.collect(new Tuple2<String, Long>(word, 1L));
                    }
                }
            }
        })
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

CassandraSink.addSink(result)
        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
        .setHost("127.0.0.1")
        .build();
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// get input data by connecting to the socket
val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')

// parse the data, group it, window it, and aggregate the counts
val result: DataStream[(String, Long)] = text
  // split up the lines in pairs (2-tuples) containing: (word,1)
  .flatMap(_.toLowerCase.split("\\s"))
  .filter(_.nonEmpty)
  .map((_, 1L))
  // group by the tuple field "0" and sum up tuple field "1"
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1)

CassandraSink.addSink(result)
  .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
  .setHost("127.0.0.1")
  .build()

result.print().setParallelism(1)

POJOデータ型のためのCassandraシンクの例

POJOデータ型をストリーミングする例と、同じPOJOエンティティをCassandraに格納します。更に、このエンティティはDataStax Java Driver com.datastax.driver.mapping.Mapper クラスを使って専用のテーブルの関連するカラムへマップされるため、このPOJO実装はDataStax Java Driver Manualに従う必要があります。

Pojoクラス内のフィールド宣言に置かれたアノテーションを使って、各テーブルのカラムのマッピングを定義することができます。マッピングの詳細については、マップされたクラスの定義CQL データ型のCQLドキュメントを参照してください

// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");

// parse the data, group it, window it, and aggregate the counts
DataStream<WordCount> result = text
        .flatMap(new FlatMapFunction<String, WordCount>() {
            public void flatMap(String value, Collector<WordCount> out) {
                // normalize and split the line
                String[] words = value.toLowerCase().split("\\s");

                // emit the pairs
                for (String word : words) {
                    if (!word.isEmpty()) {
                        //Do not accept empty word, since word is defined as primary key in C* table
                        out.collect(new WordCount(word, 1L));
                    }
                }
            }
        })
        .keyBy("word")
        .timeWindow(Time.seconds(5))

        .reduce(new ReduceFunction<WordCount>() {
            @Override
            public WordCount reduce(WordCount a, WordCount b) {
                return new WordCount(a.getWord(), a.getCount() + b.getCount());
            }
        });

CassandraSink.addSink(result)
        .setHost("127.0.0.1")
        .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
        .build();


@Table(keyspace = "example", name = "wordcount")
public class WordCount {

    @Column(name = "word")
    private String word = "";

    @Column(name = "count")
    private long count = 0;

    public WordCount() {}

    public WordCount(String word, long count) {
        this.setWord(word);
        this.setCount(count);
    }

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    @Override
    public String toString() {
        return getWord() + " : " + getCount();
    }
}

上に戻る

TOP
inserted by FC2 system