Gearpump コネクタ
基本概念
DataSource
と DataSink
はGearpumpが外部の世界と接続するために使用する二つの主要な概念です。
データソース
データソース
は入力無しでメッセージを出力するGearpumpでの概念です。つまり、基本的にデータソース
はストリーミングプロセスフローの開始点です。
少なくとも1回のメッセージの配送および確実に1回のメッセージの配送を保証するために再生可能であるデータソース
にGearpumpは依存するため、現在データソース
のオフセット(進捗)を格納するためにio.gearpump.streaming.transaction.api.OffsetStorageFactory
を必要とします。故に、再生が必要な時に、Gearpumpはあるオフセットから再生するためにデータソース
を案内することができます。
現在のところGearpumpのデータソース
は無限のストリームのみをサポートします。有限のストリームのサポートは近い将来のリリースで追加されるでしょう。
データシンク
データシンク
は出力無しだがメッセージを消費する概念です。つまり、シンク
はストリーミングプロセスフローの終点です。
実装されたコネクタ
実装されたデータオース
現在のところ、以下のデータソース
がサポートされています。
名前 | 解説 |
---|---|
CollectionDataSource |
コレクションを再帰的なデータソースに変換します。例えば、seq(1, 2, 3) は 1,2,3,1,2,3... を出力します。 |
KafkaSource |
Kafkaから読み込みます。 |
実装されたデータシンク
現在のところ、以下のデータシンク
がサポートされています。
名前 | 解説 |
---|---|
HBaseSink |
メッセージをHBaseに書き込みます。書き込むメッセージはHBaseのPut あるいは(rowKey, family, column, value) のタプルです。 |
KafkaSink |
Kafkaに書き込みます。 |
コネクタの使用
KafkaSource
の使用
アプリケーション内でkafkaSource
を使うには、まずアプリケーション内に gearpump-external-Kafka
ライブラリ依存性を追加する必要があります:
"com.github.intel-hadoop" %% "gearpump-external-kafka" % 0.8.0
<dependency>
<groupId>com.github.intel-hadoop</groupId>
<artifactId>gearpump-external-kafka</artifactId>
<version>0.8.0</version>
</dependency>
Kafkaに接続するには、以下の情報を提供する必要があります: - Zookeeperのアドレス - Kafkaのトピック
そうすると、アプリケーション内でKafkaSource
を使うことができます:
//Specify the offset storage.
//Here we use the same zookeeper as the offset storage.
//A set of corresponding topics will be created to store the offsets.
//You are free to specify your own offset storage
val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers)
//create the kafka data source
val source = new KafkaSource(topic, zookeepers, offsetStorageFactory)
//create Gearpump Processor
val reader = DataSourceProcessor(source, parallelism)
//specify the offset storage
//here we use the same zookeeper as the offset storage (a set of corresponding topics will be created to store the offsets)
//you are free to specify your own offset storage
val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers)
val source = KafkaDSLUtil.createStream(app, parallelism, "Kafka Source", topics, zookeepers, offsetStorageFactory)
...
HBaseSink
の使用
アプリケーション内でHBaseSink
を使うには、まずアプリケーション内に gearpump-external-hbase
ライブラリ依存性を追加する必要があります:
"com.github.intel-hadoop" %% "gearpump-external-hbase" % 0.8.0
<dependency>
<groupId>com.github.intel-hadoop</groupId>
<artifactId>gearpump-external-hbase</artifactId>
<version>0.8.0</version>
</dependency>
HBaseに接続するには、以下の情報を提供する必要があります: - どのHBaseサービスに接続するかのHBase設定 - テーブル名 (自身でテーブルを作成する必要があります。HBase documentationを見てください)
そうすると、アプリケーション内でHbaseSink
を使うことができます:
//create the HBase data sink
val sink = HBaseSink(UserConfig.empty, tableName, HBaseConfiguration.create())
//create Gearpump Processor
val sinkProcessor = DataSinkProcessor(sink, parallelism)
//assume stream is a normal `Stream` in DSL
stream.writeToHbase(UserConfig.empty, tableName, parallelism, "write to HBase")
渡されたHBase設定を使ってHBaseへの接続を調整することができます。渡されない場合は、Gearpumpは有効なHBase設定(hbase-site.xml
)を見つけるためにローカルのクラスパスをチェックしようとするでしょう。
注意してください。ここで議論されている問題のために、HBaseシンクのための追加の設定を生成する必要があるかも知れません。
def hadoopConfig = {
val conf = new Configuration()
conf.set("hbase.zookeeper.quorum", "zookeeperHost")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf
}
val sink = HBaseSink(UserConfig.empty, tableName, hadoopConfig)
独自のデータソース
を実装する方法
独自のデータソース
を実装するには、以下の2つを実装する必要があります:
- データソース自身
- DSL内での扱いを簡単にするためのヘルパークラス
独自のデータソース
の実装
io.gearpump.streaming.transaction.api.TimeReplayableSource
から派生したクラスを実装する必要があります。
DSLヘルパーの実装(任意)
手頃なDSLを持ちたい場合は、この独自のストリームから始めるかも知れません: 独自のDSLヘルパーを実装することができる場合はそれが良いでしょう。例としてGearpumpソース内のKafkaDSLUtil
を参照することができます。
以下はKafkaDSLUtil
からのコードの断片です:
object KafkaDSLUtil {
//T is the message type
def createStream[T: ClassTag](
app: StreamApp,
parallelism: Int,
description: String,
topics: String,
zkConnect: String,
offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory)
with TypedDataSource[T], parallelism, description)
}
}
独自のデータシンク
を実装する方法
独自のデータシンク
を実装するには、以下の2つを実装する必要があります:
- データシンク自身
- DSL内で扱いを簡単にするためのヘルパークラス
独自のデータシンク
の実装
io.gearpump.streaming.sink.DataSink
から派生したクラスを実装する必要があります。
DSLヘルパーの実装(任意)
手頃なDSLを持ちたい場合は、この独自のストリームから始めるかも知れません: 独自のDSLヘルパーを実装することができる場合はそれが良いでしょう。例としてGearpumpソース内のHBaseDSLSink
を参照することができます。
以下はHBaseDSLSink
からのコードの断片です:
class HBaseDSLSink[T](stream: Stream[T]) {
def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, description: String): Stream[T] = {
stream.sink(HBaseSink[T](userConfig, table), parallism, userConfig, description)
}
def writeToHbase(userConfig: UserConfig, configuration: Configuration, table: String, parallism: Int, description: String): Stream[T] = {
stream.sink(HBaseSink[T](userConfig, table, configuration), parallism, userConfig, description)
}
}
object HBaseDSLSink {
implicit def streamToHBaseDSLSink[T](stream: Stream[T]): HBaseDSLSink[T] = {
new HBaseDSLSink[T](stream)
}
}