Spark ストリーミング プログラミング ガイド
概要
Spark ストリーミングはライブデータストリームのスケーラブル、高スループット、耐障害性ストリーム処理を可能にするコアSpark APIの拡張です。データはKafka, Flume, Kinesis あるいはTCPソケットのような多くのソースから取り込まれ、map
, reduce
, join
および window
のような高レベルの機能によって表現される複雑なアルゴリズムを使って処理することができます。最後に、処理されたデータはファイルシステム、データベース、およびライブダッシュボードに出力することができます。その上、Sparkの機械学習 および グラフ処理 アルゴリズムをデータストリームに適用することができます。
内部的には以下のように動作します。Spark ストリーミングはライブ入力データストリームを受け取りデータをバッチに分割します。その後これはバッチ内の最終結果ストリームを生成するためにSparkエンジンによって処理されます。
Spark ストリーミングはdiscretized stream あるいは DStreamと呼ばれる高レベルの抽象化を提供します。これはデータの連続するストリームを表現します。DStreams は、Kafka, Flume および Kinesis のようなソースからの入力データストリーム、または他のDStream上の高レベルオペレーションを適用することによって、生成することができます。内部的には、DStreamは一続きのRDDsとして表現されます。
このガイドはDStreamを使ってSparkストリーミングプログラムを始める方法を説明します。SparkストリーミングプログラムをScala, Java あるいは Python(Spark1.2から導入されました)で書くことができます。これら全てはこのガイドで紹介されます。このガイドを通して異なる言語のコードの断片間を選択させるとっかかりを見つけることができるでしょう。
注意: Pythonでは異なる、あるいは利用できない2、3のAPIがあります。 このガイドの至るところで、これらの違いを強調するタグPython APIを見つけるでしょう。
クイック例
独自のSparkストリーミングプログラムを書く方法の詳細に入る前に、単純なSparkストリーミングプログラムがどのようなものかぱっと見てみましょう。 TCPソケットでlistenしているデータサーバから受け取ったテキストデータ内の単語の数をカウントしたいとします。しなければならないことは以下のとおりです。
まず、Sparkストリーミングクラスの名前と(DStreamのような)必要とする他のクラスに便利な幾つかのメソッドを追加するためにStreamContextから環境への明示的な変換をインポートします。StreamingContext は全てのストリーミング機能のメインのエントリーポイントです。2つの実行スレッド持つローカルのStreamingContext、および1秒間隔のバッチを生成します。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
このコンテキストを使って、ホスト名(たとえば localhost
) およびポート (例えば 9999
)で指定されるTCPソースからストリーミングデータを表すDStreamを生成することができます。
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
このlines
DStream はデータサーバから受け取るデータのストリームを表します。このDStreamの各レコードはテキストの行です。次に、空白文字によって行を単語に分割しようと思います。
// Split each line into words
val words = lines.flatMap(_.split(" "))
flatMap
はソースのDStreamの各レコードから複数の新しい単語を生成するために新しいDStreamを生成する one-to-manyの DStream オペレータです。この場合、各行は複数の単語に分割され、単語のストリームはwords
DStreamとして表現されます。次に、これらの単語を数えようと思います。
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
words
DStream は更に(1対1変換で) (word, 1)
ペアのDStreamにマップされ、それからデータの各バッチ内の単語の頻度を取得するためにreduceされます。最後に、wordCounts.print()
は各秒ごとに生成されるカウントを出力します。
これらの行が実行される場合に、Sparkストリーミングは開始時に実行しようとする計算のセットアップのみを行い、実際の処理はまだ開始されていないことに注意してください。全ての変換がセットアップされた後で処理を開始するために、最終的に以下を呼び出します。
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
完全なコードは Spark ストリーミングの例NetworkWordCountで見つけることができます。
まず、JavaStreamingContext オブジェクト、全てのストリーミング機能の主要なエントリーポイント、を生成します。2つの実行スレッド持つローカルのStreamingContext、および1秒間隔のバッチを生成します。
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
このコンテキストを使って、ホスト名(たとえば localhost
) およびポート (例えば 9999
)で指定されるTCPソースからストリーミングデータを表すDStreamを生成することができます。
// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
このlines
DStream はデータサーバから受け取るデータのストリームを表します。このストリーム内の各レコードはテキストの行です。それから、空白で行を単語に分割しようと思います。
// Split each line into words
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
});
flatMap
はソースのDStreamの中の各レコードから複数の新しいレコードを生成することで、新しいDStreamを生成するDStreamのオペレーションです。この場合、各行は複数の単語に分割され、単語のストリームはwords
DStreamとして表現されます。FlatMapFunction オブジェクトを使って変形を定義したことに注意してください。この先気づくかも知れないように、DStreamの変形の定義を手助けするJavaAPIの便利なクラスがたくさんあります。
次に、これらの単語を数えようと思います。
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
words
DStream はPairFunctionオブジェクトを使って、更に(1対1変換で) (word, 1)
ペアのDStreamにマップされます。そして、Function2 オブジェクトを使って、データの各バッチの中の単語の頻度を取得するためにredeceされます。最後に、wordCounts.print()
は各秒ごとに生成されるカウントを出力するでしょう。
これらの行が実行される場合に、Sparkストリーミングは開始後に実行しようとする計算のセットアップのみを行い、実際の処理はまだ開始されていないことに注意してください。全ての変換がセットアップされた後で処理を開始するために、最終的にstart
メソッドを呼び出します。
jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
完全なコードは Spark ストリーミングの例JavaNetworkWordCountで見つけることができます。
まず、StreamingContext オブジェクト、全てのストリーミング機能の主要なエントリーポイント、を生成します。2つの実行スレッド持つローカルのStreamingContext、および1秒間隔のバッチを生成します。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
このコンテキストを使って、ホスト名(たとえば localhost
) およびポート (例えば 9999
)で指定されるTCPソースからストリーミングデータを表すDStreamを生成することができます。
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
このlines
DStream はデータサーバから受け取るデータのストリームを表します。このDStreamの各レコードはテキストの行です。次に、空白によって行を単語に分割しようと思います。
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
flatMap
はソースのDStreamの各レコードから複数の新しい単語を生成するために新しいDStreamを生成する one-to-manyの DStream オペレータです。この場合、各行は複数の単語に分割され、単語のストリームはwords
DStreamとして表現されます。次に、これらの単語を数えようと思います。
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
words
DStream は更に(1対1変換で) (word, 1)
ペアのDStreamにマップされ、それからデータの各バッチ内の単語の頻度を取得するためにreduceされます。最後に、wordCounts.pprint()
は各秒ごとに生成されるカウントを出力するでしょう。
これらの行が実行される場合に、Sparkストリーミングは開始時に実行しようとする計算のセットアップのみを行い、実際の処理はまだ開始されていないことに注意してください。全ての変換がセットアップされた後で処理を開始するために、最終的に以下を呼び出します。
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
完全なコードは Spark ストリーミングの例NetworkWordCountで見つけることができます。
既にSparkをダウンロード および ビルドした場合は、以下のようにしてこの例を実行することができます。以下のようにして最初にNetcat(ほとんどのUnix系のシステムで見つけることができる小さなユーティリティ)をデータサーバとして実行する必要があるでしょう。
$ nc -lk 9999
それから、違うターミナルで、以下のようにして例を実行することができます。
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
それから、netcatサーバを実行しているターミナルの中で入力される全ての行がカウントされ、毎秒ごとに画面に出力されるでしょう。それは以下のように見えるでしょう。
|
|
基本概念
次に、単純な例の域を超えて、Sparkストリーミングの基本について詳しく述べます。
リンク
Sparkと似て、SparkストリーミングはMaven Centralを使って利用可能です。独自のSparkストリーミングプログラムを書くには、以下の依存性をSBTあるいはMavenプロジェクトに追加しなければならないでしょう。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.0</version>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.0"
Kafka, Flume および KinesisのようなSparkストリーミング コアAPIにないソースからデータを統合するには、対応するartifactspark-streaming-xyz_2.11
を依存性に追加しなければならないでしょう。例えば、一般的なものとして以下のものがあります。
ソース | 加工物 |
---|---|
Kafka | spark-streaming-kafka-0-8_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
最新のリストについては、サポートされるソースとartifactの完全なリストのためにMaven repositoryを参照してください。
StreamingContextの初期化
Spark ストリーミング プログラムを初期化するためには、全てのSparkストリーミング機能の主要なエントリーポイントとなるStreamingContext オブジェクトが生成されなければなりません。
StreamingContext オブジェクトはSparkConfオブジェクトから生成することができます。
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
appName
パラメータはクラスタUI上で見るアプリケーションの名前です。master
は Spark, Mesos あるいは YARN クラスタの URLか、ローカルモードで動作するための特別な"local[*]"文字列です。実地では、クラスタ上で実行する場合は、プログラム内にmaster
をハードコードしたくなく、spark-submit
を使ってアプリケーションを起動 し、そこでそれを受け取りたいでしょう。しかし、ローカルテストおよびユニットテストのために、処理中にSparkストリーミングを実行するために"local[*]"を渡すことができます(ローカルシステム内のコアの数を検知します)。これは内部的にssc.sparkContext
としてアクセスされるSparkContext (全てのSpark機能の開始ポイント)を生成することに注意してください。
バッチの間隔は、アプリケーションのlatency要求と利用可能なクラスタのリソースに基づいて設定されなければなりません。詳細は Performance Tuning の章を見てください。
StreamingContext
オブジェクトも既存のSparkContext
オブジェクトから生成することができます。
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
JavaStreamingContext オブジェクトは SparkConfオブジェクトから生成することができます。
import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration(1000));
appName
パラメータはクラスタUI上で見るアプリケーションの名前です。master
は Spark, Mesos あるいは YARN クラスタの URLか、ローカルモードで動作するための特別な"local[*]"文字列です。実地では、クラスタ上で実行する場合は、プログラム内にmaster
をハードコードしたくなく、spark-submit
を使ってアプリケーションを起動 し、そこでそれを受け取りたいでしょう。しかし、ローカルテストおよびユニットテストのために、処理中にSparkストリーミングを実行するために "local[*]" を渡すことができます。これは内部的にssc.sparkContext
としてアクセスされるJavaSparkContext (全てのSpark機能の開始ポイント)を生成することに注意してください。
バッチの間隔は、アプリケーションのlatency要求と利用可能なクラスタのリソースに基づいて設定されなければなりません。詳細は Performance Tuning の章を見てください。
JavaStreamingContext
オブジェクトも既存のJavaSparkContext
から生成することができます。
import org.apache.spark.streaming.api.java.*;
JavaSparkContext sc = ... //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
StreamingContext オブジェクトはSparkContext オブジェクトから生成することができます。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
appName
パラメータはクラスタUI上で見るアプリケーションの名前です。master
は Spark, Mesos あるいは YARN クラスタの URLか、ローカルモードで動作するための特別な"local[*]"文字列です。実地では、クラスタ上で実行する場合は、プログラム内にmaster
をハードコードしたくなく、spark-submit
を使ってアプリケーションを起動 し、そこでそれを受け取りたいでしょう。しかし、ローカルテストおよびユニットテストのために、処理中にSparkストリーミングを実行するために"local[*]"を渡すことができます(ローカルシステム内のコアの数を検知します)。
バッチの間隔は、アプリケーションのlatency要求と利用可能なクラスタのリソースに基づいて設定されなければなりません。詳細は Performance Tuning の章を見てください。
コンテキストが定義された後で、以下を実行する必要があります。
- 入力DStreamを生成することで入力ソースを定義します。
- DStreamへの変換と出力オペレータを適用することでストリーミングの計算を定義します。
streamingContext.start()
を使って、データの取得とそれの処理の開始します。streamingContext.awaitTermination()
を使って処理を(手動あるいは何かのエラーにより)停止するために待ちます。- 処理は
streamingContext.stop()
を使って停止することができます。
覚えておくべきこと:
- 一旦コンテキストが開始される、新しいストリーミングの計算はセットアップあるいは追加することができません。
- 一旦コンテキストを停止すると、再開することはできません。
- 同時に一つのストリーミング コンテキストだけが一つのJVMで実行することができます。
- ストリーミングテキストでstop()すると、SparkContextも停止します。ストリーミングコンテキストだけを停止するには、
stopSparkContext
に呼ばれるstop()
の任意のパラメータをfalse に設定します。 - SparkContext は、次のStreamingContextが生成される前に(SparkContextを停止すること無しに)以前のStreamingContextが停止している限り、複数のStreamingContextsを生成するために再利用することができます。
離散ストリーム (DStreams)
Discretized Stream あるいは DStream は、Spark ストリーミングによって提供される基本抽象です。それは、ソースから受け取られる入力データストリームあるいは入力ストリームの変換によって生成されたデータのストリームの処理されたもの、のどちらかの連続するストリームを表します。内部的には、DSteramはRDDの連続する配列によって表されます。それはSparkの不変、分散データセットの抽象概念です。(詳細は Spark プログラミング ガイド を見てください)。DStream内の各RDDは以下の図で示されるように、特定の間隔からのデータを含みます。
DStreamに適用されるどのようなオペレーションも、下に存在するRDD上の操作として変換されます。例えば、行から単語へのストリームの変換の以前の例では、words
DStreamのRDDを生成するために、flatMap
オペレーションがlines
DStream の各RDDに適用されます。以下の図でこれが示されます。
これらの裏に潜むRDD変換はSparkエンジンによって計算されます。DStreamオペレータはほとんどのこれらの詳細を隠し、利便性のために高レベルのAPIを開発者に提供します。これらのオペレータの詳細は後の章で議論されます。
入力DStreamとReceiver
入力DStreamはストリーミングソースから受け取る入力データのストリームを表します。quick exampleの中で、lines
はnetcatサーバから受け取るデータストリームを表していました。各入力DStream(ファイルシステムは除く。後の章で議論します)は、ソースからデータを受け取り、処理のためにSparkのメモリにそれを格納するReceiver (Scala doc, Java doc)オブジェクトと関係します。
Sparkストリーミングは組み込みのストリーミングソースの2つのカテゴリーを提供します。
- 基本的なソース: StreamContext APIの中で直接利用可能なソース。例: ファイルシステム、およびソケット接続。
- 上級のソース: Kafka, Flume, Kinesisなどのように特別なユーティリティクラスを使って利用可能なソース。これらはlinking の章で議論される特別な依存性に対してのリンクを必要とします。
この章で各カテゴリに存在するいくつかのソースについて議論しようと思います。
ストリーミングアプリケーションの中で並行して複数のデータのストリームを受け取りたい場合は、複数の入力DStreamを生成することができます(更に詳しくはパフォーマンスチューニングの章で議論されます)。これは同時に複数のデータストリームを受け取る複数のreceiverを生成するでしょう。しかし、Sparkのworker/executorは長く実行するタスクであり、従ってSparkストリーミングアプリケーションに割り当てられたコアの一つを占有することに注意してください。従って、受け取ったデータを処理、およびreceiverを実行するためには、Sparkストリーミングアプリケーションは十分なコア(あるいは、ローカルで動作する場合はスレッド)が割り当てられる必要があることを忘れないでいることが重要です。
覚えておくべきこと
-
Sparkストリーミングプログラムをローカルで実行する場合は、マスターURLとして"local"または"local[1]"を使用しないでください。これらの両方とも一つのスレッドだけがローカルでタスクを実行するために使われるでしょう。receiverに基づいた入力DStreamを使っている場合(例えば、ソケット、Kafka, Flumeなど)は、receiverを実行するために1つのスレッドが使われ、受け取ったデータを処理するためのスレッドを残さないでしょう。従って、ローカルで事項する場合は、常にマスターURLとして "local[n]" を使います。ここで、n >は実行するreceiverの数です (マスターを設定する方法については、Spark プロパティ を見てください)。
-
クラスター上で実行するようにロジックを拡張するには、Sparkストリーミングアプリケーションに割り当てるコアの数がreceiverの数よりも多くなければなりません。 そうでなければ、システムはデータを受け取りますが処理することができないでしょう。
基本ソース
quick exampleの中で、TCPソケット接続によって受け取ったテキストデータからDStreamを生成するssc.socketTextStream(...)
を既に見てきました。ソケットは別として、StreamingContext APIは入力ソースとしてファイルからDStreamを生成するメソッドを提供します。
-
File Streams: HDFS API (すなわち、HDFS, S3, NFS など) と互換性があるどのようなファイルシステム上のファイルからでもデータを読み込むため、DStreamは以下のようにして生成することができます:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);
streamingContext.textFileStream(dataDirectory)
Spark ストリーミングは
dataDirectory
ディレクトリを監視し、そのディレクトリの中に生成される全てのファイルを処理します (入れ子になったディレクトリ内のファイルはサポートされません)。以下に注意してください- ファイルは同じフォーマットでなければなりません。
- ファイルはアトミックにデータディレクトリに移動 あるいは リネームすることで
dataDirectory
に生成されなければなりません。 - 一旦移動されると、ファイルは変更されてはいけません。ファイルが連続して追加される場合は、新しいデータは読み込まれないでしょう。
単純なテキストファイルについては、簡単なメソッド
streamingContext.textFileStream(dataDirectory)
があります。そして、ファイルストリームはコアの割り当てを必要としないため、receiverを実行する必要はありません。Python APIでは、Python API
fileStream
は利用できません。textFileStream
だけが利用可能です。 -
カスタムレシーバー ベースのストリーム: DStreams はカスタムレシーバーを経由して受け取ったデータストリームを使って生成することができます。詳細はカスタムレシーバーガイド および DStream Akka を見てください。
-
ストリームとしてのRDDのキュー: テストデータを持つSparkストリーミングアプリケーションのテストのために、
streamingContext.queueStream(queueOfRDDs)
を使ってRDDのキューに基づいたDStreamも生成するかも知れません。キューに入れられた各RDDはDStreamの中のデータの塊として扱われ、ストリームのように処理されるでしょう。
ソケットおよびファイルからのストリームについての詳細は、ScalaについてはStreamingContext、JavaについてはJavaStreamingContext、そしてPythonについては StreamingContext の関連する関数のAPIドキュメントを見てください。
一歩進んだソース
Python API Spark 2.0.0 では、これらのソース、Kafka, Kinesis および Flume はPython APIから利用可能です。
ソースのこのカテゴリは外部の非Sparkライブラリとの調和を必要とします。それらのいくつかと複雑な依存性を持ちます(例えば、KafkaとFlume)。従って、依存性のバージョンの衝突に関する問題を最小化するために、これらのソースからDStreamを生成する機能が別個の必要になったら明示的にリンクされるライブラリに移動されました。
これらの上級のソースはSpark シェルでは利用できません。そのためこれらの上級のソースに基づいたアプリケーションはシェル内でテストすることができません。本当にそれらをSparkシェルの中で使用したい場合は、対応するMaven artifactのJARを依存するものと一緒にダウンロードしクラスパスに追加する必要があります。
これらの幾つかの上級ソースは以下のものです。
-
Kafka: Spark ストリーミング 2.0.0 はKafka 0.8.2.1と互換性があります。詳細はKafka 統合ガイドを見てください。
-
Flume: Spark ストリーミング 2.0.0 はFlume 1.6.0と互換性があります。詳細は Flume 統合ガイド を見てください。
-
Kinesis: Spark Streaming 2.0.0 はKinesisクライアントライブラリ 1.2.1と互換性があります。詳細は Kinesis 統合ガイド を見てください。
独自のソース
Python API これはPythonではまだサポートされません。
入力 DStream も独自のデータソースから生成することができます。しなければいけないことは、独自のソースからデータを取得しそれをSparkに入れることができるユーザ定義のreceiverを実装することです (それが何であるかを理解するには次の章を見てください)。詳細は 独自の receiver のガイド 。
receiver の信頼性
信頼性に基づいたデータソースの2つの種類がありえます。(Kafka および Flumeのような) データの到着を知らせることができるソース。それらの信頼できる ソースからデータを受け取るシステムが受け取ったデータを正しく受け入れた場合、どのような障害によってもデータが紛失していないことを保証することができます。これは結果として2つの種類のreceiverとなります:
- 信頼できる Receiver - 信頼できるreceiverはデータが受信され、レプリケーションによって格納された時に、信頼できるソースに通知が正しく送られます。
- 信頼できない Receiver - 信頼できないreceiver はソースに通知を送信しません。これは通知をサポートしないソース、あるいは信頼できるソースの場合でも通知の複雑さに分け入りたくないか必要としない場合に、使うことができます。
信頼できるreceiverを書く方法の詳細は独自のreceiverガイドで議論されます。
DStream上の変換
RDDのそれと似ていて、変換によりデータは入力DStreamから修正することができます。DStreamは通常のSpark RDD上で利用可能な多くの変換をサポートします。以下は一般的なそれらです。
変換 | 意味 |
---|---|
map(func) | ソースDStreamの各要素を関数 funcを使って渡すことで新しいDStreamを返します。 |
flatMap(func) | mapに似ていますが、各入力アイテムは0以上の出力アイテムにマップすることができます。 |
filter(func) | func がtrueを返すソースDStreamのレコードのみを選択することで新しいDStreamを返します。 |
repartition(numPartitions) | パーティションを増減することでこのDStream内の並行レベルを変更します。 |
union(otherStream) | ソースDStreamとotherDStreamの要素の結合を含む新しいDStreamを返します。 |
count() | ソースDStreamの各RDD内の要素の数を数えることで一つの要素のRDDの新しいDStreamを返します。 |
reduce(func) | (2つの引数を取り、一つを返す)func 関数を使ってソースDStreamの各RDD内の要素を集約することで、一つの要素のRDDの新しいDStreamを返します。並行して計算できるように、その関数は結合性および可換性がなければなりません。 |
countByValue() | タイプKの要素のDStreamで呼ばれると、各キーの値がソースのDStreamの各RDDでの頻度になる(K, Long)のペアの新しいDStreamを返します。 |
reduceByKey(func, [numTasks]) | (K, V)ペアのDStreamで呼ばれると、指定されたreduce関数を使って集約された各キーの値からなる(K, V)の新しいDStreamを返します。注意: グルーピングを行うために、デフォルトではSparkの並行タスクのデフォルトの数(ローカルモードでは2、クラスターモードでは spark.default.parallelism で決定される数)を使用します。タスクの異なる数を設定するために任意の numTasks 引数を渡すことができます。 |
join(otherStream, [numTasks]) | (K, V) と (K, W)のペアの2つのDStreamで呼ばれると、各キーの要素の全てのペアからなる (K, (V, W)) のペアの新しいDStreamを返します。 |
cogroup(otherStream, [numTasks]) | (K, V)と(K, W)ペアのDStreamで呼ばれると、(k, Seq[V], Seq[W]) の組の新しいDStreamを返します。 |
transform(func) | ソースDStreamの各RDDに RDD-to-RDD関数を適用することで、新しいDStreamを返します。これはDStreamに任意のRDD操作を行うために使用することができます。 |
updateStateByKey(func) | 指定された関数を以前のキーの状態とキーのための新しい値に適用することで更新された各キーのための状態を持つ、新しい"state"DStreamを返します。これは各キーのための任意のstateデータを維持するために使用することができます。 |
これらの変換のいくつかは、もっと詳細に議論する価値があります。
UpdateStateByKey オペレーション
updateStateByKey
オペレーションは新しい情報で絶え間なく更新される間任意のstateを維持することができます。コレを使うには、2つのステップを行う必要があるでしょう。
- stateを定義する - stateは任意のデータタイプでも構いません。
- stateの更新関数を定義する - 入力ストリームから以前のstateと新しい値を使ってstateをどうやって更新するかの関数を使って指定します。
各バッチにおいて、バッチ内に新しいデータがあるかどうかに関係なく、Sparkは全ての既存のキーに対して状態を更新する関数を提供するでしょう。更新関数がNone
を返すと、キー-値ペアは削除されるでしょう。
これを例を使って説明しましょう。テキストデータストリームの中で各単語のカウントを実行し続けたいとします。ここで、カウントの実行はstateで、それは整数です。更新関数を以下のように定義します:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
これが単語を含むDStreamに適用されます。(つまり、pairs
DStreamは以前の例の(word, 1)
ペアを含みます)。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
((word, 1)
ペアから)1秒の順列を持つnewValues
と、以前のカウントを持つrunningCount
を使って、各単語ごとに update 関数が呼ばれるでしょう。
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
Integer newSum = ... // add the new values with the previous running count to get the new count
return Optional.of(newSum);
}
};
これが単語を含むDStreamに適用されます。(つまり、pairs
DStreamはquick exampleの(word, 1)
ペアを含みます)。
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);
((word, 1)
ペアから)1秒の順列を持つnewValues
と、以前のカウントを持つrunningCount
を使って、各単語ごとに update 関数が呼ばれるでしょう。完全なJavaコードについては、JavaStatefulNetworkWordCount.scalaの例を見てください。
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount) # add the new values with the previous running count to get the new count
これが単語を含むDStreamに適用されます。(つまり、pairs
DStreamは以前の例の(word, 1)
ペアを含みます)。
runningCounts = pairs.updateStateByKey(updateFunction)
((word, 1)
ペアから)1秒の順列を持つnewValues
と、以前のカウントを持つrunningCount
を使って、各単語ごとに update 関数が呼ばれるでしょう。完全なPythonコードについては、stateful_network_wordcount.pyの例を見てください。
updateStateByKey
の使用は設定するためにチェックポイントディレクトリを必要とすることに注意してください。これはチェックポイント の章で詳細に議論されます。
変換オペレーション
(transformWith
のような変形物も一緒に)変換
オペレーションにより、任意のRDD-to-RDD関数はDStreamに適用することができます。DStream APIに公開されていない任意のRDDオペレーションを適用するために使用することができます。例えば、データストリーム中の各バッチに他のデータセットをjoinする機能はDStream API には直接公開されていません。しかし、これを行うためにtransform
を簡単に使うことができます。これにより将来性がとても強くなります。例えば、入力データストリームをあらかじめ計算されたspam情報(これもSparkで生成されるかも知れません)とjoinし、それに基づいてフィルタすることで、リアルタイムのデータの整理を行うことができます。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
@Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
...
}
});
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
提供された関数は各バッチの間隔ごとに呼ばれることに注意してください。これにより時間で変動するRDDオペレーションを行うことができます。つまり、RDDオペレーション、パーティションの数、ブロードキャスト変数などがバッチ間で変わるかも知れません。
ウィンドウ オペレーション
Spark ストリーミングはウィンドウ オペレーションも提供します。これによりデータのスライドするウィンドウに変換を適用することができます。以下の図はこのスライドするウィンドウを図示しています。
図で示されるように、ウィンドウがソースのDStream上をスライドするたびに、ウィンドウ内のDStreamのRDDを生成するために、ウィンドウ内のソースRDDは結合され操作されます。この特定の場合において、操作は最後の3回のデータの単位に対して適用され、2回の単位に対してスライドします。この事はどのウィンドウオペレーションも2つのパラメータを指定しなければならないことを示します。
- ウィンドウの長さ - ウィンドウの持続時間 (図での 3)。
- スライドの間隔 - ウィンドウのオペレーションが実施される間隔 (図での2)。
これら2つのパラメータはソースDStreamのバッチ間隔の倍数でなければなりません(図での1)。
例を使ってウィンドウオペレーションを説明しましょう。10秒ごとに直近30秒のデータの単語のカウントを生成することで前の例を拡張したいとします。これをするには直近30秒のデータに (word, 1)
ペアのpairs
DStreamに reduceByKey
オペレーションを適用する必要があります。reduceByKeyAndWindow
オペレーションを使うことでこれをすることができます。
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
// Reduce function adding two integers, defined separately for clarity
Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
};
// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(10));
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
以下は一般的なウィンドウ オペレーションのいくつかです。これらすべてのオペレーションは上に述べたように2つのパラメータ - windowLength と slideInterval を取ります。
変換 | 意味 |
---|---|
window(windowLength, slideInterval) | ソースのDStreamのウィンドウで切り取られたバッチに基づいて計算された新しいDStreamを返します。 |
countByWindow(windowLength, slideInterval) | ストリーム内の要素のスライドするウィンドウによって切り取られた数を返します。 |
reduceByWindow(func, windowLength, slideInterval) | funcを使ってストリーム内のスライドの間隔で要素を集約することで生成された、新しい一つの要素のストリームを返します。並行して正確に計算できるように、その関数は結合性および可換性がなければなりません。 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | (K, V)ペアのDStreamで呼ばれると、指定されたreduce関数funcをスライドするウィンドウ内でバッチに使って集約された各キーの値からなる(K, V)の新しいDStreamを返します。注意: グルーピングを行うために、デフォルトではSparkの並行タスクのデフォルトの数(ローカルモードでは2、クラスターモードでは spark.default.parallelism で決定される数)を使用します。タスクの異なる数を設定するために任意の numTasks 引数を渡すことができます。
|
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) |
以前のウィンドウのreduce値を使って各ウィンドウのreduce値が逐次計算される、上の |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | (K, V) ペアのDStreamで呼ばれると、各キーの値がスライディングウィンドウ内での頻度になる(K, Long)のペアの新しいDStreamを返します。reduceByKeyAndWindow のように、任意の引数を使ってreduceタスクの数が設定可能です。
|
join オペレーション
最後に、Sparkストリーミングで異なる種類のjoinをどれだけ簡単に行うことができるかについて、強調する価値があります。
Stream-stream joins
ストリームは他のストリームととても簡単にjoinすることができます。
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
ここで、各バッチの間隔の中で、stream1
で生成されたRDDはstream2
で生成されたRDDとjoinされるでしょう。leftOuterJoin
, rightOuterJoin
, fullOuterJoin
を行うこともできます。更に、ストリームのウィンドウ上でjoinすることもしばしば有用です。それも同じくとても簡単です。
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);
windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
Stream-dataset joins
これは DStream.transform
オペレーションの説明の中で既に以前に説明されました。これは、ウィンドウで切り取られたストリームとデータセットのjoinの更に別の例です
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(
new Function<JavaRDD<Tuple2<String, String>>, JavaRDD<Tuple2<String, String>>>() {
@Override
public JavaRDD<Tuple2<String, String>> call(JavaRDD<Tuple2<String, String>> rdd) {
return rdd.join(dataset);
}
}
);
dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
実際は、joinしたいデータセットの変更を動的に行うこともできます。transform
を提供する関数が各バッチの間隔ごとに評価され、従ってdataset
リファレンスが示す現在のデータセットを使用するでしょう。
DStream変換の完全なリストはAPIドキュメントの中で利用可能です。Scala APIについては、DStream と PairDStreamFunctionsを見てください。Java APIについては、JavaDStream と JavaPairDStream を見てください。Python API については、DStream を見てください。
DStream上の出力オペレーション
出力オペレーションによってDStreamのデータはデータベースあるいはファイルシステムのような外部システムに出力することができます。出力オペレーションによって実際に変換されたデータが外部システムによって使うことができるため、出力オペレーションは全てのDStream変換の実際の実行を引き起こします (RDDでのアクションに似ています)。現在のところ、以下の出力オペレーションが定義されています:
出力オペレーション | 意味 |
---|---|
print() | ストリーミングアプリケーションを実行しているドライバノード上のDStreamの中のデータの各バッチの最初の10個要素を出力します。これは開発とデバッグの時に便利です。 Python API Python APIの中で pprint() が呼ばれます。 |
saveAsTextFiles(prefix, [suffix]) | このDStreamの内容をテキストとして保存します。各バッチの間隔におけるファイル名は、prefix および suffixに基づいて生成されます : "prefix-TIME_IN_MS[.suffix]". |
saveAsObjectFiles(prefix, [suffix]) | このDStreamの内容をシリアライズ化されたJavaオブジェクトのSequenceFiles として保存します。各バッチの間隔におけるファイル名は、prefix および suffixに基づいて生成されます : "prefix-TIME_IN_MS[.suffix]". Python API これはPython APIの中で利用できません。 |
saveAsHadoopFiles(prefix, [suffix]) | このDStreamの内容をHadoopファイルとして保存します。各バッチの間隔におけるファイル名は、prefix および suffixに基づいて生成されます : "prefix-TIME_IN_MS[.suffix]". Python API これはPython APIの中で利用できません。 |
foreachRDD(func) | 関数 func をストリームから生成された各RDDへ適用する、最も一般的な出力オペレータ。この関数は各RDD内のデータを、RDDをファイルへ保存、あるいはネットワーク越しにデータベースに書き込むような外部システムに送信します。関数 func はストリーミングアプリケーションを実行しているドライバプログラムの中で実行され、通常はストリーミングRDDの計算を強制するRDDアクションを持つだろうということに注意してください。 |
foreachRDDの使用のデザインパターン
dstream.foreachRDD
はデータを外部システムに送信することができる強力なprimitiveです。しかし、このprimitiveを正しくそして効果的に使う方法を理解することが重要です。以下は避けるべきよくある間違いです。
外部システムにデータを書き込むのに接続オブジェクト(例えば、リモートサーバへのTCP接続)の生成とそれを使ってデータをリモートシステムに送信することが必要になることが度々あります。この目的のため、開発者はSparkドライバに接続オブジェクトを不注意にも生成しようとし、RDDにレコードを保存するためにSparkワーカーの中で使用しようとするかも知れません。例えば (Scalaでは)、
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}
def sendRecord(rdd):
connection = createNewConnection() # executed at the driver
rdd.foreach(lambda record: connection.send(record))
connection.close()
dstream.foreachRDD(sendRecord)
これは接続オブジェクトがシリアライズされ、ドライバーからワーカーに送信されることを必要とするため、正しくありません。このような接続オブジェクトはマシーン間でめったに転送されません。このエラーはシリアライズエラー(接続オブジェクトがシリアライズ可能ではない)、初期化エラー(接続オブジェクトはワーカーで初期化される必要がある)など、として明らかにされるかも知れません。正しい解決方法はワーカーで接続オブジェクトを生成することです。
しかし、これは他のよくある間違いに繋がります - 各レコードごとに新しい接続を生成する。例えば:
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
一般的に、接続オブジェクトを生成することは、時間とリソースのオーバーヘッドがあります。従って、各レコードごとに接続オブジェクトを生成および破壊することは、不必要な高負荷を起こし、システムの全体のスループットをかなり下げることになりえます。より良い方法は rdd.foreachPartition
を使うことです - 一つの接続オブジェクトを生成し、その接続を使ってRDD内の全てのレコードを送信します。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
これは多くのレコード上の接続生成のオーバーヘッドを償却します。
結果的に、これは複数のRDD/バッチに渡って接続オブジェクトを再利用することでより最適化されるかも知れません。One can maintain a static pool of connection objects than can be reused as RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
プール内の接続は要求に応じてlazilyに生成され、しばらくの間使われなければタイムアウトしなければなりません。これは外部のシステムへのデータの送信を最も効率よく行います。
覚えておくべき他のポイント:
-
RDDがRDDアクションによってlazilyに実行されるのと同じ様に、DStreamsは出力操作によってlazilyに実行されます。特に、DStream出力オペレーション内のRDDアクションは受け取ったデータの処理を強制します。従って、アプリケ-ションがどのような出力オペレーションも持たないか、あるいは出力オペレーション内にどのようなRDDアクションも持たない
dstream.foreachRDD()
のような出力オペレーションを持つ場合は、何も実施されないでしょう。システムは単純にデータを受け取りそれを破棄するでしょう。 -
デフォルトでは、出力オペレーションは一度に一つずつ実行されます。そして、それらはアプリケーション内で定義された順番で実行されます。
Accumulators および Broadcast 変数
Accumulators および Broadcast variables はSparkストリーミングのチェックポイントから回復することができません。チェックポイントを有効にし、またAccumulators あるいは Broadcast variables を使う場合、障害時にドライバーが再起動した後で再インスタンス化できるように、Accumulators と Broadcast variables のための穏やかにインスタンス化されたシングルトン インスタンスを生成しなければならないでしょう。以下の例で説明されます。
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}
object DroppedWordsCounter {
@volatile private var instance: LongAccumulator = null
def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
instance
}
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter.add(count)
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
})
完全なソースコードを見てください。
class JavaWordBlacklist {
private static volatile Broadcast<List<String>> instance = null;
public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordBlacklist.class) {
if (instance == null) {
List<String> wordBlacklist = Arrays.asList("a", "b", "c");
instance = jsc.broadcast(wordBlacklist);
}
}
}
return instance;
}
}
class JavaDroppedWordsCounter {
private static volatile LongAccumulator instance = null;
public static LongAccumulator getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaDroppedWordsCounter.class) {
if (instance == null) {
instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
}
}
}
return instance;
}
}
wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
// Get or register the blacklist Broadcast
final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
final LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use blacklist to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
if (blacklist.value().contains(wordCount._1())) {
droppedWordsCounter.add(wordCount._2());
return false;
} else {
return true;
}
}
}).collect().toString();
String output = "Counts at time " + time + " " + counts;
}
}
完全なソースコードを見てください。
def getWordBlacklist(sparkContext):
if ('wordBlacklist' not in globals()):
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
return globals()['wordBlacklist']
def getDroppedWordsCounter(sparkContext):
if ('droppedWordsCounter' not in globals()):
globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
return globals()['droppedWordsCounter']
def echo(time, rdd):
# Get or register the blacklist Broadcast
blacklist = getWordBlacklist(rdd.context)
# Get or register the droppedWordsCounter Accumulator
droppedWordsCounter = getDroppedWordsCounter(rdd.context)
# Use blacklist to drop words and use droppedWordsCounter to count them
def filterFunc(wordCount):
if wordCount[0] in blacklist.value:
droppedWordsCounter.add(wordCount[1])
False
else:
True
counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
wordCounts.foreachRDD(echo)
完全なソースコードを見てください。
DataFrame および SQL オペレーション
ストリーミングデータ上でDataFrames および SQLオペレーションを簡単に使用することができます。StreamingContextが使用しているSparkContextを使って、SparkSessionを生成しなければなりません。更にコレはドライバの故障時に再起動することができるようにしなければなりません。これは怠惰なインスタンス化されたSparkSessionのシングルトン インスタンスの生成によって行われなければなりません。以下の例で説明されます。DataFrameとSQLを使ったワードカウントを生成するために、以前の ワード カウントの例を修正します。各RDDは一時テーブルとして登録され、SQLを使ってクエリを実行して、データフレームに変換されます。
/** DataFrame operations inside your streaming program */
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("word")
// Create a temporary view
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
完全なソースコードを見てください。
/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
private String word;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
}
...
/** DataFrame operations inside your streaming program */
JavaDStream<String> words = ...
words.foreachRDD(
new Function2<JavaRDD<String>, Time, Void>() {
@Override
public Void call(JavaRDD<String> rdd, Time time) {
// Get the singleton instance of SparkSession
SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
public JavaRow call(String word) {
JavaRow record = new JavaRow();
record.setWord(word);
return record;
}
});
DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words");
// Do word count on table using SQL and print it
DataFrame wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word");
wordCountsDataFrame.show();
return null;
}
}
);
完全なソースコードを見てください。
# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
...
# DataFrame operations inside your streaming program
words = ... # DStream of strings
def process(time, rdd):
print("========= %s =========" % str(time))
try:
# Get the singleton instance of SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())
# Convert RDD[String] to RDD[Row] to DataFrame
rowRdd = rdd.map(lambda w: Row(word=w))
wordsDataFrame = spark.createDataFrame(rowRdd)
# Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
# Do word count on table using SQL and print it
wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
except:
pass
words.foreachRDD(process)
完全なソースコードを見てください。
異なるスレッドからのストリーミングデータで定義されたテーブルにSQLクエリを実行することもできます(つまり、StreamingContextの実行の非同期)。単に、クエリが実行できるだけの十分なストリーミングデータをStreamingContextに設定するようにしてください。そうでなければ、非同期SQLクエリに気づかないStreamingContextはクエリが完了する前に古いストリーミングデータを削除するでしょう。例えば、もし最後のバッチにクエリしたいが、クエリの実行に5分かかるかも知れない場合は、streamingContext.remember(Minutes(5))
を呼びます。(Scala、あるいは他の言語での同等のもの)。
データストリームについてもっと知りたい場合は、データフレームとSQLガイドを見てください。
MLlib オペレーション
MLlibによって提供される機械学習アルゴリズムも簡単に使うことができます。まず最初に、同時にストリーミングから学習できストリーミングにモデルを適用する、ストリーミング機械学習アルゴリズム(例えば、ストリーミング線形回帰, ストリーミングK平均法など)があります。これらより優れた、もっと大きな機械学習アルゴリズムのクラスについては、オフライン(つまり過去データ)で学習モデルを学習し、それからオンラインでストリーミングデータ上にそのモデルを適用します。詳細はMLlib ガイドを見てください。
キャッシング / 永続化
RDDと似て、DStreamは開発者がメモリ内にストリームのデータを永続化することもできます。つまり、DStream上のpersist()
メソッドの使用は自動的にメモリ内のDStreamの各RDDを永続化するでしょう。DStream内のデータが複数回計算される場合には有用でしょう(例えば、同じデータに複数のオペレーション)。reduceByWindow
と reduceByKeyAndWindow
、および updateStateByKey
のような状態に基づくオペレーションに関して、これは暗黙のうちにtrueです。従ってウィンドウに基づくオペレーションによって生成されたDStreamは、開発者のpersist()
の呼び出し無しに自動的にメモリ内に永続化されます。
(Kafka, Flume, ソケットなどのような)ネットワーク越しにデータを受け取る入力ストリームについては、耐障害性のために2つのノードにデータをリプリケートするようにデフォルトの永続性レベルが設定されます。
RDDと違い、DStreamのデフォルトの永続性レベルはメモリ内にデータをシリアライズ化します。これはパフォーマンスチューニングの章で更に議論されます。異なる永続化レベルの更なる情報はSpark プログラミングガイドの中で見つけることができます。
チェックポイント
ストリーミングアプリケーションは24/7で運用しなければならず、従ってアプリケーションロジックに関係の無い障害(例えば、システム障害、JVMクラッシュなど)には弾力性がなければなりません。これを可能にするために、Sparkストリーミングは障害から復帰できるように耐障害性のあるストレージシステムについての十分な情報のチェックポイントを必要とします。チェックポイントとされるデータには2つの種類があります。
- メタデータ チェックポイント - HDFSのような耐障害性のあるストレージへのストリーミングの計算を定義している情報の保存。これはストリーミングアプリケーションのドライバーが動作しているノードの障害からの復帰に使われます(詳細は後で議論されます)。メタデータには以下が含まれます:
- 設定 - ストリーミングアプリケーションを生成するために使われた設定。
- DStream オペレーション - ストリーミングアプリケーションを定義するDStreamオペレーションのセット。
- 不完全なバッチ - ジョブがキューされているがまだ完了していないバッチ。
- データ チェックポイント - 信頼できるストレージへ生成されたRDDの保存。これは複数のバッチにまたがった結合データのステートフルな変換に必要となります。そのような変換において、生成されたRDDは以前のバッチのRDDに依存します。これにより時間が経つにつれて依存のチェーンの長さが増加します。そのようなリカバリー時間の際限の無さ(依存チェーンに比例します)を防ぐために、依存チェーンの切断のために信頼できるストレージ(例えば HDFS)に定期的にステートフルな変換の中間RDDがチェックポイントされます。
要約すると、もしストートフル変換が使われる場合はデータあるいはRDDチェックポイントは基本的な機能に必要とされるのに対し、メタデータのチェックポイントはドライバー障害からの復帰に主に必要とされます。
チェックポイントを有効にする場合とは
チェックポイントは以下の要求のいずれかが必要なプリケーションで有効にされるべきです:
- ステートフル変換の使用 - もし
updateStateByKey
あるいはreduceByKeyAndWindow
(逆関数と一緒に)がアプリケーションで使われる場合、チェックポイントディレクトリは定期的なRDDチェックポイントのために提供されなければなりません。 - アプリケーションを実行しているドライバの障害からの復帰 - メタデータチェックポイントが進捗の情報と共にリカバリーに使われます。
前述のステートフル変換が無い単純なストリーミングアプリケーションは、チェックポイント無しに実行することができることに注意してください。ドライバー障害からの復帰もそのような場合は部分的になるでしょう(受け取ったが処理されていないデータは喪失されるでしょう)。これはしばしば許容され、多くのSparkストリーミングアプリケーションはこのようにされています。将来には、非Hadoop環境へのサポートがこれを改善すると期待されています。
チェックポイントの設定方法
チェックポイントは耐障害性、信頼できるファイルシステム(例えば、HDFS, S3など)のディレクトリをチェックポイント情報が保存される場所として設定することで有効にされます。これはstreamingContext.checkpoint(checkpointDirectory)
の使用によって行われます。これにより前述したステートフル変換を行うことができるでしょう。更に、ドライバーの障害からアプリケーションをリカバーしたい場合は、ストリーミングアプリケーションを以下の挙動を持つように書き直す必要があります。
- プログラムが初めて開始される時に、プログラムは新しいStreamingContextを生成し、全てのストリームをセットアップし、それからstart()を呼ぶでしょう。
- 障害の後でプログラムが再起動する時に、チェックポイントディレクトリのチェックポイントデータからStreamingContextを再生成するでしょう。
この挙動はStreamingContext.getOrCreate
を使うことで簡単になります。以下のように使われます。
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
checkpointDirectory
が存在すると、チェックポイントデータからコンテキストが再生成されます。ディレクトリが存在しない場合(つまり、初めて実行)、新しいコンテキストを生成およびDStreamをセットアップするために関数functionToCreateContext
が呼ばれるでしょう。Scalaの例 RecoverableNetworkWordCount を見てください。この例はネットワークデータのワードのカウントをファイルに追記します。
この挙動はJavaStreamingContext.getOrCreate
を使うことで簡単になります。以下のように使われます。
// Create a factory object that can create and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
@Override public JavaStreamingContext create() {
JavaStreamingContext jssc = new JavaStreamingContext(...); // new context
JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams
...
jssc.checkpoint(checkpointDirectory); // set checkpoint directory
return jssc;
}
};
// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start();
context.awaitTermination();
checkpointDirectory
が存在すると、チェックポイントデータからコンテキストが再生成されます。ディレクトリが存在しない場合(つまり、初めて実行)、新しいコンテキストを生成およびDStreamをセットアップするために関数contextFactory
が呼ばれるでしょう。Javaの例 JavaRecoverableNetworkWordCount を見ます。この例はネットワークデータのワードのカウントをファイルに追記します。
この挙動はStreamingContext.getOrCreate
を使うことで簡単になります。以下のように使われます。
# Function to create and setup a new StreamingContext
def functionToCreateContext():
sc = SparkContext(...) # new context
ssc = new StreamingContext(...)
lines = ssc.socketTextStream(...) # create DStreams
...
ssc.checkpoint(checkpointDirectory) # set checkpoint directory
return ssc
# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...
# Start the context
context.start()
context.awaitTermination()
checkpointDirectory
が存在すると、チェックポイントデータからコンテキストが再生成されます。ディレクトリが存在しない場合(つまり、初めて実行)、新しいコンテキストを生成およびDStreamをセットアップするために関数functionToCreateContext
が呼ばれるでしょう。Pythonの例 recoverable_network_wordcount.py を見てください。この例はネットワークデータのワードのカウントをファイルに追記します。
StreamingContext.getOrCreate(checkpointDirectory, None)
を使って、チェックポイントデータから StreamingContext
を明示的に生成し、計算を開始することもできます。
getOrCreate
の使用に加えて、障害時に自動的にドライバープロセスを再開するようにする必要があります。これはアプリケーションを実行するために使われるデプロイメント インフラによってのみ行うことができます。これはデプロイメント の章で更に議論されます。
RDDのチェックポイントは信頼できるストレージへの保存のコストがあることに注意してください。RDDのチェックポイントが作られるバッチの処理時間を増やすかも知れません。従って、チェックポイントの間隔は注意深く設定される必要があります。小さなバッチサイズ(1秒とします)において、各バッチのチェックポイントはオペレーションのスループットを著しく下げるかも知れません。逆に、あまりに頻度が少ないチェックポイントは、系統およびタスクサイズの増大を起こし、有害な結果をもたらすかも知れません。RDDのチェックポイントを必要とするステートフルな変換については、デフォルトの間隔は少なくとも10秒のバッチの倍数です。dstream.checkpoint(checkpointInterval)
を使って設定することができます。一般的に、DStreamのスライドする間隔の5 - 10倍のチェックポイントの間隔が試してみるのに良い設定です。
アプリケーションのデプロイ
この章では、Sparkストリーミングアプリケーションをデプロイするステップについて議論します。
必要条件
Sparkストリーミングアプリケーションを実行するには、以下のものを持つ必要があります。
-
クラスタマネージャーを持つクラスタ - これは全てのSparkアプリケーションでの一般的な要求で、詳細はデプロイメント ガイドの中で議論されます。
-
アプリケーションJARのパッケージ - ストリームアプリケーションをJARにコンパイルする必要があります。アプリケーションを開始するために
spark-submit
を使っている場合は、JARの中にSparkおよびSparkストリーミング を提供する必要は無いでしょう。しかし、アプリケーションがadvanced sources (例えば、Kafka, Flume)を使っている場合は、それらの依存に応じてJARの中にアプリケーションをデプロイするために使われる、それらがリンクする特別なartifactをパッケージする必要があるでしょう。例えば、KafkaUtils
を使用するアプリケーションは、アプリケーションJARの中にspark-streaming-kafka-0-8_2.11
およびその移行の依存の全てを含む必要があるでしょ。 -
executorのための十分なメモリの設定 - 受け取ったデータはメモリ内に保存する必要があるため、executorは受け取ったデータを保持するための十分なメモリの設定をされていなければなりません。10分のウィンドウオペレーションをする場合は、システムはメモリ内に直近10分のデータを維持しなければならないことに注意してください。つまり、アプリケーションのためのメモリの要求はその中で使われるオペレーションに依存します。
-
チェックポイントの設定 - もしストリームアプリケーションが必要とすれば、Hadoop APIと互換性のある耐障害性のあるストレージ(例えば、HDFS, S3など)のディレクトリはチェックポイントディレクトリとして設定されなければならず、そのように書かれたストリーミングアプリケーションはチェックポイント情報を障害からの回復に使うことができます。詳細は チェックポイントの章を見てください。
- アプリケーションドライバーの自動再起動の設定 - ドライバーの障害からの自動的な回復のために、ストリーミングアプリケーションを実行するために使われるデプロイメント インフラストラクチャーは、ドライバープロセスを監視し障害が起きた場合にはドライバーを再起動しなければなりません。異なる クラスターマネージャー はこれを実施するために異なるツールを持っています。
- Spark スタンドアローン - SparkアプリケーションドライバーはSparkスタンドアローンクラスタ(クラスター デプロイ モードを見てください)内で実行するためにサブミットすることができます。つまり、アプリケーションドライバそれ自身がワーカーノードのうちの一つの上で実行します。更に、スタンドアローン クラスター マネージャーはドライバーを監督するように指示をすることができ、非0の終了コードあるいはドライバーを実行しているノードの傷害によってドライバーが故障した場合には、再起動します。詳細はSpark スタンドアローン ガイドの中のクラスターモード および supervise を見てください。
- YARN - Yarnはアプリケーションの自動的な再起動のための似たような仕組みをサポートします。詳細はYARNドキュメントを参照してください。
- Mesos - Marathon はMesosと一緒にこれを行うために使われます。
-
先行書き込みログの設定 - Spark 1.2 から、強力な耐障害性の達成のために先行書き込みログが導入されました。有効にされた場合は、receiverから取得された全てのデータは設定のチェックポイントディレクトリ内の先行書き込みログに書き込まれます。これによりドライバのリカバリー時のデータ喪失が避けられ、従ってデータの非喪失が保証されます (詳細は耐障害性 意味論 の章で議論されます)。これは設定パラメータ
spark.streaming.receiver.writeAheadLog.enable
をtrue
に設定することで有効にすることができます。しかし、これらの強力な意味論は各receiverの受け取りスループットの犠牲によって達成されるでしょう。集約スループットを高くするために もっと多くのreceiverを並列で実行することによって、これは修正することができます。更に、先行書き込みログが有効でログが既にレプリケートされているストレージシステムに格納されている場合、Spark内の受信データのレプリケーションを無効にすることをお勧めします。これは入力ストリームのストレージレベルをStorageLevel.MEMORY_AND_DISK_SER
に設定することで行うことができます。書き込み先行ログのためにS3 (あるいはフラッシュをサポートしないファイルシステム)を使う場合は、spark.streaming.driver.writeAheadLog.closeFileAfterWrite
とspark.streaming.receiver.writeAheadLog.closeFileAfterWrite
を有効にしてください。詳細はSpark ストリーミング設定 を見てください。 - 最大受信レートの設定 - ストリーミングアプリケーションがデータ受信の速度ほど速くデータを処理するための十分なクラスターリソースが無い場合、receiverは records/ sec という形で最大のレートの制限を設定することでレートを制限することができます。receiverについては configuration parameters
spark.streaming.receiver.maxRate
を、Direct Kafka の方法についてはspark.streaming.kafka.maxRatePerPartition
を参照してください。Spark 1.5では、このレートの制限を設定する必要が無くなるbackpressureと呼ばれる機能を導入しました。Sparkストリーミングはレートの制限を自動的に計算し、処理の状況が変わると動的にそれらに順応します。このbackpressureは configuration parameterspark.streaming.backpressure.enabled
をtrue
に設定することで有効にすることができます。
アプリケーションコードのアップグレード
実行中のSparkストリーミングアプリケーションが新しいアプリケーションコードにアップグレードされなければならない場合、二つの仕組みがあります。
-
アップグレードされたSparkストリーミングアプリケーションが開始され既存のアプリケーションと並列して実行されます。新しいアプリケーション(古いものと同じデータを受け取る)が作動できる状態になり、忙しい時間帯に対しても準備ができると、古いアプリケーションは減少させることができます。これはデータを2つの宛先(つまり、以前およびアップグレード後のアプリケーション)へのデータの送信をサポートするデータソースの対して行えるということに注意してください。
-
既存のアプリケーションはgracefulにシャットダウンされます (graceful シャットダウンのオプションについては
StreamingContext.stop(...)
あるいはJavaStreamingContext.stop(...) を見てください
)。これは受信されたデータがシャットダウンの前に完全に処理されることを保証します。次にアップグレードされたアプリケーションを開始することができます。これは以前のアプリケーションが停止したのと同じ場所から処理を開始するでしょう。以前のアプリケーションが停止しアップグレードしたアプリケーションがまだ起動していない間にデータがバッファされる必要があるため、これは(Kafka、およびFlumeのような)ソース側のバッファリングをサポートする入力ソースを使ってのみ行えることに注意してください。そして、アップグレード前のコードの以前のチェックポイントの情報から再起動することはできません。チェックポイントの情報は基本的にシリアライズ化された Scala/Java/Pythonオブジェクトを含んでおり、新しいコードでオブジェクトをデシリアライズしようとするので、修正されたクラスは結果的にエラーになるかもしれません。この場合、異なるチェックポイントディレクトリを使ってアップグレードしたアプリケーションを開始するか、以前のチェックポイントディレクトリを削除するかのどちらかになります。
アプリケーションの監視
Sparkの 監視能力を超えて、Sparkストリーミングに固有の追加の能力があります。StreamingContextが使われる場合、Spark web UI は実行中のreceiverに関する統計(receiverがアクティブかどうか、受信したレコードの数、receiverのエラーなど)、および完了したバッチ(バッチの処理時間、キューの遅延など)を表示する Streaming
タブを表示します。これはストリーミングアプリケーションの進捗を監視するために使用することができます。
web UI内の以下の2つのマトリックスは特に重要です:
- Processing Time - データの各バッチの処理に掛かる時間。
- Scheduling Delay - 以前のバッチの処理が終了するのをキューで待つバッチの時間。
バッチの処理時間がバッチの間隔よりも一貫して多い、かつ/あるいは、キューの遅延が増加し続けている場合は、バッチが生成されるのが早く、遅延しているために、システムはバッチの処理をすることができないことを示します。この場合、バッチの処理時間をreducing することを考慮します。
Sparkストリーミング アプリケーションの進捗は、StreamingListener インターフェースを使って監視することもできます。これによりreceiverのステータスと処理時間を取得することができます。これは開発者のAPIであり、将来おそらく改良(つまり、もっと情報が報告されます)されるだろうことに注意してください。
パフォーマンス チューニング
クラスタ上でSparkストリーミングアプリケーションから最高のパフォーマンスを得るには、ちょっとした調整が必要です。この章では、アプリケーションのパフォーマンスを改善するために調整可能な幾つかのパラメータと設定について説明します。高次元では、以下の2つのことを考慮する必要があります:
-
クラスタのリソースを効率的に使うことでデータの各バッチの処理時間を削減する。
-
データのバッチを受け取ったらできるだけ早く処理できるような正しいデータのバッチサイズに設定する。
バッチ処理時間の削減
Sparkで各バッチの処理時間を最小にするための行える多くの最適化があります。詳細はチューニングガイドの中で議論されます。この章では最も重要な幾つかについて際立たせます。
データ受信時の並行レベル
(Kafka, Flume, ソケットなどの)ネットワーク越しのデータの受信はデータがデシリアライズされSparkに格納されることを必要とします。データの受信がシステム内でボトルネックになった場合は、データの受信の並行化を考慮します。各入力DStreamは単一の一つのデータのストリームを受信する(ワーカーマシーン上で動作する)receiverを生成することに注意してください。従って複数データストリームの受信は複数の入力DStreamの生成およびそれらがソースからデータのストリームの異なるパーティションを受け取るように設定することで、達成されます。例えば、データの2つのトピックを受け取る一つのKafka入力DStreamは、それぞれは一つのトピックを受け取る2つのKafka入力ストリームに分割することができます。これは並行してデータを受け取ることができる二つのreceiverを実行し、従って全体のスループットを増加させます。これらの複数のDstreamは一つのDStreamを生成するために一つに結合することができます。そして、一つの入力DStreamに適用される変換は、統一されたストリームに適用することができます。以下のようにして行います。
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
numStreams = 5
kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()
考慮しなければならない他のパラメータは、receiverのブロッキング間隔です。これはconfiguration parameter spark.streaming.blockInterval
によって決定されます。ほとんどのreceiverにとって、受け取ったデータはSparkのメモリ内に格納される前にデータのブロックの中に一緒に合体されます。各バッチ内のブロックの数はmapのような変換の中で受け取ったデータの処理に使われるだろうタスクの数を決定します。バッチあたりのreceiverごとのタスクの数はおよそ (batch間隔 / ブロック間隔) です。例えば、200msのブロック間隔は2秒のバッチごとに10のタスクを生成するでしょう。タスクの数があまりに低い(つまり、マシーンあたりのコアの数より少ない)場合、全ての利用可能なコアがデータを処理するために使われないことになるので非効率です。指定されたバッチの間隔でタスクの数を増やすには、ブロック間隔を減らします。しかし、推奨されるブロックの間隔の最小値は約50msで、それ以下ではタスクを起動するオーバーヘッドが問題になるかも知れません。
複数の入力ストリーム/receiver を使ってデータを受け取る代替方法は、入力データストリームを(inputStream.repartition(<number of partitions>)
を使って)明示的にパーティションすることです。 これはさらに処理をする前にクラスタ内で指定された数のマシーンにわたって受け取ったデータのバッチを分散します。
データ処理での並行レベル
計算の全てのステージ内で使用される並行タスクの数が十分多くない場合、クラスターのリソースは十分使われません。例えば、reduceByKey
および reduceByKeyAndWindow
のような分散reduceオペレーションのために、デフォルトの並行タスクはspark.default.parallelism
configuration propertyによって制御されます。並行レベルを引数として渡す(PairDStreamFunctions
ドキュメントを見てください)documentation)ことができます。あるいはデフォルトを変更するためにspark.default.parallelism
configuration property を設定することができます。
データのシリアライズ
データのシリアライズのオーバーヘッドは、シリアライズのフォーマットを調整することで減らすことができます。ストリーミングの場合は、シリアライズには2つのデータのタイプがあります。
-
Input data: デフォルトでは、receiverを使って受信される入力データはStorageLevel.MEMORY_AND_DISK_SER_2を使ってexecutorのメモリに格納されます。つまり、データはGCのオーバーヘッドを減らすためにバイトにシリアライズされ、executorの障害の耐性のためにリプリケートされます。また、データは最初にメモリに保持され、ストリーミング計算に必要な入力データの全てを保持するのにメモリが十分では無い場合に限りディスクに流し込まれます。このシリアライズ化は明らかにオーバーヘッドがあります- receiverは受け取ったデータをデシリアライズしなければならず、Sparkのシリアライズほーマットを使ってそれを再シリアライズしなければなりません。
-
ストリーミング 操作によって生成された永続的なRDD: ストリーミング計算によって生成されたRDDはメモリ内に永続化するかも知れません。例えば、ウィンドウオペレーションは複数回処理されるかも知れないのでメモリ内にデータを永続化します。しかし、SparkコアのデフォルトのStorageLevel.MEMORY_ONLYとは違い、ストリーミング計算で生成された永続化されたRDDはデフォルトでGCのオーバーヘッドを最小化するStorageLevel.MEMORY_ONLY_SER(つまり、シリアライズ) を使って永続化されます。
どちらの場合でも、Kryoシリアライズを使うことでCPUおよびメモリのオーバーヘッドを減らすことができます。詳細はSpark チューニングガイド を見てください。Kryoに関しては、独自クラスの登録およびオブジェクトのリファレンスの追跡の無効化を考慮してください (設定ガイドのKryoに関係する設定を見てください)。
ストリーミングアプリケーションのために維持する必要があるデータの量が多く無い特定の場合において、過度なGCのオーバーヘッド無しにデシリアライズされたオブジェクトとしてデータ(タイプも)残すことが実現可能かも知れません。例えば、バッチ間隔が数秒で、ウィンドウオペレーションが無い場合、明示的にストレージレベルを相応に設定することで永続化データのシリアライズの無効化を試すことができます。これはシリアライズによるCPUのオーバーヘッドを減らし、あまりGCのオーバーヘッド無しにパフォーマンスを改善するかもしれません。
タスクの起動のオーバーヘッド
秒間あたりのタスクの数が高い場合(例えば秒間あたり50以上)、タスクをスレーブにスレーブに送信するオーバーヘッドが大きくなり、1秒以内のレイテンシを達成するのが難しくなるでしょう。そのオーバーヘッドは以下の変更で少なくすることができます:
- 実行モード: Sparkをスタンドアローンモードあるいはcoarse-grained Mesosモードでの実行することは、fine-grained Mesosモードよりもタスクの起動時間が良い結果になります。詳細はMesos上での実行ガイド を参照してください。
これらの変更はバッチの処理時間を100ミリ秒の単位で削減するでしょう。このように秒未満のバッチサイズが実現可能になります。
正しいバッチの間隔の設定
クラスタ上で動作するSparkストリーミングアプリケーションが安定するために、システムはデータを受け取るとできるだけ早くデータを処理できなければなりません。言い換えると、生成されたデータのバッチはできるだけ早く処理されなければなりません。これがアプリケーションにとって真かどうかはストリーミングweb UIの処理時間の監視 によって知ることができます。バッチの処理時間はバッチの間隔よりも少なくなければなりません。
ストリーミング計算の性質に依存して、使用されるバッチの間隔はクラスタのリソースの固定セット上のアプリケーションによって継続されるデータのレートに大きな影響を与えるかも知れません。例えば、以前の WordCountNetwork の例を考えてみましょう。特定のデータレートにおいて、システムは2秒ごとにワードカウントをレポートし続けることが可能かも知れません(つまり、バッチの間隔は2秒)が、各500ミリ秒ごとではありません。つまりバッチの間隔は、プロダクションで維持できるそのようなデータレートに設定される必要があります。
アプリケーションのための正しいバッチサイズを見つけ出す良い方法は、保守的なバッチ間隔(つまり、5-10秒)と低いデータレートを使ってそれをテストすることです。システムがそのデータレートで維持できるかどうかを検証するために、処理された各バッチによって実験された end-to-endの遅延の値をチェックすることができます(Sparkドライバのlog4jのログの "Total delay"、あるいは StreamingListener インタフェースのどちらかを使って調べます)。遅延がバッチサイズと同程度に維持される場合は、システムは安定しています。そうでなければ、もし遅延が常に増加する場合、それはシステムが遅れないで付いていくことができず、不安定であることを意味します。一度安定した設定の見解が見つかれば、データレートの増加 および/あるいは バッチサイズの削減を試すことができます。一時的なデータレートの増加による瞬間的な遅延の増加は、遅延が低い値(つまり、バッチサイズ未満)に戻る限りは、それで良いことに注意してください。
メモリー チューニング
メモリの使用の調整とSparkアプリケーションの挙動は、チューニングガイドの中でとても詳細に議論されています。それを読むことを強くお勧めします。この章では、Sparkストリーミングアプリケーションのコンテキストの中での2,3のパラメータを具体的に議論します。
Sparkストリーミングアプリケーションによって必要とされるクラスタの総メモリは、使用される変換の種類にとても依存します。例えば、もし直近の10秒のデータへのウィンドウ操作を使用したい場合、クラスタはメモリ内に10秒分のデータを保持するのに十分なメモリを持つ必要があります。あるいは、多くのキーを使ってupdateStateByKey
を使いたい場合、必要なメモリは多くなるでしょう。逆に、単純な map-filter-storeオペレーションを行いたい場合は、必要なメモリは少なくなるでしょう。
一般的に、receiverを使って受信されるデータはStorageLevel.MEMORY_AND_DISK_SER_2を使って格納されるため、メモリに入りきらないデータはディスクにこぼれ出るでしょう。これはストリーミングアプリケーションのパフォーマンスを下げるかも知れないため、ストリーミングアプリケーションに必要とされる十分なメモリを提供するように勧められます。小さなサイズでメモリの使用量を試してみて、それに応じて推測するのが一番良いです。
メモリチューニングの他の視点に、ガベージコレクションがあります。低いレイテンシを必要とするストリーミングアプリケーションのために、JVMガベージコレクションによって引き起こされる大きな中断があることは望ましいことではありません。
メモリ利用料とGCオーバーヘッドを調整するのに役に立つかも知れない2,3のパラメータがあります。
-
DStreamの永続性レベル: 以前 データのシリアライズ化の章で述べたように、入力データおよびRDDはデフォルトではシリアライズ化されたバイトとして永続化されます。これは、メモリの使用量およびGCのオーバーヘッドを、永続化のデシリアライズ化に比べて少なくします。Kryoシリアライズ化を有効にすることで、更にシリアライズ化のサイズとメモリの使用量を少なくします。更なるメモリの使用量の削減は、CPU時間を犠牲にして圧縮(Spark設定の
spark.rdd.compress
を見てください)によって達成することができます。 -
古いデータの掃除: デフォルトでは、全ての入力データおよびDstream変換によって永続化されたRDDは、自動的に掃除されます。Sparkストリーミングは、使用した変換に基づいていつデータを掃除するかを決定します。例えば、10分のウィンドウオペレーションを使用している場合、Sparkストリーミングは直近のおよそ10分のデータを保持し、積極的に古いデータを捨てるでしょう。データは
streamingContext.remember
を設定することで、より長い期間(例えば、対話的に古いデータをクエリする)保持することができます。 -
CMS Garbage Collector: GCに関係する休止を一貫して低くし続けるためには、concurrent mark-and-sweep GC がとてもお勧めです。concurrent GCはシステムの全体の処理スループットを下げると知られていますが、もっと一貫性のあるバッチ処理時間を達成するためにもその使用がお勧めです。ドライバー(
spark-submit
の中で--driver-java-options
を使用する)およびexecutor(Spark 設定のspark.executor.extraJavaOptions
を使用)の両方でCMS GCを設定するのを忘れないでください。 -
その他のtips: GCのオーバーヘッドをもっと減らすために、やってみるべき幾つかの更なるtipsがあります。
OFF_HEAP
ストレージレベルを使用するPersist RDD。詳細はSpark プログラミング ガイドを見てください。- 少ないheapサイズでもっと多くのexecutorを使用する。これは各JVM heap内のGCの圧力をさげるでしょう。
覚えておくべき重要な点:
-
DStreamは1つのレシーバーに関連付けされます。並行読み込みを行うには、複数のレシーバー、つまり複数のDStreamの生成が必要です。レシーバーはexecutor内で実行されます。それは1つのコアを占有します。レシーバーのスロットが予約された後で処理のためのコアが十分にあるようにします。つまり
spark.cores.max
はレシーバーのスロットを勘定に入れる必要があります。レシーバーはラウンドロビンの形式でexecutorに割り当てられます。 -
データがストリームのソースから受け取られた場合、レシーバはデータのブロックを生成します。データの新しいブロックが各 blockInterval ミリ秒ごとに生成されます。データのNブロックは batchInterval の間に生成されます。N = batchInterval/blockInterval。これらのブロックは現在のexecutorのBlockManagerによって他のexecutorのブロックマネージャーに配送されます。その後で、ドライバー上のネットワーク入力トラッカーがその後の処理のためにブロックの場所について知らされます。
-
RDDは batchIntervalの間に生成されたブロックのためにドライバー上で生成されます。batchInterval の間に生成されたブロックはRDDのパーティションです。各パーティションはsparkでのタスクです。blockInterval== batchinterval は、1つのパーティションが生成され、おそらくそれはローカルで実行されるだろうことを意味します。
-
The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in. blockinterval を大きくすることはブロックが大きくなることを意味します。
spark.locality.wait
の高い値は、ローカルノード上のブロックの処理の機会を増やします。大きなブロックがローカルで処理されることを保証するためには、これらの二つのパラメータの間でバランスが見つけられなければなりません。 -
batchInterval と blockIntervalに頼る代わりに、
inputDstream.repartition(n)
を呼ぶことでパーティションの数を定義することができます。これは、パーティションのn番目を作成するためにRDD内でランダムにデータを再シャッフルします。もちろん、並行度を大きくするためです。しかし、シャッフルという代償があります。RDDの処理はジョブとしてドライバーのジョブスケジューラによってスケジュールされます。ある時間において1つのジョブだけがアクティブです。つまり、1つのジョブが実行中の場合、他のジョブはキューされます。 -
もし二つのdstreamがある場合、形成された二つのRDDがあるでしょう。そして互い違いにスケジュールされるであろう2つのジョブが生成されるでしょう。これを避けるために、二つのdstreamを1つにすることができます。これは、1つのunionRDDがdstreamの2つのRDDのために形成されることを保証するでしょう。このunionRDDはその後1つのジョブと見なされます。しかし、RDDのパーティション化は影響を受けません。
-
もしバッチ処理時間がbatchintervalより大きい場合、レシーバーのメモリは明らかに一杯になりはじめ、例外(ほとんどの場合おそらくBlockNotFoundException)を投げることになるでしょう。現在のところレシーバーを止める方法はありません。SparkConf 設定
spark.streaming.receiver.maxRate
を使って、レシーバーのレートを制限することができます。
耐障害性semantics
この章では、障害時のSparkストリーミングアプリケーションの挙動について議論するつもりです。
背景
Spark Streamingによって提供される意味を理解するために、SparkのRDDの基本的な耐障害性の意味を思い出してみましょう。
- RDDは不変、決定論的に再計算可能、分散されたデータセットです。各RDDは耐障害性のある入力データセット上でそれを作るために使われた決定的な操作の系図を記憶します。
- RDDのいずれかのパーティションがワーカーノードの障害により喪失した場合、そのパーティションはオペレーションの系譜を使って元の対障害性のデータセットから再計算することができます。
- 全てのRDDの変換が決定的だと仮定した場合、最終的に変換されたRDD内のデータはSparkクラスター内の障害に関係なく常に同じものになるでしょう。
SparkはHDFSあるいはS3のような耐障害性のあるファイルシステム内で操作されます。従って、耐障害性のあるデータから生成された全てのRDDも耐障害性があります。しかしSparkストリーミングの場合はほとんどの場合において(fileStream
が使われる場合を除いて)ネットワークを超えてデータを受け取るため、そうではありません。全ての生成されたRDDが同じ耐障害性の特性を達成するために、受け取ったデータはクラスタ内のワーカーノードで複数のSpark executor間でリプリケートされます(デフォルトのリプリケーションの要素は2です)。これにより障害発生時に復元されなければならないシステム内のデータは二種類になります。
- 受信およびリプリケートされたデータ - このデータは、他のノードの一つの上でデータのコピーとして、一つのワーカーノードの障害を生き延びます。
- 受信されたがリプリケーションのためにバッファされたデータ - これはリプリケートされていないため、このデータを回復する唯一の方法はソースから再び取得することです。
更に、考慮しなければならない2つの種類の障害があります:
- ワーカーノードの障害 - executorを実行しているワーカーノードのどれかが故障することがありえ、それらのノード上のメモリ内の全データが喪失されるでしょう。故障したノード上でreceiverが実行していた場合、それらのバッファされたデータは喪失されるでしょう。
- ドライバーノードの障害 - Sparkストリーミングアプリケーションを実行中のドライバーノードが故障した場合、明らかにSparkContextが喪失され、全てのexecutorのそれらのメモリ内のデータが喪失されます。
この基本的な知識を使って、Sparkストリーミングの耐障害性のsemanticsを理解してみましょう。
定義
ストリーミングシステムの意味は、各レコードがシステムによって処理されるかも知れない回数の点でしばしば捉えられます。システムが全てのありえる操作の状況下(障害などにも関わらず)で提供することができる3つの種類の保証があります。
- 最大1回: 各レコードは1回処理されるかあるいは全く処理されないでしょう。
- 少なくとも1回: 各レコードは1回以上処理されるでしょう。これはデータが喪失されないことを保証するので 最大1回よりも強いです。しかし、重複するかも知れません。
- 正確に1回: 各レコードは正確に1回処理されるでしょう - データが喪失することなく、複数回処理されるデータは無いでしょう。これは明らかに3つの中で最も強い保証です。
基本的な意味
ストリーム処理システムの中で、大まかに言うとデータ処理の中に3つのステップがあります。
-
データの受信: データはrecieverあるいは別のものを使ってソースから受信されます。
-
データの変換: 受信されたデータはDStreamおよびRDD変換を使って変換されます。
-
データの出力: 最終的な変換されたデータはファイルシステム、データベース、ダッシュボードなどのような外部システムに出力されます。
ストリーミングアプリケーションがend-to-end の確実に1回だけの保証を達成しなければならない場合、各ステップは確実に1回だけの保証を提供しなければなりません。つまり、各レコードは確実に1回受信され、確実に1回変換され、システムの下流へ確実に1回出力されなければなりません。. Spark Streamingの状況内でのこれらのステップの意味を理解しましょう。
-
データの受信: 異なる入力ソースは異なる保証を提供します。これは次のサブセクションで詳細に議論されます。
-
データの変換: 受信された全てのデータはRDDの提供する保証のおかげで確実に1回処理されます。障害があったとしても、受信した入力データがアクセス可能な限り、最終的な変換後RDDは常に同じ内容を持つでしょう。
-
Pushing out the data: Output operations by default ensure at-least once semantics because it depends on the type of output operation (idempotent, or not) and the semantics of the downstream system (supports transactions or not). しかし、ユーザはexactly-once セマンティクスを行うために独自のトランザクションの仕組みを実装することができます。これはこの章で後で詳細に議論されます。
受信されたデータのセマンティクス
異なる入力ソースは、少なくとも1回 から 確実に1回の範囲の異なる保証を提供します。もっと詳細に読む
ファイルの使用
入力データの全てが既にHDFSのような耐障害性のあるファイルシステムの中にある場合、Sparkストリーミングは常に障害から復旧でき、データの全てを処理できます。これはexactly-once セマンティクスを与え、何が失敗しようとも確実に1回全てのデータの全てが処理されることを意味します。
rerceiverベースのソースを使用
For input sources based on receivers, the fault-tolerance semantics depend on both the failure scenario and the type of receiver. 以前に議論したように、receiverには2つの種類があります:
- 信頼できるreceiver - これらのreceiverは受信したデータがリプリケートされたことを確認した後でのみ到着を知らせるでしょう。もしそのようなreceiverが故障した場合、ソースはバッファされた(リプリケートされていない)データの到着の知らせを受け取らないでしょう。従って、もしreceiverが再起動すると、ソースはデータを再送信し、データは障害により喪失されないでしょう。
- 信頼できないreceiver - そのようなreceiverは到着を送信しません。従ってワーカーあるいはドライバーの障害により失敗した場合、recieverはデータを喪失するかも知れません。
どの種類のreceiverが使われるかによって、以下のsemanticsを達成することができます。ワーカーノードが故障した場合、信頼できるreceiverによってデータは喪失しません。信頼できないreceiverを使うと、受信されたがリプリケートされていないデータは喪失するかも知れません。ドライバーノードが故障した場合、それらの喪失に加えて、受信されてメモリ内にリプリケートされた全ての過去のデータは喪失されるでしょう。. これはステートフル変換の結果に影響を与えるでしょう。
この過去の受信データの喪失を避けるために、Spark 1.2は受信データを耐障害性のあるストレージに保存する 先行書き込みログが導入されました。先行書き込みログの有効と信頼できるreceiverを使って、データの喪失がなくなります。semanticsの点では、それは少なくとも1回の保証を提供します。
以下の表は障害時のsemanticsを要約します:
デプロイ シナリオ | ワーカーの障害 | ドライバーの障害 |
---|---|---|
Spark 1.1 以前、 あるいは 先行書き込みログの無いSpark 1.2以前 |
信頼できないレシーバーを使ってバッファされたデータが損失する 信頼できるレシーバーを使ってデータの損失がゼロ 少なくとも1回のsemantics |
信頼できないレシーバーを使ってバッファされたデータが損失する 全てのレシーバーを使って過去のデータが損失する 未定義のsemantics |
先行書き込みログを持つSpark1.2以降 |
信頼できるレシーバーを使ってデータの損失がゼロ 少なくとも1回のsemantics |
信頼できるレシーバーとファイルを使ってデータの損失がゼロ 少なくとも1回のsemantics |
Kafka ダイレクト APIの使用
Spark 1.3では、新しいKafa Direct APIを導入しました。これにより全てのKafkaのデータはSparkストリーミングによって確実に1回受け取られます。これに伴い、確実に1回の出力オペレーションを実装する場合、end-to-endで確実に1回の保証を達成することができます。このやり方(Spark 2.0.0 の時点では実験的)はKafka 統合ガイドの中で更に議論されます。
出力オペレータのsemantics
(foreachRDD
のような)出力オペレーションは少なくとも1回のsemanticsを持ちます。つまり、ワーカーの障害の時に変換されたデータは外部エンティティに1回以上書き込まれます。これはファイルシステムに書き込むためにsaveAs***Files
オペレーションを使うことで条件を満たします(ファイルは単純に同じデータを使って書き込むでしょう)が、確実に1回のsemanticsを達成するには更なる努力が必要でしょう。2つのやりかたがあります。
-
等冪アップデート : 複数回の書き込みは常に同じデータを書き込みます。例えば、
saveAs***Files
は常に同じデータを生成したファイルに書き込みます。 -
トランザクショナル アップデート: 更新が正確に一度だけ自動的に行われるように、全ての更新がトランザクション的に行われます。これを行う一つの方法は以下のようになるでしょう。
- 識別子を生成するために、バッチ時間 (
foreachRDD
で利用可能です) と、RDDのパーティションインデックスを使います。この識別子はストリーミングアプリケーション内のblobデータを一意に識別します。 -
その識別子を使って外部システムをこのblobでトランザクション的に更新します (つまり、確実に一回、アトミック的に)。つまり、もし識別子がまだコミットされていなければ、パーティションデータと識別子をアトミック的にコミットします。そうでなければ、これは既にコミットされていて、更新をスキップします。
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } }
- 識別子を生成するために、バッチ時間 (
0.9.1あるいは1.x以下からの移行ガイド
Spark 0.9.1 と Spark 1.0の相田で、APIの安定性を更に確実にするために2,3のAPIの変更がありました。この章は既存のコードを1.0に移行するために必要なステップを詳しく述べます。
Input DStreams: 入力ストリーム(つまり、StreamingContext.socketStream
, FlumeUtils.createStream
など)を生成する全てのオペレータは、今はScalaでは(Dstreamの代わりに)InputDStream / ReceiverInputDStream を、Javaでは(JavaDStreamの代わりに)JavaInputDStream / JavaPairInputDStream / JavaReceiverInputDStream / JavaPairReceiverInputDStream を返します。これは入力ストリームに固有の機能を将来バイナリの互換性を壊すことなくこれらのクラスに追加することができることを保証します。既存のSparkストリーミングアプリケーションは何も変更を必要としませんが、(これらの新しいクラスはDStream/JavaDStreamのサブクラスのため)Spark 1.0での再コンパイルが必要かも知れないことに注意してください。
独自のネットワーク receiver: Spark ストリーミングのリリースから、独自のネットワークreceiverがNetowrkReceiverクラスを使ってScalaで定義できるようになりました。しかし、APIはエラーハンドリングとレポートの点で制限があり、Javaから使うことはできませんでした。Spark 1.0から、このクラスは以下の利点を持つReceiver に置き換えられました。
stop
およびrestart
のようなメソッドが、receiverのライフサイクルのよりよい制御のために追加されました。詳細は独自のreceiverガイドを見てください。- 独自のreceiverはScalaおよびJavaの両方を使って実装することができます。
既存の独自のreceiverを以前のNetworkReceiverから新しいReceiverに移行するには、以下を行う必要があります。
- 独自のreceiverクラスが
org.apache.spark.streaming.dstream.NetworkReceiver
の代わりにorg.apache.spark.streaming.receiver.Receiver
を継承するようにします。 - 以前は、BlockGeneratorオブジェクトが独自のreceiverによって生成されなければなりませんでした。受信されたデータはSparkに格納されるためにこれに追加されました。これは
onStart()
およびonStop()
メソッドから明示的に開始および停止されました。新しいReceiverクラスはstore(<data>)
という名前のSparkでデータを格納するために使うことができる一連のメソッドを追加し、これを不要にします。つまり、既存のネットワーク receiverを移行するためには、全てのBlockGeneratorオブジェクトを削除し(いずれにしてもSpark1.0ではもはや存在しません)、受信したデータにstore(...)
メソッドを使用します。
Actor-based Receivers: The Actor-based Receiver APIs have been moved to DStream Akka. 詳細はプロジェクトを参照してください。