Spark ストリーミング プログラミング ガイド

概要

Spark ストリーミングはライブデータストリームのスケーラブル、高スループット、耐障害性ストリーム処理を可能にするコアSpark APIの拡張です。データはKafka, Flume, Kinesis あるいはTCPソケットのような多くのソースから取り込まれ、map, reduce, join および window のような高レベルの機能によって表現される複雑なアルゴリズムを使って処理することができます。最後に、処理されたデータはファイルシステム、データベース、およびライブダッシュボードに出力することができます。その上、Sparkの機械学習 および グラフ処理 アルゴリズムをデータストリームに適用することができます。

Spark ストリーミング

内部的には以下のように動作します。Spark ストリーミングはライブ入力データストリームを受け取りデータをバッチに分割します。その後これはバッチ内の最終結果ストリームを生成するためにSparkエンジンによって処理されます。

Spark ストリーミング

Spark ストリーミングはdiscretized stream あるいは DStreamと呼ばれる高レベルの抽象化を提供します。これはデータの連続するストリームを表現します。DStreams は、Kafka, Flume および Kinesis のようなソースからの入力データストリーム、または他のDStream上の高レベルオペレーションを適用することによって、生成することができます。内部的には、DStreamは一続きのRDDsとして表現されます。

このガイドはDStreamを使ってSparkストリーミングプログラムを始める方法を説明します。SparkストリーミングプログラムをScala, Java あるいは Python(Spark1.2から導入されました)で書くことができます。これら全てはこのガイドで紹介されます。このガイドを通して異なる言語のコードの断片間を選択させるとっかかりを見つけることができるでしょう。

注意: Pythonでは異なる、あるいは利用できない2、3のAPIがあります。 このガイドの至るところで、これらの違いを強調するタグPython APIを見つけるでしょう。


クイック例

独自のSparkストリーミングプログラムを書く方法の詳細に入る前に、単純なSparkストリーミングプログラムがどのようなものかぱっと見てみましょう。 TCPソケットでlistenしているデータサーバから受け取ったテキストデータ内の単語の数をカウントしたいとします。しなければならないことは以下のとおりです。

まず、Sparkストリーミングクラスの名前と(DStreamのような)必要とする他のクラスに便利な幾つかのメソッドを追加するためにStreamContextから環境への明示的な変換をインポートします。StreamingContext は全てのストリーミング機能のメインのエントリーポイントです。2つの実行スレッド持つローカルのStreamingContext、および1秒間隔のバッチを生成します。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

このコンテキストを使って、ホスト名(たとえば localhost) およびポート (例えば 9999)で指定されるTCPソースからストリーミングデータを表すDStreamを生成することができます。

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

このlines DStream はデータサーバから受け取るデータのストリームを表します。このDStreamの各レコードはテキストの行です。次に、空白文字によって行を単語に分割しようと思います。

// Split each line into words
val words = lines.flatMap(_.split(" "))

flatMapはソースのDStreamの各レコードから複数の新しい単語を生成するために新しいDStreamを生成する one-to-manyの DStream オペレータです。この場合、各行は複数の単語に分割され、単語のストリームはwords DStreamとして表現されます。次に、これらの単語を数えようと思います。

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

words DStream は更に(1対1変換で) (word, 1)ペアのDStreamにマップされ、それからデータの各バッチ内の単語の頻度を取得するためにreduceされます。最後に、wordCounts.print() は各秒ごとに生成されるカウントを出力します。

これらの行が実行される場合に、Sparkストリーミングは開始時に実行しようとする計算のセットアップのみを行い、実際の処理はまだ開始されていないことに注意してください。全ての変換がセットアップされた後で処理を開始するために、最終的に以下を呼び出します。

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

完全なコードは Spark ストリーミングの例NetworkWordCountで見つけることができます。

まず、JavaStreamingContext オブジェクト、全てのストリーミング機能の主要なエントリーポイント、を生成します。2つの実行スレッド持つローカルのStreamingContext、および1秒間隔のバッチを生成します。

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

このコンテキストを使って、ホスト名(たとえば localhost) およびポート (例えば 9999)で指定されるTCPソースからストリーミングデータを表すDStreamを生成することができます。

// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

このlines DStream はデータサーバから受け取るデータのストリームを表します。このストリーム内の各レコードはテキストの行です。それから、空白で行を単語に分割しようと思います。

// Split each line into words
JavaDStream<String> words = lines.flatMap(
  new FlatMapFunction<String, String>() {
    @Override public Iterator<String> call(String x) {
      return Arrays.asList(x.split(" ")).iterator();
    }
  });

flatMapはソースのDStreamの中の各レコードから複数の新しいレコードを生成することで、新しいDStreamを生成するDStreamのオペレーションです。この場合、各行は複数の単語に分割され、単語のストリームはwords DStreamとして表現されます。FlatMapFunction オブジェクトを使って変形を定義したことに注意してください。この先気づくかも知れないように、DStreamの変形の定義を手助けするJavaAPIの便利なクラスがたくさんあります。

次に、これらの単語を数えようと思います。

// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(
  new PairFunction<String, String, Integer>() {
    @Override public Tuple2<String, Integer> call(String s) {
      return new Tuple2<>(s, 1);
    }
  });
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
  new Function2<Integer, Integer, Integer>() {
    @Override public Integer call(Integer i1, Integer i2) {
      return i1 + i2;
    }
  });

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();

words DStream はPairFunctionオブジェクトを使って、更に(1対1変換で) (word, 1)ペアのDStreamにマップされます。そして、Function2 オブジェクトを使って、データの各バッチの中の単語の頻度を取得するためにredeceされます。最後に、wordCounts.print() は各秒ごとに生成されるカウントを出力するでしょう。

これらの行が実行される場合に、Sparkストリーミングは開始後に実行しようとする計算のセットアップのみを行い、実際の処理はまだ開始されていないことに注意してください。全ての変換がセットアップされた後で処理を開始するために、最終的にstartメソッドを呼び出します。

jssc.start();              // Start the computation
jssc.awaitTermination();   // Wait for the computation to terminate

完全なコードは Spark ストリーミングの例JavaNetworkWordCountで見つけることができます。

まず、StreamingContext オブジェクト、全てのストリーミング機能の主要なエントリーポイント、を生成します。2つの実行スレッド持つローカルのStreamingContext、および1秒間隔のバッチを生成します。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

このコンテキストを使って、ホスト名(たとえば localhost) およびポート (例えば 9999)で指定されるTCPソースからストリーミングデータを表すDStreamを生成することができます。

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

このlines DStream はデータサーバから受け取るデータのストリームを表します。このDStreamの各レコードはテキストの行です。次に、空白によって行を単語に分割しようと思います。

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

flatMapはソースのDStreamの各レコードから複数の新しい単語を生成するために新しいDStreamを生成する one-to-manyの DStream オペレータです。この場合、各行は複数の単語に分割され、単語のストリームはwords DStreamとして表現されます。次に、これらの単語を数えようと思います。

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

words DStream は更に(1対1変換で) (word, 1)ペアのDStreamにマップされ、それからデータの各バッチ内の単語の頻度を取得するためにreduceされます。最後に、wordCounts.pprint() は各秒ごとに生成されるカウントを出力するでしょう。

これらの行が実行される場合に、Sparkストリーミングは開始時に実行しようとする計算のセットアップのみを行い、実際の処理はまだ開始されていないことに注意してください。全ての変換がセットアップされた後で処理を開始するために、最終的に以下を呼び出します。

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

完全なコードは Spark ストリーミングの例NetworkWordCountで見つけることができます。

既にSparkをダウンロード および ビルドした場合は、以下のようにしてこの例を実行することができます。以下のようにして最初にNetcat(ほとんどのUnix系のシステムで見つけることができる小さなユーティリティ)をデータサーバとして実行する必要があるでしょう。

$ nc -lk 9999

それから、違うターミナルで、以下のようにして例を実行することができます。

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999

それから、netcatサーバを実行しているターミナルの中で入力される全ての行がカウントされ、毎秒ごとに画面に出力されるでしょう。それは以下のように見えるでしょう。

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world



...
# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
# TERMINAL 2: RUNNING JavaNetworkWordCount

$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
# TERMINAL 2: RUNNING network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
...
-------------------------------------------
Time: 2014-10-14 15:25:21
-------------------------------------------
(hello,1)
(world,1)
...


基本概念

次に、単純な例の域を超えて、Sparkストリーミングの基本について詳しく述べます。

リンク

Sparkと似て、SparkストリーミングはMaven Centralを使って利用可能です。独自のSparkストリーミングプログラムを書くには、以下の依存性をSBTあるいはMavenプロジェクトに追加しなければならないでしょう。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.0"

Kafka, Flume および KinesisのようなSparkストリーミング コアAPIにないソースからデータを統合するには、対応するartifactspark-streaming-xyz_2.11を依存性に追加しなければならないでしょう。例えば、一般的なものとして以下のものがあります。

ソース加工物
Kafka spark-streaming-kafka-0-8_2.11
Flume spark-streaming-flume_2.11
Kinesis
spark-streaming-kinesis-asl_2.11 [Amazon Software License]

最新のリストについては、サポートされるソースとartifactの完全なリストのためにMaven repositoryを参照してください。


StreamingContextの初期化

Spark ストリーミング プログラムを初期化するためには、全てのSparkストリーミング機能の主要なエントリーポイントとなるStreamingContext オブジェクトが生成されなければなりません。

StreamingContext オブジェクトはSparkConfオブジェクトから生成することができます。

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

appName パラメータはクラスタUI上で見るアプリケーションの名前です。masterSpark, Mesos あるいは YARN クラスタの URLか、ローカルモードで動作するための特別な"local[*]"文字列です。実地では、クラスタ上で実行する場合は、プログラム内にmaster をハードコードしたくなく、spark-submitを使ってアプリケーションを起動 し、そこでそれを受け取りたいでしょう。しかし、ローカルテストおよびユニットテストのために、処理中にSparkストリーミングを実行するために"local[*]"を渡すことができます(ローカルシステム内のコアの数を検知します)。これは内部的にssc.sparkContextとしてアクセスされるSparkContext (全てのSpark機能の開始ポイント)を生成することに注意してください。

バッチの間隔は、アプリケーションのlatency要求と利用可能なクラスタのリソースに基づいて設定されなければなりません。詳細は Performance Tuning の章を見てください。

StreamingContextオブジェクトも既存のSparkContext オブジェクトから生成することができます。

import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

JavaStreamingContext オブジェクトは SparkConfオブジェクトから生成することができます。

import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration(1000));

appName パラメータはクラスタUI上で見るアプリケーションの名前です。masterSpark, Mesos あるいは YARN クラスタの URLか、ローカルモードで動作するための特別な"local[*]"文字列です。実地では、クラスタ上で実行する場合は、プログラム内にmaster をハードコードしたくなく、spark-submitを使ってアプリケーションを起動 し、そこでそれを受け取りたいでしょう。しかし、ローカルテストおよびユニットテストのために、処理中にSparkストリーミングを実行するために "local[*]" を渡すことができます。これは内部的にssc.sparkContextとしてアクセスされるJavaSparkContext (全てのSpark機能の開始ポイント)を生成することに注意してください。

バッチの間隔は、アプリケーションのlatency要求と利用可能なクラスタのリソースに基づいて設定されなければなりません。詳細は Performance Tuning の章を見てください。

JavaStreamingContext オブジェクトも既存のJavaSparkContextから生成することができます。

import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

StreamingContext オブジェクトはSparkContext オブジェクトから生成することができます。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)

appName パラメータはクラスタUI上で見るアプリケーションの名前です。masterSpark, Mesos あるいは YARN クラスタの URLか、ローカルモードで動作するための特別な"local[*]"文字列です。実地では、クラスタ上で実行する場合は、プログラム内にmaster をハードコードしたくなく、spark-submitを使ってアプリケーションを起動 し、そこでそれを受け取りたいでしょう。しかし、ローカルテストおよびユニットテストのために、処理中にSparkストリーミングを実行するために"local[*]"を渡すことができます(ローカルシステム内のコアの数を検知します)。

バッチの間隔は、アプリケーションのlatency要求と利用可能なクラスタのリソースに基づいて設定されなければなりません。詳細は Performance Tuning の章を見てください。

コンテキストが定義された後で、以下を実行する必要があります。

  1. 入力DStreamを生成することで入力ソースを定義します。
  2. DStreamへの変換と出力オペレータを適用することでストリーミングの計算を定義します。
  3. streamingContext.start()を使って、データの取得とそれの処理の開始します。
  4. streamingContext.awaitTermination()を使って処理を(手動あるいは何かのエラーにより)停止するために待ちます。
  5. 処理はstreamingContext.stop()を使って停止することができます。
覚えておくべきこと:

離散ストリーム (DStreams)

Discretized Stream あるいは DStream は、Spark ストリーミングによって提供される基本抽象です。それは、ソースから受け取られる入力データストリームあるいは入力ストリームの変換によって生成されたデータのストリームの処理されたもの、のどちらかの連続するストリームを表します。内部的には、DSteramはRDDの連続する配列によって表されます。それはSparkの不変、分散データセットの抽象概念です。(詳細は Spark プログラミング ガイド を見てください)。DStream内の各RDDは以下の図で示されるように、特定の間隔からのデータを含みます。

Spark ストリーミング

DStreamに適用されるどのようなオペレーションも、下に存在するRDD上の操作として変換されます。例えば、行から単語へのストリームの変換の以前の例では、wordsDStreamのRDDを生成するために、flatMap オペレーションがlines DStream の各RDDに適用されます。以下の図でこれが示されます。

Spark ストリーミング

これらの裏に潜むRDD変換はSparkエンジンによって計算されます。DStreamオペレータはほとんどのこれらの詳細を隠し、利便性のために高レベルのAPIを開発者に提供します。これらのオペレータの詳細は後の章で議論されます。


入力DStreamとReceiver

入力DStreamはストリーミングソースから受け取る入力データのストリームを表します。quick exampleの中で、linesはnetcatサーバから受け取るデータストリームを表していました。各入力DStream(ファイルシステムは除く。後の章で議論します)は、ソースからデータを受け取り、処理のためにSparkのメモリにそれを格納するReceiver (Scala doc, Java doc)オブジェクトと関係します。

Sparkストリーミングは組み込みのストリーミングソースの2つのカテゴリーを提供します。

この章で各カテゴリに存在するいくつかのソースについて議論しようと思います。

ストリーミングアプリケーションの中で並行して複数のデータのストリームを受け取りたい場合は、複数の入力DStreamを生成することができます(更に詳しくはパフォーマンスチューニングの章で議論されます)。これは同時に複数のデータストリームを受け取る複数のreceiverを生成するでしょう。しかし、Sparkのworker/executorは長く実行するタスクであり、従ってSparkストリーミングアプリケーションに割り当てられたコアの一つを占有することに注意してください。従って、受け取ったデータを処理、およびreceiverを実行するためには、Sparkストリーミングアプリケーションは十分なコア(あるいは、ローカルで動作する場合はスレッド)が割り当てられる必要があることを忘れないでいることが重要です。

覚えておくべきこと

基本ソース

quick exampleの中で、TCPソケット接続によって受け取ったテキストデータからDStreamを生成するssc.socketTextStream(...)を既に見てきました。ソケットは別として、StreamingContext APIは入力ソースとしてファイルからDStreamを生成するメソッドを提供します。

ソケットおよびファイルからのストリームについての詳細は、ScalaについてはStreamingContext、JavaについてはJavaStreamingContext、そしてPythonについては StreamingContext の関連する関数のAPIドキュメントを見てください。

一歩進んだソース

Python API Spark 2.0.0 では、これらのソース、Kafka, Kinesis および Flume はPython APIから利用可能です。

ソースのこのカテゴリは外部の非Sparkライブラリとの調和を必要とします。それらのいくつかと複雑な依存性を持ちます(例えば、KafkaとFlume)。従って、依存性のバージョンの衝突に関する問題を最小化するために、これらのソースからDStreamを生成する機能が別個の必要になったら明示的にリンクされるライブラリに移動されました。

これらの上級のソースはSpark シェルでは利用できません。そのためこれらの上級のソースに基づいたアプリケーションはシェル内でテストすることができません。本当にそれらをSparkシェルの中で使用したい場合は、対応するMaven artifactのJARを依存するものと一緒にダウンロードしクラスパスに追加する必要があります。

これらの幾つかの上級ソースは以下のものです。

独自のソース

Python API これはPythonではまだサポートされません。

入力 DStream も独自のデータソースから生成することができます。しなければいけないことは、独自のソースからデータを取得しそれをSparkに入れることができるユーザ定義のreceiverを実装することです (それが何であるかを理解するには次の章を見てください)。詳細は 独自の receiver のガイド

receiver の信頼性

信頼性に基づいたデータソースの2つの種類がありえます。(Kafka および Flumeのような) データの到着を知らせることができるソース。それらの信頼できる ソースからデータを受け取るシステムが受け取ったデータを正しく受け入れた場合、どのような障害によってもデータが紛失していないことを保証することができます。これは結果として2つの種類のreceiverとなります:

  1. 信頼できる Receiver - 信頼できるreceiverはデータが受信され、レプリケーションによって格納された時に、信頼できるソースに通知が正しく送られます。
  2. 信頼できない Receiver - 信頼できないreceiver はソースに通知を送信しません。これは通知をサポートしないソース、あるいは信頼できるソースの場合でも通知の複雑さに分け入りたくないか必要としない場合に、使うことができます。

信頼できるreceiverを書く方法の詳細は独自のreceiverガイドで議論されます。


DStream上の変換

RDDのそれと似ていて、変換によりデータは入力DStreamから修正することができます。DStreamは通常のSpark RDD上で利用可能な多くの変換をサポートします。以下は一般的なそれらです。

変換意味
map(func) ソースDStreamの各要素を関数 funcを使って渡すことで新しいDStreamを返します。
flatMap(func) mapに似ていますが、各入力アイテムは0以上の出力アイテムにマップすることができます。
filter(func) func がtrueを返すソースDStreamのレコードのみを選択することで新しいDStreamを返します。
repartition(numPartitions) パーティションを増減することでこのDStream内の並行レベルを変更します。
union(otherStream) ソースDStreamとotherDStreamの要素の結合を含む新しいDStreamを返します。
count() ソースDStreamの各RDD内の要素の数を数えることで一つの要素のRDDの新しいDStreamを返します。
reduce(func) (2つの引数を取り、一つを返す)func 関数を使ってソースDStreamの各RDD内の要素を集約することで、一つの要素のRDDの新しいDStreamを返します。並行して計算できるように、その関数は結合性および可換性がなければなりません。
countByValue() タイプKの要素のDStreamで呼ばれると、各キーの値がソースのDStreamの各RDDでの頻度になる(K, Long)のペアの新しいDStreamを返します。
reduceByKey(func, [numTasks]) (K, V)ペアのDStreamで呼ばれると、指定されたreduce関数を使って集約された各キーの値からなる(K, V)の新しいDStreamを返します。注意: グルーピングを行うために、デフォルトではSparkの並行タスクのデフォルトの数(ローカルモードでは2、クラスターモードでは spark.default.parallelismで決定される数)を使用します。タスクの異なる数を設定するために任意の numTasks 引数を渡すことができます。
join(otherStream, [numTasks]) (K, V) と (K, W)のペアの2つのDStreamで呼ばれると、各キーの要素の全てのペアからなる (K, (V, W)) のペアの新しいDStreamを返します。
cogroup(otherStream, [numTasks]) (K, V)と(K, W)ペアのDStreamで呼ばれると、(k, Seq[V], Seq[W]) の組の新しいDStreamを返します。
transform(func) ソースDStreamの各RDDに RDD-to-RDD関数を適用することで、新しいDStreamを返します。これはDStreamに任意のRDD操作を行うために使用することができます。
updateStateByKey(func) 指定された関数を以前のキーの状態とキーのための新しい値に適用することで更新された各キーのための状態を持つ、新しい"state"DStreamを返します。これは各キーのための任意のstateデータを維持するために使用することができます。

これらの変換のいくつかは、もっと詳細に議論する価値があります。

UpdateStateByKey オペレーション

updateStateByKeyオペレーションは新しい情報で絶え間なく更新される間任意のstateを維持することができます。コレを使うには、2つのステップを行う必要があるでしょう。

  1. stateを定義する - stateは任意のデータタイプでも構いません。
  2. stateの更新関数を定義する - 入力ストリームから以前のstateと新しい値を使ってstateをどうやって更新するかの関数を使って指定します。

各バッチにおいて、バッチ内に新しいデータがあるかどうかに関係なく、Sparkは全ての既存のキーに対して状態を更新する関数を提供するでしょう。更新関数がNone を返すと、キー-値ペアは削除されるでしょう。

これを例を使って説明しましょう。テキストデータストリームの中で各単語のカウントを実行し続けたいとします。ここで、カウントの実行はstateで、それは整数です。更新関数を以下のように定義します:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

これが単語を含むDStreamに適用されます。(つまり、pairs DStreamは以前の例(word, 1) ペアを含みます)。

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

((word, 1)ペアから)1秒の順列を持つnewValuesと、以前のカウントを持つrunningCount を使って、各単語ごとに update 関数が呼ばれるでしょう。

Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
  new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
    @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
      Integer newSum = ...  // add the new values with the previous running count to get the new count
      return Optional.of(newSum);
    }
  };

これが単語を含むDStreamに適用されます。(つまり、pairs DStreamはquick example(word, 1) ペアを含みます)。

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);

((word, 1)ペアから)1秒の順列を持つnewValuesと、以前のカウントを持つrunningCount を使って、各単語ごとに update 関数が呼ばれるでしょう。完全なJavaコードについては、JavaStatefulNetworkWordCount.scalaの例を見てください。

def updateFunction(newValues, runningCount):
    if runningCount is None:
       runningCount = 0
    return sum(newValues, runningCount)  # add the new values with the previous running count to get the new count

これが単語を含むDStreamに適用されます。(つまり、pairs DStreamは以前の例(word, 1) ペアを含みます)。

runningCounts = pairs.updateStateByKey(updateFunction)

((word, 1)ペアから)1秒の順列を持つnewValuesと、以前のカウントを持つrunningCount を使って、各単語ごとに update 関数が呼ばれるでしょう。完全なPythonコードについては、stateful_network_wordcount.pyの例を見てください。

updateStateByKeyの使用は設定するためにチェックポイントディレクトリを必要とすることに注意してください。これはチェックポイント の章で詳細に議論されます。

変換オペレーション

(transformWithのような変形物も一緒に)変換 オペレーションにより、任意のRDD-to-RDD関数はDStreamに適用することができます。DStream APIに公開されていない任意のRDDオペレーションを適用するために使用することができます。例えば、データストリーム中の各バッチに他のデータセットをjoinする機能はDStream API には直接公開されていません。しかし、これを行うためにtransform を簡単に使うことができます。これにより将来性がとても強くなります。例えば、入力データストリームをあらかじめ計算されたspam情報(これもSparkで生成されるかも知れません)とjoinし、それに基づいてフィルタすることで、リアルタイムのデータの整理を行うことができます。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
})
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
  new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
    @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
      rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
      ...
    }
  });
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information

# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))

提供された関数は各バッチの間隔ごとに呼ばれることに注意してください。これにより時間で変動するRDDオペレーションを行うことができます。つまり、RDDオペレーション、パーティションの数、ブロードキャスト変数などがバッチ間で変わるかも知れません。

ウィンドウ オペレーション

Spark ストリーミングはウィンドウ オペレーションも提供します。これによりデータのスライドするウィンドウに変換を適用することができます。以下の図はこのスライドするウィンドウを図示しています。

Spark ストリーミング

図で示されるように、ウィンドウがソースのDStream上をスライドするたびに、ウィンドウ内のDStreamのRDDを生成するために、ウィンドウ内のソースRDDは結合され操作されます。この特定の場合において、操作は最後の3回のデータの単位に対して適用され、2回の単位に対してスライドします。この事はどのウィンドウオペレーションも2つのパラメータを指定しなければならないことを示します。

これら2つのパラメータはソースDStreamのバッチ間隔の倍数でなければなりません(図での1)。

例を使ってウィンドウオペレーションを説明しましょう。10秒ごとに直近30秒のデータの単語のカウントを生成することで前の例を拡張したいとします。これをするには直近30秒のデータに (word, 1)ペアのpairs DStreamに reduceByKeyオペレーションを適用する必要があります。reduceByKeyAndWindowオペレーションを使うことでこれをすることができます。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
// Reduce function adding two integers, defined separately for clarity
Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() {
  @Override public Integer call(Integer i1, Integer i2) {
    return i1 + i2;
  }
};

// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(10));
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)

以下は一般的なウィンドウ オペレーションのいくつかです。これらすべてのオペレーションは上に述べたように2つのパラメータ - windowLengthslideInterval を取ります。

変換意味
window(windowLength, slideInterval) ソースのDStreamのウィンドウで切り取られたバッチに基づいて計算された新しいDStreamを返します。
countByWindow(windowLength, slideInterval) ストリーム内の要素のスライドするウィンドウによって切り取られた数を返します。
reduceByWindow(func, windowLength, slideInterval) funcを使ってストリーム内のスライドの間隔で要素を集約することで生成された、新しい一つの要素のストリームを返します。並行して正確に計算できるように、その関数は結合性および可換性がなければなりません。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) (K, V)ペアのDStreamで呼ばれると、指定されたreduce関数funcをスライドするウィンドウ内でバッチに使って集約された各キーの値からなる(K, V)の新しいDStreamを返します。注意: グルーピングを行うために、デフォルトではSparkの並行タスクのデフォルトの数(ローカルモードでは2、クラスターモードでは spark.default.parallelismで決定される数)を使用します。タスクの異なる数を設定するために任意の numTasks 引数を渡すことができます。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

以前のウィンドウのreduce値を使って各ウィンドウのreduce値が逐次計算される、上のreduceByKeyAndWindow()のもっと効率的なバージョンです。これはスライドウィンドウに入った新しいデータをreduceすることで行われ、古いデータを"逆reduce"することでウィンドウを離れます。例としては、ウィンドウスライドとしてキーのカウントの"足し算"および"引き算"があります。しかし、"逆reduce可能な関数"、つまり対応する"逆reduce"関数(パラメータとして invFuncをとる)を持つreduce関数のみに適用可能です。reduceByKeyAndWindowのように、任意の引数を使ってreduceタスクの数が設定可能です。このオペレーションを使用するにはチェックポイントが有効になっていなければならないことに注意してください。

countByValueAndWindow(windowLength, slideInterval, [numTasks]) (K, V) ペアのDStreamで呼ばれると、各キーの値がスライディングウィンドウ内での頻度になる(K, Long)のペアの新しいDStreamを返します。reduceByKeyAndWindowのように、任意の引数を使ってreduceタスクの数が設定可能です。

join オペレーション

最後に、Sparkストリーミングで異なる種類のjoinをどれだけ簡単に行うことができるかについて、強調する価値があります。

Stream-stream joins

ストリームは他のストリームととても簡単にjoinすることができます。

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)

ここで、各バッチの間隔の中で、stream1で生成されたRDDはstream2で生成されたRDDとjoinされるでしょう。leftOuterJoin, rightOuterJoin, fullOuterJoinを行うこともできます。更に、ストリームのウィンドウ上でjoinすることもしばしば有用です。それも同じくとても簡単です。

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);
windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
Stream-dataset joins

これは DStream.transform オペレーションの説明の中で既に以前に説明されました。これは、ウィンドウで切り取られたストリームとデータセットのjoinの更に別の例です

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(
    new Function<JavaRDD<Tuple2<String, String>>, JavaRDD<Tuple2<String, String>>>() {
        @Override 
        public JavaRDD<Tuple2<String, String>> call(JavaRDD<Tuple2<String, String>> rdd) {
            return rdd.join(dataset);
        }
    }
);
dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))

実際は、joinしたいデータセットの変更を動的に行うこともできます。transformを提供する関数が各バッチの間隔ごとに評価され、従ってdataset リファレンスが示す現在のデータセットを使用するでしょう。

DStream変換の完全なリストはAPIドキュメントの中で利用可能です。Scala APIについては、DStreamPairDStreamFunctionsを見てください。Java APIについては、JavaDStreamJavaPairDStream を見てください。Python API については、DStream を見てください。


DStream上の出力オペレーション

出力オペレーションによってDStreamのデータはデータベースあるいはファイルシステムのような外部システムに出力することができます。出力オペレーションによって実際に変換されたデータが外部システムによって使うことができるため、出力オペレーションは全てのDStream変換の実際の実行を引き起こします (RDDでのアクションに似ています)。現在のところ、以下の出力オペレーションが定義されています:

出力オペレーション意味
print() ストリーミングアプリケーションを実行しているドライバノード上のDStreamの中のデータの各バッチの最初の10個要素を出力します。これは開発とデバッグの時に便利です。
Python API Python APIの中で pprint() が呼ばれます。
saveAsTextFiles(prefix, [suffix]) このDStreamの内容をテキストとして保存します。各バッチの間隔におけるファイル名は、prefix および suffixに基づいて生成されます : "prefix-TIME_IN_MS[.suffix]".
saveAsObjectFiles(prefix, [suffix]) このDStreamの内容をシリアライズ化されたJavaオブジェクトのSequenceFilesとして保存します。各バッチの間隔におけるファイル名は、prefix および suffixに基づいて生成されます : "prefix-TIME_IN_MS[.suffix]".
Python API これはPython APIの中で利用できません。
saveAsHadoopFiles(prefix, [suffix]) このDStreamの内容をHadoopファイルとして保存します。各バッチの間隔におけるファイル名は、prefix および suffixに基づいて生成されます : "prefix-TIME_IN_MS[.suffix]".
Python API これはPython APIの中で利用できません。
foreachRDD(func) 関数 func をストリームから生成された各RDDへ適用する、最も一般的な出力オペレータ。この関数は各RDD内のデータを、RDDをファイルへ保存、あるいはネットワーク越しにデータベースに書き込むような外部システムに送信します。関数 func はストリーミングアプリケーションを実行しているドライバプログラムの中で実行され、通常はストリーミングRDDの計算を強制するRDDアクションを持つだろうということに注意してください。

foreachRDDの使用のデザインパターン

dstream.foreachRDDはデータを外部システムに送信することができる強力なprimitiveです。しかし、このprimitiveを正しくそして効果的に使う方法を理解することが重要です。以下は避けるべきよくある間違いです。

外部システムにデータを書き込むのに接続オブジェクト(例えば、リモートサーバへのTCP接続)の生成とそれを使ってデータをリモートシステムに送信することが必要になることが度々あります。この目的のため、開発者はSparkドライバに接続オブジェクトを不注意にも生成しようとし、RDDにレコードを保存するためにSparkワーカーの中で使用しようとするかも知れません。例えば (Scalaでは)、

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}
def sendRecord(rdd):
    connection = createNewConnection()  # executed at the driver
    rdd.foreach(lambda record: connection.send(record))
    connection.close()

dstream.foreachRDD(sendRecord)

これは接続オブジェクトがシリアライズされ、ドライバーからワーカーに送信されることを必要とするため、正しくありません。このような接続オブジェクトはマシーン間でめったに転送されません。このエラーはシリアライズエラー(接続オブジェクトがシリアライズ可能ではない)、初期化エラー(接続オブジェクトはワーカーで初期化される必要がある)など、として明らかにされるかも知れません。正しい解決方法はワーカーで接続オブジェクトを生成することです。

しかし、これは他のよくある間違いに繋がります - 各レコードごとに新しい接続を生成する。例えば:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}
def sendRecord(record):
    connection = createNewConnection()
    connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))

一般的に、接続オブジェクトを生成することは、時間とリソースのオーバーヘッドがあります。従って、各レコードごとに接続オブジェクトを生成および破壊することは、不必要な高負荷を起こし、システムの全体のスループットをかなり下げることになりえます。より良い方法は rdd.foreachPartition を使うことです - 一つの接続オブジェクトを生成し、その接続を使ってRDD内の全てのレコードを送信します。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}
def sendPartition(iter):
    connection = createNewConnection()
    for record in iter:
        connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

これは多くのレコード上の接続生成のオーバーヘッドを償却します。

結果的に、これは複数のRDD/バッチに渡って接続オブジェクトを再利用することでより最適化されるかも知れません。One can maintain a static pool of connection objects than can be reused as RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}
def sendPartition(iter):
    # ConnectionPool is a static, lazily initialized pool of connections
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # return to the pool for future reuse
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

プール内の接続は要求に応じてlazilyに生成され、しばらくの間使われなければタイムアウトしなければなりません。これは外部のシステムへのデータの送信を最も効率よく行います。

覚えておくべき他のポイント:

Accumulators および Broadcast 変数

Accumulators および Broadcast variables はSparkストリーミングのチェックポイントから回復することができません。チェックポイントを有効にし、またAccumulators あるいは Broadcast variables を使う場合、障害時にドライバーが再起動した後で再インスタンス化できるように、AccumulatorsBroadcast variables のための穏やかにインスタンス化されたシングルトン インスタンスを生成しなければならないでしょう。以下の例で説明されます。

object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}

object DroppedWordsCounter {

  @volatile private var instance: LongAccumulator = null

  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}

wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
  // Get or register the blacklist Broadcast
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use blacklist to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter.add(count)
      false
    } else {
      true
    }
  }.collect().mkString("[", ", ", "]")
  val output = "Counts at time " + time + " " + counts
})

完全なソースコードを見てください。

class JavaWordBlacklist {

  private static volatile Broadcast<List<String>> instance = null;

  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaWordBlacklist.class) {
        if (instance == null) {
          List<String> wordBlacklist = Arrays.asList("a", "b", "c");
          instance = jsc.broadcast(wordBlacklist);
        }
      }
    }
    return instance;
  }
}

class JavaDroppedWordsCounter {

  private static volatile LongAccumulator instance = null;

  public static LongAccumulator getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaDroppedWordsCounter.class) {
        if (instance == null) {
          instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
        }
      }
    }
    return instance;
  }
}

wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
  @Override
  public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
    // Get or register the blacklist Broadcast
    final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
    // Get or register the droppedWordsCounter Accumulator
    final LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
    // Use blacklist to drop words and use droppedWordsCounter to count them
    String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
      @Override
      public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
        if (blacklist.value().contains(wordCount._1())) {
          droppedWordsCounter.add(wordCount._2());
          return false;
        } else {
          return true;
        }
      }
    }).collect().toString();
    String output = "Counts at time " + time + " " + counts;
  }
}

完全なソースコードを見てください。

def getWordBlacklist(sparkContext):
    if ('wordBlacklist' not in globals()):
        globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
    return globals()['wordBlacklist']

def getDroppedWordsCounter(sparkContext):
    if ('droppedWordsCounter' not in globals()):
        globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
    return globals()['droppedWordsCounter']

def echo(time, rdd):
    # Get or register the blacklist Broadcast
    blacklist = getWordBlacklist(rdd.context)
    # Get or register the droppedWordsCounter Accumulator
    droppedWordsCounter = getDroppedWordsCounter(rdd.context)

    # Use blacklist to drop words and use droppedWordsCounter to count them
    def filterFunc(wordCount):
        if wordCount[0] in blacklist.value:
            droppedWordsCounter.add(wordCount[1])
            False
        else:
            True

    counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())

wordCounts.foreachRDD(echo)

完全なソースコードを見てください。


DataFrame および SQL オペレーション

ストリーミングデータ上でDataFrames および SQLオペレーションを簡単に使用することができます。StreamingContextが使用しているSparkContextを使って、SparkSessionを生成しなければなりません。更にコレはドライバの故障時に再起動することができるようにしなければなりません。これは怠惰なインスタンス化されたSparkSessionのシングルトン インスタンスの生成によって行われなければなりません。以下の例で説明されます。DataFrameとSQLを使ったワードカウントを生成するために、以前の ワード カウントの例を修正します。各RDDは一時テーブルとして登録され、SQLを使ってクエリを実行して、データフレームに変換されます。

/** DataFrame operations inside your streaming program */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame = 
    spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

完全なソースコードを見てください。

/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
  private String word;

  public String getWord() {
    return word;
  }

  public void setWord(String word) {
    this.word = word;
  }
}

...

/** DataFrame operations inside your streaming program */

JavaDStream<String> words = ... 

words.foreachRDD(
  new Function2<JavaRDD<String>, Time, Void>() {
    @Override
    public Void call(JavaRDD<String> rdd, Time time) {

      // Get the singleton instance of SparkSession
      SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();

      // Convert RDD[String] to RDD[case class] to DataFrame
      JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
        public JavaRow call(String word) {
          JavaRow record = new JavaRow();
          record.setWord(word);
          return record;
        }
      });
      DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);

      // Creates a temporary view using the DataFrame
      wordsDataFrame.createOrReplaceTempView("words");

      // Do word count on table using SQL and print it
      DataFrame wordCountsDataFrame =
          spark.sql("select word, count(*) as total from words group by word");
      wordCountsDataFrame.show();
      return null;
    }
  }
);

完全なソースコードを見てください。

# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=sparkConf)\
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']

...

# DataFrame operations inside your streaming program

words = ... # DStream of strings

def process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        # Get the singleton instance of SparkSession
        spark = getSparkSessionInstance(rdd.context.getConf())

        # Convert RDD[String] to RDD[Row] to DataFrame
        rowRdd = rdd.map(lambda w: Row(word=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)

        # Creates a temporary view using the DataFrame
        wordsDataFrame.createOrReplaceTempView("words")

        # Do word count on table using SQL and print it
        wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
        wordCountsDataFrame.show()
    except:
        pass

words.foreachRDD(process)

完全なソースコードを見てください。

異なるスレッドからのストリーミングデータで定義されたテーブルにSQLクエリを実行することもできます(つまり、StreamingContextの実行の非同期)。単に、クエリが実行できるだけの十分なストリーミングデータをStreamingContextに設定するようにしてください。そうでなければ、非同期SQLクエリに気づかないStreamingContextはクエリが完了する前に古いストリーミングデータを削除するでしょう。例えば、もし最後のバッチにクエリしたいが、クエリの実行に5分かかるかも知れない場合は、streamingContext.remember(Minutes(5))を呼びます。(Scala、あるいは他の言語での同等のもの)。

データストリームについてもっと知りたい場合は、データフレームとSQLガイドを見てください。


MLlib オペレーション

MLlibによって提供される機械学習アルゴリズムも簡単に使うことができます。まず最初に、同時にストリーミングから学習できストリーミングにモデルを適用する、ストリーミング機械学習アルゴリズム(例えば、ストリーミング線形回帰, ストリーミングK平均法など)があります。これらより優れた、もっと大きな機械学習アルゴリズムのクラスについては、オフライン(つまり過去データ)で学習モデルを学習し、それからオンラインでストリーミングデータ上にそのモデルを適用します。詳細はMLlib ガイドを見てください。


キャッシング / 永続化

RDDと似て、DStreamは開発者がメモリ内にストリームのデータを永続化することもできます。つまり、DStream上のpersist()メソッドの使用は自動的にメモリ内のDStreamの各RDDを永続化するでしょう。DStream内のデータが複数回計算される場合には有用でしょう(例えば、同じデータに複数のオペレーション)。reduceByWindowreduceByKeyAndWindow、および updateStateByKeyのような状態に基づくオペレーションに関して、これは暗黙のうちにtrueです。従ってウィンドウに基づくオペレーションによって生成されたDStreamは、開発者のpersist()の呼び出し無しに自動的にメモリ内に永続化されます。

(Kafka, Flume, ソケットなどのような)ネットワーク越しにデータを受け取る入力ストリームについては、耐障害性のために2つのノードにデータをリプリケートするようにデフォルトの永続性レベルが設定されます。

RDDと違い、DStreamのデフォルトの永続性レベルはメモリ内にデータをシリアライズ化します。これはパフォーマンスチューニングの章で更に議論されます。異なる永続化レベルの更なる情報はSpark プログラミングガイドの中で見つけることができます。


チェックポイント

ストリーミングアプリケーションは24/7で運用しなければならず、従ってアプリケーションロジックに関係の無い障害(例えば、システム障害、JVMクラッシュなど)には弾力性がなければなりません。これを可能にするために、Sparkストリーミングは障害から復帰できるように耐障害性のあるストレージシステムについての十分な情報のチェックポイントを必要とします。チェックポイントとされるデータには2つの種類があります。

要約すると、もしストートフル変換が使われる場合はデータあるいはRDDチェックポイントは基本的な機能に必要とされるのに対し、メタデータのチェックポイントはドライバー障害からの復帰に主に必要とされます。

チェックポイントを有効にする場合とは

チェックポイントは以下の要求のいずれかが必要なプリケーションで有効にされるべきです:

前述のステートフル変換が無い単純なストリーミングアプリケーションは、チェックポイント無しに実行することができることに注意してください。ドライバー障害からの復帰もそのような場合は部分的になるでしょう(受け取ったが処理されていないデータは喪失されるでしょう)。これはしばしば許容され、多くのSparkストリーミングアプリケーションはこのようにされています。将来には、非Hadoop環境へのサポートがこれを改善すると期待されています。

チェックポイントの設定方法

チェックポイントは耐障害性、信頼できるファイルシステム(例えば、HDFS, S3など)のディレクトリをチェックポイント情報が保存される場所として設定することで有効にされます。これはstreamingContext.checkpoint(checkpointDirectory)の使用によって行われます。これにより前述したステートフル変換を行うことができるでしょう。更に、ドライバーの障害からアプリケーションをリカバーしたい場合は、ストリーミングアプリケーションを以下の挙動を持つように書き直す必要があります。

この挙動はStreamingContext.getOrCreateを使うことで簡単になります。以下のように使われます。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

checkpointDirectoryが存在すると、チェックポイントデータからコンテキストが再生成されます。ディレクトリが存在しない場合(つまり、初めて実行)、新しいコンテキストを生成およびDStreamをセットアップするために関数functionToCreateContextが呼ばれるでしょう。Scalaの例 RecoverableNetworkWordCount を見てください。この例はネットワークデータのワードのカウントをファイルに追記します。

この挙動はJavaStreamingContext.getOrCreateを使うことで簡単になります。以下のように使われます。

// Create a factory object that can create and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
  @Override public JavaStreamingContext create() {
    JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
    JavaDStream<String> lines = jssc.socketTextStream(...);     // create DStreams
    ...
    jssc.checkpoint(checkpointDirectory);                       // set checkpoint directory
    return jssc;
  }
};

// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start();
context.awaitTermination();

checkpointDirectoryが存在すると、チェックポイントデータからコンテキストが再生成されます。ディレクトリが存在しない場合(つまり、初めて実行)、新しいコンテキストを生成およびDStreamをセットアップするために関数contextFactoryが呼ばれるでしょう。Javaの例 JavaRecoverableNetworkWordCount を見ます。この例はネットワークデータのワードのカウントをファイルに追記します。

この挙動はStreamingContext.getOrCreateを使うことで簡単になります。以下のように使われます。

# Function to create and setup a new StreamingContext
def functionToCreateContext():
    sc = SparkContext(...)   # new context
    ssc = new StreamingContext(...)
    lines = ssc.socketTextStream(...) # create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   # set checkpoint directory
    return ssc

# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...

# Start the context
context.start()
context.awaitTermination()

checkpointDirectoryが存在すると、チェックポイントデータからコンテキストが再生成されます。ディレクトリが存在しない場合(つまり、初めて実行)、新しいコンテキストを生成およびDStreamをセットアップするために関数functionToCreateContextが呼ばれるでしょう。Pythonの例 recoverable_network_wordcount.py を見てください。この例はネットワークデータのワードのカウントをファイルに追記します。

StreamingContext.getOrCreate(checkpointDirectory, None)を使って、チェックポイントデータから StreamingContextを明示的に生成し、計算を開始することもできます。

getOrCreateの使用に加えて、障害時に自動的にドライバープロセスを再開するようにする必要があります。これはアプリケーションを実行するために使われるデプロイメント インフラによってのみ行うことができます。これはデプロイメント の章で更に議論されます。

RDDのチェックポイントは信頼できるストレージへの保存のコストがあることに注意してください。RDDのチェックポイントが作られるバッチの処理時間を増やすかも知れません。従って、チェックポイントの間隔は注意深く設定される必要があります。小さなバッチサイズ(1秒とします)において、各バッチのチェックポイントはオペレーションのスループットを著しく下げるかも知れません。逆に、あまりに頻度が少ないチェックポイントは、系統およびタスクサイズの増大を起こし、有害な結果をもたらすかも知れません。RDDのチェックポイントを必要とするステートフルな変換については、デフォルトの間隔は少なくとも10秒のバッチの倍数です。dstream.checkpoint(checkpointInterval)を使って設定することができます。一般的に、DStreamのスライドする間隔の5 - 10倍のチェックポイントの間隔が試してみるのに良い設定です。


アプリケーションのデプロイ

この章では、Sparkストリーミングアプリケーションをデプロイするステップについて議論します。

必要条件

Sparkストリーミングアプリケーションを実行するには、以下のものを持つ必要があります。

アプリケーションコードのアップグレード

実行中のSparkストリーミングアプリケーションが新しいアプリケーションコードにアップグレードされなければならない場合、二つの仕組みがあります。


アプリケーションの監視

Sparkの 監視能力を超えて、Sparkストリーミングに固有の追加の能力があります。StreamingContextが使われる場合、Spark web UI は実行中のreceiverに関する統計(receiverがアクティブかどうか、受信したレコードの数、receiverのエラーなど)、および完了したバッチ(バッチの処理時間、キューの遅延など)を表示する Streamingタブを表示します。これはストリーミングアプリケーションの進捗を監視するために使用することができます。

web UI内の以下の2つのマトリックスは特に重要です:

バッチの処理時間がバッチの間隔よりも一貫して多い、かつ/あるいは、キューの遅延が増加し続けている場合は、バッチが生成されるのが早く、遅延しているために、システムはバッチの処理をすることができないことを示します。この場合、バッチの処理時間をreducing することを考慮します。

Sparkストリーミング アプリケーションの進捗は、StreamingListener インターフェースを使って監視することもできます。これによりreceiverのステータスと処理時間を取得することができます。これは開発者のAPIであり、将来おそらく改良(つまり、もっと情報が報告されます)されるだろうことに注意してください。



パフォーマンス チューニング

クラスタ上でSparkストリーミングアプリケーションから最高のパフォーマンスを得るには、ちょっとした調整が必要です。この章では、アプリケーションのパフォーマンスを改善するために調整可能な幾つかのパラメータと設定について説明します。高次元では、以下の2つのことを考慮する必要があります:

  1. クラスタのリソースを効率的に使うことでデータの各バッチの処理時間を削減する。

  2. データのバッチを受け取ったらできるだけ早く処理できるような正しいデータのバッチサイズに設定する。

バッチ処理時間の削減

Sparkで各バッチの処理時間を最小にするための行える多くの最適化があります。詳細はチューニングガイドの中で議論されます。この章では最も重要な幾つかについて際立たせます。

データ受信時の並行レベル

(Kafka, Flume, ソケットなどの)ネットワーク越しのデータの受信はデータがデシリアライズされSparkに格納されることを必要とします。データの受信がシステム内でボトルネックになった場合は、データの受信の並行化を考慮します。各入力DStreamは単一の一つのデータのストリームを受信する(ワーカーマシーン上で動作する)receiverを生成することに注意してください。従って複数データストリームの受信は複数の入力DStreamの生成およびそれらがソースからデータのストリームの異なるパーティションを受け取るように設定することで、達成されます。例えば、データの2つのトピックを受け取る一つのKafka入力DStreamは、それぞれは一つのトピックを受け取る2つのKafka入力ストリームに分割することができます。これは並行してデータを受け取ることができる二つのreceiverを実行し、従って全体のスループットを増加させます。これらの複数のDstreamは一つのDStreamを生成するために一つに結合することができます。そして、一つの入力DStreamに適用される変換は、統一されたストリームに適用することができます。以下のようにして行います。

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
numStreams = 5
kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()

考慮しなければならない他のパラメータは、receiverのブロッキング間隔です。これはconfiguration parameter spark.streaming.blockIntervalによって決定されます。ほとんどのreceiverにとって、受け取ったデータはSparkのメモリ内に格納される前にデータのブロックの中に一緒に合体されます。各バッチ内のブロックの数はmapのような変換の中で受け取ったデータの処理に使われるだろうタスクの数を決定します。バッチあたりのreceiverごとのタスクの数はおよそ (batch間隔 / ブロック間隔) です。例えば、200msのブロック間隔は2秒のバッチごとに10のタスクを生成するでしょう。タスクの数があまりに低い(つまり、マシーンあたりのコアの数より少ない)場合、全ての利用可能なコアがデータを処理するために使われないことになるので非効率です。指定されたバッチの間隔でタスクの数を増やすには、ブロック間隔を減らします。しかし、推奨されるブロックの間隔の最小値は約50msで、それ以下ではタスクを起動するオーバーヘッドが問題になるかも知れません。

複数の入力ストリーム/receiver を使ってデータを受け取る代替方法は、入力データストリームを(inputStream.repartition(<number of partitions>)を使って)明示的にパーティションすることです。 これはさらに処理をする前にクラスタ内で指定された数のマシーンにわたって受け取ったデータのバッチを分散します。

データ処理での並行レベル

計算の全てのステージ内で使用される並行タスクの数が十分多くない場合、クラスターのリソースは十分使われません。例えば、reduceByKey および reduceByKeyAndWindowのような分散reduceオペレーションのために、デフォルトの並行タスクはspark.default.parallelism configuration propertyによって制御されます。並行レベルを引数として渡す(PairDStreamFunctions ドキュメントを見てください)documentation)ことができます。あるいはデフォルトを変更するためにspark.default.parallelism configuration property を設定することができます。

データのシリアライズ

データのシリアライズのオーバーヘッドは、シリアライズのフォーマットを調整することで減らすことができます。ストリーミングの場合は、シリアライズには2つのデータのタイプがあります。

どちらの場合でも、Kryoシリアライズを使うことでCPUおよびメモリのオーバーヘッドを減らすことができます。詳細はSpark チューニングガイド を見てください。Kryoに関しては、独自クラスの登録およびオブジェクトのリファレンスの追跡の無効化を考慮してください (設定ガイドのKryoに関係する設定を見てください)。

ストリーミングアプリケーションのために維持する必要があるデータの量が多く無い特定の場合において、過度なGCのオーバーヘッド無しにデシリアライズされたオブジェクトとしてデータ(タイプも)残すことが実現可能かも知れません。例えば、バッチ間隔が数秒で、ウィンドウオペレーションが無い場合、明示的にストレージレベルを相応に設定することで永続化データのシリアライズの無効化を試すことができます。これはシリアライズによるCPUのオーバーヘッドを減らし、あまりGCのオーバーヘッド無しにパフォーマンスを改善するかもしれません。

タスクの起動のオーバーヘッド

秒間あたりのタスクの数が高い場合(例えば秒間あたり50以上)、タスクをスレーブにスレーブに送信するオーバーヘッドが大きくなり、1秒以内のレイテンシを達成するのが難しくなるでしょう。そのオーバーヘッドは以下の変更で少なくすることができます:

これらの変更はバッチの処理時間を100ミリ秒の単位で削減するでしょう。このように秒未満のバッチサイズが実現可能になります。


正しいバッチの間隔の設定

クラスタ上で動作するSparkストリーミングアプリケーションが安定するために、システムはデータを受け取るとできるだけ早くデータを処理できなければなりません。言い換えると、生成されたデータのバッチはできるだけ早く処理されなければなりません。これがアプリケーションにとって真かどうかはストリーミングweb UIの処理時間の監視 によって知ることができます。バッチの処理時間はバッチの間隔よりも少なくなければなりません。

ストリーミング計算の性質に依存して、使用されるバッチの間隔はクラスタのリソースの固定セット上のアプリケーションによって継続されるデータのレートに大きな影響を与えるかも知れません。例えば、以前の WordCountNetwork の例を考えてみましょう。特定のデータレートにおいて、システムは2秒ごとにワードカウントをレポートし続けることが可能かも知れません(つまり、バッチの間隔は2秒)が、各500ミリ秒ごとではありません。つまりバッチの間隔は、プロダクションで維持できるそのようなデータレートに設定される必要があります。

アプリケーションのための正しいバッチサイズを見つけ出す良い方法は、保守的なバッチ間隔(つまり、5-10秒)と低いデータレートを使ってそれをテストすることです。システムがそのデータレートで維持できるかどうかを検証するために、処理された各バッチによって実験された end-to-endの遅延の値をチェックすることができます(Sparkドライバのlog4jのログの "Total delay"、あるいは StreamingListener インタフェースのどちらかを使って調べます)。遅延がバッチサイズと同程度に維持される場合は、システムは安定しています。そうでなければ、もし遅延が常に増加する場合、それはシステムが遅れないで付いていくことができず、不安定であることを意味します。一度安定した設定の見解が見つかれば、データレートの増加 および/あるいは バッチサイズの削減を試すことができます。一時的なデータレートの増加による瞬間的な遅延の増加は、遅延が低い値(つまり、バッチサイズ未満)に戻る限りは、それで良いことに注意してください。


メモリー チューニング

メモリの使用の調整とSparkアプリケーションの挙動は、チューニングガイドの中でとても詳細に議論されています。それを読むことを強くお勧めします。この章では、Sparkストリーミングアプリケーションのコンテキストの中での2,3のパラメータを具体的に議論します。

Sparkストリーミングアプリケーションによって必要とされるクラスタの総メモリは、使用される変換の種類にとても依存します。例えば、もし直近の10秒のデータへのウィンドウ操作を使用したい場合、クラスタはメモリ内に10秒分のデータを保持するのに十分なメモリを持つ必要があります。あるいは、多くのキーを使ってupdateStateByKeyを使いたい場合、必要なメモリは多くなるでしょう。逆に、単純な map-filter-storeオペレーションを行いたい場合は、必要なメモリは少なくなるでしょう。

一般的に、receiverを使って受信されるデータはStorageLevel.MEMORY_AND_DISK_SER_2を使って格納されるため、メモリに入りきらないデータはディスクにこぼれ出るでしょう。これはストリーミングアプリケーションのパフォーマンスを下げるかも知れないため、ストリーミングアプリケーションに必要とされる十分なメモリを提供するように勧められます。小さなサイズでメモリの使用量を試してみて、それに応じて推測するのが一番良いです。

メモリチューニングの他の視点に、ガベージコレクションがあります。低いレイテンシを必要とするストリーミングアプリケーションのために、JVMガベージコレクションによって引き起こされる大きな中断があることは望ましいことではありません。

メモリ利用料とGCオーバーヘッドを調整するのに役に立つかも知れない2,3のパラメータがあります。


覚えておくべき重要な点:


耐障害性semantics

この章では、障害時のSparkストリーミングアプリケーションの挙動について議論するつもりです。

背景

Spark Streamingによって提供される意味を理解するために、SparkのRDDの基本的な耐障害性の意味を思い出してみましょう。

  1. RDDは不変、決定論的に再計算可能、分散されたデータセットです。各RDDは耐障害性のある入力データセット上でそれを作るために使われた決定的な操作の系図を記憶します。
  2. RDDのいずれかのパーティションがワーカーノードの障害により喪失した場合、そのパーティションはオペレーションの系譜を使って元の対障害性のデータセットから再計算することができます。
  3. 全てのRDDの変換が決定的だと仮定した場合、最終的に変換されたRDD内のデータはSparkクラスター内の障害に関係なく常に同じものになるでしょう。

SparkはHDFSあるいはS3のような耐障害性のあるファイルシステム内で操作されます。従って、耐障害性のあるデータから生成された全てのRDDも耐障害性があります。しかしSparkストリーミングの場合はほとんどの場合において(fileStreamが使われる場合を除いて)ネットワークを超えてデータを受け取るため、そうではありません。全ての生成されたRDDが同じ耐障害性の特性を達成するために、受け取ったデータはクラスタ内のワーカーノードで複数のSpark executor間でリプリケートされます(デフォルトのリプリケーションの要素は2です)。これにより障害発生時に復元されなければならないシステム内のデータは二種類になります。

  1. 受信およびリプリケートされたデータ - このデータは、他のノードの一つの上でデータのコピーとして、一つのワーカーノードの障害を生き延びます。
  2. 受信されたがリプリケーションのためにバッファされたデータ - これはリプリケートされていないため、このデータを回復する唯一の方法はソースから再び取得することです。

更に、考慮しなければならない2つの種類の障害があります:

  1. ワーカーノードの障害 - executorを実行しているワーカーノードのどれかが故障することがありえ、それらのノード上のメモリ内の全データが喪失されるでしょう。故障したノード上でreceiverが実行していた場合、それらのバッファされたデータは喪失されるでしょう。
  2. ドライバーノードの障害 - Sparkストリーミングアプリケーションを実行中のドライバーノードが故障した場合、明らかにSparkContextが喪失され、全てのexecutorのそれらのメモリ内のデータが喪失されます。

この基本的な知識を使って、Sparkストリーミングの耐障害性のsemanticsを理解してみましょう。

定義

ストリーミングシステムの意味は、各レコードがシステムによって処理されるかも知れない回数の点でしばしば捉えられます。システムが全てのありえる操作の状況下(障害などにも関わらず)で提供することができる3つの種類の保証があります。

  1. 最大1回: 各レコードは1回処理されるかあるいは全く処理されないでしょう。
  2. 少なくとも1回: 各レコードは1回以上処理されるでしょう。これはデータが喪失されないことを保証するので 最大1回よりも強いです。しかし、重複するかも知れません。
  3. 正確に1回: 各レコードは正確に1回処理されるでしょう - データが喪失することなく、複数回処理されるデータは無いでしょう。これは明らかに3つの中で最も強い保証です。

基本的な意味

ストリーム処理システムの中で、大まかに言うとデータ処理の中に3つのステップがあります。

  1. データの受信: データはrecieverあるいは別のものを使ってソースから受信されます。

  2. データの変換: 受信されたデータはDStreamおよびRDD変換を使って変換されます。

  3. データの出力: 最終的な変換されたデータはファイルシステム、データベース、ダッシュボードなどのような外部システムに出力されます。

ストリーミングアプリケーションがend-to-end の確実に1回だけの保証を達成しなければならない場合、各ステップは確実に1回だけの保証を提供しなければなりません。つまり、各レコードは確実に1回受信され、確実に1回変換され、システムの下流へ確実に1回出力されなければなりません。. Spark Streamingの状況内でのこれらのステップの意味を理解しましょう。

  1. データの受信: 異なる入力ソースは異なる保証を提供します。これは次のサブセクションで詳細に議論されます。

  2. データの変換: 受信された全てのデータはRDDの提供する保証のおかげで確実に1回処理されます。障害があったとしても、受信した入力データがアクセス可能な限り、最終的な変換後RDDは常に同じ内容を持つでしょう。

  3. Pushing out the data: Output operations by default ensure at-least once semantics because it depends on the type of output operation (idempotent, or not) and the semantics of the downstream system (supports transactions or not). しかし、ユーザはexactly-once セマンティクスを行うために独自のトランザクションの仕組みを実装することができます。これはこの章で後で詳細に議論されます。

受信されたデータのセマンティクス

異なる入力ソースは、少なくとも1回 から 確実に1回の範囲の異なる保証を提供します。もっと詳細に読む

ファイルの使用

入力データの全てが既にHDFSのような耐障害性のあるファイルシステムの中にある場合、Sparkストリーミングは常に障害から復旧でき、データの全てを処理できます。これはexactly-once セマンティクスを与え、何が失敗しようとも確実に1回全てのデータの全てが処理されることを意味します。

rerceiverベースのソースを使用

For input sources based on receivers, the fault-tolerance semantics depend on both the failure scenario and the type of receiver. 以前に議論したように、receiverには2つの種類があります:

  1. 信頼できるreceiver - これらのreceiverは受信したデータがリプリケートされたことを確認した後でのみ到着を知らせるでしょう。もしそのようなreceiverが故障した場合、ソースはバッファされた(リプリケートされていない)データの到着の知らせを受け取らないでしょう。従って、もしreceiverが再起動すると、ソースはデータを再送信し、データは障害により喪失されないでしょう。
  2. 信頼できないreceiver - そのようなreceiverは到着を送信しません。従ってワーカーあるいはドライバーの障害により失敗した場合、recieverはデータを喪失するかも知れません

どの種類のreceiverが使われるかによって、以下のsemanticsを達成することができます。ワーカーノードが故障した場合、信頼できるreceiverによってデータは喪失しません。信頼できないreceiverを使うと、受信されたがリプリケートされていないデータは喪失するかも知れません。ドライバーノードが故障した場合、それらの喪失に加えて、受信されてメモリ内にリプリケートされた全ての過去のデータは喪失されるでしょう。. これはステートフル変換の結果に影響を与えるでしょう。

この過去の受信データの喪失を避けるために、Spark 1.2は受信データを耐障害性のあるストレージに保存する 先行書き込みログが導入されました。先行書き込みログの有効と信頼できるreceiverを使って、データの喪失がなくなります。semanticsの点では、それは少なくとも1回の保証を提供します。

以下の表は障害時のsemanticsを要約します:

デプロイ シナリオ ワーカーの障害 ドライバーの障害
Spark 1.1 以前、 あるいは
先行書き込みログの無いSpark 1.2以前
信頼できないレシーバーを使ってバッファされたデータが損失する
信頼できるレシーバーを使ってデータの損失がゼロ
少なくとも1回のsemantics
信頼できないレシーバーを使ってバッファされたデータが損失する
全てのレシーバーを使って過去のデータが損失する
未定義のsemantics
先行書き込みログを持つSpark1.2以降 信頼できるレシーバーを使ってデータの損失がゼロ
少なくとも1回のsemantics
信頼できるレシーバーとファイルを使ってデータの損失がゼロ
少なくとも1回のsemantics

Kafka ダイレクト APIの使用

Spark 1.3では、新しいKafa Direct APIを導入しました。これにより全てのKafkaのデータはSparkストリーミングによって確実に1回受け取られます。これに伴い、確実に1回の出力オペレーションを実装する場合、end-to-endで確実に1回の保証を達成することができます。このやり方(Spark 2.0.0 の時点では実験的)はKafka 統合ガイドの中で更に議論されます。

出力オペレータのsemantics

(foreachRDDのような)出力オペレーションは少なくとも1回のsemanticsを持ちます。つまり、ワーカーの障害の時に変換されたデータは外部エンティティに1回以上書き込まれます。これはファイルシステムに書き込むためにsaveAs***Files オペレーションを使うことで条件を満たします(ファイルは単純に同じデータを使って書き込むでしょう)が、確実に1回のsemanticsを達成するには更なる努力が必要でしょう。2つのやりかたがあります。



0.9.1あるいは1.x以下からの移行ガイド

Spark 0.9.1 と Spark 1.0の相田で、APIの安定性を更に確実にするために2,3のAPIの変更がありました。この章は既存のコードを1.0に移行するために必要なステップを詳しく述べます。

Input DStreams: 入力ストリーム(つまり、StreamingContext.socketStream, FlumeUtils.createStreamなど)を生成する全てのオペレータは、今はScalaでは(Dstreamの代わりに)InputDStream / ReceiverInputDStream を、Javaでは(JavaDStreamの代わりに)JavaInputDStream / JavaPairInputDStream / JavaReceiverInputDStream / JavaPairReceiverInputDStream を返します。これは入力ストリームに固有の機能を将来バイナリの互換性を壊すことなくこれらのクラスに追加することができることを保証します。既存のSparkストリーミングアプリケーションは何も変更を必要としませんが、(これらの新しいクラスはDStream/JavaDStreamのサブクラスのため)Spark 1.0での再コンパイルが必要かも知れないことに注意してください。

独自のネットワーク receiver: Spark ストリーミングのリリースから、独自のネットワークreceiverがNetowrkReceiverクラスを使ってScalaで定義できるようになりました。しかし、APIはエラーハンドリングとレポートの点で制限があり、Javaから使うことはできませんでした。Spark 1.0から、このクラスは以下の利点を持つReceiver に置き換えられました。

既存の独自のreceiverを以前のNetworkReceiverから新しいReceiverに移行するには、以下を行う必要があります。

Actor-based Receivers: The Actor-based Receiver APIs have been moved to DStream Akka. 詳細はプロジェクトを参照してください。



この後どうすればいいか

TOP
inserted by FC2 system