Testing
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

テスト #

テストはあらゆるソフトウェア開発プロセスに不可欠な部分であり、Apache Flinkにはテストピラミッドの複数のレベルでアプリケーションコードをテストするためのツールが付属しています。

ユーザ定義関数のテスト #

通常、Flinkはユーザ定義関数の外側で正しい結果を生成すると想定できます。従って、主要なビジネスロジックを含むクラスを単体テストでできるだけテストすることをお勧めします。

ステートレスでタイムレスなUDFの単体テスト #

例えば、次のステートレスなMapFunctionを考えてみましょう。

public class IncrementMapFunction implements MapFunction<Long, Long> {

    @Override
    public Long map(Long record) throws Exception {
        return record + 1;
    }
}
class IncrementMapFunction extends MapFunction[Long, Long] {

    override def map(record: Long): Long = {
        record + 1
    }
}

適した引数を渡して出力を検証することで、お気に入りのテストフレームワークを使ってそのような関数を単体テストすることは非常に簡単です。

public class IncrementMapFunctionTest {

    @Test
    public void testIncrement() throws Exception {
        // instantiate your function
        IncrementMapFunction incrementer = new IncrementMapFunction();

        // call the methods that you have implemented
        assertEquals(3L, incrementer.map(2L));
    }
}
class IncrementMapFunctionTest extends FlatSpec with Matchers {

    "IncrementMapFunction" should "increment values" in {
        // instantiate your function
        val incrementer: IncrementMapFunction = new IncrementMapFunction()

        // call the methods that you have implemented
        incremeter.map(2) should be (3)
    }
}

同様に、org.apache.flink.util.Collectorを使うユーザ定義関数(例えば、FlatMapFunctionProcessFunction)は実際のコレクタの代わりにモックオブジェクトを提供することで簡単にテストできます。IncrementMapFunctionと同じ機能を持つFlatMapFunctionは次のようにテストできます。

public class IncrementFlatMapFunctionTest {

    @Test
    public void testIncrement() throws Exception {
        // instantiate your function
        IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();

        Collector<Integer> collector = mock(Collector.class);

        // call the methods that you have implemented
        incrementer.flatMap(2L, collector);

        //verify collector was called with the right output
        Mockito.verify(collector, times(1)).collect(3L);
    }
}
class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {

    "IncrementFlatMapFunction" should "increment values" in {
       // instantiate your function
      val incrementer : IncrementFlatMapFunction = new IncrementFlatMapFunction()

      val collector = mock[Collector[Integer]]

      //verify collector was called with the right output
      (collector.collect _).expects(3)

      // call the methods that you have implemented
      flattenFunction.flatMap(2, collector)
  }
}

ステートフルまたはタイムリーなUDFと独自のオペレータの単体テスト #

管理された状態やタイマーを利用するユーザ定義関数の機能のテストは、ユーザコードとFlinkのランタイムの間のテストが含まれるため、さらに困難になります。 このため、Flinkにはいわゆるテストハーネスのコレクションが付属していて、これを使ってユーザ定義関数やカスタムオペレータをテストできます:

  • OneInputStreamOperatorTestHarness (DataStreamのオペレータ用)
  • KeyedOneInputStreamOperatorTestHarness (KeyedStreamのオペレータ用)
  • TwoInputStreamOperatorTestHarness (2つのDataStreamConnectedStreamsオペレータ用)
  • KeyedTwoInputStreamOperatorTestHarness (2つのKeyedStreamConnectedStreamsオペレータ用)

テストハーネスを使うには、追加の依存関係のセットが必要です。詳細については、設定セクションを参照してください。

現在、テストハーネスを使って、レコードとウォーターマークをユーザ定義関数または独自のオペレータにプッシュし、処理時間を制御し、最後にオペレータの出力(サイド出力を含む)であさーとできるようになりました。

public class StatefulFlatMapTest {
    private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
    private StatefulFlatMap statefulFlatMapFunction;

    @Before
    public void setupTestHarness() throws Exception {

        //instantiate user-defined function
        statefulFlatMapFunction = new StatefulFlatMapFunction();

        // wrap user defined function into a the corresponding operator
        testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));

        // optionally configured the execution environment
        testHarness.getExecutionConfig().setAutoWatermarkInterval(50);

        // open the test harness (will also call open() on RichFunctions)
        testHarness.open();
    }

    @Test
    public void testingStatefulFlatMapFunction() throws Exception {

        //push (timestamped) elements into the operator (and hence user defined function)
        testHarness.processElement(2L, 100L);

        //trigger event time timers by advancing the event time of the operator with a watermark
        testHarness.processWatermark(100L);

        //trigger processing time timers by advancing the processing time of the operator directly
        testHarness.setProcessingTime(100L);

        //retrieve list of emitted records for assertions
        assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));

        //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
        //assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
    }
}
class StatefulFlatMapFunctionTest extends FlatSpec with Matchers with BeforeAndAfter {

  private var testHarness: OneInputStreamOperatorTestHarness[Long, Long] = null
  private var statefulFlatMap: StatefulFlatMapFunction = null

  before {
    //instantiate user-defined function
    statefulFlatMap = new StatefulFlatMap

    // wrap user defined function into a the corresponding operator
    testHarness = new OneInputStreamOperatorTestHarness[Long, Long](new StreamFlatMap(statefulFlatMap))

    // optionally configured the execution environment
    testHarness.getExecutionConfig().setAutoWatermarkInterval(50)

    // open the test harness (will also call open() on RichFunctions)
    testHarness.open()
  }

  "StatefulFlatMap" should "do some fancy stuff with timers and state" in {


    //push (timestamped) elements into the operator (and hence user defined function)
    testHarness.processElement(2, 100)

    //trigger event time timers by advancing the event time of the operator with a watermark
    testHarness.processWatermark(100)

    //trigger proccesign time timers by advancing the processing time of the operator directly
    testHarness.setProcessingTime(100)

    //retrieve list of emitted records for assertions
    testHarness.getOutput should contain (3)

    //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
    //testHarness.getSideOutput(new OutputTag[Int]("invalidRecords")) should have size 0
  }
}

KeyedOneInputStreamOperatorTestHarness and KeyedTwoInputStreamOperatorTestHarness are instantiated by additionally providing a KeySelector including TypeInformation for the class of the key.

public class StatefulFlatMapFunctionTest {
    private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;
    private StatefulFlatMap statefulFlatMapFunction;

    @Before
    public void setupTestHarness() throws Exception {

        //instantiate user-defined function
        statefulFlatMapFunction = new StatefulFlatMapFunction();

        // wrap user defined function into a the corresponding operator
        testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(), Types.STRING);

        // open the test harness (will also call open() on RichFunctions)
        testHarness.open();
    }

    //tests

}
class StatefulFlatMapTest extends FlatSpec with Matchers with BeforeAndAfter {

  private var testHarness: OneInputStreamOperatorTestHarness[String, Long, Long] = null
  private var statefulFlatMapFunction: FlattenFunction = null

  before {
    //instantiate user-defined function
    statefulFlatMapFunction = new StateFulFlatMap

    // wrap user defined function into a the corresponding operator
    testHarness = new KeyedOneInputStreamOperatorTestHarness(new StreamFlatMap(statefulFlatMapFunction),new MyStringKeySelector(), Types.STRING())

    // open the test harness (will also call open() on RichFunctions)
    testHarness.open()
  }

  //tests

}

これらのテストハーネスの使用例は、Flinkコードベースにさらに多くあります。例えば:

  • org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTestは、処理時間またはイベント時間に依存するオペレータとユーザ定義関数をテストするための良い例です。

注意 AbstractStreamOperatorTestHarnessとその派生クラスは現在public APIの一部ではなく、変更される可能性があることに注意してください。

ProcessFunctionの単体テスト #

その重要性を考慮して、ProcessFunctionのテストに直接使える以前のテストハーネスに加えて、Flinkはテストハーネスのインスタンス化を容易にするProcessFunctionTestHarnessesという名前のテストハーネスファクトリを提供します。この例を考えてみます:

注意 このテストハーネスを使うには、前のセクションで説明した依存関係も導入する必要があることに注意してください。

public static class PassThroughProcessFunction extends ProcessFunction<Integer, Integer> {

	@Override
	public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
        out.collect(value);
	}
}
class PassThroughProcessFunction extends ProcessFunction[Integer, Integer] {

    @throws[Exception]
    override def processElement(value: Integer, ctx: ProcessFunction[Integer, Integer]#Context, out: Collector[Integer]): Unit = {
      out.collect(value)
    }
}

ProcessFunctionTestHarnessesを使って適切な引数を渡し、出力を検証することで、このような関数の探偵テストを行うのは非常に簡単です。

public class PassThroughProcessFunctionTest {

    @Test
    public void testPassThrough() throws Exception {

        //instantiate user-defined function
        PassThroughProcessFunction processFunction = new PassThroughProcessFunction();

        // wrap user defined function into a the corresponding operator
        OneInputStreamOperatorTestHarness<Integer, Integer> harness = ProcessFunctionTestHarnesses
        	.forProcessFunction(processFunction);

        //push (timestamped) elements into the operator (and hence user defined function)
        harness.processElement(1, 10);

        //retrieve list of emitted records for assertions
        assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
    }
}
class PassThroughProcessFunctionTest extends FlatSpec with Matchers {

  "PassThroughProcessFunction" should "forward values" in {

    //instantiate user-defined function
    val processFunction = new PassThroughProcessFunction

    // wrap user defined function into a the corresponding operator
    val harness = ProcessFunctionTestHarnesses.forProcessFunction(processFunction)

    //push (timestamped) elements into the operator (and hence user defined function)
    harness.processElement(1, 10)

    //retrieve list of emitted records for assertions
    harness.extractOutputValues() should contain (1)
  }
}

ProcessFunctionの様々なフレーバーをテストするためにProcessFunctionTestHarnessesを使う方法の詳細な例については、例えば、KeyedProcessFunctionKeyedCoProcessFunctionBroadcastProcessFunctionなど、ユーザはProcessFunctionTestHarnessesTestを確認することをお勧めします。

Flinkジョブのテスト #

JUnit Rule MiniClusterWithClientResource #

Apache Flinkは、ローカルの組み込みのミニクラスタに対して完全なジョブをテストするためのMiniClusterWithClientResourceと呼ばれるJUnitルールを提供します。 MiniClusterWithClientResourceと呼ばれます。

MiniClusterWithClientResourceを使うには、追加の依存関係(テストスコープ)が1つ必要です。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.19-SNAPSHOT</version>    
    <scope>test</scope>
</dependency>

前のセクションと同じ単純なMapFunctionを考えてみましょう。

public class IncrementMapFunction implements MapFunction<Long, Long> {

    @Override
    public Long map(Long record) throws Exception {
        return record + 1;
    }
}
class IncrementMapFunction extends MapFunction[Long, Long] {

    override def map(record: Long): Long = {
        record + 1
    }
}

このMapFunctionを使う単純なパイプラインは、次のようなローカルFlinkクラスタでテストできるようになりました。

public class ExampleIntegrationTest {

     @ClassRule
     public static MiniClusterWithClientResource flinkCluster =
         new MiniClusterWithClientResource(
             new MiniClusterResourceConfiguration.Builder()
                 .setNumberSlotsPerTaskManager(2)
                 .setNumberTaskManagers(1)
                 .build());

    @Test
    public void testIncrementPipeline() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(2);

        // values are collected in a static variable
        CollectSink.values.clear();

        // create a stream of custom elements and apply transformations
        env.fromElements(1L, 21L, 22L)
                .map(new IncrementMapFunction())
                .addSink(new CollectSink());

        // execute
        env.execute();

        // verify your results
        assertTrue(CollectSink.values.containsAll(2L, 22L, 23L));
    }

    // create a testing sink
    private static class CollectSink implements SinkFunction<Long> {

        // must be static
        public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());

        @Override
        public void invoke(Long value, SinkFunction.Context context) throws Exception {
            values.add(value);
        }
    }
}
class StreamingJobIntegrationTest extends FlatSpec with Matchers with BeforeAndAfter {

  val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
    .setNumberSlotsPerTaskManager(2)
    .setNumberTaskManagers(1)
    .build)

  before {
    flinkCluster.before()
  }

  after {
    flinkCluster.after()
  }


  "IncrementFlatMapFunction pipeline" should "incrementValues" in {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // configure your test environment
    env.setParallelism(2)

    // values are collected in a static variable
    CollectSink.values.clear()

    // create a stream of custom elements and apply transformations
    env.fromElements(1L, 21L, 22L)
       .map(new IncrementMapFunction())
       .addSink(new CollectSink())

    // execute
    env.execute()

    // verify your results
    CollectSink.values should contain allOf (2, 22, 23)
    }
}

// create a testing sink
class CollectSink extends SinkFunction[Long] {

  override def invoke(value: Long, context: SinkFunction.Context): Unit = {
    CollectSink.values.add(value)
  }
}

object CollectSink {
    // must be static
    val values: util.List[Long] = Collections.synchronizedList(new util.ArrayList())
}

MiniClusterWithClientResourceとの統合テストについて幾つか説明します:

  • パイプラインコード全体をプロダクション環境からテスト環境にコピーしないようにするには、ソースとシンクをプロダクションコードにプラグイン可能にし、特別なテストソースとテストシンクをテストに挿入します。

  • Flinkは全てのオペレーターをクラスタ全体に分散する前にシリアライズ化するため、ここではCollectSinkの静的変数が使われます。 静的変数を介してローカルFlinkミニクラスタによってインスタンス化されたオペレータと通信することは、この問題を回避する1つの方法です。 あるいは、テストシンクを使って一時ディレクトリ内のファイルにデータを書き込むこともできます。

  • ジョブがイベント時間タイマーを使う場合は、ウォーターマークを発行するための独自のparallelソース関数を実装できます。

  • 並列実行されたパイプラインでのみ表面化するバグを特定するには、常に並列処理 > 1 でパイプラインをローカルでテストすることをお勧めします。

  • 複数のテストが同じFlinkクラスタを共有できるように、@Ruleよりも@ClassRuleを優先します。通常、Flinkclusterの起動とシャットダウンが実際のテストの実行時間の大部分を占めるため、そうすることで時間を大幅に節約できます。

  • パイプラインに独自の状態処理が含まれている場合は、チェックポイントを有効にしてミニクラスタ内でジョブを再開することで、その正確さをテストできます。このためには、パイプライン内の(テスト専用の)ユーザ定義関数から例外を投げることで、失敗をトリガーする必要があります。

Back to top

inserted by FC2 system