Table API Tutorial
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Table APIチュートリアル #

Apache Flinkは、バッチとストリーム処理用の統合されたリレーショナルAPIとしてTable APIを提供します。つまり、クエリは、制限のないリアルタイムストリームまたは制限のあるバッチデータセットに対して同じセマンティクスで実行され、同じ結果を生成します。FlinkのTable APIは、データ分析、データパイプライン、ETLアプリケーションの定義を明確にするために一般的に使われます。

何を構築するつもりですか? #

このチュートリアルで、pure Python Flink Table APIパイプラインをビルドする方法を学習します。 パイプラインは入力csvファイルからデータを読み取り、単語の頻出頻度を計算し、結果を出力ファイルに書き込みます。

必要条件 #

このウォークスルーは、Pythonにある程度知識があることを前提としていますが、別のプログラミング言語を使っている場合でも理解できるはずです。 また、SELECT句やGROUP BY句のような基本的なリレーショナル概念を理解していることを仮定します。

助けてください。行き詰まりました! #

行き詰った場合は、コミュニティサポートリソースを調べてください。 特に、Apache Flinkのuser mailing listは、あらゆるApacheプロジェクトの中で最も活発なプロジェクトの1つとして常にランク付けされており、すばやく助けを受けるのに最適な方法です。

How To Follow Along #

この手順に従って進めたい場合は、次の機能を備えたコンピュータが必要です:

  • Java 11
  • Python 3.8, 3.9, 3.10 or 3.11

Python Table APIを使うには、PyFlinkをインストールする必要があります。PyFlinkはPyPIで利用可能で、pipを使って簡単にインストールできます。

$ python -m pip install apache-flink

PyFlinkがインストールされると、Python Table APIジョブの作成に進むことができます。

Table APIアプリケーションは、table環境を宣言することから始まります。 これは、Flinkランタイムと対話するためのメインエントリポイントとして機能します。 再起動戦略、デフォルトの並列度などの実行パラメータの設定に使えます。 table設定により、Table API固有の設定を設定することができます。

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")

これで、ソースとシンクtableを作成できます:

t_env.create_temporary_table(
    'source',
    TableDescriptor.for_connector('filesystem')
        .schema(Schema.new_builder()
                .column('word', DataTypes.STRING())
                .build())
        .option('path', input_path)
        .format('csv')
        .build())
tab = t_env.from_path('source')

t_env.create_temporary_table(
    'sink',
    TableDescriptor.for_connector('filesystem')
        .schema(Schema.new_builder()
                .column('word', DataTypes.STRING())
                .column('count', DataTypes.BIGINT())
                .build())
        .option('path', output_path)
        .format(FormatDescriptor.for_format('canal-json')
                .build())
        .build())

TableEnvironment.execute_sql()メソッドを使って、DDLで定義されるソース/シンクtableも登録できます。

my_source_ddl = """
    create table source (
        word STRING
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '{}'
    )
""".format(input_path)

my_sink_ddl = """
    create table sink (
        word STRING,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'canal-json',
        'path' = '{}'
    )
""".format(output_path)

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

これは、sourceという名前のtableと、sinkという名前のtableを、table環境に登録します。 table sourceにはwordという1つの列のみがあり、input_pathで指定されるファイルから読み取られた文字列を消費します。 table sinkには、wordとcountの2つの列があり、output_pathで指定されたファイルへデータを書き込みます。

これで、table source入力を読み取り、いくつかの変換を実行し、結果をtable sinkに書き込みます。

最後に、実際のFlink Python Tableジョブを実行する必要があります。 ソース、変換、シンクの作成などの全てのオペレーションは遅延されます。 execute_insert(sink_name)が呼ばれた場合のみ、ジョブは実行のために送信されます。

@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):
    for s in line[0].split():
        yield Row(s)

# compute word count
tab.flat_map(split).alias('word') \
   .group_by(col('word')) \
   .select(col('word'), lit(1).count) \
   .execute_insert('sink') \
   .wait()

これまでの完全なコード:

import argparse
import logging
import sys

from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf

word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "Or to take arms against a sea of troubles,",
                   "And by opposing end them?--To die,--to sleep,--",
                   "No more; and by a sleep to say we end",
                   "The heartache, and the thousand natural shocks",
                   "That flesh is heir to,--'tis a consummation",
                   "Devoutly to be wish'd. To die,--to sleep;--",
                   "To sleep! perchance to dream:--ay, there's the rub;",
                   "For in that sleep of death what dreams may come,",
                   "When we have shuffled off this mortal coil,",
                   "Must give us pause: there's the respect",
                   "That makes calamity of so long life;",
                   "For who would bear the whips and scorns of time,",
                   "The oppressor's wrong, the proud man's contumely,",
                   "The pangs of despis'd love, the law's delay,",
                   "The insolence of office, and the spurns",
                   "That patient merit of the unworthy takes,",
                   "When he himself might his quietus make",
                   "With a bare bodkin? who would these fardels bear,",
                   "To grunt and sweat under a weary life,",
                   "But that the dread of something after death,--",
                   "The undiscover'd country, from whose bourn",
                   "No traveller returns,--puzzles the will,",
                   "And makes us rather bear those ills we have",
                   "Than fly to others that we know not of?",
                   "Thus conscience does make cowards of us all;",
                   "And thus the native hue of resolution",
                   "Is sicklied o'er with the pale cast of thought;",
                   "And enterprises of great pith and moment,",
                   "With this regard, their currents turn awry,",
                   "And lose the name of action.--Soft you now!",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]


def word_count(input_path, output_path):
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    # write all the data to one file
    t_env.get_config().set("parallelism.default", "1")

    # define the source
    if input_path is not None:
        t_env.create_temporary_table(
            'source',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .build())
                .option('path', input_path)
                .format('csv')
                .build())
        tab = t_env.from_path('source')
    else:
        print("Executing word_count example with default input data set.")
        print("Use --input to specify file input.")
        tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
                                  DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))

    # define the sink
    if output_path is not None:
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .column('count', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format(FormatDescriptor.for_format('canal-json')
                        .build())
                .build())
    else:
        print("Printing result to stdout. Use --output to specify output path.")
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('print')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .column('count', DataTypes.BIGINT())
                        .build())
                .build())

    @udtf(result_types=[DataTypes.STRING()])
    def split(line: Row):
        for s in line[0].split():
            yield Row(s)

    # compute word count
    tab.flat_map(split).alias('word') \
        .group_by(col('word')) \
        .select(col('word'), lit(1).count) \
        .execute_insert('sink') \
        .wait()
    # remove .wait if submitting to a remote cluster, refer to
    # https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
    # for more details


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    word_count(known_args.input, known_args.output)

この例はコマンドラインで実行できます:

$ python word_count.py

このコマンドはローカルのミニクラスタでPython Table APIプログラムをビルドして実行します。 Python Table APIプログラムをリモートクラスタに送信することもできます。詳細についてはジョブの送信の例を参照してください。

最後に、以下のような実行悔過kが表示されます:

+I[To, 1]
+I[be,, 1]
+I[or, 1]
+I[not, 1]
...

これにより、独自のFlink Python Table APIプログラムの作成を開始できるようになります。 その他の例については、 PyFlink Examples を参照することもできます。 Python Table APIについてより学習したい場合は、詳細については Flink Python API Docs を参照してください。

inserted by FC2 system