繰り返しのアルゴリズムは機械学習 あるいは グラフ解析のようなデータ解析の多くのドメインで現れます。そのようなアルゴリズムは意味のある情報を出たから抽出するためにビッグデータの見どころを理解するために極めて重要です。とても大きなデータセット上でそのような種類のアルゴリズムを実行することへの興味の増大により、大規模に並行形式で繰り返しを実行する必要があります。
Flink プログラムはステップ関数の定義およびそれを特別な繰り返しオペレータに組み込むことで、繰り返しのアルゴリズムを実装します。このオペレータには2つの種類があります: Iterate とDelta Iterate。両方のオペレータはある終了条件に達するまで現在の繰り返し状態上でステップ関数を起動します。
ここで、両方のオペレータの変種の背景とそれらの使用法の概要について説明します。プログラミングガイド はScalaとJavaの両方でオペレータを実装する方法を説明します。Flinkのグラフ処理API Gelly を使って、vertex-centric および gather-sum-apply イテレーションの両方もサポートします。
以下の表は両方のオペレータの概要を提供します:
Iterate | Delta Iterate | |
---|---|---|
繰り返しの入力 | 部分解答 | 作業セット と 解答セット |
ステップ関数 | 任意のデータフロー | |
状態の更新 | 次の部分解答 |
|
繰り返しの結果 | 最後の部分解答 | 最後の繰り返しの後での解答セットの状態 |
終了 |
|
|
イテレート オペレータ は 単純な形式のイテレーションをカバーします: 各イテレーションでは、ステップ関数 が 入力全体 (以前の繰り返しの結果、あるいは初期データセット)を消費し、部分解答の次のバージョンを計算します ( map
, reduce
, join
など)。
map
, reduce
, join
などのようなオペレータから成る任意のデータフローで、すぐに使える特定のタスクに依存します。繰り返しのための終了条件を指定するために複数のオプションがあります:
疑似コードで繰り返しオペレータについて想像するかも知れません:
IterationState state = getInitialState();
while (!terminationCriterion()) {
state = step(state);
}
setFinalState(state);
以下の例で、セットの数を繰り返し増加します:
1
から 5
)からなります。map
オペレータです。これは整数値フィールドをi
から i+1
まで増やします。入力の各レコードに適用されるでしょう。11
から 15
になるでしょう。// 1st 2nd 10th
map(1) -> 2 map(2) -> 3 ... map(10) -> 11
map(2) -> 3 map(3) -> 4 ... map(11) -> 12
map(3) -> 4 map(4) -> 5 ... map(12) -> 13
map(4) -> 5 map(5) -> 6 ... map(13) -> 14
map(5) -> 6 map(6) -> 7 ... map(14) -> 15
1, 2 と 4 は任意のデータフローかもしれないことに注意してください。
delta イテレート オペレータ は増分イテレーションの場合をカバーします。増分イテレーションはそれらの解の要素を選択的に修正し、完全に再計算するのではなく、解を導き出します。
適用可能な場合、結果セットの各要素は各繰り返しの中で変更を設定しないため、より効率的なアルゴリズムに繋がります。これにより、解の重要な部分に注目し、重要では無い部分に触れないことができます。しばしば、解の大部分は比較的早くに落ち着き、後の繰り返しはデータの小さな部分セット上でのみ操作します。
map
, reduce
, join
などのようなオペレータから成る任意のデータフローで、すぐに使える特定のタスクに依存します。deltaイテレーションのデフォルトの終了条件は空の作業セットの収束条件 および繰り返しの最大数によって指定されます。繰り返しは生成された次の作業セットが空になった時か、あるいは繰り返しの最大数に達した時に終了するでしょう。独自のアグリゲータ および 収束条件を指定することも可能です。
疑似コードで繰り返しオペレータについて想像するかも知れません:
IterationState workset = getInitialState();
IterationState solution = getInitialSolution();
while (!terminationCriterion()) {
(delta, workset) = step(workset, solution);
solution.update(delta)
}
setFinalState(solution);
以下の例では、各頂点はID とcoloringを持ちます。各頂点はその頂点IDを隣接する頂点に広めるでしょう。目的は最小のIDをサブグラフ内の各頂点に割り当てることです。もし受け取ったIDが現在のものより小さい場合は、受け取ったIDを使って頂点の色に変更します。これの1つのアプリケーションがcommunity analysis あるいは connected components 計算で見つかるでしょう。
初期入力 は 作業セットと結果セットの両方として設定されます。上の図では、色が結果セットの進化を図化します。各繰り返しとともに、最小IDの色はそれぞれのサブグラフの中で広がります。同時に、(頂点IDの交換および比較)の作業の量が各繰り返しとともに減ります。これは作業セットのサイズの減少に対応します。3つの繰り返し、これは繰り返しの終了条件です、の後で、7つ全ての頂点から0に変わります。The important observation is that the lower subgraph converges before the upper half does and the delta iteration is able to capture this with the workset abstraction.
上のサブグラフ ID 1 (オレンジ) は、最小 IDです。最初の繰り返しの中で、頂点 2 が広まり、その後、色をオレンジに変えるでしょう。頂点 3と4はID 2 (黄色) を現在の最小IDとして受け取り、黄色に変わるでしょう。現在の頂点 1 の色は最初の繰り返しで変わらなかったため、次の作業セットではスキップされるかも知れません。
下方のサブグラフでは、ID 5 (藍色) が 最小 IDです。下方のサブグラフの全ての頂点は最初の繰り返しの中でそれを受け取るでしょう。この場合もやはり、次の作業セットでは変更されなかった頂点 (頂点 5) はスキップされるかも知れません。
2回目の繰り返しの中で、作業セットのサイズはすでに7つから5つ(頂点 2, 3, 4, 6 と 7)の要素に減っています。These are part of the iteration and further propagate their current minimum IDs. この繰り返しの後で、下方のサブグラフは作業セットに要素が無いため、それはすでに収束しています(グラフのcold part)。一方で、上半分は2つの残っている作業要素(頂点3と4)のために、更に繰り返しが必要です(グラフのhot part)。
3回目の繰り返しの後で、作業セットが空になった時に、繰り返しが終了します。
イテレーション オペレータのステップ関数の各実行を1つのイテレーションとして参照しました。並行する構成の中で、イテレーション状態の異なるパーティション上でステップ関数の複数のインスタンスが並行して評価されます。In many settings, one evaluation of the step function on all parallel instances forms a so called superstep, which is also the granularity of synchronization. 従って、イテレーションの全ての並行タスクは次のスーパーステップが初期化される前にスーパーステップを完了する必要があります。終了条件 もスーパーステップ境界で評価されるでしょう。