Vectorized User-defined Functions
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

ベクトル化ユーザ定義関数 #

ベクトル化Pythonユーザ定義関数は、JVMとPython VM間で要素のバッチをArrow形式で転送することによって実行される関数です。 ベクトル化Pythonユーザ定義関数のパフォーマンスは、シリアル化/逆シリアル化のオーバーヘッドと呼び出しのオーバーヘッドが大幅に削減されるため、通常、非ベクトル化Pythonユーザ定義関s縫うよりも遥かに高くなります。さらに、Pandas、Numpyなどの一般的なPythonライブラリを利用して、ベクトル化Pythonユーザ定義関数を実装できます。 これらのPythonライブラリは高度に最適化されており、高性能のデータ構造と関数を提供します。ベクトル化ユーザ定義関数を定義する方法については、非ベクトル化ユーザ定義関数と同様の方法を提供します。 デコレータudfまたはudafに追加された特別なパラメータfunc_type="pandas"を追加して、ベクトル化ユーザ定義関数としてマークするだけで済みます。

注意: Python UDFの実行には、PyFlink がインストールされたPythonバージョン(3.8、3.9、3.10)が必要です。これはクライアント側とクラスタ側の両方で必要です。

ベクトル化スカラー関数 #

ベクトル化Pythonスカラー関数は、入力としてpandas.Seriesを受け付け、出力として同じ長さのpandas.Seriesを返します。 内部的には、Flinkは入力要素をバッチに分割し、入力要素のバッチをPandas.Seriesに変換してから、にゅりょく要素の各バッチごとにユーザ定義のベクトル化スカラー関数を呼び出します。バッチサイズを設定する方法についての詳細は、設定オプションのpython.fn-execution.arrow.batch.sizeを参照してください。

ベクトル化Pythonスカラー関数は、非ベクトル化Pythonスカラー関数が使える場所であればどこでも使えます。

以下の例は、2つのカラムの合計を計算し、それをクエリ内で使う独自のベクトル化Pythonスカラー関数を定義する方法を示します:

from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col
from pyflink.table.udf import udf

@udf(result_type='BIGINT', func_type="pandas")
def add(i, j):
  return i + j

settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(settings)

# use the vectorized Python scalar function in Python Table API
my_table.select(add(col("bigint"), col("bigint")))

# use the vectorized Python scalar function in SQL API
table_env.create_temporary_function("add", add)
table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")

ベクトル化集計関数 #

ベクトル化Python集計関数は入力として1つ以上のpandas.Seriesを受け付け、出力として1つのスカラー値を返します。

注意 返り値の型は、現時点では、RowTypeMapTypeをサポートしていません。

ベクトル化Python集計関数は、GroupBy集計(Batch)、GroupByウィンドウ集計(バッチとストリーム)、Overウィンドウ集計(バッチとウィンドウを超えて境界付けられたストリーム)で使えます。集計の使用法の詳細については、関連ドキュメントを参照してください。

注意 Pandas UDAFは部分的な集計をサポートしていません。さらに、グループまたはウィンドウの全てのデータは実行時に同時にメモリにロードされるため、グループまたはウィンドウがメモリ内に収まることを確認する必要があります。

以下の例は、平均を計算する独自のベクトル化集計関数を定義し、それをGroupBy集計GroupByウィンドウ集計Overウィンドウ集計で使う方法を示しています:

from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col, lit
from pyflink.table.udf import udaf
from pyflink.table.window import Tumble

@udaf(result_type='FLOAT', func_type="pandas")
def mean_udaf(v):
    return v.mean()

settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(settings)

my_table = ...  # type: Table, table schema: [a: String, b: BigInt, c: BigInt]

# use the vectorized Python aggregate function in GroupBy Aggregation
my_table.group_by(col('a')).select(col('a'), mean_udaf(col('b')))


# use the vectorized Python aggregate function in GroupBy Window Aggregation
tumble_window = Tumble.over(lit(1).hours) \
            .on(col("rowtime")) \
            .alias("w")

my_table.window(tumble_window) \
    .group_by(col("w")) \
    .select(col('w').start, col('w').end, mean_udaf(col('b')))

# use the vectorized Python aggregate function in Over Window Aggregation
table_env.create_temporary_function("mean_udaf", mean_udaf)
table_env.sql_query("""
    SELECT a,
        mean_udaf(b)
        over (PARTITION BY a ORDER BY rowtime
        ROWS BETWEEN UNBOUNDED preceding AND UNBOUNDED FOLLOWING)
    FROM MyTable""")

ベクトル化Python集計関数を定義するには、様々な方法があります。 以下の例は、入力パラメータとしてbigintの2つのカラムを受け取り、結果としてそれらの最大値の合計を返す、様々な方法を示しています。

from pyflink.table.udf import AggregateFunction, udaf

# option 1: extending the base class `AggregateFunction`
class MaxAdd(AggregateFunction):

    def open(self, function_context):
        mg = function_context.get_metric_group()
        self.counter = mg.add_group("key", "value").counter("my_counter")
        self.counter_sum = 0

    def get_value(self, accumulator):
        # counter
        self.counter.inc(10)
        self.counter_sum += 10
        return accumulator[0]

    def create_accumulator(self):
        return []

    def accumulate(self, accumulator, *args):
        result = 0
        for arg in args:
            result += arg.max()
        accumulator.append(result)

max_add = udaf(MaxAdd(), result_type='BIGINT', func_type="pandas")

# option 2: Python function
@udaf(result_type='BIGINT', func_type="pandas")
def max_add(i, j):
  return i.max() + j.max()

# option 3: lambda function
max_add = udaf(lambda i, j: i.max() + j.max(), result_type='BIGINT', func_type="pandas")

# option 4: callable function
class CallableMaxAdd(object):
  def __call__(self, i, j):
    return i.max() + j.max()

max_add = udaf(CallableMaxAdd(), result_type='BIGINT', func_type="pandas")

# option 5: partial function
def partial_max_add(i, j, k):
  return i.max() + j.max() + k
  
max_add = udaf(functools.partial(partial_max_add, k=1), result_type='BIGINT', func_type="pandas")
inserted by FC2 system