Spark ストリーミング カスタム レシーバー
Spark Streamingは組み込みのサポート(つまり Flume, Kafka, Kinesis, files, sockets など)の1つ以上の任意のデータソースからのストリーミングデータを受け取ることができます。これには開発者が関係したデータソースからデータを受け取るためにカスタマイズされたreceiverを実装する必要があります。このガイドはカスタムレシーバーを実装とそれをSparkストリーミングアプリケーション内で使用する方法をざっと見ます。カスタムレシーバーはScalaあるいはJavaで実装できることに注意してください。
カスタムレシーバーの実装
Receiver (Scala doc, Java doc)の実装から始まります。カスタムレシーバー2つのメソッドを実装することでこの抽象クラスを拡張しなければなりません
onStart()
: データの受信を開始するためにしなければならないこと。onStop()
: データの受信を停止するためにしなければならないこと。
onStart()
と onStop()
の両方は永久にブロックしてはなりません。一般的に onStart()
はデータの受信に責任があるスレッドを開始し、onStop()
はデータを受信するそれらのスレッドを確実に停止しなければなりません。受信スレッドはisStopped()
も使うことができ、Receiver
はそれらがデータの受信を停止しなければならないかどうかを調べます。
一度データが受信されると、データはstore(data)
を呼び出すことでSparkの内部に格納することができます。これはReceiverクラスによって提供されるメソッドです。受信したデータを一度に記録、あるいはオブジェクト/シリアライズ化されたバイトの全体のコレクションとして保存することができる、store()
の数多くの特徴があります。 レシーバーを実装するために使用されるstore()
の特徴が、その信頼性と耐障害性の意味に影響を与えることに注意してください。 詳細については後で議論されます。
受信スレッド内でのどのような例外も、レシーバーの沈黙の失敗を避けるために適切に捉えられ処理されなければなりません。restart(<exception>)
は非同期にonStop()
を呼び出し、遅延の後でonStart()
を呼び出すことでレシーバーを再起動するでしょう。stop(<exception>)
はl onStop()
を呼び出しレシーバーを終了します。また、reportError(<error>)
はレシーバーの停止/再起動無しに、ドライバーにエラーメッセージを報告します(logとUIで見ることができます)。
以下はソケット上のテキストのストリームを受け取るカスタムレシーバーです。テキストストリーム内の'\n'で区切られた行をレコードとして扱い、それらをSparkに格納します。受信スレッドに接続あるいは受信のなんらかのエラーがあると、レシーバーは別の接続をするために再起動されます。
class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
public class JavaCustomReceiver extends Receiver<String> {
String host = null;
int port = -1;
public JavaCustomReceiver(String host_ , int port_) {
super(StorageLevel.MEMORY_AND_DISK_2());
host = host_;
port = port_;
}
public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
receive();
}
}.start();
}
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private void receive() {
Socket socket = null;
String userInput = null;
try {
// connect to the server
socket = new Socket(host, port);
BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
// Until stopped or connection broken continue reading
while (!isStopped() && (userInput = reader.readLine()) != null) {
System.out.println("Received data '" + userInput + "'");
store(userInput);
}
reader.close();
socket.close();
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again");
} catch(ConnectException ce) {
// restart if could not connect to server
restart("Could not connect", ce);
} catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
}
Sparkストリーミングアプリケーションでのカスタムレシーバーの使用
カスタムレシーバーは streamingContext.receiverStream(<instance of custom receiver>)
を使うことでSparkストリーミングアプリケーションの中で使うことができます。これは以下で示すようにカスタムレシーバーのインスタンスによって受け取られたデータを使って入力のDStreamを生成するでしょう:
// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = lines.flatMap(_.split(" "))
...
完全なソースコードは、例CustomReceiver.scalaにあります。
// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });
...
完全なソースコードは、例JavaCustomReceiver.javaにあります。
receiver の信頼性
Spark ストリーミングプログラミングガイドの中で簡単に議論されたように、それらの信頼性と耐障害性に基づいて2つの種類のレシーバーがあります。
- 信頼できるレシーバー - データの送信を通知することができる信頼できるソースに関して、信頼できるレシーバーはソースにデータが受信されSparkに格納されたことを正確に通知します(つまり、リプリケートに成功した)。通常、このレシーバーの実装はソースの通知のやり方を注意深く考慮する必要があります。
- 信頼できないレシーバー - 信頼できないレシーバーはソースへの通知の送信を行いません。これは通知をサポートしないソース、あるいは信頼できるソースの場合でも通知の複雑さに分け入りたくないか必要としない場合に、使うことができます。
信頼できるレシーバーを実装するには、データを格納するためにstore(multiple-records)
を使う必要があります。このstore
の特徴は、全ての与えられた行がSpark内に格納された後でのみ返るという呼び出しのブロッキングです。レシーバーの設定されたストレージレベルがリプリケーションを使う場合(デフォルトでは有効)、この呼び出しはリプリケーションが完了した後で返されます。従って、データが確実に格納され、レシーバーはソースに適切に通知することができます。これは、レシーバーがデータのリプリケートの最中に故障した場合にデータが消失しないようにします - バッファされたデータは通知されず、従ってソースによって後で再送されるでしょう。
信頼できないレシーバー はこのようなロジックを一切実装する必要はありません。単純にソースからレコードを受け取り、store(single-record)
を使って一つずつそれらを挿入します。store(multiple-records)
の信頼の保証がありませんが、以下の利点があります:
- システムはデータを適切なサイズのブロックにチャンクするようにします(Sparkストリーミング プログラミング ガイドのブロック間隔を探してください)。
- レートの制限が指定されていた場合には、システムは受信レートの制御を引き受けます。
- これら2つのために、信頼できないレシーバーは信頼できるレシーバーより実装が簡単です。
以下の表はレシーバの両方のタイプの特徴を要約します
レシーバーの種類 | 特徴 |
---|---|
信頼できないレシーバー |
実装が単純。 システムはブロックの生成とレートの制御を引き受けます。耐障害性の保証無しで、レシーバの故障によりデータを損失するかも知れません。 |
信頼できるレシーバー |
耐障害性の保証が高く、データの損失ゼロを確実にすることができます。 ブロックの生成とレートの制御はレシーバの実装によって処理されます。 実装の複雑さはソースの通知の仕組みに依存します。 |