This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
コネクタ #
このページでは、PyFlinkでコネクタを使う方法を説明し、FlinkコネクタをPythonプログラムで使う時に注意すべき詳細を強調します。
注意 一般的なコネクタの情報と一般的な設定については、対応するJava/Scalaドキュメントを参照してください。
コネクタとフォーマットjarをダウンロードする #
FlinkはJava/Scalaベースのプロジェクトであるため、コネクタとフォーマットの両方について、実装はジョブ依存関係として指定する必要があるjarとして利用できます。
table_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")
コネクタの使い方 #
PyFlinkのTable APIでは、DDLがソースとシンクを定義する推奨方法で、TableEnvironment
のexecute_sql()
メソッドを介して実行されます。
これにより、アプリケーションがtableを使えるようになります。
source_ddl = """
CREATE TABLE source_table(
a VARCHAR,
b INT
) WITH (
'connector' = 'kafka',
'topic' = 'source_topic',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'test_3',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
sink_ddl = """
CREATE TABLE sink_table(
a VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'sink_topic',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.sql_query("SELECT a FROM source_table") \
.execute_insert("sink_table").wait()
以下は、Kafkaソース/シンクとJSONフォーマットをPyFlinkで使う方法の完全な例です。
from pyflink.table import TableEnvironment, EnvironmentSettings
def log_processing():
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
# specify connector and format jars
t_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")
source_ddl = """
CREATE TABLE source_table(
a VARCHAR,
b INT
) WITH (
'connector' = 'kafka',
'topic' = 'source_topic',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'test_3',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
sink_ddl = """
CREATE TABLE sink_table(
a VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'sink_topic',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.sql_query("SELECT a FROM source_table") \
.execute_insert("sink_table").wait()
if __name__ == '__main__':
log_processing()
事前定義されたソースとシンク #
一部のデータソースとシンクはFlinkに組み込まれており、すぐに使えます。 これらの事前定義されたデータ祖素にはPandas DataFrameからの読み込み、またはコレクションからのデータの取り込みが含まれます。 事前定義されたデータシンクは、Pandas DataFrameへの書き込みをサポートします。
Pandasとのやり取り #
PyFlink TablesはPandas DataFrameとの間の変換をサポートします。
from pyflink.table.expressions import col
import pandas as pd
import numpy as np
# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)
# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.to_pandas()
from_elements() #
from_elements()
を使って要素のコレクションからtableを作成できます。要素の型はT、許容されるアトミックな型か、許容される複合型である必要があります。
from pyflink.table import DataTypes
table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
# use the second parameter to specify custom field names
table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b'])
# use the second parameter to specify a custom table schema
table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
DataTypes.FIELD("b", DataTypes.STRING())]))
上記のクエリは以下のようなTableを返します:
+----+-------+
| a | b |
+====+=======+
| 1 | Hi |
+----+-------+
| 2 | Hello |
+----+-------+
ユーザ定義のソースとシンク #
場合によっては、独自のソースとシンクを定義することが必要になる場合があります。現在、ソースとシンクはJava/Scalaで実装する必要がありますが、TableFactory
を地祇してDDL経由での使用をサポートできます。
詳細については、Java/Scala documentationを参照してください。