Connectors
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

コネクタ #

このページでは、PyFlinkでコネクタを使う方法を説明し、FlinkコネクタをPythonプログラムで使う時に注意すべき詳細を強調します。

注意 一般的なコネクタの情報と一般的な設定については、対応するJava/Scalaドキュメントを参照してください。

コネクタとフォーマットjarをダウンロードする #

FlinkはJava/Scalaベースのプロジェクトであるため、コネクタとフォーマットの両方について、実装はジョブ依存関係として指定する必要があるjarとして利用できます。

table_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")

コネクタの使い方 #

PyFlinkのTable APIでは、DDLがソースとシンクを定義する推奨方法で、TableEnvironmentexecute_sql()メソッドを介して実行されます。 これにより、アプリケーションがtableを使えるようになります。

source_ddl = """
        CREATE TABLE source_table(
            a VARCHAR,
            b INT
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'source_topic',
          'properties.bootstrap.servers' = 'kafka:9092',
          'properties.group.id' = 'test_3',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json'
        )
        """

sink_ddl = """
        CREATE TABLE sink_table(
            a VARCHAR
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'sink_topic',
          'properties.bootstrap.servers' = 'kafka:9092',
          'format' = 'json'
        )
        """

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

t_env.sql_query("SELECT a FROM source_table") \
    .execute_insert("sink_table").wait()

以下は、Kafkaソース/シンクとJSONフォーマットをPyFlinkで使う方法の完全な例です。

from pyflink.table import TableEnvironment, EnvironmentSettings

def log_processing():
    env_settings = EnvironmentSettings.in_streaming_mode()
    t_env = TableEnvironment.create(env_settings)
    # specify connector and format jars
    t_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")
    
    source_ddl = """
            CREATE TABLE source_table(
                a VARCHAR,
                b INT
            ) WITH (
              'connector' = 'kafka',
              'topic' = 'source_topic',
              'properties.bootstrap.servers' = 'kafka:9092',
              'properties.group.id' = 'test_3',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'json'
            )
            """

    sink_ddl = """
            CREATE TABLE sink_table(
                a VARCHAR
            ) WITH (
              'connector' = 'kafka',
              'topic' = 'sink_topic',
              'properties.bootstrap.servers' = 'kafka:9092',
              'format' = 'json'
            )
            """

    t_env.execute_sql(source_ddl)
    t_env.execute_sql(sink_ddl)

    t_env.sql_query("SELECT a FROM source_table") \
        .execute_insert("sink_table").wait()


if __name__ == '__main__':
    log_processing()

事前定義されたソースとシンク #

一部のデータソースとシンクはFlinkに組み込まれており、すぐに使えます。 これらの事前定義されたデータ祖素にはPandas DataFrameからの読み込み、またはコレクションからのデータの取り込みが含まれます。 事前定義されたデータシンクは、Pandas DataFrameへの書き込みをサポートします。

Pandasとのやり取り #

PyFlink TablesはPandas DataFrameとの間の変換をサポートします。

from pyflink.table.expressions import col

import pandas as pd
import numpy as np

# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)

# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.to_pandas()

from_elements() #

from_elements()を使って要素のコレクションからtableを作成できます。要素の型はT、許容されるアトミックな型か、許容される複合型である必要があります。

from pyflink.table import DataTypes

table_env.from_elements([(1, 'Hi'), (2, 'Hello')])

# use the second parameter to specify custom field names
table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b'])

# use the second parameter to specify a custom table schema
table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
                        DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
                                       DataTypes.FIELD("b", DataTypes.STRING())]))

上記のクエリは以下のようなTableを返します:

+----+-------+
| a  |   b   |
+====+=======+
| 1  |  Hi   |
+----+-------+
| 2  | Hello |
+----+-------+

ユーザ定義のソースとシンク #

場合によっては、独自のソースとシンクを定義することが必要になる場合があります。現在、ソースとシンクはJava/Scalaで実装する必要がありますが、TableFactoryを地祇してDDL経由での使用をサポートできます。 詳細については、Java/Scala documentationを参照してください。

inserted by FC2 system