Overview
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

オペレータ #

オペレータは、1つ以上のDataStreamを新しいDataStreamに変換します。プログラムは、複数の変換を組み合わせて、洗練されたデータフロートポロジを作成できます。

データストリームの変換 #

FlinkのDataStreamプログラムはデータストリーム上の変換を実装する通常のプログラムです(例えば、マッピング、フィルタリング、リダクション)。利用可能なPython DataStream APIの概要については、operatorsを参照してください。

機能 #

変換は変換の機能を定義するための入力としてユーザ定義関数を受け入れます。 以下のセクションでは、Python DataStream APIでPythonユーザ定義関数を定義する様々な方法について説明します。

Functionインタフェースの実装 #

Python DataStream APIの様々な変換には、様々なFuntionインタフェースが提供されています。例えば、MapFunctionmap変換のために提供され、FilterFunctionfilter変換のために提供されています。 ユーザは変換の型に応じて、対応するFunctionインタフェースを実装できます。MapFunctionを例に取り上げてみましょう:

# Implementing MapFunction
class MyMapFunction(MapFunction):
    
    def map(self, value):
        return value + 1
        
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())

Lambda Function #

以下の例で示すように、変換は、変換の機能を定義するためにlambda関数を受け付けることもできます:

data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())

注意 ConnectedStream.map()ConnectedStream.flat_map()はlamda関数をサポートしていないため、別々にCoMapFunctionCoFlatMapFunctionを受け付ける必要があります。

Python Function #

ユーザはPython関数を使って変換の機能を定義することもできます:

def my_map_func(value):
    return value + 1

data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())

出力の型 #

ユーザはPython DataStream APIで変換の型情報を明示的に指定することができます。指定しない場合は、出力はデフォルトでTypes.PICKLED_BYTE_ARRAYになり、結果データはpickleシリアライザを使ってシリアライズ化されます。 pickleシリアライザの詳細については、Pickle Serialization.を参照してください。

一般的に、出力の型は次のシナリオで指定される必要があります。

DataStreamからTableへの変換 #

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment


def data_stream_api_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env)

    t_env.execute_sql("""
            CREATE TABLE my_source (
              a INT,
              b VARCHAR
            ) WITH (
              'connector' = 'datagen',
              'number-of-rows' = '10'
            )
        """)

    ds = t_env.to_append_stream(
        t_env.from_path('my_source'),
        Types.ROW([Types.INT(), Types.STRING()]))

    def split(s):
        splits = s[1].split("|")
        for sp in splits:
            yield s[0], sp

    ds = ds.map(lambda i: (i[0] + 1, i[1])) \
           .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
           .key_by(lambda i: i[1]) \
           .reduce(lambda i, j: (i[0] + j[0], i[1]))

    t_env.execute_sql("""
            CREATE TABLE my_sink (
              a INT,
              b VARCHAR
            ) WITH (
              'connector' = 'print'
            )
        """)

    table = t_env.from_data_stream(ds)
    table_result = table.execute_insert("my_sink")

    # 1)ジョブが終了するまで待機し、ローカル実行でのみ使われます。それ以外の場合はジョブが実行されたままスクリプトが終了する可能性があります
    # 2)ジョブをYARN、スタンドアローン、K8sなどのリモートクラスタにでタッチモードで送信する場合、削除する必要があります
    table_result.wait()


if __name__ == '__main__':
    data_stream_api_demo()

出力の型は、上記の例のflap mapオペレーションに指定する必要があります。これはreduceオペレーションの出力型として暗黙的に使われます。t_env.from_data_stream(ds)では、dsの出力タイプが複合型である必要があるからです。

DataStreamのシンクへの書き込み #

from pyflink.common.typeinfo import Types

def split(s):
    splits = s[1].split("|")
    for sp in splits:
        yield s[0], sp

ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])) \
  .sink_to(...)

一般的に、シンクが特殊な種類のデータ、例えばRowなど、のみを受け付ける場合、上記のmapオペレーションに対して出力の型を指定する必要があります。

オペレータのチェーン #

デフォルトでは、シリアライズ化と逆シリアライズ化を回避してパフォーマンスを向上させるために、複数の非シャッフルPython関数がチェーンされます。チェーンを無効にしたい場合もあります。例えば、入力要素ごとに多数の要素を生成するflatmap関数があり、チェーンを無効にすると異なる並列度で出力を処理できるようになります。

オペレータのチェーンは、以下のいずれかの方法で無効にすることができます:

  • 現在のオペレータの後に、key_byオペレーション、shuffleオペレーション、rescaleオペレーション、rebalanceオペレーション、partition_customオペレーションを追加することで、直後のオペレータとのチェーンを無効にします。
  • 現在のオペレータにstart_new_chainオペレーションを適用して前のオペレータとのチェーンを無効にします。
  • 現在のオペレータにdisable_chainingオペレーションを適用して、前後のオペレータとのチェーンを無効にします。
  • 異なる並列度または異なるスロット共有グループを設定して、2つのオペレータのチェーンを無効にします。
  • 設定python.operator-chaining.enabledを介して、全てのオペレータを無効にすることもできます。

Python Functionsのバンドル #

Python functionsがmain()関数が定義されているファイル外にある場合は、非ローカルモードでPython functionsを実行するには、設定オプションpython-filesを使ってPython functionsをバンドルすることを強くお勧めします。 そうしなければ、my_function.pyというファイル内のPython functions関数を定義すると、ModuleNotFoundError: No module named 'my_function'が発生する可能性があります。

Python Functionsでのリソースのロード #

最初にPython functions内にいくつかのリソースをロードし、その後リソースをリロードすることなく計算を繰り返し実行するシナリオがあります。 例えば、大規模な深層学習モデルを1回だけロードし、そのモデルに対してバッチ予測を複数回実行したい場合があります。

基本クラスFunctionから継承されたopenメソッドを上書きすることが、まさに必要なことです。

class Predict(MapFunction):
    def open(self, runtime_context: RuntimeContext):
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def eval(self, x):
        return self.model.predict(x)
inserted by FC2 system