Gearpump コネクタ

基本概念

DataSourceDataSink はGearpumpが外部の世界と接続するために使用する二つの主要な概念です。

データソース

データソース は入力無しでメッセージを出力するGearpumpでの概念です。つまり、基本的にデータソース はストリーミングプロセスフローの開始点です。

少なくとも1回のメッセージの配送および確実に1回のメッセージの配送を保証するために再生可能であるデータソースにGearpumpは依存するため、現在データソースのオフセット(進捗)を格納するためにio.gearpump.streaming.transaction.api.OffsetStorageFactory を必要とします。故に、再生が必要な時に、Gearpumpはあるオフセットから再生するためにデータソースを案内することができます。

現在のところGearpumpのデータソースは無限のストリームのみをサポートします。有限のストリームのサポートは近い将来のリリースで追加されるでしょう。

データシンク

データシンクは出力無しだがメッセージを消費する概念です。つまり、シンクはストリーミングプロセスフローの終点です。

実装されたコネクタ

実装されたデータオース

現在のところ、以下のデータソースがサポートされています。

名前 解説
CollectionDataSource コレクションを再帰的なデータソースに変換します。例えば、seq(1, 2, 3)1,2,3,1,2,3...を出力します。
KafkaSource Kafkaから読み込みます。

実装されたデータシンク

現在のところ、以下のデータシンク がサポートされています。

名前 解説
HBaseSink メッセージをHBaseに書き込みます。書き込むメッセージはHBaseのPutあるいは(rowKey, family, column, value)のタプルです。
KafkaSink Kafkaに書き込みます。

コネクタの使用

KafkaSourceの使用

アプリケーション内でkafkaSource を使うには、まずアプリケーション内に gearpump-external-Kafka ライブラリ依存性を追加する必要があります:

"com.github.intel-hadoop" %% "gearpump-external-kafka" % 0.8.0
<dependency>
  <groupId>com.github.intel-hadoop</groupId>
  <artifactId>gearpump-external-kafka</artifactId>
  <version>0.8.0</version>
</dependency>

Kafkaに接続するには、以下の情報を提供する必要があります: - Zookeeperのアドレス - Kafkaのトピック

そうすると、アプリケーション内でKafkaSourceを使うことができます:

   //Specify the offset storage.
   //Here we use the same zookeeper as the offset storage.
   //A set of corresponding topics will be created to store the offsets.
   //You are free to specify your own offset storage
   val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers)

   //create the kafka data source
   val source = new KafkaSource(topic, zookeepers, offsetStorageFactory)

   //create Gearpump Processor
   val reader = DataSourceProcessor(source, parallelism)
  //specify the offset storage
  //here we use the same zookeeper as the offset storage (a set of corresponding topics will be created to store the offsets)
  //you are free to specify your own offset storage
  val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers)

  val source = KafkaDSLUtil.createStream(app, parallelism, "Kafka Source", topics, zookeepers, offsetStorageFactory)
  ...

HBaseSinkの使用

アプリケーション内でHBaseSink を使うには、まずアプリケーション内に gearpump-external-hbase ライブラリ依存性を追加する必要があります:

"com.github.intel-hadoop" %% "gearpump-external-hbase" % 0.8.0
<dependency>
  <groupId>com.github.intel-hadoop</groupId>
  <artifactId>gearpump-external-hbase</artifactId>
  <version>0.8.0</version>
</dependency>

HBaseに接続するには、以下の情報を提供する必要があります: - どのHBaseサービスに接続するかのHBase設定 - テーブル名 (自身でテーブルを作成する必要があります。HBase documentationを見てください)

そうすると、アプリケーション内でHbaseSinkを使うことができます:

   //create the HBase data sink
   val sink = HBaseSink(UserConfig.empty, tableName, HBaseConfiguration.create())

   //create Gearpump Processor
   val sinkProcessor = DataSinkProcessor(sink, parallelism)
  //assume stream is a normal `Stream` in DSL
  stream.writeToHbase(UserConfig.empty, tableName, parallelism, "write to HBase")

渡されたHBase設定を使ってHBaseへの接続を調整することができます。渡されない場合は、Gearpumpは有効なHBase設定(hbase-site.xml)を見つけるためにローカルのクラスパスをチェックしようとするでしょう。

注意してください。ここで議論されている問題のために、HBaseシンクのための追加の設定を生成する必要があるかも知れません。

   def hadoopConfig = {
     val conf = new Configuration()
     conf.set("hbase.zookeeper.quorum", "zookeeperHost")
     conf.set("hbase.zookeeper.property.clientPort", "2181")
     conf
   }
   val sink = HBaseSink(UserConfig.empty, tableName, hadoopConfig)

独自のデータソースを実装する方法

独自のデータソースを実装するには、以下の2つを実装する必要があります:

  1. データソース自身
  2. DSL内での扱いを簡単にするためのヘルパークラス

独自のデータソースの実装

io.gearpump.streaming.transaction.api.TimeReplayableSourceから派生したクラスを実装する必要があります。

DSLヘルパーの実装(任意)

手頃なDSLを持ちたい場合は、この独自のストリームから始めるかも知れません: 独自のDSLヘルパーを実装することができる場合はそれが良いでしょう。例としてGearpumpソース内のKafkaDSLUtilを参照することができます。

以下はKafkaDSLUtilからのコードの断片です:

object KafkaDSLUtil {
  //T is the message type
  def createStream[T: ClassTag](
      app: StreamApp,
      parallelism: Int,
      description: String,
      topics: String,
      zkConnect: String,
      offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
    app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory)
        with TypedDataSource[T], parallelism, description)
  }
}

独自のデータシンクを実装する方法

独自のデータシンクを実装するには、以下の2つを実装する必要があります:

  1. データシンク自身
  2. DSL内で扱いを簡単にするためのヘルパークラス

独自のデータシンクの実装

io.gearpump.streaming.sink.DataSinkから派生したクラスを実装する必要があります。

DSLヘルパーの実装(任意)

手頃なDSLを持ちたい場合は、この独自のストリームから始めるかも知れません: 独自のDSLヘルパーを実装することができる場合はそれが良いでしょう。例としてGearpumpソース内のHBaseDSLSinkを参照することができます。

以下はHBaseDSLSinkからのコードの断片です:

class HBaseDSLSink[T](stream: Stream[T]) {
  def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, description: String): Stream[T] = {
    stream.sink(HBaseSink[T](userConfig, table), parallism, userConfig, description)
  }
  
  def writeToHbase(userConfig: UserConfig, configuration: Configuration, table: String, parallism: Int, description: String): Stream[T] = {
    stream.sink(HBaseSink[T](userConfig, table, configuration), parallism, userConfig, description)
  }  
}

object HBaseDSLSink {
  implicit def streamToHBaseDSLSink[T](stream: Stream[T]): HBaseDSLSink[T] = {
    new HBaseDSLSink[T](stream)
  }
}
TOP
inserted by FC2 system