This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
ユーザ定義関数 #
PyFlink Table APIを使って、ユーザはPythonのユーザ定義関数を使ったデータ変換を行うことができます
現在、2種類のPythonユーザ定義関数をサポートします: 一度に1行のデータを処理する一般的なPythonユーザ定義関数と、一度に1つのバッチがデータを処理するベクトル化されたPythonユーザ定義関数。
UDFのバンドル #
非ローカルモードでPython UDF(およびPandas UDF)を実行するには、Python UDFがmain()
関数が定義されているファイルの外にある場合は、設定オプションpython-files
を使ってPython UDF定義をバンドルすることを強くお勧めします。
そうしなければ、Python UDFをmy_udf.py
というファイルで定義すると、ModuleNotFoundError: No module named 'my_udf'
が発生する場合があります。
UDFへのリソースのロード #
最初にUDFにいくつかのリソースをロードし、次にリソースを再ロードすることなく計算(つまりeval
)を繰り返し実行したい場合のシナリオがあります。
例えば、大規模な深層学習モデルを1回だけロードし、そのモデルに対してバッチ予測を複数回実行したい場合があります。
UserDefinedFunction
のopen
メソッドをオーバライドすることが、まさに必要なことです。
class Predict(ScalarFunction):
def open(self, function_context):
import pickle
with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f)
def eval(self, x):
return self.model.predict(x)
predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
Accessing job parameters #
open()
メソッドは、メトリクスグループ、グローバルジョブパラメータのようなユーザ定義関数が実行されるコンテキストに関する情報を含むFunctionContext
を提供します。
以下の情報は、FunctionContext
の対応するメソッドを呼び出すことで取得できます:
メソッド | 説明 |
---|---|
get_metric_group() |
この並列サブタスクのメトリクスグループ。 |
get_job_parameter(name, default_value) |
指定されたキーに関連付けられたグローバルジョブパラメータ。 |
class HashCode(ScalarFunction):
def open(self, function_context: FunctionContext):
# access the global "hashcode_factor" parameter
# "12" would be the default value if the parameter does not exist
self.factor = int(function_context.get_job_parameter("hashcode_factor", "12"))
def eval(self, s: str):
return hash(s) * self.factor
hash_code = udf(HashCode(), result_type=DataTypes.INT())
TableEnvironment t_env = TableEnvironment.create(...)
t_env.get_config().set('pipeline.global-job-parameters', 'hashcode_factor:31')
t_env.create_temporary_system_function("hashCode", hash_code)
t_env.sql_query("SELECT myField, hashCode(myField) FROM MyTable")
ユーザ定義関数のテスト #
以下のようにPythonユーザ定義関数を定義したとします:
add = udf(lambda i, j: i + j, result_type=DataTypes.BIGINT())
これを単体テストするには、._func
を使って元のPython関数を抽出し、次にそれを単体テストする必要があります:
f = add._func
assert f(1, 2) == 3