Spark ストリーミング + Flume 統合ガイド
Apache Flume は大量のログデータの効率的な収集、集約および移動のための、分散した信頼のできる利用可能なサービスです。Flumeをどうやって設定するか、SparkストリーミングがFlumeからデータを受信するための方法を説明します。これには2つのやり方があります。
方法 1: Flumeスタイル プッシュに基づいた方法
Flume はFlumeエージェント間でデータをプッシュするように設計されています。このやり方で、Sparkストリーミングは本質的にFlumeのためのAvroエージェントとして振舞うレシーバーをセットアップします。Flumeはこれにデータをプッシュすることができます。以下は設定のステップです。
一般的な要求
クラスタ内にこのようなマシーンを選択します
-
Flume + Spark ストリーミングアプリケーションが起動された時に、Sparkのワーカーのうちの一つがそのマシーン上で起動しなければなりません。
-
Flume はそのマシーン上のポートにデータをプッシュするように設定されるでしょう。
プッシュモデルにより、Flumeがデータをプッシュできるように、レシーバーはスケジュールされ、選択されたポートでlistenしているストリーミングアプリケーションは起動している必要があります。
Flumeの設定
設定ファイルに以下の内容を入れることで、Avro sink へFlume エージェントがデータを送信するように設定します。
agent.sinks = avroSink
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.hostname = <chosen machine's hostname>
agent.sinks.avroSink.port = <chosen port on the machine>
Flumeエージェントの設定についての詳細な情報は Flumeのドキュメント を見てください。
Sparkストリーミングアプリケーションの設定
-
Linking: SBT/Maven プロジェクトの定義の中で、ストリーミングアプリケーションを以下のartifactにリンクします(更に詳しい情報はメインプログラムガイドのLinking section を見てください)。
groupId = org.apache.spark artifactId = spark-streaming-flume_2.11 version = 2.0.2
-
Programming: ストリーミングアプリケーションコードの中で、
FlumeUtils
をインポートし、以下のように入力DStreamを生成します。import org.apache.spark.streaming.flume._ val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
API ドキュメント と例を見てください。
import org.apache.spark.streaming.flume.*; JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]);
API ドキュメント と例を見てください。
from pyspark.streaming.flume import FlumeUtils flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
デフォルトでは、Python APIはFlumeイベントボディをUTF8エンコード文字列としてデコードするでしょう。Flumeイベント内のボディバイトの配列を任意のデータ型にデコードするために、カスタムデコード関数を指定することができます。API ドキュメント と例を見てください。
ホスト名はクラスタ(Mesos, YARN あるいはSpark スタンドアローン)内のリソースマネージャーによって使われるものと同じでなければなりません。つまり、リソースの割り当ては名前と合致することができ、正しいマシーン内のレシーバーを起動することができます。
-
Deploying: Sparkアプリケーションと同様に、アプリケーションを起動するために
spark-submit
が使われます。しかし、Scala/JavaアプリケーションとPythonアプリケーションについては、詳細が少し異なります。Scala と Java アプリケーションについては、もしSBTあるいはMavenをプロジェクト管理に使っている場合、
spark-streaming-flume_2.11
とその依存物をアプリケーションJARにパッケージします。spark-core_2.11
とspark-streaming_2.11
は既にSparkインストレーションの中に存在するため、それらがprovided
依存物として印がつけられるようにしてください。そして、アプリケーションを起動するためにspark-submit
を使います (メインプログラムのDeploying section を見てください)。SBT/Mavenプロジェクト管理が欠けているPythonアプリケーションについては、
spark-streaming-flume_2.11
とその依存物は--packages
を使ってspark-submit
に直接追加することができます (アプリケーション submissionガイドを見てください)。つまり、./bin/spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.0.2 ...
もう一つの方法として、Maven repository からMaven artifact
spark-streaming-flume-assembly
のJARをダウンロードし、それを--jars
を使ってspark-submit
に追加することもできます。
方法 2: カスタムのSinkを使ったプルベースの方法
FlumeがSparkストリーミングにデータを直接プッシュする代わりに、この方法は以下を許可するカスタムのFlume sinkを実行します。
- Flume はデータをsink にプッシュし、データはバッファされたままになります。
- Spark ストリーミングはsinkからのデータをプルするために信頼できる Flume レシーバー とトランザクションを使います。データはSparkストリーミングによって受け取られリプリケートされた後でのみ成功します。
これにより以前の方法に比べて強い信頼性と耐障害性の保証が確実になります。しかし、これはFlumeがカスタムsinkを実行するように設定する必要があります。以下は設定のステップです。
一般的な要求
Flumeエージェントの中でカスタムsinkを実行するだろうマシーンを選択します。Flumeのパイプラインの残りはデータをエージェントに送信するように設定されます。Sparkクラスタ内のマシーンはカスタムsinkを実行している選択されたマシーンへアクセスできなければなりません。
Flumeの設定
選択されたマシーン上のFlumeの設定は以下の2つのステップを必要とします。
-
Sink JARs: 以下のJARを、カスタムsinkを実行する専用のマシーン内のFLumeのクラスパスに追加します(どうするかはFlumeのドキュメント を見てください)
(i) カスタム sink JAR: 以下のartifactに対応するJARをダウンロードします(あるいは direct link)。
groupId = org.apache.spark artifactId = spark-streaming-flume-sink_2.11 version = 2.0.2
(ii) Scala library JAR: Scala 2.11.7のためのScalaライブラリJARをダウンロードします。以下のartifact詳細を使って見つけることができます(あるいは、direct link)。
groupId = org.scala-lang artifactId = scala-library version = 2.11.7
(iii) Commons Lang 3 JAR: Commons Lang 3 JARをダウンロードします。以下のartifact詳細を使って見つけることができます(あるいは、direct link)。
groupId = org.apache.commons artifactId = commons-lang3 version = 3.3.2
-
設定ファイル: マシーン上で、設定ファイル内に以下を持たせることで、FlumeエージェントがデータをAvro sinkに送信するように設定します。
agent.sinks = spark agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink agent.sinks.spark.hostname = <hostname of the local machine> agent.sinks.spark.port = <port to listen on for connection from Spark> agent.sinks.spark.channel = memoryChannel
upstream Flumeパイプラインが確実にデータをこのsinkを実行しているFlumeエージェントに送信するように設定します。
Flumeエージェントの設定についての詳細な情報は Flumeのドキュメント を見てください。
Sparkストリーミングアプリケーションの設定
-
Linking: SBT/Maven プロジェクト定義の中で、ストリーミングアプリケーションを
spark-streaming-flume_2.11
にリンクします (メインプログラミングガイドのLinking sectionを見てください)。 -
Programming: ストリーミングアプリケーションコードの中で、
FlumeUtils
をインポートし、以下のように入力DStreamを生成します。import org.apache.spark.streaming.flume._ val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])
import org.apache.spark.streaming.flume.*; JavaReceiverInputDStream<SparkFlumeEvent>flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
from pyspark.streaming.flume import FlumeUtils addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])] flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses)
デフォルトでは、Python APIはFlumeイベントボディをUTF8エンコード文字列としてデコードするでしょう。Flumeイベント内のボディバイトの配列を任意のデータ型にデコードするために、カスタムデコード関数を指定することができます。API ドキュメントを見てください。
Scalaの例FlumePollingEventCountを見てください。
各入力DStreamは複数のsinkからデータを受け取るように設定することができることに注意してください。
-
配備: これは最初のやり方と同じです。