Storm 互換

Gearpump はAapache Stormアプリケーションに関して バイナリ互換性を提供します。つまり、ユーザは既存のStorm jarを簡単に掴み取り、Gearpump上で実行できます。TこのドキュメントはGearpumpのStormとの互換性を説明します。

Gearpump上でサポートされるStormの機能

Storm 0.9.x

機能 サポート
基本的なトポロジー yes
DRPC yes
multi-lang yes
storm-kafka yes
Trident no

Storm 0.10.x

機能 サポート
基本的なトポロジー yes
DRPC yes
multi-lang yes
storm-kafka yes
storm-hdfs yes
storm-hbase yes
storm-hive yes
storm-jdbc yes
storm-redis yes
flux yes
storm-eventhubs not verfied
Trident no

少なくとも一度のサポート

Ackers を有効にすると、Storm 0.9.x と Storm 0.10.x の両方で少なくとも1度の2種類のサポートのがあります。

  1. 噴出口が活動中な限り、メッセージの喪失時に吹き出し愚痴がメッセージを再現するでしょう。
  2. もしKafkaSpout が使われた場合、吹き出し口が壊れたとしてもKafkaからメッセージが再現されます。

Gearpumpは両方のStormのバージョンについて二つ目をサポートします。

セキュリティサポート

Storm 0.10.x は以下のコネクタに関してセキュリティのサポートを追加します

このことはユーザがkerberosを有効にしたHDFS、Hiveおよびこれらのコネクタを使ったHBaseにアクセスできることを意味します。一般的に、Stormは2つのやり方を提供します (詳細な情報は上のリンクを参照してください)

  1. トポロジーサブミッターユーザの代わりに移譲トークンを自動的に取得するためにnimbusを設定します
  2. kerberosのkeytabはワーカーホスト上に既に分配されています; ユーザはkeytabのパスとprincipalを設定します

Gearpump は2つ目のやり方をサポートし、ユーザはHDFS/Hive/HBase のクラスパスを各ノード上のgear.confの中の gearpump.executor.extraClasspathに追加する必要があります。例えば:

  ###################
  ### Executor argument configuration
  ### Executor JVM can contains multiple tasks
  ###################
  executor {
    vmargs = "-server -Xms512M -Xmx1024M -Xss1M -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 -XX:+UseParNewGC -XX:NewRatio=3  -Djava.rmi.server.hostname=localhost"
    extraClasspath = "/etc/hadoop/conf"
  }

Gearpump上でStormアプリケーションを実行する方法

この章ではローカルのGearpumpクラスタ内で既存のStorm jarを実行する方法を説明します。

  1. ローカルクラスタを起動する

    bin/local
    
  2. Gearpump Nimbusサーバを開始する

    ユーザは後でトポロジーをサブミットするためにサーバのアドレス(nimbus.hostnimbus.thrift.port)を必要とします。アドレスは -output オプションを使って設定されるyaml 設定ファイルに書き込まれます。ユーザはアドレスのみが上書きされる既存の設定ファイルを提供することができます。提供されない場合、新しいファイル app.yaml がその設定を使って生成されます。

    bin/storm nimbus -output [conf <custom yaml config>]
    
  3. Stormアプリケーションのサブミット

    ユーザはコマンドラインあるいはUIを使ってStormアプリケーションをサブミットすることができます。

    a. コマンドラインを使ってStormアプリケーションをサブミットする

    bin/storm app -verbose -config app.yaml -jar storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology exclamation

    ユーザは以下のオプションを使ってアプリケーションを設定することができます

    * `jar` - set the path of a Storm application jar
    * `config` - submit the custom configuration file generated when launching Nimbus
    

    b. UIを使ってStormアプリケーションをサブミットする

    1. UI上のアプリケーションページの"Ceate"ボタンをクリックする。
    2. プルダウンメニュー内の"Submit Storm Application"項目をクリックする。
    3. ポップアップコンソールの中で、StormプリケーションのjarとNimbusを起動する時に生成された設定ファイルをアップロードし、引数としてstorm.starter.ExclamationTopology exclamationを与えます。
    4. "Submit"ボタンをクリックします

    どのやり方でもダッシュボードをチェックするとトポロジーを経由してデータフローを見ることができるはずです。

それはStorm上で実行中のものとどれほど異なるか

トポロジーのサブミッション

クライアントがStormトポロジーをサブミットする場合、GearpumpはローカルでStormのNimbusサーバの簡略化バージョン GearpumpNimbusを起動します。GearpumpNimbus はトポロジーをGearpumpの有向非巡回グラフ (DAG)に変換します。これはGearpumpマスターにサブミットされ、Gearpumpアプリケーションとして配備されます。

storm_gearpump_cluster

GearpumpNimbus は以下のメソッドをサポートします

トポロジー 変換

以下はGearpump DAGに変換されたacker bolts (acker)を使った WordCountTopology の例です。

storm_gearpump_dag

Gearpump は同じ並行化を使って、各Storm spoutのための StormProducer と、各Storm bolt (ackerを除く)のためのStormProcessor を生成し、同じグループ戦略(Gearpump内のパーティショニング)を使ってStom内で関係付けます。

実行時には、spoutとboltはそれぞれStormProducer タスクと StormProcessor タスクの中で実行します。spoutによって発せられたメッセージは StormProducerに渡され、StormProcessorへ変換され、boltに渡されます。メッセージはStormシリアライザを使って、シリアライズ化/反シリアライズ化されます。

Gearpumpは異なるメッセージの追跡とフローの制御の仕組みを持つため、Stormのackerは捨てられます。

タスクの実行

各Stormタスクは専用のスレッドで実行され、一方でexecutorの全てのGearpumpタスクはスレッドプールを共有します。一般的に共有スレッドプールを使うことでより良いパフォーマンスを発揮することができます。しかし、幾つかのタスクはブロックし、全てのスレッドを利用することが有り得ます。そのような場合、gear.conf内のgearpump.task-dispatcher"gaerpump.single-thread-dispatcher"に設定することで、Stormのやり方に戻ることができます。

メッセージの追跡

Storm は少なくとも1回の配送を保証するために、ackerを使って各メッセージの系統を追跡します。失敗したメッセージはspoutから再送信されます。

Gearpump は効率的な方法で送信者と受信者の間でメッセージを追跡します。メッセージの喪失はアプリケーション全体をシステム内で待っている全てのメッセージの最小のタイムスタンプから再生させます。

フローの制御

Storm はspoutでのフローの度合を絞ります。これはもしackされていないメッセージの数がtopology.max.spout.pendingを超えた場合にメッセージの送信を停止します。

Gearpumpは送信者が受信者を満たせないようなタスク間でフローの制御をします。これは発生元まで圧力をかけます。

設定

全てのStorm設定は以下の優先順位で尊重されます

defaults.yaml < 独自のファイル設定 < アプリケーション設定 < コンポーネントの設定

ここで

StreamCQL サポート

StreamCQL は Huawei によってオープンソース化されたリアルアイム計算システム上の連続的なクエリ言語です。StreamCQLはすでにStormをサポートするため、Gearpump上でのStreamCQLの実行は簡単です。

  1. 公式のREADMEのようにStreamCQLをインストール

  2. 前のようにGearpump Nimbusサーバを起動

  3. インストールされたstream-cql-binaryに移動し、ステップ2のNimbus設定の出力を使ってconf/streaming-site.xml の以下の設定を変更

     <property>
       <name>streaming.storm.nimbus.host</name>
       <value>${nimbus.host}</value>
     </property>
     <property>
       <name>streaming.storm.nimbus.port</name>
       <value>${nimbus.thrift.port}</value>
     </property>
  4. CQLクライアントシェルを開き、簡単なcqlの例を実行

    bin/cql
    
    Streaming> CREATE INPUT STREAM s
        (id INT, name STRING, type INT)
    SOURCE randomgen
        PROPERTIES ( timeUnit = "SECONDS", period = "1",
            eventNumPerperiod = "1", isSchedule = "true" );
       
    CREATE OUTPUT STREAM rs
        (type INT, cc INT)
    SINK consoleOutput;
       
    INSERT INTO STREAM rs SELECT type, COUNT(id) as cc
        FROM s[RANGE 20 SECONDS BATCH]
        WHERE id > 5 GROUP BY type;
       
    SUBMIT APPLICATION example;
  5. ダッシュボードをチェックすると、3つのコンポーネントのトポロジを経由してデータのフローを見ることができるはずです。

TOP
inserted by FC2 system