バッチの例

以下のプログラムの例は単純なワードカウントからグラフアルゴリズムまでのFlinkの異なるアプリケーションを紹介します。コードの見本はFlinkのデータセット APIを説明します。

以下の完全なソースコードともっと多くの例はFlinkソースレポジトリのflink-examples-batch あるいは flink-examples-streamingモジュールで見つかります。

例の実行

Flinkの例を実行するために、実行中のFlinkインスタンスが利用可能であると仮定します。ナビゲーション内の“クイックスタート” および “セットアップ” タブはFlinkを開始する様々な方法を説明します。

もっとも簡単な方法は、./bin/start-local.sh スクリプトを実行することで、これはローカルでジョブマネージャーを開始するでしょう。

Flinkの各バイナリリリースはこのページのそれぞれの例についてのjarファイルを持つexamples ディレクトリを含んでいます。

WordCount の例を実行するには、以下のコマンドを発行します:

./bin/flink run ./examples/batch/WordCount.jar

他の例も似たような方法で開始することができます。

組み込みのデータを使う事で、多くの例は引数を渡すこと無しに動きます。現実のデータを使ってWordCountを動かすには、データへのパスを渡す必要があります:

./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result

ローカルでは無いファイルシステムはhdfs://のようなスキーマのプリフィックスを必要とすることに注意してください。

ワードカウント

WordCount はビッグデータ処理システムの “Hello World” です。テキストのコレクション内の単語の頻度を計算します。アルゴリズムは二つのステップで動作します: 最初に、テキストは個々の単語に分割されます。次に、ワードはグループ化され数えられます。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<String> text = env.readTextFile("/path/to/file");

DataSet<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new Tokenizer())
        // group by the tuple field "0" and sum up tuple field "1"
        .groupBy(0)
        .sum(1);

counts.writeAsCsv(outputPath, "\n", " ");

// User-defined functions
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");

        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
            }   
        }
    }
}

WordCount の例 は入力パラメータを使って上で説明されたアルゴリズムを実装します: --input <path> --output <path>。テストデータとして、どのようなテキストファイルでも動くでしょう。

val env = ExecutionEnvironment.getExecutionEnvironment

// get input data
val text = env.readTextFile("/path/to/file")

val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  .map { (_, 1) }
  .groupBy(0)
  .sum(1)

counts.writeAsCsv(outputPath, "\n", " ")

WordCount の例 は入力パラメータを使って上で説明されたアルゴリズムを実装します: --input <path> --output <path>。テストデータとして、どのようなテキストファイルでも動くでしょう。

ページランク

ページランク アルゴリズムはリンクによって定義されるグラフ内でのページの “重要度” を計算します。リンクは1つのページから他のページを示します。それは繰り返しのグラフアルゴリズムで、そのことは同じ計算を繰り返し適用することを意味します。各繰り返しの中で、各ページは現在のランクを全ての隣に分配し、新しいランクを隣から受け取ったランクの重み付き合計として計算します。 ページランクのアルゴリズムはページの重要さを検索クエリの結果のランク付けに使うグーグル検索エンジンによって一般的になりました。

この簡単な例では、ページランクはbulk iterationと固定数の繰り返しを使って実装されます。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// read the pages and initial ranks by parsing a CSV file
DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
						   .types(Long.class, Double.class)

// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);

// set iterative data set
IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);

DataSet<Tuple2<Long, Double>> newRanks = iteration
        // join pages with outgoing edges and distribute rank
        .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
        // collect and sum ranks
        .groupBy(0).sum(1)
        // apply dampening factor
        .map(new Dampener(DAMPENING_FACTOR, numPages));

DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
        newRanks,
        newRanks.join(iteration).where(0).equalTo(0)
        // termination condition
        .filter(new EpsilonFilter()));

finalPageRanks.writeAsCsv(outputPath, "\n", " ");

// User-defined functions

public static final class JoinVertexWithEdgesMatch
                    implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>,
                                            Tuple2<Long, Double>> {

    @Override
    public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj,
                        Collector<Tuple2<Long, Double>> out) {
        Long[] neighbors = adj.f1;
        double rank = page.f1;
        double rankToDistribute = rank / ((double) neigbors.length);

        for (int i = 0; i < neighbors.length; i++) {
            out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute));
        }
    }
}

public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
    private final double dampening, randomJump;

    public Dampener(double dampening, double numVertices) {
        this.dampening = dampening;
        this.randomJump = (1 - dampening) / numVertices;
    }

    @Override
    public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
        value.f1 = (value.f1 * dampening) + randomJump;
        return value;
    }
}

public static final class EpsilonFilter
                implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {

    @Override
    public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
        return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
    }
}

ページランク プログラム は上の例を実装します。実行には以下のパラメータを必要とします: --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>

// User-defined types
case class Link(sourceId: Long, targetId: Long)
case class Page(pageId: Long, rank: Double)
case class AdjacencyList(sourceId: Long, targetIds: Array[Long])

// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// read the pages and initial ranks by parsing a CSV file
val pages = env.readCsvFile[Page](pagesInputPath)

// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
val links = env.readCsvFile[Link](linksInputPath)

// assign initial ranks to pages
val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))

// build adjacency list from link input
val adjacencyLists = links
  // initialize lists
  .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
  // concatenate lists
  .groupBy("sourceId").reduce {
  (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
  }

// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
  currentRanks =>
    val newRanks = currentRanks
      // distribute ranks to target pages
      .join(adjacencyLists).where("pageId").equalTo("sourceId") {
        (page, adjacent, out: Collector[Page]) =>
        for (targetId <- adjacent.targetIds) {
          out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
        }
      }
      // collect ranks and sum them up
      .groupBy("pageId").aggregate(SUM, "rank")
      // apply dampening factor
      .map { p =>
        Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
      }

    // terminate if no rank update was significant
    val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
      (current, next, out: Collector[Int]) =>
        // check for significant update
        if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
    }

    (newRanks, termination)
}

val result = finalRanks

// emit result
result.writeAsCsv(outputPath, "\n", " ")

ページランク プログラム は上の例を実装します。実行には以下のパラメータを必要とします: --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>

入力ファイルは平文のテキストファイルで以下のように整形されていなければなりません: 改行文字で分割された(long) IDで表されるページ* 例えば"1\n2\n12\n42\n63\n" はID 1, 2, 12, 42 および 63 を持つ5つのページを与えます。- リンクは空白文字で分割されたページIDのペアとして表されます。リンクは改行文字で分割されます: * 例えば "1 2\n2 12\n1 12\n42 63\n" は4つの(有向の)リンク (1)->(2), (2)->(12), (1)->(12) および (42)->(63) を与えます。

この簡単な実装について、各ページは少なくとも1つの入力と1つの出力のリンクを持ちます (ぺージは自分自身を指すかも知れません)。

連結コンポーネント

連結コンポーネントアルゴリズムは、同じく接続された部分の全ての頂点を同じコンポーネントIDに割り当てることで、接続された大きなグラフの部分を識別します。ページランクと似て、連結コンポーネントは繰り返しのアルゴリズムです。各ステップで、各頂点は現在のコンポーネントIDを全ての隣に広めます。頂点は、もしそれが自身のコンポーネントIDよりも小さければ、隣からコンポーネントIDを受け取ります。

この実装はデルタ 繰り返しを使います: コンポーネントIDが変更されなかった頂点は次のステップには参加しません。後の繰り返しは一般的に小数の飛び値のみを扱うため、これはとても良いパフォーマンスをもたらします。

// read vertex and edge data
DataSet<Long> vertices = getVertexDataSet(env);
DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());

// assign the initial component IDs (equal to the vertex ID)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());

// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
        verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);

// apply the step logic:
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
        // join with the edges
        .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
        // select the minimum neighbor component ID
        .groupBy(0).aggregate(Aggregations.MIN, 1)
        // update if the component ID of the candidate is smaller
        .join(iteration.getSolutionSet()).where(0).equalTo(0)
        .flatMap(new ComponentIdFilter());

// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);

// emit result
result.writeAsCsv(outputPath, "\n", " ");

// User-defined functions

public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {

    @Override
    public Tuple2<T, T> map(T vertex) {
        return new Tuple2<T, T>(vertex, vertex);
    }
}

public static final class UndirectEdge
                    implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
    Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();

    @Override
    public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
        invertedEdge.f0 = edge.f1;
        invertedEdge.f1 = edge.f0;
        out.collect(edge);
        out.collect(invertedEdge);
    }
}

public static final class NeighborWithComponentIDJoin
                implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {

    @Override
    public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
        return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
    }
}

public static final class ComponentIdFilter
                    implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
                                            Tuple2<Long, Long>> {

    @Override
    public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value,
                        Collector<Tuple2<Long, Long>> out) {
        if (value.f0.f1 < value.f1.f1) {
            out.collect(value.f0);
        }
    }
}

連結コンポーネント プログラム は上の例を実装します。実行には以下のパラメータを必要とします: --vertices <path> --edges <path> --output <path> --iterations <n>

// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// read vertex and edge data
// assign the initial components (equal to the vertex id)
val vertices = getVerticesDataSet(env).map { id => (id, id) }

// undirected edges by emitting for each input edge the input edges itself and an inverted
// version
val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }

// open a delta iteration
val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
  (s, ws) =>

    // apply the step logic: join with the edges
    val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
      (edge._2, vertex._2)
    }

    // select the minimum neighbor
    val minNeighbors = allNeighbors.groupBy(0).min(1)

    // update if the component of the candidate is smaller
    val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
      (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
        if (newVertex._2 < oldVertex._2) out.collect(newVertex)
    }

    // delta and new workset are identical
    (updatedComponents, updatedComponents)
}

verticesWithComponents.writeAsCsv(outputPath, "\n", " ")

連結コンポーネント プログラム は上の例を実装します。実行には以下のパラメータを必要とします: --vertices <path> --edges <path> --output <path> --iterations <n>

入力ファイルは平文のテキストファイルで以下のように整形されていなければなりません: IDで表され、改行文字で分割された頂点。* 例えば "1\n2\n12\n42\n63\n" は (1), (2), (12), (42) および (63) を持つ5つの頂点を与えます。- 辺は空白文字で区切られた頂点IDについてのペアとして表されます。辺は改行文字で分割されます: * 例えば "1 2\n2 12\n1 12\n42 63\n" は4つの(無向の)リンク (1)->(2), (2)->(12), (1)->(12) および (42)->(63) を与えます。

関係のクエリ

Relational Query の例は2つのテーブルを仮定します。1つはordersを持ち、もう1つはTPC-H decision support benchmarkで指定されたlineitems を持ちます。TPC-H はデータベース業界の標準的なベンチマークです。入力データを生成する方法は以下を見てください。

例は以下のSQLクエリを実装します。

SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
    FROM orders, lineitem
WHERE l_orderkey = o_orderkey
    AND o_orderstatus = "F"
    AND YEAR(o_orderdate) > 1993
    AND o_orderpriority LIKE "5%"
GROUP BY l_orderkey, o_shippriority;

上のクエリを実装するFlink プログラムは以下のように見えます。

// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
// get lineitem data set: (orderkey, extendedprice)
DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);

// orders filtered by year: (orderkey, custkey)
DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
        // filter orders
        orders.filter(
            new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() {
                @Override
                public boolean filter(Tuple5<Integer, String, String, String, Integer> t) {
                    // status filter
                    if(!t.f1.equals(STATUS_FILTER)) {
                        return false;
                    // year filter
                    } else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) {
                        return false;
                    // order priority filter
                    } else if(!t.f3.startsWith(OPRIO_FILTER)) {
                        return false;
                    }
                    return true;
                }
            })
        // project fields out that are no longer required
        .project(0,4).types(Integer.class, Integer.class);

// join orders with lineitems: (orderkey, shippriority, extendedprice)
DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders =
        ordersFilteredByYear.joinWithHuge(lineitems)
                            .where(0).equalTo(0)
                            .projectFirst(0,1).projectSecond(1)
                            .types(Integer.class, Integer.class, Double.class);

// extendedprice sums: (orderkey, shippriority, sum(extendedprice))
DataSet<Tuple3<Integer, Integer, Double>> priceSums =
        // group by order and sum extendedprice
        lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);

// emit result
priceSums.writeAsCsv(outputPath);

関連クエリプログラム は上のクエリを実装します。実行には以下のパラメータを必要とします: --orders <path> --lineitem <path> --output <path>.

近日公開…

関連クエリプログラム は上のクエリを実装します。実行には以下のパラメータを必要とします: --orders <path> --lineitem <path> --output <path>.

順番とlineitemのファイルはTPC-H benchmark スイートのデータ生成ツール (DBGEN)を使って生成することができます。提供されたFlinkプログラムのために任意のサイズの入力ファイルを生成するには、以下の手順に従ってください:

  1. DBGENをダウンロードし解凍します
  2. Makefileと呼ばれるmakefile.suite のコピーを生成し、以下の変更を実施します:
DATABASE = DB2
MACHINE  = LINUX
WORKLOAD = TPCH
CC       = gcc
  1. makeを使ってDBGENをビルドします
  2. dbgenを使ってlineitemと順番を生成します。スケール要素 1 (-s) は約1GBのサイズを持つデータセットを生成します。
./dbgen -T o -s 1
TOP
inserted by FC2 system