Apache Cassandra コネクタ

このコネクタはデータをCassandra データベースに書き込むシンクを提供します。

このコネクタを使うには、以下の依存をプロジェクトに追加します:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-cassandra_2.10</artifactId>
  <version>1.3-SNAPSHOT</version>
</dependency>

ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにそれらとどうやってリンクするかをここで見ます

Apache Cassandraのインストール

Cassandra の開始ページの説明に従います。

Cassandra Sink

Flinkの Cassandra シンクは static CassandraSink.addSink(DataStream input) メソッドを使って生成されます。このメソッドはCassandraSinkBuilderを返します。これはシンクをさらに設定するためのメソッドを提供します

以下の設定メソッドを使うことができます:

  1. setQuery(String query)
  2. setHost(String host[, int port])
  3. setClusterBuilder(ClusterBuilder builder)
  4. enableWriteAheadLog([CheckpointCommitter committer])
  5. build()

setQuery() シンクが受け取る各値について実行されるクエリを設定する。setHost() 接続する cassandra のホスト/ポートを設定する。このメソッドは単一のユーザケースのためのものです。setClusterBuilder() cassandraへの接続を設定するために使われるクラスタ ビルダーを設定する。setHost() 機能はこのメソッドの中に含めることができます。enableWriteAheadLog() は任意のメソッドです。非決定論的なアルゴリズムが確実に一回の処理をすることができます。

チェックポイントのコミッターは幾つかのリソース中の完了したチェックポイントについての追加の情報を格納します。この情報は障害時に最後に完了したチェックポイントの完全な再生を避けるために使われます。cassandraの個々のテーブル内のこれらを格納するためにCassandraCommitter を使うことができます。このテーブルはFlinkによって掃除されないだろうことに注意してください。

build() は設定を終了し、CassandraSinkを返します。

Flinkはクエリが等羃(結果を変更する事無しに複数回適用することができることを意味します)で、チェックポイントが有効な時に、正確に一回の保証を提供することができます。障害時には、失敗したチェックポイントが完全に再生されるでしょう。

更に、非決定論的なプログラムについて先行書き込みログは有効にされていなければなりません。For such a program the replayed checkpoint may be completely different than the previous attempt, which may leave the database in an inconsitent state since part of the first attempt may already be written. 先行書き込みログは再生されたチェックポイントが最初の試行と全く同じであることを保証します。この機能を有効にすることはレイテンシに不利な影響があるだろうことに注意してください。

注意: 先行書き込みログ機能は現在のところ実験的なものです。多くの場合において、これを有効にせずにコネクタを使うことで十分です。問題は開発メーリングリストに報告してください。

CassandraSink.addSink(input)
  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
  .setClusterBuilder(new ClusterBuilder() {
    @Override
    public Cluster buildCluster(Cluster.Builder builder) {
      return builder.addContactPoint("127.0.0.1").build();
    }
  })
  .build();
CassandraSink.addSink(input)
  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
  .setClusterBuilder(new ClusterBuilder() {
    @Override
    public Cluster buildCluster(Cluster.Builder builder) {
      return builder.addContactPoint("127.0.0.1").build();
    }
  })
  .build();

CassandraのシンクはDataStaxアノテーションを使うタプルとPOJOをサポートします。Flinkは自動的に入力のどのタイプが使われるかを検知します。

Pojoの場合の例:

@Table(keyspace= "test", name = "mappersink")
public class Pojo implements Serializable {

	private static final long serialVersionUID = 1038054554690916991L;

	@Column(name = "id")
	private long id;
	@Column(name = "value")
	private String value;

	public Pojo(long id, String value){
		this.id = id;
		this.value = value;
	}

	public long getId() {
		return id;
	}

	public void setId(long id) {
		this.id = id;
	}

	public String getValue() {
		return value;
	}

	public void setValue(String value) {
		this.value = value;
	}
}
TOP
inserted by FC2 system