重要: Scalaに依存するMaven アーティファクトはScalaのメジャーバージョンが後ろに付きます。例えば、"2.10" あるいは "2.11"。プロジェクトwiki上のマイグレーションガイドに相談してください。

新しいオペレータの追加の仕方

Java APIの中のオペレータは複数の異なる方法で追加することができます:

  1. データセット上で、既存のオペレータの 特別な形/組み合わせとして
  2. 独自の拡張オペレータとして
  3. 新しい実行時のオペレータとして

最初の二つのやり方は一般的により軽く実装し易いです。Sometimes, new functionality does require a new runtime operator, or it is much more efficient to

データセット上の新しいオペレータの実装

Many operators can be implemented as a specialization of another operator, or by means of a UDF.

最も簡単な例はDataSet上のsum(), min() および max() 関数です。これらの関数は単純に他のオペレータをいくつかの事前定義されたパラメータを使って呼び出します:

public AggregateOperator<T> sum (int field) {
    return this.aggregate (Aggregations.SUM, field);
}

幾つかのオペレーションは複数の他のオペレータの組み合わせとして実装することができます。例はmap および aggregateの組み合わせを使ってcount()関数を実装します。

これを行う単純な方法は順番にmap(…) および reduce(…)を呼ぶ DataSet上で関数を定義することです:

public DataSet<Long> count() {
    return this.map(new MapFunction<T, Long>() {
                        public Long map(T value) {
                            return 1L;
                        }
                    })
               .reduce(new ReduceFunction<Long>() {
                        public Long reduce(Long val1, Long val1) {
                            return val1 + val2;
                        }
                    });
}

データセットクラスを変更せずに新しいオペレータを定義することは関数を静的なメンバとして他のクラスに入れることで可能です。count() オペレータの例は以下の方法で見えるでしょう:

public static <T>DataSet<Long> count(DataSet<T> data) {
    return data.map(...).reduce(...);
}

もっと複雑なオペレータ

特殊化を使ってオペレータのもっと複雑な例は、Java APIでの集約オペレーションです。GroupReduce UDF によって実装されます。

集約オペレータはJava APIの中で独自のオペレータに付属しますが、共通 APIの中でそれ自身をGroupReduceOperatorBaseに変換します。The Java API aggregation operator is only a builder that takes the types of aggregations and the field positions, and used that information to parameterize the GroupReduce UDF that performs the aggregations.

オペレーションはGroupReduceオペレーションに変換されるため、オプティマイザと実行時の中でGroupReduceOperatorのように見えます。

独自の拡張オペレータの実装

データセットは独自のオペレータのためのメソッドを提供します: DataSet<X> runOperation(CustomUnaryOperation<T, X> operation). CustomUnaryOperation インタフェースは以下の二つの関数を使ってオペレータを定義します:

void setInput(DataSet<IN> inputData);
	
DataSet<OUT> createResult();

VertexCentricIteration オペレータはそのように実装されます。以下はcount()オペレータをどのように実装するかの例です。

public class Counter<T> implements CustomUnaryOperation<T, Long> {

    private DataSet<T> input;

    public void setInput(DataSet<IN> inputData) { this.input = inputData; }

    public DataSet<Long> createResult() {
        return input.map(...).reduce(...);
    }
}

CountOperatorは以下の方法で呼び出すことができます:

DataSet<String> lines = ...;
DataSet<Long> count = lines.runOperation(new Counter<String>());

新しい実行時のオペレータの実装

新しい実行時のオペレータを追加するには、APIから実行時までの完全なスタックの全体にわたっての変更を必要とします:

  • Java API
  • 共通のAPI
  • オプティマイザ
  • ランタイム

We start the description bottom up, at the example of the mapPartition() function, which is like a map function, but invoked only once per parallel partition.

ランタイム

ランタイム オペレーションはドライバー インタフェースを使って実装されます。インタフェースはランタイムへのオペレータを記述するメソッドを定義します。MapDriver はそれらのオペレータがどのように動作するかの簡単な例として提供します。

The runtime works with the MutableObjectIterator, which describes data streams with the ability to reuse objects, to reduce pressure on the garbage collector.

mapPartitionオペレータについて中心的なrun() メソッドの実装は以下のように見えるでしょう:

public void run() throws Exception {
    final MutableObjectIterator<IN> input = this.taskContext.getInput(0);
    final MapPartitionFunction<IN, OUT> function = this.taskContext.getStub();
    final Collector<OUT> output = this.taskContext.getOutputCollector();
    final TypeSerializer<IN> serializer = this.taskContext.getInputSerializer(0);

    // we assume that the UDF takes a java.util.Iterator, so we wrap the MutableObjectIterator
    Iterator<IN> iterator = new MutableToRegularIteratorWrapper(input, serializer);

    function.mapPartition(iterator, output);
}

効率を上げるために、オペレータのchainedバージョンを実装することがしばしば有益です。chained オペレータは同じスレッド内で先行するオペレータとして実行し、入れ子になった関数呼び出しを協調します。これはシリアライズ化/デシリアライズ化のオーバーヘッドを省くため、とても効率的です。

chained オペレータを実装する方法を学ぶには、MapDriver (通常) および ChainedMapDriver (chained 変種)を見てください。

オプティマイザ/コンパイラ

この章はオペレータを追加する重要なステップの最小限の議論を行います。もっと詳しい作業を最適化する方法については最適化 を見てください。オプティマイザが計画の中に新しいオペレータを含むことができるようにするには、それについてのもう少しの情報が必要です; 特に、以下の情報です:

  • DriverStrategy: オプティマイザが利用可能なようにするために、オペレーションがEnumに追加される必要があります。The parameters to the Enum entry define which class implements the runtime operator, its chained version, whether the operator accumulates records (and needs memory for that), and whether it requires a comparator (works on keys). 私たちの例では、エントリーを追加することができます ~~~ java MAP_PARTITION(MapPartitionDriver.class, null /* or chained variant */, PIPELINED, false) ~~~

  • コスト関数: クラスCostEstimator はオペレーションがシステムにどれほど高価であるかを知る必要があります。ここでのコストはオペレータの非UDF部分を参照します。オペレータは本質的に何もしない(レコード ストリームをUDFに転送する)ため、コストはゼロです。コストが計上されないようにMAP_PARTITION定数をMAP定数に似たswitch文に追加することでcostOperator(...)メソッドを変更します。

  • OperatorDescriptor: オペレータ ディスクリプタはオペレータがオプティマイザによってどのように扱われる必要があるかを定義します。They describe how the operation requires the input data to be (e.g., sorted or partitioned) and that way allows the optimizer to optimize the data movement, sorting, grouping in a global fashion. They do that by describing which RequestedGlobalProperties (partitioning, replication, etc) and which RequestedLocalProperties (sorting, grouping, uniqueness) the operator has, as well as how the operator affects the existing GlobalProperties and LocalProperties. 例えばオペレータの候補をインスタンス化するための2,3のユーティリティメソッドを定義します。mapPartition() 関数はとても単純(パーティショニング/グルーピング時に要求されるものがない)なため、ディスクリプタはとても単純です。例えば Hash Join 1, Hash Join 2, SortMerge Joinなどの他のオペレータはもっと複雑な要求を持ちます。以下のコードの例は(コメントを使って)MapPartitionOperatorのためのディスクリプタを生成する方法を説明します。

      public DriverStrategy getStrategy() {
          return MAP_PARTITION;
      }
    
      // Instantiate the operator with the strategy over the input given in the form of the Channel
      public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
          return new SingleInputPlanNode(node, "MapPartition", in, MAP_PARTITION);
      }
    
      // The operation accepts data with default global properties (arbitrary distribution)
      protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
          return Collections.singletonList(new RequestedGlobalProperties());
      }
    
      // The operation can accept data with any local properties. No grouping/sorting is necessary
      protected List<RequestedLocalProperties> createPossibleLocalProperties() {
          return Collections.singletonList(new RequestedLocalProperties());
      }
    
      // the operation itself does not affect the existing global properties.
      // The effect of the UDF's semantics// are evaluated separately (by interpreting the
      // semantic assertions)
      public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
          return gProps;
      }
    
      // since the operation can mess up all order, grouping, uniqueness, we cannot make any statements
      // about how local properties are preserved
      public LocalProperties computeLocalProperties(LocalProperties lProps) {
          return LocalProperties.EMPTY;
      }
  • OptimizerNode: optimizer ノードは全てを1つにする場所です。OperatorDescriptorsのリストを生成し、結果のデータセットのサイズを予想し、オペレーションに名前を割り当てます。それは比較的小さなクラスでおおよそMapNodeからコピーされることができます。

共通のAPI

高レベルAPIでオペレーションを使えるようにするためには、一般的なAPIに追加される必要があります。もっとも簡単な方法は基本オペレータに追加することです。MapOperatorBaseのパターンの後で、クラスMapPartitionOperatorBaseを生成します。

In addition, the optimizer needs to know which OptimizerNode how to create an OptimizerNode from the OperatorBase. これは、Optimizerの中のクラスGraphCreatingVisitorの中で起きます。

Note: A pending idea is to allow to skip this step by unifying the OptimizerNode and the Common API operator. それらは本質的に同じ機能を満たします。The Common API operator exists only in order for the flink-java and flink-scala packages to not have a dependency on the optimizer.

Java API

同じやり方でMapOperatorとして構築されているJava API オペレータを生成します。中核のメソッドはtranslateToDataFlow(...) メソッドで、これはJava APIオペレータのための一般的なAPIオペレータを生成します。

The final step is to add a function to the DataSet class:

public <R> DataSet<R> mapPartition(MapPartitionFunction<T, R> function) {
    return new MapPartitionOperator<T, R>(this, function);
}

このドキュメントは個々のコンポーネントの貢献者によって維持されます。コンポーネントに追加および変更を加えた人にいつかはこれらのドキュメントも更新するパッチあるいはプルリクエストを提供してくれるようにお願いします。

TOP
inserted by FC2 system