クイックススタート

上のFlinkのプログラムの例を取得し、2,3の簡単なステップで実行する。

FlinkはLinux, Mac OS X, and Windowsで動作します。Flinkを実行できるようにするための必要条件はJava 7.x (あるいは以上)のインストレーションのみです。Windows ユーザは、ローカルセットアップのためのWindows上でFlinkを実行する方法を説明しているWindows上のFlink ガイドを見てください。

以下のコマンドを実行することで、Javaの正しいインストレーションを調べることができます:

java -version

If you have Java 8, the output will look something like this:

java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)

ダウンロード

Clone the source code from one of our repositories, e.g.:

$ git clone https://github.com/apache/flink.git
$ cd flink
$ mvn clean package -DskipTests # this will take up to 10 minutes
$ cd build-target               # this is where Flink is installed to
$ ./bin/start-local.sh  # Start Flink

http://localhost:8081ジョブマネージャーのwebフロントエンドをチェックし、全てが立ち上がっていて実行中であることを確認します。webフロントエンドは1つの利用可能なタスクマネージャーインスタンスを報告するはずです。

ジョブマネージャー: 概要

You can also verify that the system is running by checking the log files in the logs directory:

$ tail log/flink-*-jobmanager-*.log
INFO ... - Starting JobManager
INFO ... - Starting JobManager web frontend
INFO ... - Web frontend listening at 127.0.0.1:8081
INFO ... - Registered TaskManager at 127.0.0.1 (akka://flink/user/taskmanager)

Read the Code

You can find the complete source code for this SocketWindowWordCount example in scala and java on GitHub.

object SocketWindowWordCount {

    def main(args: Array[String]) : Unit = {

        // the port to connect to
        val port: Int = try {
            ParameterTool.fromArgs(args).getInt("port")
        } catch {
            case e: Exception => {
                System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
                return
            }
        }

        // get the execution environment
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        // get input data by connecting to the socket
        val text = env.socketTextStream("localhost", port, '\n')

        // parse the data, group it, window it, and aggregate the counts
        val windowCounts = text
            .flatMap { w => w.split("\\s") }
            .map { w => WordWithCount(w, 1) }
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .sum("count")

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1)

        env.execute("Socket Window WordCount")
    }

    // Data type for words with count
    case class WordWithCount(word: String, count: Long)
}
public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // the port to connect to
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream("localhost", port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>() {
                @Override
                public void flatMap(String value, Collector<WordWithCount> out) {
                    for (String word : value.split("\\s")) {
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .reduce(new ReduceFunction<WordWithCount>() {
                @Override
                public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                    return new WordWithCount(a.word, a.count + b.count);
                }
            });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

Run the Example

Now, we are going to run this Flink application. It will read text from a socket and once every 5 seconds print the number of occurrences of each distinct word during the previous 5 seconds, i.e. a tumbling window of processing time, as long as words are floating in.

  • まず最初に、以下のようにローカルサーバを開始するためにnetcatを使います

    $ nc -l 9000
  • Flinkプログラムをサブミットします:

    $ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
    
    Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
    Using address 127.0.0.1:6123 to connect to JobManager.
    JobManager web interface address http://127.0.0.1:8081
    Starting execution of program
    Submitting job with JobID: 574a10c8debda3dccd0c78a3bde55e1b. Waiting for job completion.
    Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#297388688]
    11/04/2016 14:04:50     Job execution switched to status RUNNING.
    11/04/2016 14:04:50     Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED
    11/04/2016 14:04:50     Source: Socket Stream -> Flat Map(1/1) switched to DEPLOYING
    11/04/2016 14:04:50     Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to SCHEDULED
    11/04/2016 14:04:51     Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to DEPLOYING
    11/04/2016 14:04:51     Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to RUNNING
    11/04/2016 14:04:51     Source: Socket Stream -> Flat Map(1/1) switched to RUNNING

    プログラムはソケットに接続し入力を待ちます。期待した通りにジョブを実行しているかどうかを検証するためにwebインタフェースをチェックすることができます:

    ジョブマネージャー: 概要(cont'd)
    ジョブマネージャー: 実行中のジョブ
  • Words are counted in time windows of 5 seconds (processing time, tumbling windows) and are printed to stdout. Monitor the JobManager’s output file and write some text in nc (input is sent to Flink line by line after hitting ):

    $ nc -l 9000
    lorem ipsum
    ipsum ipsum ipsum
    bye

    The .out file will print the counts at the end of each time window as long as words are floating in, e.g.:

    $ tail -f log/flink-*-jobmanager-*.out
    lorem : 1
    bye : 1
    ipsum : 4

    完了時にFlinkを停止するには、以下をタイプします:

    $ ./bin/stop-local.sh

次のステップ

Check out some more examples to get a better feel for Flink’s programming APIs. それを完了して、ストリーミング ガイドに進んでください。

TOP
inserted by FC2 system