Dependency Management
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

依存性の管理 #

Python APIプログラム内で依存関係を使うには要件があります。例えば、ユーザはPythonのユーザ定義関数でサードパーティのPythonライブラリを使う必要がある場合があります。 さらに、機械学習予想などのシナリオでは、ユーザはPythonユーザ定義関数の中に機械学習モデルをロードしたい場合があります。

PyFlinkジョブをローカルで実行する場合、ユーザはサードパーティPythonライブラリをローカルのPython環境にインストールし、機械学習モデルをローカルにダウンロードすることができます。 ただし、このやり方はユーザがPyFlinkジョブをリモートクラスタに送信する場合にはうまく動作しません。 次のセクションでは、これらの要件に対してPyFlinkで提供されるオプションを紹介します。

注意 Python DataStream APIとPython Table APIはどちらも依存関係の種類ごとにAPIを提供しています。Python DataStream APIとPython Table APIを混合して使う場合、Python DataStream APIを介して依存関係を指定し、Python DataStream APIとPython Table APIの両方で機能するようにする必要があります。

JARの依存関係 #

サードパーティのJARを使う場合は、以下のようにPython Table APIでJARを指定できます:

# Specify a list of jar URLs via "pipeline.jars". The jars are separated by ";"
# and will be uploaded to the cluster.
# NOTE: Only local file URLs (start with "file://") are supported.
table_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")

# It looks like the following on Windows:
table_env.get_config().set("pipeline.jars", "file:///E:/my/jar/path/connector.jar;file:///E:/my/jar/path/udf.jar")

# Specify a list of URLs via "pipeline.classpaths". The URLs are separated by ";" 
# and will be added to the classpath during job execution.
# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the URLs are accessible on both the client and the cluster.
table_env.get_config().set("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")

or in the Python DataStream API as following:

# Use the add_jars() to add local jars and the jars will be uploaded to the cluster.
# NOTE: Only local file URLs (start with "file://") are supported.
stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")

# It looks like the following on Windows:
stream_execution_environment.add_jars("file:///E:/my/jar/path/connector1.jar", "file:///E:/my/jar/path/connector2.jar")

# Use the add_classpaths() to add the dependent jars URLs into the classpath.
# The URLs will also be added to the classpath of both the client and the cluster.
# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the 
# URLs are accessible on both the client and the cluster.
stream_execution_environment.add_classpaths("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")

あるいは、ジョブを送信する時にコマンドライン引数 --jarfileを介します。

注意 コマンドライン引数--jarfileを使って1つのjarファイルの指定のみがサポートされるため、複数のjarファイルがある場合はfat jarをビルドする必要があります。

Pythonの依存関係 #

Pythonライブラリ #

Pythonのユーザ定義関数でサードパーティのPythonライブラリを使う必要がある場合があります。 Pythonライブラリを指定するには複数の方法があります。

以下のようにPython Table APIを使ってコード内でそれらを指定できます:

table_env.add_python_file(file_path)

あるいは、次のようにPython DataStream APIを使います:

stream_execution_environment.add_python_file(file_path)

設定python.filesや、ジョブを送信する時にコマンドライン引数 -pyfsまたは--pyFilesを指定して、Pythonライブラリを指定することもできます。

注意 Pythonライブラリは、ローカルファイルまたはローカルディレクトリである可能性があります。Python UDFワーカーのPYTHONPATHに追加されます。

requirements.txt #

サードパーティのPython依存関係を定義するrequirements.txtファイルを指定することもできます。 これらのPython依存関係は、作業ディレクトリにインストールされ、Python UDFワーカーのPYTHONPATHに追加されます。

以下のようにrequirements.txtを手動で準備できます:

echo numpy==1.16.5 >> requirements.txt
echo pandas==1.0.0 >> requirements.txt

あるいは、現在のPython環境にインストールされている全てのパッケージをリストするpip freezeを使います:

pip freeze > requirements.txt

requirements.txtファイルの内容は以下のようになります:

numpy==1.16.5
pandas==1.0.0

不要なエントリを削除したり、余分なエントリを追加したりすることで、手動で編集できます。

requirements.txtファイルは、次のようにPython Table APIを使ってコード内で指定できます:

# requirements_cache_dir is optional
table_env.set_python_requirements(
    requirements_file_path="/path/to/requirements.txt",
    requirements_cache_dir="cached_dir")

あるいは、次のようにPython DataStream APIを使います:

# requirements_cache_dir is optional
stream_execution_environment.set_python_requirements(
    requirements_file_path="/path/to/requirements.txt",
    requirements_cache_dir="cached_dir")

注意 クラスタ内でアクセスできなかった依存関係については、パラメータrequirements_cached_dirを使ってこれらの依存関係のインストールパッケージを含むディレクトリを指定できます。クラスタにアップロードしてオフラインインストールをサポートします。以下のように、requirements_cache_dirを準備できます:

pip download -d cached_dir -r requirements.txt --no-binary :all:

注意 準備されたパッケージがクラスタのプラットフォームおよび使用しているPythonのバージョンと一致していることを確認してください。

設定python.requirementsを使ったり、ジョブを送信する時に コマンドライン引数を介して、requirements.txtファイルを指定できます。

or via command line arguments -pyreq or --pyRequirements when submitting the job.

注意 pipを使ってrequirements.txtファイルに指定されたパッケージがインストールされるため、pip (version >= 20.3)とsetuptools (version >= 37.0.0)が利用可能であることを確認してください。

アーカイブ #

アーカイブファイルを指定することもできます。アーカイブファイルを使って独自のPython仮想環境、データファイルなどを指定できます。

以下のようにして、Python Table APIを使って、コード内でアーカイブファイルを指定できます:

table_env.add_python_archive(archive_path="/path/to/archive_file", target_dir=None)

あるいは、次のようにPython DataStream APIを使います:

stream_execution_environment.add_python_archive(archive_path="/path/to/archive_file", target_dir=None)

注意 引数target_dirはオプションです。指定した場合、アーカイブファイルは実行時にtarget_dirという指定された名前のディレクトリに抽出されます。 それ以外の場合、アーカイブファイルはアーカイブファイル名と同じ名前のディレクトリに抽出されます。

以下のように、アーカイブファイルを指定したとします:

table_env.add_python_archive("/path/to/py_env.zip", "myenv")

次に、次のようにPythonユーザ定義関数でアーカイブファイルの内容にアクセスできます:

def my_udf():
    with open("myenv/py_env/data/data.txt") as f:
        ...

パラメータtarget_dirを指定しなかった場合:

table_env.add_python_archive("/path/to/py_env.zip")

次に、次のようにPythonユーザ定義関数でアーカイブファイルの内容にアクセスできます:

def my_udf():
    with open("py_env.zip/py_env/data/data.txt") as f:
        ...

注意 アーカイブファイルはPython UDFワーカーの作業ディレクトリに抽出されるため、相対パスを使ってアーカイブファイルの中のファイルにアクセスできます。

設定python.archivesや、送信時にコマンドライン引数-pyarchまたは--pyArchivesを介して、アーカイブファイルを指定することもできます。

注意 アーカイブファイルにPython仮想環境を含む場合、Python仮想環境がクラスタが実行されているプラットフォームと一致していることを確認してください。

注意 現在、zipファイル(つまり、zip、jar、whl、eggなど)と、tarファイル(つまり、tar、tar.gz、tgz)がサポートされています。

Pythonインタプリター #

Pythonワーカーを実行するPythonインタプリターのパスの指定をサポートします。

以下のように、Python Table APIを使ってコード内でPythonインタプリターを指定できます:

table_env.get_config().set_python_executable("/path/to/python")

あるいは、次のようにPython DataStream APIを使います:

stream_execution_environment.set_python_executable("/path/to/python")

アーカイブ内でPythonインタプリタを使うこともサポートされています。

# Python Table API
table_env.add_python_archive("/path/to/py_env.zip", "venv")
table_env.get_config().set_python_executable("venv/py_env/bin/python")

# Python DataStream API
stream_execution_environment.add_python_archive("/path/to/py_env.zip", "venv")
stream_execution_environment.set_python_executable("venv/py_env/bin/python")

設定python.executableや、ジョブの送信時にコマンドライン引数-pyexecまたは--pyExecutableを介してPythonインタプリタを指定することもできます。

注意 PythonインタプリタのパスがPythonアーカイブファイルを参照する場合、絶対パスではなく相対パスを使う必要があります。

クライアントのPythonインタプリタ #

ジョブのコンパイル時にPythonユーザ定義関数を解析するために、クライアント側でPythonが必要です。 compiling the job.

現在のセッションでアクティブにすることで、クライアント側で使われる独自のPythonインタプリタを指定できます。

source my_env/bin/activate

あるいは、設定python.client.executableコマンドライン引数 -pyclientexecまたは--pyClientExecutable、環境変数PYFLINK_CLIENT_EXECUTABLEを使って指定します。

Java/ScalaプログラムでPython依存関係を指定する方法 #

Java Table APIプログラムやpure SQLプログラムで、Pythonのユーザ定義関数の使用もサポートされています。 以下のコードはJava Table APIプログラムでPythonユーザ定義関数を使う方法の簡単な例を示しています: Java Table API program:

import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

TableEnvironment tEnv = TableEnvironment.create(
    EnvironmentSettings.inBatchMode());
tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM, 1);

// register the Python UDF
tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python");

tEnv.createTemporaryView("source", tEnv.fromValues(1L, 2L, 3L).as("a"));

// use Python UDF in the Java Table API program
tEnv.executeSql("select add_one(a) as a from source").collect();

SQLステートメントを使ってPythonユーザ定義関数を作成する方法の詳細については、CREATE FUNCTIONについてのSQLステートメントを参照してください。

The Python dependencies could then be specified via the Python config options, such as python.archives, python.files, python.requirements, python.client.executable, python.executable. etc or through command line arguments when submitting the job.

inserted by FC2 system