Storm 互換
Gearpump はAapache Stormアプリケーションに関して バイナリ互換性を提供します。つまり、ユーザは既存のStorm jarを簡単に掴み取り、Gearpump上で実行できます。TこのドキュメントはGearpumpのStormとの互換性を説明します。
Gearpump上でサポートされるStormの機能
Storm 0.9.x
機能 | サポート |
---|---|
基本的なトポロジー | yes |
DRPC | yes |
multi-lang | yes |
storm-kafka | yes |
Trident | no |
Storm 0.10.x
機能 | サポート |
---|---|
基本的なトポロジー | yes |
DRPC | yes |
multi-lang | yes |
storm-kafka | yes |
storm-hdfs | yes |
storm-hbase | yes |
storm-hive | yes |
storm-jdbc | yes |
storm-redis | yes |
flux | yes |
storm-eventhubs | not verfied |
Trident | no |
少なくとも一度のサポート
Ackers を有効にすると、Storm 0.9.x と Storm 0.10.x の両方で少なくとも1度の2種類のサポートのがあります。
- 噴出口が活動中な限り、メッセージの喪失時に吹き出し愚痴がメッセージを再現するでしょう。
- もし
KafkaSpout
が使われた場合、吹き出し口が壊れたとしてもKafkaからメッセージが再現されます。
Gearpumpは両方のStormのバージョンについて二つ目をサポートします。
セキュリティサポート
Storm 0.10.x は以下のコネクタに関してセキュリティのサポートを追加します
このことはユーザがkerberosを有効にしたHDFS、Hiveおよびこれらのコネクタを使ったHBaseにアクセスできることを意味します。一般的に、Stormは2つのやり方を提供します (詳細な情報は上のリンクを参照してください)
- トポロジーサブミッターユーザの代わりに移譲トークンを自動的に取得するためにnimbusを設定します
- kerberosのkeytabはワーカーホスト上に既に分配されています; ユーザはkeytabのパスとprincipalを設定します
Gearpump は2つ目のやり方をサポートし、ユーザはHDFS/Hive/HBase のクラスパスを各ノード上のgear.conf
の中の gearpump.executor.extraClasspath
に追加する必要があります。例えば:
###################
### Executor argument configuration
### Executor JVM can contains multiple tasks
###################
executor {
vmargs = "-server -Xms512M -Xmx1024M -Xss1M -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseParNewGC -XX:NewRatio=3 -Djava.rmi.server.hostname=localhost"
extraClasspath = "/etc/hadoop/conf"
}
Gearpump上でStormアプリケーションを実行する方法
この章ではローカルのGearpumpクラスタ内で既存のStorm jarを実行する方法を説明します。
-
ローカルクラスタを起動する
bin/local
-
Gearpump Nimbusサーバを開始する
ユーザは後でトポロジーをサブミットするためにサーバのアドレス(
nimbus.host
とnimbus.thrift.port
)を必要とします。アドレスは-output
オプションを使って設定されるyaml 設定ファイルに書き込まれます。ユーザはアドレスのみが上書きされる既存の設定ファイルを提供することができます。提供されない場合、新しいファイルapp.yaml
がその設定を使って生成されます。bin/storm nimbus -output [conf <custom yaml config>]
-
Stormアプリケーションのサブミット
ユーザはコマンドラインあるいはUIを使ってStormアプリケーションをサブミットすることができます。
a. コマンドラインを使ってStormアプリケーションをサブミットする
bin/storm app -verbose -config app.yaml -jar storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology exclamation
ユーザは以下のオプションを使ってアプリケーションを設定することができます
* `jar` - set the path of a Storm application jar * `config` - submit the custom configuration file generated when launching Nimbus
b. UIを使ってStormアプリケーションをサブミットする
- UI上のアプリケーションページの"Ceate"ボタンをクリックする。
- プルダウンメニュー内の"Submit Storm Application"項目をクリックする。
- ポップアップコンソールの中で、StormプリケーションのjarとNimbusを起動する時に生成された設定ファイルをアップロードし、引数として
storm.starter.ExclamationTopology exclamation
を与えます。 - "Submit"ボタンをクリックします
どのやり方でもダッシュボードをチェックするとトポロジーを経由してデータフローを見ることができるはずです。
それはStorm上で実行中のものとどれほど異なるか
トポロジーのサブミッション
クライアントがStormトポロジーをサブミットする場合、GearpumpはローカルでStormのNimbusサーバの簡略化バージョン GearpumpNimbus
を起動します。GearpumpNimbus
はトポロジーをGearpumpの有向非巡回グラフ (DAG)に変換します。これはGearpumpマスターにサブミットされ、Gearpumpアプリケーションとして配備されます。
GearpumpNimbus
は以下のメソッドをサポートします
submitTopology
/submitTopologyWithOpts
killTopology
/killTopologyWithOpts
getTopology
/getUserTopology
getClusterInfo
トポロジー 変換
以下はGearpump DAGに変換されたacker bolts (acker)を使った WordCountTopology
の例です。
Gearpump は同じ並行化を使って、各Storm spoutのための StormProducer
と、各Storm bolt (ackerを除く)のためのStormProcessor
を生成し、同じグループ戦略(Gearpump内のパーティショニング)を使ってStom内で関係付けます。
実行時には、spoutとboltはそれぞれStormProducer
タスクと StormProcessor
タスクの中で実行します。spoutによって発せられたメッセージは StormProducer
に渡され、StormProcessor
へ変換され、boltに渡されます。メッセージはStormシリアライザを使って、シリアライズ化/反シリアライズ化されます。
Gearpumpは異なるメッセージの追跡とフローの制御の仕組みを持つため、Stormのackerは捨てられます。
タスクの実行
各Stormタスクは専用のスレッドで実行され、一方でexecutorの全てのGearpumpタスクはスレッドプールを共有します。一般的に共有スレッドプールを使うことでより良いパフォーマンスを発揮することができます。しかし、幾つかのタスクはブロックし、全てのスレッドを利用することが有り得ます。そのような場合、gear.conf
内のgearpump.task-dispatcher
を "gaerpump.single-thread-dispatcher"
に設定することで、Stormのやり方に戻ることができます。
メッセージの追跡
Storm は少なくとも1回の配送を保証するために、ackerを使って各メッセージの系統を追跡します。失敗したメッセージはspoutから再送信されます。
Gearpump は効率的な方法で送信者と受信者の間でメッセージを追跡します。メッセージの喪失はアプリケーション全体をシステム内で待っている全てのメッセージの最小のタイムスタンプから再生させます。
フローの制御
Storm はspoutでのフローの度合を絞ります。これはもしackされていないメッセージの数がtopology.max.spout.pending
を超えた場合にメッセージの送信を停止します。
Gearpumpは送信者が受信者を満たせないようなタスク間でフローの制御をします。これは発生元まで圧力をかけます。
設定
全てのStorm設定は以下の優先順位で尊重されます
defaults.yaml < 独自のファイル設定 < アプリケーション設定 < コンポーネントの設定
ここで
- アプリケーションの設定は、トポロジーと一緒のStormアプリケーションからサブミットされます
- コンポーネントの設定は、
getComponentConfiguration
を使って、spout / bolt 内で設定されます - 独自のファイル設定は、コマンドラインからStormアプリケーションがサブミットされる時の
-config
オプションで指定されるか、UIからアップロードされます。
StreamCQL サポート
StreamCQL は Huawei によってオープンソース化されたリアルアイム計算システム上の連続的なクエリ言語です。StreamCQLはすでにStormをサポートするため、Gearpump上でのStreamCQLの実行は簡単です。
-
公式のREADMEのようにStreamCQLをインストール
-
前のようにGearpump Nimbusサーバを起動
-
インストールされたstream-cql-binaryに移動し、ステップ2のNimbus設定の出力を使って
conf/streaming-site.xml
の以下の設定を変更<property> <name>streaming.storm.nimbus.host</name> <value>${nimbus.host}</value> </property> <property> <name>streaming.storm.nimbus.port</name> <value>${nimbus.thrift.port}</value> </property>
-
CQLクライアントシェルを開き、簡単なcqlの例を実行
bin/cql
Streaming> CREATE INPUT STREAM s (id INT, name STRING, type INT) SOURCE randomgen PROPERTIES ( timeUnit = "SECONDS", period = "1", eventNumPerperiod = "1", isSchedule = "true" ); CREATE OUTPUT STREAM rs (type INT, cc INT) SINK consoleOutput; INSERT INTO STREAM rs SELECT type, COUNT(id) as cc FROM s[RANGE 20 SECONDS BATCH] WHERE id > 5 GROUP BY type; SUBMIT APPLICATION example;
-
ダッシュボードをチェックすると、3つのコンポーネントのトポロジを経由してデータのフローを見ることができるはずです。