This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
オペレータ #
オペレータは、1つ以上のDataStreamを新しいDataStreamに変換します。プログラムは、複数の変換を組み合わせて、洗練されたデータフロートポロジを作成できます。
データストリームの変換 #
FlinkのDataStreamプログラムはデータストリーム上の変換を実装する通常のプログラムです(例えば、マッピング、フィルタリング、リダクション)。利用可能なPython DataStream APIの概要については、operatorsを参照してください。
機能 #
変換は変換の機能を定義するための入力としてユーザ定義関数を受け入れます。 以下のセクションでは、Python DataStream APIでPythonユーザ定義関数を定義する様々な方法について説明します。
Functionインタフェースの実装 #
Python DataStream APIの様々な変換には、様々なFuntionインタフェースが提供されています。例えば、MapFunction
はmap
変換のために提供され、FilterFunction
はfilter
変換のために提供されています。
ユーザは変換の型に応じて、対応するFunctionインタフェースを実装できます。MapFunctionを例に取り上げてみましょう:
# Implementing MapFunction
class MyMapFunction(MapFunction):
def map(self, value):
return value + 1
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
Lambda Function #
以下の例で示すように、変換は、変換の機能を定義するためにlambda関数を受け付けることもできます:
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())
注意 ConnectedStream.map()
とConnectedStream.flat_map()
はlamda関数をサポートしていないため、別々にCoMapFunction
とCoFlatMapFunction
を受け付ける必要があります。
Python Function #
ユーザはPython関数を使って変換の機能を定義することもできます:
def my_map_func(value):
return value + 1
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())
出力の型 #
ユーザはPython DataStream APIで変換の型情報を明示的に指定することができます。指定しない場合は、出力はデフォルトでTypes.PICKLED_BYTE_ARRAY
になり、結果データはpickleシリアライザを使ってシリアライズ化されます。
pickleシリアライザの詳細については、Pickle Serialization.を参照してください。
一般的に、出力の型は次のシナリオで指定される必要があります。
DataStreamからTableへの変換 #
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
def data_stream_api_demo():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.execute_sql("""
CREATE TABLE my_source (
a INT,
b VARCHAR
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
""")
ds = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.INT(), Types.STRING()]))
def split(s):
splits = s[1].split("|")
for sp in splits:
yield s[0], sp
ds = ds.map(lambda i: (i[0] + 1, i[1])) \
.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
.key_by(lambda i: i[1]) \
.reduce(lambda i, j: (i[0] + j[0], i[1]))
t_env.execute_sql("""
CREATE TABLE my_sink (
a INT,
b VARCHAR
) WITH (
'connector' = 'print'
)
""")
table = t_env.from_data_stream(ds)
table_result = table.execute_insert("my_sink")
# 1)ジョブが終了するまで待機し、ローカル実行でのみ使われます。それ以外の場合はジョブが実行されたままスクリプトが終了する可能性があります
# 2)ジョブをYARN、スタンドアローン、K8sなどのリモートクラスタにでタッチモードで送信する場合、削除する必要があります
table_result.wait()
if __name__ == '__main__':
data_stream_api_demo()
出力の型は、上記の例のflap mapオペレーションに指定する必要があります。これはreduceオペレーションの出力型として暗黙的に使われます。t_env.from_data_stream(ds)
では、ds
の出力タイプが複合型である必要があるからです。
DataStreamのシンクへの書き込み #
from pyflink.common.typeinfo import Types
def split(s):
splits = s[1].split("|")
for sp in splits:
yield s[0], sp
ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])) \
.sink_to(...)
一般的に、シンクが特殊な種類のデータ、例えばRowなど、のみを受け付ける場合、上記のmapオペレーションに対して出力の型を指定する必要があります。
オペレータのチェーン #
デフォルトでは、シリアライズ化と逆シリアライズ化を回避してパフォーマンスを向上させるために、複数の非シャッフルPython関数がチェーンされます。チェーンを無効にしたい場合もあります。例えば、入力要素ごとに多数の要素を生成するflatmap
関数があり、チェーンを無効にすると異なる並列度で出力を処理できるようになります。
オペレータのチェーンは、以下のいずれかの方法で無効にすることができます:
- 現在のオペレータの後に、
key_by
オペレーション、shuffle
オペレーション、rescale
オペレーション、rebalance
オペレーション、partition_custom
オペレーションを追加することで、直後のオペレータとのチェーンを無効にします。 - 現在のオペレータに
start_new_chain
オペレーションを適用して前のオペレータとのチェーンを無効にします。 - 現在のオペレータに
disable_chaining
オペレーションを適用して、前後のオペレータとのチェーンを無効にします。 - 異なる並列度または異なるスロット共有グループを設定して、2つのオペレータのチェーンを無効にします。
- 設定
python.operator-chaining.enabled
を介して、全てのオペレータを無効にすることもできます。
Python Functionsのバンドル #
Python functionsがmain()
関数が定義されているファイル外にある場合は、非ローカルモードでPython functionsを実行するには、設定オプションpython-files
を使ってPython functionsをバンドルすることを強くお勧めします。
そうしなければ、my_function.py
というファイル内のPython functions関数を定義すると、ModuleNotFoundError: No module named 'my_function'
が発生する可能性があります。
Python Functionsでのリソースのロード #
最初にPython functions内にいくつかのリソースをロードし、その後リソースをリロードすることなく計算を繰り返し実行するシナリオがあります。 例えば、大規模な深層学習モデルを1回だけロードし、そのモデルに対してバッチ予測を複数回実行したい場合があります。
基本クラスFunction
から継承されたopen
メソッドを上書きすることが、まさに必要なことです。
class Predict(MapFunction):
def open(self, runtime_context: RuntimeContext):
import pickle
with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f)
def eval(self, x):
return self.model.predict(x)