This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
FAQ #
このページでは、PyFlinkユーザに対するよくある質問の解決策について説明します。
Python仮想環境の準備 #
便利なスクリプトをダウンロードして、Mac OSとほとんどのLinuxディストリビューションで使えるPython仮想環境のzipを準備できます。 PyFlinkバージョンを指定して、対応するPyFlinkバージョンに必要なPython仮想環境を生成できます。それ以外の場合は、最新バージョンがインストールされます。
$ sh setup-pyflink-virtual-env.sh
Python仮想環境でPyFlinkジョブを実行 #
前のセクションで説明したように、python仮想環境を設定した後で、PyFlinkジョブを実行する前に環境をアクティブ化する必要があります。
ローカル #
# activate the conda python virtual environment
$ source venv/bin/activate
$ python xxx.py
クラスタ #
$ # specify the Python virtual environment
$ table_env.add_python_archive("venv.zip")
$ # specify the path of the python interpreter which is used to execute the python UDF workers
$ table_env.get_config().set_python_executable("venv.zip/venv/bin/python")
add_python_archive
とset_python_executable
の使用法の詳細については、関連ドキュメントを参照してください。
Jarファイルの追加 #
PyFlinkジョブはjarファイル、つまりコネクタ、Java UDFsなどに依存する場合があります。 以下のPython Table APIを使うか、ジョブの送信時にコマンドライン引数を介して直接指定できます。
# 注意: ローカルファイルURL("file:"から始まる)のみがサポートされます。
table_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
# 注意: パスはプロトコル(例えば"file")を指定し、URLがクライアントとクラスタの両方でアクセス可能であることを確認する必要があります。
table_env.get_config().set("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
Java依存関係を追加するAPIの詳細については、the relevant documentationを参照してください。
Pythonファイルの追加 #
コマンドライン引数pyfs
やTableEnvironment
のAPI add_python_file
を使って、pythonファイル、pythonパッケージまたはローカルディレクトリであるpythonファイル依存関係を追加できます。
例えば、以下の階層構造を持つmyDir
という名前のディレクトリがあるとします:
myDir
├──utils
├──__init__.py
├──my_util.py
以下のように、ディレクトリmyDir
のPythonファイルを追加できます:
table_env.add_python_file('myDir')
def my_udf():
from utils import my_util
ミニクラスタでジョブを実行する場合は、ジョブが終了するまで待ちます #
ミニクラスタでジョブを実行する場合(例えば、IDEでジョブを実行する場合)、ジョブで以下のAPI(例えば、Python Table APIでのTableEnvironment.execute_sql、StatementSet.executeなど; Python DataStream APIでのStreamExecutionEnvironment.execute_async)を使う場合、APIは非同期のため、明示的にジョブの実行を待機することを忘れないでください。 それ以外の場合、ジョブの実行が完了する前にプログラムが終了してしまうため、実行結果が見つからない場合があります。その方法については、以下の例を参照してください:
# execute SQL / Table API query asynchronously
t_result = table_env.execute_sql(...)
t_result.wait()
# execute DataStream Job asynchronously
job_client = stream_execution_env.execute_async('My DataStream Job')
job_client.get_job_execution_result().result()
注意: リモートclusterでジョブを実行する場合、ジョブの実行を待つ必要はありません。そのため、リモートクラスタでジョブを実行する場合はこれらのコードを忘れずに削除してください。