TableEnvironment
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

TableEnvironment #

このドキュメントはPyFlink TableEnvironmentの紹介です。 これには、TableEnvironmentクラスの全てのパブリックなインタフェースの詳細な説明が含まれています。

TableEnvironmentの作成 #

TableEnvironmentを作成する推奨方法は、EnvironmentSettingsオブジェクトから作成することです:

from pyflink.common import Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment

# create a streaming TableEnvironment
config = Configuration()
config.set_string('execution.buffer-timeout', '1 min')
env_settings = EnvironmentSettings \
    .new_instance() \
    .in_streaming_mode() \
    .with_configuration(config) \
    .build()

table_env = TableEnvironment.create(env_settings)

あるいは、ユーザは既存のStreamExecutionEnvironmentからStreamTableEnvironmentを作成して、DataStream APIと相互運用することができます。

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

# create a streaming TableEnvironment from a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)

TableEnvironment API #

Table/SQLオペレーション #

これらのAPIはTable API/SQL tableを作成/削除し、クエリを書くのに使われます:

API 説明 ドキュメント
from_elements(elements, schema=None, verify_schema=True) 要素のコレクションからtableを作成します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.from_elements" name="link">}}
from_pandas(pdf, schema=None, split_num=1) pandas DataFrameからtableを作成します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.from_pandas" name="link">}}
from_path(path) 指定されたパスの下に登録されたtableからtableを作成します。例えば、create_temporary_viewを介して登録されたtable。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.from_path" name="link">}}
create_temporary_view(view_path, table) SQL一時ビューと同様の一時ビューとして、`Table`オブジェクトを登録します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.create_temporary_view" name="link">}}
drop_temporary_view(view_path) 指定されたパスの下に登録されている一時ビューを削除します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.drop_temporary_view" name="link">}}
drop_temporary_table(table_path) 指定されたパスの下に登録されている一時tableを削除します。 このインタフェースを使って一時ソースtableと一時シンクtableを削除できます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.drop_temporary_table" name="link">}}
execute_sql(stmt) 指定された単一のステートメントを実行し、実行結果を返します。 ステートメントには、DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USEがあります。

"INSERT INTO"ステートメントの場合、これは非同期オペレーションであることに注意してください。これは通常、ジョブをリモートクラスタに送信する時に実行されます。 ただし、ミニクラスタやIDEでジョブを実行する場合、ジョブの実行が終了するまで待つ必要があります。詳細については、}}#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster">ここを参照してください。
SQLステートメントの詳細については、}}">SQLドキュメントを参照してください。
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.execute_sql" name="link">}}
sql_query(query) SQLクエリを評価し、結果を`Table`オブジェクトとして取得します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.sql_query" name="link">}}

非推奨のAPI

API 説明 ドキュメント
from_table_source(table_source) tableソースからtableを作成します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.from_table_source" name="link">}}
scan(*table_path) カタログから登録されたtableをスキャンし、結果のtableを返します。 from_pathで置き換えることができます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.scan" name="link">}}
register_table(name, table) `Table`オブジェクトを一意の名前でTableEnvironmentカタログに登録します。 登録されたtableはSQLクエリで参照できます。 create_temporary_viewで置き換えることができます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.register_table" name="link">}}
register_table_source(name, table_source) 外部`TableSource`をTableEnvironmentカタログに登録します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.register_table_source" name="link">}}
register_table_sink(name, table_sink) 外部`TableSink`をTableEnvironmentカタログに登録します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.register_table_sink" name="link">}}
insert_into(target_path, table) `Table`オブジェクトの内容をシンクtableに書き込むように指示します。 このインタフェースはジョブの実行をトリガーしないことに注意してください。 ジョブを実行す類は、"execute"メソッドを呼び出す必要があります。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.insert_into" name="link">}}
sql_update(stmt) INSERT、UPDATE、DELETEのようなSQLステートメントやDDLステートメントを評価します。 execute_sqlで置き換えることができます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.sql_update" name="link">}}

ジョブの実行/explain #

これらのAPIを使ってジョブをexplain/実行します。API execute_sqlもジョブを実行するために使うことができます。

API 説明 ドキュメント
explain_sql(stmt, *extra_details) 指定されたステートメントのASTと実行プランを返します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.explain_sql" name="link">}}
create_statement_set() DMLステートメントやtableを受け入れるStatementSetインスタンスを作成します。 複数のシンクjobを実行するために使うことができます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.create_statement_set" name="link">}}

非推奨のAPI

API 説明 ドキュメント
explain(table=None, extended=False) 指定されたTable APIとSQLクエリのASTと、指定された`Table`オブジェクトまたはマルチシンクプランの結果を計算する実行プランを返します。 the result of the given `Table` object or multi-sinks plan. insert_intosql_updateメソッドを使ってデータを複数のシンクに発行する場合は、このメソッドを使ってプランんを取得することができます。 method to get the plan. TableEnvironment.explain_sqlTable.explainStatementSet.explainで置き換えることができます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.explain" name="link">}}
execute(job_name) プログラムの時実行をトリガーします。この環境でプログラムの全ての部分を実行します。 insert_intosql_updateメソッドをつあkってシンクにデータを発行する場合、このメソッドを使ってプログラムの実行をトリガーできます。 method trigger the program execution. このメソッドはジョブが終了/中止/失敗するまで、クライアントプログラムをブロックします。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.execute" name="link">}}

ユーザ定義関数を作成/削除します。 #

これらのAPIを使って、UDFを登録したり、登録されたUDFを削除します。 API execute_sqlを使ってUDFを登録/削除できることに注意してください。 様々な種類のUDFの詳細については、ユーザ定義関数を参照してください。

API 説明 ドキュメント
create_temporary_function(path, function) Pythonのユーザ定義関数クラスを一時カタログ関数として登録します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.create_temporary_function" name="link">}}
create_temporary_system_function(name, function) Pythonのユーザ定義関数を一時システム関数として登録します。 一時システム関数の名前が一時カタログ関数の名前と同じ場合、 一時システム関数が優先されます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.create_temporary_system_function" name="link">}}
create_java_function(path, function_class_name, ignore_if_exists=None) Javaのユーザ定義関数クラスをカタログ関数として指定されたパスに登録します。 カタログが永続的な場合、登録されたカタログ関数は複数のFlinkセッションとクラスタに渡って使えます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.create_java_function" name="link">}}
create_java_temporary_function(path, function_class_name) Javaのユーザ定義関数クラスを一時カタログ関数として登録します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.create_java_temporary_function" name="link">}}
create_java_temporary_system_function(name, function_class_name) Javaのユーザ定義関数クラスを一時システム関数として登録します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.create_java_temporary_system_function" name="link">}}
drop_function(path) 指定されたパスの下に登録されているカタログ関数を削除します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.drop_function" name="link">}}
drop_temporary_function(path) 指定された名前で登録された一時システム関数を削除します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.drop_temporary_function" name="link">}}
drop_temporary_system_function(name) 指定された名前で登録された一時システム関数を削除します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.drop_temporary_system_function" name="link">}}

非推奨のAPI

API 説明 ドキュメント
register_function(name, function) Pythonのユーザ定義関数を一意の名前で登録します。 この名前の既存のユーザ定義関数を置き換えます。 create_temporary_system_functionで置き換えることができます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.register_function" name="link">}}
register_java_function(name, function_class_name) Javaのユーザ定義関数を一意の名前で登録します。 この名前の既存のユーザ定義関数を置き換えます。 create_java_temporary_system_function.で置き換えることができます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.register_java_function" name="link">}}

依存性の管理 #

これらのAPIを使ってPython UDFに必要なPython依存関係を管理します。 詳細については、依存関係の管理ドキュメントを参照してください。

API 説明 ドキュメント
add_python_file(file_path) Pythonファイル、Pythonパッケージ、ローカルディレクトリなどのPython依存関係を追加します。 Python UDFワーカーのPYTHONPATHに追加されます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.add_python_file" name="link">}}
set_python_requirements(requirements_file_path, requirements_cache_dir=None) サードパーティの依存関係を定義するrequirements.txtファイルを指定します。 これらの依存関係は一時ディレクトリにインストールされ、Python UDFワーカーのPYTHONPATHに追加されます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.set_python_requirements" name="link">}}
add_python_archive(archive_path, target_dir=None) Pythonのアーカイブファイルを追加します。ファイルはPython UDFワーカーの作業ディレクトリに抽出されます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.add_python_archive" name="link">}}

設定 #

API 説明 ドキュメント
get_config() Table APIの実行時の挙動を定義するtable設定を返します。 利用可能な全ての設定オプションは}}">Configuration}}">Python設定にあります。

以下のコードは、このAPIを介して設定オプションを設定する方法を示す例です: # set the parallelism to 8
table_env.get_config().set("parallelism.default", "8")
# set the job name
table_env.get_config().set("pipeline.name", "my_first_job")
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.get_config" name="link">}}

カタログAPI #

これらのAPIを使ってカタログとモジュールにアクセスします。詳細な使用については、モジュールカタログドキュメントにあります。

API 説明 ドキュメント
register_catalog(catalog_name, catalog) `Catalog`を一意の名前で登録します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.register_catalog" name="link">}}
get_catalog(catalog_name) 登録された`Catalog`を名前で取得します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.get_catalog" name="link">}}
use_catalog(catalog_name) 現在のカタログを指定された値に設定します。 これは、デフォルトのデータベースをカタログのデフォルトのデータベースに設定します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.use_catalog" name="link">}}
get_current_catalog() 現在のセッションの現在のデフォルトのカタログ名を取得します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.get_current_catalog" name="link">}}
get_current_database() 実行中のセッションの現在のデフォルトのデータベース名を取得します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.get_current_database" name="link">}}
use_database(database_name) 現在のデフォルトのデータベースを設定します。 現在のカタログに存在する必要があります。 このパスは、非完全オブジェクト名を検索する時にデフォルトのパスとして使われます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.use_database" name="link">}}
load_module(module_name, module) 一意の名前で`Module`をロードします。 モジュールはロードされた順番で保持されます。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.load_module" name="link">}}
unload_module(module_name) 指定された名前の`Module`をアンロードします。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.unload_module" name="link">}}
use_modules(*module_names) ロードされたモジュールの解決順序を有効にして変更します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.use_modules" name="link">}}
list_catalogs() この環境に登録されている全てのカタログの名前を取得します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_catalogs" name="link">}}
list_modules() この環境に登録されている全ての有効なモジュールの名前を取得します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_modules" name="link">}}
list_full_modules() この環境に登録されている全てのロードされたモジュール(無効なモジュールを含む)の名前を取得します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_full_modules" name="link">}}
list_databases() 現在のカタログ内の全てのデータベースの名前を取得します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_databases" name="link">}}
list_tables() 現在のカタログの現在のデータベース内の全てのtableの名前を取得します。 一時、永続のテーブルとビューの両方を返します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_tables" name="link">}}
list_views() 現在のカタログの現在のデータベース内の全てのビューの名前を取得します。 一時ビューと永続ビューの両方を返します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_views" name="link">}}
list_user_defined_functions() この環境に登録されている全てのユーザ定義関数の名前を取得します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_user_defined_functions" name="link">}}
list_functions() この環境の全ての関数の名前を取得します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_functions" name="link">}}
list_temporary_tables() 現在の名前空間(現在のカタログの現在のデータベース)で利用可能な全ての一時テーブルと一時ビューの名前を取得します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_temporary_tables" name="link">}}
list_temporary_views() 現在の名前空間(現在のカタログの現在のデータベース)で利用可能な全ての一時ビューの名前を取得します。 {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.list_temporary_views" name="link">}}

状態バックエンド、チェックポイント、再起動戦略 #

Flink 1.10より前では、StreamExecutionEnvironmentを介して状態バックエンド、チェックポイント、再起動戦略を設定できます。 今は、TableConfigでキーバリュー設定を設定することで設定できます。詳細については、耐障害性状態バックエンドチェックポイントを参照してください。

以下のコードは、Table APIを介して、状態バックエンド、チェックポイント、再起動戦略を設定する方法を示す例です:

# set the restart strategy to "fixed-delay"
table_env.get_config().set("restart-strategy.type", "fixed-delay")
table_env.get_config().set("restart-strategy.fixed-delay.attempts", "3")
table_env.get_config().set("restart-strategy.fixed-delay.delay", "30s")

# set the checkpoint mode to EXACTLY_ONCE
table_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
table_env.get_config().set("execution.checkpointing.interval", "3min")

# set the statebackend type to "rocksdb", other available options are "filesystem" and "jobmanager"
# you can also set the full qualified Java class name of the StateBackendFactory to this option
# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
table_env.get_config().set("state.backend.type", "rocksdb")

# set the checkpoint directory, which is required by the RocksDB statebackend
table_env.get_config().set("state.checkpoints.dir", "file:///tmp/checkpoints/")
inserted by FC2 system