DataStream API Tutorial
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

DataStream APIチュートリアル #

Apache Flinkは、堅牢でステートフルなストリーミングアプリケーションを構築するためのDataStream APIを提供します。状態と時間をきめ細かく制御できるため、高度なイベント駆動型システムの実装が可能になります。このステップバイステップガイドでは、PyFlinkとDataStream APIを使って単純なストリーミングアプリケーションを構築する方法を学習します。

何を構築するつもりですか? #

このチュートリアルで、単純なPython DataStreamパイプラインを書く方法を学習します。 パイプラインはcsvファイルからデータを読み取り、単語の出現頻度を計算し、結果を出力ファイルへ書き込みます。

必要条件 #

このウォークスルーは、Pythonにある程度知識があることを前提としていますが、別のプログラミング言語を使っている場合でも理解できるはずです。

助けてください。行き詰まりました! #

行き詰った場合は、コミュニティサポートリソースを調べてください。 特に、Apache Flinkのuser mailing listは、あらゆるApacheプロジェクトの中で最も活発なプロジェクトの1つとして常にランク付けされており、すばやく助けを受けるのに最適な方法です。

How To Follow Along #

この手順に従って進めたい場合は、次の機能を備えたコンピュータが必要です:

  • Java 11
  • Python 3.8, 3.9, 3.10 or 3.11

Python DataStream APIを使うには、PyFlinkをインストールする必要があります。PyFlinkはPyPIで利用可能で、pipを使って簡単にインストールできます。

$ python -m pip install apache-flink

PyFlinkがインストールされると、Python DataStreamジョブの作成に進むことができます。

DataStream APIアプリケーションは、ストリーミングプログラムが実行されるコンテキストである実行環境(StreamExecutionEnvironment)を宣言することから始まります。これはジョブのプロパティ(例えば、デフォルトの並列度、再起動戦略)を設定し、ソースを作成し、最後にジョブの実行をトリガーするために使われます。

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)

StreamExecutionEnvironmentが作成されると、それを使って_source_を宣言できます。ソースは、Apache Kafka、Rabbit MQ、Apache Pulsarのような外部システムからデータをFlink Jobsに取り込みます。

物事を簡単にするために、このウォークスルーではファイルからデータを読み込むソースを使います。

ds = env.from_source(
    source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
                                               input_path)
                     .process_static_file_set().build(),
    watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
    source_name="file_source"
)

これで、このデータストリームに対して変換を実行したり、_sink_を使って外部システムにデータを書き込んだりできるようになります。このウォークスルーではFileSinkシンクコネクタを使ってデータをファイルに書き込みます。

ds.sink_to(
    sink=FileSink.for_row_format(
        base_path=output_path,
        encoder=Encoder.simple_string_encoder())
    .with_output_file_config(
        OutputFileConfig.builder()
        .with_part_prefix("prefix")
        .with_part_suffix(".ext")
        .build())
    .with_rolling_policy(RollingPolicy.default_rolling_policy())
    .build()
)

def split(line):
    yield from line.split()

# compute word count
ds = ds.flat_map(split) \
       .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
       .key_by(lambda i: i[0]) \
       .reduce(lambda i, j: (i[0], i[1] + j[1]))

最後のステップは、実際のPyFlink DataStream APIジョブを実行することです。PyFlinkアプリケーションは遅延ビルドされ、完全に形成された後でのみ実行するためにクラスタに送信されます。アプリケーションを実行するには、env.execute()を呼び出すだけです。

env.execute()

これまでの完全なコード:

import argparse
import logging
import sys

from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy


word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "Or to take arms against a sea of troubles,",
                   "And by opposing end them?--To die,--to sleep,--",
                   "No more; and by a sleep to say we end",
                   "The heartache, and the thousand natural shocks",
                   "That flesh is heir to,--'tis a consummation",
                   "Devoutly to be wish'd. To die,--to sleep;--",
                   "To sleep! perchance to dream:--ay, there's the rub;",
                   "For in that sleep of death what dreams may come,",
                   "When we have shuffled off this mortal coil,",
                   "Must give us pause: there's the respect",
                   "That makes calamity of so long life;",
                   "For who would bear the whips and scorns of time,",
                   "The oppressor's wrong, the proud man's contumely,",
                   "The pangs of despis'd love, the law's delay,",
                   "The insolence of office, and the spurns",
                   "That patient merit of the unworthy takes,",
                   "When he himself might his quietus make",
                   "With a bare bodkin? who would these fardels bear,",
                   "To grunt and sweat under a weary life,",
                   "But that the dread of something after death,--",
                   "The undiscover'd country, from whose bourn",
                   "No traveller returns,--puzzles the will,",
                   "And makes us rather bear those ills we have",
                   "Than fly to others that we know not of?",
                   "Thus conscience does make cowards of us all;",
                   "And thus the native hue of resolution",
                   "Is sicklied o'er with the pale cast of thought;",
                   "And enterprises of great pith and moment,",
                   "With this regard, their currents turn awry,",
                   "And lose the name of action.--Soft you now!",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]


def word_count(input_path, output_path):
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    # write all the data to one file
    env.set_parallelism(1)

    # define the source
    if input_path is not None:
        ds = env.from_source(
            source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
                                                       input_path)
                             .process_static_file_set().build(),
            watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
            source_name="file_source"
        )
    else:
        print("Executing word_count example with default input data set.")
        print("Use --input to specify file input.")
        ds = env.from_collection(word_count_data)

    def split(line):
        yield from line.split()

    # compute word count
    ds = ds.flat_map(split) \
        .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
        .key_by(lambda i: i[0]) \
        .reduce(lambda i, j: (i[0], i[1] + j[1]))

    # define the sink
    if output_path is not None:
        ds.sink_to(
            sink=FileSink.for_row_format(
                base_path=output_path,
                encoder=Encoder.simple_string_encoder())
            .with_output_file_config(
                OutputFileConfig.builder()
                .with_part_prefix("prefix")
                .with_part_suffix(".ext")
                .build())
            .with_rolling_policy(RollingPolicy.default_rolling_policy())
            .build()
        )
    else:
        print("Printing result to stdout. Use --output to specify output path.")
        ds.print()

    # submit for execution
    env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    word_count(known_args.input, known_args.output)

PyFlinkプログラムを定義したので、作成したプログラムをコマンドラインで実行できます:

$ python word_count.py

このコマンドはPyFlinkプログラムをローカルminiクラスタでPyFlinkプログラムをビルドして実行します。あるいは、Job Submission Examplesで詳しく説明されている手順を使ってリモートクラスタに送信できます。

最後に、以下のような実行悔過kが表示されます:

(a,5)
(Be,1)
(Is,1)
(No,2)
...

このウォークスルーでは、独自のPyFlink DataStream APIプログラムの作成を開始するための基礎を提供します。 その他の例については、 PyFlink Examples を参照することもできます。 Python DataStream APIについてより学習したい場合は、詳細については Flink Python API Docs を参照してください。

inserted by FC2 system