Spark ストリーミング + Flume 統合ガイド

Apache Flume は大量のログデータの効率的な収集、集約および移動のための、分散した信頼のできる利用可能なサービスです。Flumeをどうやって設定するか、SparkストリーミングがFlumeからデータを受信するための方法を説明します。これには2つのやり方があります。

方法 1: Flumeスタイル プッシュに基づいた方法

Flume はFlumeエージェント間でデータをプッシュするように設計されています。このやり方で、Sparkストリーミングは本質的にFlumeのためのAvroエージェントとして振舞うレシーバーをセットアップします。Flumeはこれにデータをプッシュすることができます。以下は設定のステップです。

一般的な要求

クラスタ内にこのようなマシーンを選択します

プッシュモデルにより、Flumeがデータをプッシュできるように、レシーバーはスケジュールされ、選択されたポートでlistenしているストリーミングアプリケーションは起動している必要があります。

Flumeの設定

設定ファイルに以下の内容を入れることで、Avro sink へFlume エージェントがデータを送信するように設定します。

agent.sinks = avroSink
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.hostname = <chosen machine's hostname>
agent.sinks.avroSink.port = <chosen port on the machine>

Flumeエージェントの設定についての詳細な情報は Flumeのドキュメント を見てください。

Sparkストリーミングアプリケーションの設定

  1. Linking: SBT/Maven プロジェクトの定義の中で、ストリーミングアプリケーションを以下のartifactにリンクします(更に詳しい情報はメインプログラムガイドのLinking section を見てください)。

     groupId = org.apache.spark
     artifactId = spark-streaming-flume_2.11
     version = 2.0.2
    
  2. Programming: ストリーミングアプリケーションコードの中で、FlumeUtilsをインポートし、以下のように入力DStreamを生成します。

     import org.apache.spark.streaming.flume._
    
     val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
    

    API ドキュメントを見てください。

     import org.apache.spark.streaming.flume.*;
    
     JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
     	FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]);
    

    API ドキュメントを見てください。

     from pyspark.streaming.flume import FlumeUtils
    
     flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
    

    デフォルトでは、Python APIはFlumeイベントボディをUTF8エンコード文字列としてデコードするでしょう。Flumeイベント内のボディバイトの配列を任意のデータ型にデコードするために、カスタムデコード関数を指定することができます。API ドキュメントを見てください。

    ホスト名はクラスタ(Mesos, YARN あるいはSpark スタンドアローン)内のリソースマネージャーによって使われるものと同じでなければなりません。つまり、リソースの割り当ては名前と合致することができ、正しいマシーン内のレシーバーを起動することができます。

  3. Deploying: Sparkアプリケーションと同様に、アプリケーションを起動するためにspark-submit が使われます。しかし、Scala/JavaアプリケーションとPythonアプリケーションについては、詳細が少し異なります。

    Scala と Java アプリケーションについては、もしSBTあるいはMavenをプロジェクト管理に使っている場合、spark-streaming-flume_2.11 とその依存物をアプリケーションJARにパッケージします。spark-core_2.11spark-streaming_2.11 は既にSparkインストレーションの中に存在するため、それらが provided 依存物として印がつけられるようにしてください。そして、アプリケーションを起動するためにspark-submit を使います (メインプログラムのDeploying section を見てください)。

    SBT/Mavenプロジェクト管理が欠けているPythonアプリケーションについては、spark-streaming-flume_2.11とその依存物は--packagesを使ってspark-submitに直接追加することができます (アプリケーション submissionガイドを見てください)。つまり、

     ./bin/spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.0.2 ...
    

    もう一つの方法として、Maven repository からMaven artifactspark-streaming-flume-assemblyのJARをダウンロードし、それを--jarsを使ってspark-submitに追加することもできます。

方法 2: カスタムのSinkを使ったプルベースの方法

FlumeがSparkストリーミングにデータを直接プッシュする代わりに、この方法は以下を許可するカスタムのFlume sinkを実行します。

これにより以前の方法に比べて強い信頼性と耐障害性の保証が確実になります。しかし、これはFlumeがカスタムsinkを実行するように設定する必要があります。以下は設定のステップです。

一般的な要求

Flumeエージェントの中でカスタムsinkを実行するだろうマシーンを選択します。Flumeのパイプラインの残りはデータをエージェントに送信するように設定されます。Sparkクラスタ内のマシーンはカスタムsinkを実行している選択されたマシーンへアクセスできなければなりません。

Flumeの設定

選択されたマシーン上のFlumeの設定は以下の2つのステップを必要とします。

  1. Sink JARs: 以下のJARを、カスタムsinkを実行する専用のマシーン内のFLumeのクラスパスに追加します(どうするかはFlumeのドキュメント を見てください)

    (i) カスタム sink JAR: 以下のartifactに対応するJARをダウンロードします(あるいは direct link)。

     groupId = org.apache.spark
     artifactId = spark-streaming-flume-sink_2.11
     version = 2.0.2
    

    (ii) Scala library JAR: Scala 2.11.7のためのScalaライブラリJARをダウンロードします。以下のartifact詳細を使って見つけることができます(あるいは、direct link)。

     groupId = org.scala-lang
     artifactId = scala-library
     version = 2.11.7
    

    (iii) Commons Lang 3 JAR: Commons Lang 3 JARをダウンロードします。以下のartifact詳細を使って見つけることができます(あるいは、direct link)。

     groupId = org.apache.commons
     artifactId = commons-lang3
     version = 3.3.2
    
  2. 設定ファイル: マシーン上で、設定ファイル内に以下を持たせることで、FlumeエージェントがデータをAvro sinkに送信するように設定します。

     agent.sinks = spark
     agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
     agent.sinks.spark.hostname = <hostname of the local machine>
     agent.sinks.spark.port = <port to listen on for connection from Spark>
     agent.sinks.spark.channel = memoryChannel
    

    upstream Flumeパイプラインが確実にデータをこのsinkを実行しているFlumeエージェントに送信するように設定します。

Flumeエージェントの設定についての詳細な情報は Flumeのドキュメント を見てください。

Sparkストリーミングアプリケーションの設定

  1. Linking: SBT/Maven プロジェクト定義の中で、ストリーミングアプリケーションをspark-streaming-flume_2.11にリンクします (メインプログラミングガイドのLinking sectionを見てください)。

  2. Programming: ストリーミングアプリケーションコードの中で、FlumeUtilsをインポートし、以下のように入力DStreamを生成します。

     import org.apache.spark.streaming.flume._
    
     val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])
    
     import org.apache.spark.streaming.flume.*;
    
     JavaReceiverInputDStream<SparkFlumeEvent>flumeStream =
         FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
    
     from pyspark.streaming.flume import FlumeUtils
    
     addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])]
     flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses)
    

    デフォルトでは、Python APIはFlumeイベントボディをUTF8エンコード文字列としてデコードするでしょう。Flumeイベント内のボディバイトの配列を任意のデータ型にデコードするために、カスタムデコード関数を指定することができます。API ドキュメントを見てください。

    Scalaの例FlumePollingEventCountを見てください。

    各入力DStreamは複数のsinkからデータを受け取るように設定することができることに注意してください。

  3. 配備: これは最初のやり方と同じです。

TOP
inserted by FC2 system