This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
テスト #
テストはあらゆるソフトウェア開発プロセスに不可欠な部分であり、Apache Flinkにはテストピラミッドの複数のレベルでアプリケーションコードをテストするためのツールが付属しています。
ユーザ定義関数のテスト #
通常、Flinkはユーザ定義関数の外側で正しい結果を生成すると想定できます。従って、主要なビジネスロジックを含むクラスを単体テストでできるだけテストすることをお勧めします。
ステートレスでタイムレスなUDFの単体テスト #
例えば、次のステートレスなMapFunction
を考えてみましょう。
public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
public Long map(Long record) throws Exception {
return record + 1;
}
}
class IncrementMapFunction extends MapFunction[Long, Long] {
override def map(record: Long): Long = {
record + 1
}
}
適した引数を渡して出力を検証することで、お気に入りのテストフレームワークを使ってそのような関数を単体テストすることは非常に簡単です。
public class IncrementMapFunctionTest {
@Test
public void testIncrement() throws Exception {
// instantiate your function
IncrementMapFunction incrementer = new IncrementMapFunction();
// call the methods that you have implemented
assertEquals(3L, incrementer.map(2L));
}
}
class IncrementMapFunctionTest extends FlatSpec with Matchers {
"IncrementMapFunction" should "increment values" in {
// instantiate your function
val incrementer: IncrementMapFunction = new IncrementMapFunction()
// call the methods that you have implemented
incremeter.map(2) should be (3)
}
}
同様に、org.apache.flink.util.Collector
を使うユーザ定義関数(例えば、FlatMapFunction
やProcessFunction
)は実際のコレクタの代わりにモックオブジェクトを提供することで簡単にテストできます。IncrementMapFunction
と同じ機能を持つFlatMapFunction
は次のようにテストできます。
public class IncrementFlatMapFunctionTest {
@Test
public void testIncrement() throws Exception {
// instantiate your function
IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
Collector<Integer> collector = mock(Collector.class);
// call the methods that you have implemented
incrementer.flatMap(2L, collector);
//verify collector was called with the right output
Mockito.verify(collector, times(1)).collect(3L);
}
}
class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {
"IncrementFlatMapFunction" should "increment values" in {
// instantiate your function
val incrementer : IncrementFlatMapFunction = new IncrementFlatMapFunction()
val collector = mock[Collector[Integer]]
//verify collector was called with the right output
(collector.collect _).expects(3)
// call the methods that you have implemented
flattenFunction.flatMap(2, collector)
}
}
ステートフルまたはタイムリーなUDFと独自のオペレータの単体テスト #
管理された状態やタイマーを利用するユーザ定義関数の機能のテストは、ユーザコードとFlinkのランタイムの間のテストが含まれるため、さらに困難になります。 このため、Flinkにはいわゆるテストハーネスのコレクションが付属していて、これを使ってユーザ定義関数やカスタムオペレータをテストできます:
OneInputStreamOperatorTestHarness
(DataStream
のオペレータ用)KeyedOneInputStreamOperatorTestHarness
(KeyedStream
のオペレータ用)TwoInputStreamOperatorTestHarness
(2つのDataStream
のConnectedStreams
オペレータ用)KeyedTwoInputStreamOperatorTestHarness
(2つのKeyedStream
のConnectedStreams
オペレータ用)
テストハーネスを使うには、追加の依存関係のセットが必要です。詳細については、設定セクションを参照してください。
現在、テストハーネスを使って、レコードとウォーターマークをユーザ定義関数または独自のオペレータにプッシュし、処理時間を制御し、最後にオペレータの出力(サイド出力を含む)であさーとできるようになりました。
public class StatefulFlatMapTest {
private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
private StatefulFlatMap statefulFlatMapFunction;
@Before
public void setupTestHarness() throws Exception {
//instantiate user-defined function
statefulFlatMapFunction = new StatefulFlatMapFunction();
// wrap user defined function into a the corresponding operator
testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));
// optionally configured the execution environment
testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
// open the test harness (will also call open() on RichFunctions)
testHarness.open();
}
@Test
public void testingStatefulFlatMapFunction() throws Exception {
//push (timestamped) elements into the operator (and hence user defined function)
testHarness.processElement(2L, 100L);
//trigger event time timers by advancing the event time of the operator with a watermark
testHarness.processWatermark(100L);
//trigger processing time timers by advancing the processing time of the operator directly
testHarness.setProcessingTime(100L);
//retrieve list of emitted records for assertions
assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
//retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
//assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
}
}
class StatefulFlatMapFunctionTest extends FlatSpec with Matchers with BeforeAndAfter {
private var testHarness: OneInputStreamOperatorTestHarness[Long, Long] = null
private var statefulFlatMap: StatefulFlatMapFunction = null
before {
//instantiate user-defined function
statefulFlatMap = new StatefulFlatMap
// wrap user defined function into a the corresponding operator
testHarness = new OneInputStreamOperatorTestHarness[Long, Long](new StreamFlatMap(statefulFlatMap))
// optionally configured the execution environment
testHarness.getExecutionConfig().setAutoWatermarkInterval(50)
// open the test harness (will also call open() on RichFunctions)
testHarness.open()
}
"StatefulFlatMap" should "do some fancy stuff with timers and state" in {
//push (timestamped) elements into the operator (and hence user defined function)
testHarness.processElement(2, 100)
//trigger event time timers by advancing the event time of the operator with a watermark
testHarness.processWatermark(100)
//trigger proccesign time timers by advancing the processing time of the operator directly
testHarness.setProcessingTime(100)
//retrieve list of emitted records for assertions
testHarness.getOutput should contain (3)
//retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
//testHarness.getSideOutput(new OutputTag[Int]("invalidRecords")) should have size 0
}
}
KeyedOneInputStreamOperatorTestHarness
and KeyedTwoInputStreamOperatorTestHarness
are instantiated by additionally providing a KeySelector
including TypeInformation
for the class of the key.
public class StatefulFlatMapFunctionTest {
private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;
private StatefulFlatMap statefulFlatMapFunction;
@Before
public void setupTestHarness() throws Exception {
//instantiate user-defined function
statefulFlatMapFunction = new StatefulFlatMapFunction();
// wrap user defined function into a the corresponding operator
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(), Types.STRING);
// open the test harness (will also call open() on RichFunctions)
testHarness.open();
}
//tests
}
class StatefulFlatMapTest extends FlatSpec with Matchers with BeforeAndAfter {
private var testHarness: OneInputStreamOperatorTestHarness[String, Long, Long] = null
private var statefulFlatMapFunction: FlattenFunction = null
before {
//instantiate user-defined function
statefulFlatMapFunction = new StateFulFlatMap
// wrap user defined function into a the corresponding operator
testHarness = new KeyedOneInputStreamOperatorTestHarness(new StreamFlatMap(statefulFlatMapFunction),new MyStringKeySelector(), Types.STRING())
// open the test harness (will also call open() on RichFunctions)
testHarness.open()
}
//tests
}
これらのテストハーネスの使用例は、Flinkコードベースにさらに多くあります。例えば:
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest
は、処理時間またはイベント時間に依存するオペレータとユーザ定義関数をテストするための良い例です。
注意 AbstractStreamOperatorTestHarness
とその派生クラスは現在public APIの一部ではなく、変更される可能性があることに注意してください。
ProcessFunctionの単体テスト #
その重要性を考慮して、ProcessFunction
のテストに直接使える以前のテストハーネスに加えて、Flinkはテストハーネスのインスタンス化を容易にするProcessFunctionTestHarnesses
という名前のテストハーネスファクトリを提供します。この例を考えてみます:
注意 このテストハーネスを使うには、前のセクションで説明した依存関係も導入する必要があることに注意してください。
public static class PassThroughProcessFunction extends ProcessFunction<Integer, Integer> {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value);
}
}
class PassThroughProcessFunction extends ProcessFunction[Integer, Integer] {
@throws[Exception]
override def processElement(value: Integer, ctx: ProcessFunction[Integer, Integer]#Context, out: Collector[Integer]): Unit = {
out.collect(value)
}
}
ProcessFunctionTestHarnesses
を使って適切な引数を渡し、出力を検証することで、このような関数の探偵テストを行うのは非常に簡単です。
public class PassThroughProcessFunctionTest {
@Test
public void testPassThrough() throws Exception {
//instantiate user-defined function
PassThroughProcessFunction processFunction = new PassThroughProcessFunction();
// wrap user defined function into a the corresponding operator
OneInputStreamOperatorTestHarness<Integer, Integer> harness = ProcessFunctionTestHarnesses
.forProcessFunction(processFunction);
//push (timestamped) elements into the operator (and hence user defined function)
harness.processElement(1, 10);
//retrieve list of emitted records for assertions
assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
}
}
class PassThroughProcessFunctionTest extends FlatSpec with Matchers {
"PassThroughProcessFunction" should "forward values" in {
//instantiate user-defined function
val processFunction = new PassThroughProcessFunction
// wrap user defined function into a the corresponding operator
val harness = ProcessFunctionTestHarnesses.forProcessFunction(processFunction)
//push (timestamped) elements into the operator (and hence user defined function)
harness.processElement(1, 10)
//retrieve list of emitted records for assertions
harness.extractOutputValues() should contain (1)
}
}
ProcessFunction
の様々なフレーバーをテストするためにProcessFunctionTestHarnesses
を使う方法の詳細な例については、例えば、KeyedProcessFunction
、KeyedCoProcessFunction
、BroadcastProcessFunction
など、ユーザはProcessFunctionTestHarnessesTest
を確認することをお勧めします。
Flinkジョブのテスト #
JUnit Rule MiniClusterWithClientResource
#
Apache Flinkは、ローカルの組み込みのミニクラスタに対して完全なジョブをテストするためのMiniClusterWithClientResource
と呼ばれるJUnitルールを提供します。
MiniClusterWithClientResource
と呼ばれます。
MiniClusterWithClientResource
を使うには、追加の依存関係(テストスコープ)が1つ必要です。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>1.19-SNAPSHOT</version>
<scope>test</scope>
</dependency>
前のセクションと同じ単純なMapFunction
を考えてみましょう。
public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
public Long map(Long record) throws Exception {
return record + 1;
}
}
class IncrementMapFunction extends MapFunction[Long, Long] {
override def map(record: Long): Long = {
record + 1
}
}
このMapFunction
を使う単純なパイプラインは、次のようなローカルFlinkクラスタでテストできるようになりました。
public class ExampleIntegrationTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
@Test
public void testIncrementPipeline() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure your test environment
env.setParallelism(2);
// values are collected in a static variable
CollectSink.values.clear();
// create a stream of custom elements and apply transformations
env.fromElements(1L, 21L, 22L)
.map(new IncrementMapFunction())
.addSink(new CollectSink());
// execute
env.execute();
// verify your results
assertTrue(CollectSink.values.containsAll(2L, 22L, 23L));
}
// create a testing sink
private static class CollectSink implements SinkFunction<Long> {
// must be static
public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(Long value, SinkFunction.Context context) throws Exception {
values.add(value);
}
}
}
class StreamingJobIntegrationTest extends FlatSpec with Matchers with BeforeAndAfter {
val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build)
before {
flinkCluster.before()
}
after {
flinkCluster.after()
}
"IncrementFlatMapFunction pipeline" should "incrementValues" in {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// configure your test environment
env.setParallelism(2)
// values are collected in a static variable
CollectSink.values.clear()
// create a stream of custom elements and apply transformations
env.fromElements(1L, 21L, 22L)
.map(new IncrementMapFunction())
.addSink(new CollectSink())
// execute
env.execute()
// verify your results
CollectSink.values should contain allOf (2, 22, 23)
}
}
// create a testing sink
class CollectSink extends SinkFunction[Long] {
override def invoke(value: Long, context: SinkFunction.Context): Unit = {
CollectSink.values.add(value)
}
}
object CollectSink {
// must be static
val values: util.List[Long] = Collections.synchronizedList(new util.ArrayList())
}
MiniClusterWithClientResource
との統合テストについて幾つか説明します:
-
パイプラインコード全体をプロダクション環境からテスト環境にコピーしないようにするには、ソースとシンクをプロダクションコードにプラグイン可能にし、特別なテストソースとテストシンクをテストに挿入します。
-
Flinkは全てのオペレーターをクラスタ全体に分散する前にシリアライズ化するため、ここでは
CollectSink
の静的変数が使われます。 静的変数を介してローカルFlinkミニクラスタによってインスタンス化されたオペレータと通信することは、この問題を回避する1つの方法です。 あるいは、テストシンクを使って一時ディレクトリ内のファイルにデータを書き込むこともできます。 -
ジョブがイベント時間タイマーを使う場合は、ウォーターマークを発行するための独自のparallelソース関数を実装できます。
-
並列実行されたパイプラインでのみ表面化するバグを特定するには、常に並列処理 > 1 でパイプラインをローカルでテストすることをお勧めします。
-
複数のテストが同じFlinkクラスタを共有できるように、
@Rule
よりも@ClassRule
を優先します。通常、Flinkclusterの起動とシャットダウンが実際のテストの実行時間の大部分を占めるため、そうすることで時間を大幅に節約できます。 -
パイプラインに独自の状態処理が含まれている場合は、チェックポイントを有効にしてミニクラスタ内でジョブを再開することで、その正確さをテストできます。このためには、パイプライン内の(テスト専用の)ユーザ定義関数から例外を投げることで、失敗をトリガーする必要があります。