This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
概念 & 共通 API #
テーブルAPIとSQLはjoint APIに統合されています。
このAPIの中心的な概念は、クエリの入力と出力として機能するTable
です。
このドキュメントでは、テーブルAPIとSQLクエリを使うプログラムの一般的な構造、Table
の登録方法、Table
のクエリ方法、Table
の発行方法を示します。
テーブルAPIとSQLプログラムの構造 #
以下のコード例は、Table APIとSQLプログラムの共通構造を示しています。
全てのFlink Scala APIは非推奨となり、将来のFlinkバージョンでは削除される予定です。引き続きScalaでアプリケーションをビルドできますが、DataStream APIやTable APIのJavaバージョンへ移行する必要があります。
import org.apache.flink.table.api.*;
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;
// Create a TableEnvironment for batch or streaming execution.
// See the "Create a TableEnvironment" section for details.
TableEnvironment tableEnv = TableEnvironment.create(/*…*/);
// Create a source table
tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
.build());
// Create a sink table (using SQL DDL)
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable (EXCLUDING OPTIONS) ");
// Create a Table object from a Table API query
Table table1 = tableEnv.from("SourceTable");
// Create a Table object from a SQL query
Table table2 = tableEnv.sqlQuery("SELECT * FROM SourceTable");
// Emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table1.insertInto("SinkTable").execute();
import org.apache.flink.table.api._
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions
// Create a TableEnvironment for batch or streaming execution.
// See the "Create a TableEnvironment" section for details.
val tableEnv = TableEnvironment.create(/*…*/)
// Create a source table
tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
.build())
// Create a sink table (using SQL DDL)
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable (EXCLUDING OPTIONS) ")
// Create a Table object from a Table API query
val table1 = tableEnv.from("SourceTable")
// Create a Table object from a SQL query
val table2 = tableEnv.sqlQuery("SELECT * FROM SourceTable")
// Emit a Table API result Table to a TableSink, same for SQL result
val tableResult = table1.insertInto("SinkTable").execute()
from pyflink.table import *
# Create a TableEnvironment for batch or streaming execution
table_env = ... # see "Create a TableEnvironment" section
# Create a source table
table_env.executeSql("""CREATE TEMPORARY TABLE SourceTable (
f0 STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100'
)
""")
# Create a sink table
table_env.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable (EXCLUDING OPTIONS) ")
# Create a Table from a Table API query
table1 = table_env.from_path("SourceTable").select(...)
# Create a Table from a SQL query
table2 = table_env.sql_query("SELECT ... FROM SourceTable ...")
# Emit a Table API result Table to a TableSink, same for SQL result
table_result = table1.execute_insert("SinkTable")
Table API and SQL queries can be easily integrated with and embedded into DataStream programs. Have a look at the DataStream API Integration page to learn how DataStreams can be converted into Tables and vice versa.
TableEnvironmentの作成 #
TableEnvironment
は、テーブルAPIとSQL統合のエントリポイントで、次の役割を果たします:
- 内部カタログに
Table
を登録 - カタログを登録
- プラグ可能なモジュールのロード
- SQLクエリの実行
- ユーザ定義(スカラー、テーブル、集計)関数の登録
DataStream
とTable
の間の変換(StreamTableEnvironment
の場合)
Table
は常に特定のTableEnvironment
にバインドされます。
同じクエリ内で異なるTableEnvironmentのテーブルをjoinしたりunionしたりすることはできません。
TableEnvironment
はstatic TableEnvironment.create()
メソッドを呼び出すことで作成されます。
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
//.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
//.inBatchMode()
.build()
val tEnv = TableEnvironment.create(settings)
from pyflink.table import EnvironmentSettings, TableEnvironment
# create a streaming TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
別のやり方として、ユーザは既存のStreamExecutionEnvironment
からStreamTableEnvironment
を作成してDataStream
APIを共同利用することができます。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
s_env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(s_env)
カタログ内でのテーブルの作成 #
TableEnvironment
は作成されたテーブルのカタログのマップをidentifierを使って保持します。各識別子は3つの部分から構成されます: カタログ名、データベース名、オブジェクト名。カタログまたはデータベースが指定されていない場合、現在のデフォルト値が使われます(Table識別子の拡張セクションの例を見てください)。
テーブルは、仮想(VIEWS
)または通常(TABLES
)のいずれかです。VIEWS
は既存のTable
オブジェクト、通常はTable APIまたはSQLクエリの結果、から作成できます。TABLES
はファイル、データベーステーブル、メッセージキューなどの外部データを記述します。
一時vs永続テーブル #
テーブルは、単一のFlinkセッションのライフサイクルに関連付けられた一時的なテーブル、または複数のFlinkセッションとクラスタに渡って表示される永続的なテーブルのいずれかです。
永続テーブルには、テーブルに関するメタデータを維持するためのcatalog (Hiveメタデータなど)が必要です。永続テーブルが作成されると、カタログに接続されている全てのFlinkセッションに表示され、テーブルが明示的に削除されるまで存在し続けます。
一方、一時テーブルは常にメモリに保存され、作成されたFlinkセッションの間のみ存在します。これらのテーブルは他のセッションには表示されません。これらは、カタログやデータベースにバインドされていませんが、カタログやデータベースの名前空間内に作成できます。一時テーブルは、対応するデータベースが削除されても削除されません。
シャドーイング #
既存の永続テーブルと同じ識別子を持つ一時テーブルを登録することができます。一時テーブルは永続テーブルをシャドウし、一時テーブルが存在する限り永続テーブルにアクセスできなくなります。その識別子を持つ全てのクエリは、一時テーブルに対して実行されます。
これは実験に役立つかもしれません。It allows running exactly the same query first against a temporary table that e.g. has just a subset of data, or the data is obfuscated. クエリが正しいことが確認されたら、実際の本番データに対して実行できます。
テーブルの作成 #
仮想テーブル #
SQL用語のVIEW
(仮想テーブル)に対応するTable
APIオブジェクト。論理クエリ計画をカプセル化します。以下のようにして、カタログ内に作成できます:
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// table is the result of a simple projection query
Table projTable = tableEnv.from("X").select(...);
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// table is the result of a simple projection query
val projTable: Table = tableEnv.from("X").select(...)
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable)
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# table is the result of a simple projection query
proj_table = table_env.from_path("X").select(...)
# register the Table projTable as table "projectedTable"
table_env.register_table("projectedTable", proj_table)
注意: Table
オブジェクトはリレーショナルデータベースシステムのVIEW
に似ています。つまり、Table
を定義するクエリは最適化されていませんが、別のクエリが登録されたTable
を参照する時にインライン化されます。複数のクエリが同じ登録ずみのTable
を参照する場合、クエリは参照するクエリごとにインライン化され、複数回実行されます。つまり、登録されたTable
は共有されません。
コネクタテーブル #
リレーショナルデータベースとして知られるTABLE
をconnector宣言から作成することもできます。
コネクタはテーブルのデータを保持する外部システムを記述します。Apache Kafkaや通常のファイルシステムのようなストレージシステムをここで宣言できます。
このようなテーブルはTable APIを直接使うか、SQL DDLに切り替えることによっても作成できます。
// Using table descriptors
final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
.build();
tableEnv.createTable("SourceTableA", sourceDescriptor);
tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);
// Using SQL DDL
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)");
# Using table descriptors
source_descriptor = TableDescriptor.for_connector("datagen") \
.schema(Schema.new_builder()
.column("f0", DataTypes.STRING())
.build()) \
.option("rows-per-second", "100") \
.build()
t_env.create_table("SourceTableA", source_descriptor)
t_env.create_temporary_table("SourceTableB", source_descriptor)
# Using SQL DDL
t_env.execute_sql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
テーブル識別子の拡張 #
テーブルは常に、カタログ、データベース、テーブル名で構成される3つの部分からなる識別子を使って登録されます。
ユーザは1つのカタログと1つのデータベースを“current catalog”と“current database”として設定できます。 これらを使うと、上記の3つの部分からなる識別子の最初の2つの部分はオプションになります。これらが指定されない倍は、現在のカタログと現在のデータベースが参照されます。ユーザは現在のカタログと現在のデータベースをtable APIまたはSQLを介して切り替えることができます。table API or SQL.
識別子はSQL要件に従います。つまり、バッククォート文字(`
)でエスケープできることを意味します。
TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
Table table = ...;
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("exampleView", table);
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_database.exampleView", table);
// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("`example.View`", table);
// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
// get a TableEnvironment
val tEnv: TableEnvironment = ...
tEnv.useCatalog("custom_catalog")
tEnv.useDatabase("custom_database")
val table: Table = ...
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("exampleView", table)
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_database.exampleView", table)
// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("`example.View`", table)
// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)
# get a TableEnvironment
t_env = TableEnvironment.create(...)
t_env.use_catalog("custom_catalog")
t_env.use_database("custom_database")
table = ...
# register the view named 'exampleView' in the catalog named 'custom_catalog'
# in the database named 'custom_database'
t_env.create_temporary_view("other_database.exampleView", table)
# register the view named 'example.View' in the catalog named 'custom_catalog'
# in the database named 'custom_database'
t_env.create_temporary_view("`example.View`", table)
# register the view named 'exampleView' in the catalog named 'other_catalog'
# in the database named 'other_database'
t_env.create_temporary_view("other_catalog.other_database.exampleView", table)
テーブルのクエリ #
テーブルAPI #
Table APIはScalaとJava用の言語統合クエリAPIです。SQLとは対照的に、クエリは文字列として指定されませんが、ホストの言語で段階的に作成されます。
APIは、テーブル(ストリーミングまたはバッチ)を表すTable
クラスに基づいており、リレーショナル演算を適用するメソッドを提供します。これらのメソッドは新しいTable
オブジェクトを返します。これは入力Table
にリレーショナル演算を適用した結果を表します。Some relational operations are composed of multiple method calls such as table.groupBy(...).select()
, where groupBy(...)
specifies a grouping of table
, and select(...)
the projection on the grouping of table
.
Table APIドキュメントは、ストリーミングとバッチテーブルでサポートされる全てのTable APIオペレーションについて説明します。
以下の例は、単純なTable API集計クエリを示します:
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
Table orders = tableEnv.from("Orders");
// compute revenue for all customers from France
Table revenue = orders
.filter($("cCountry").isEqual("FRANCE"))
.groupBy($("cID"), $("cName"))
.select($("cID"), $("cName"), $("revenue").sum().as("revSum"));
// emit or convert Table
// execute query
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
val orders = tableEnv.from("Orders")
// compute revenue for all customers from France
val revenue = orders
.filter($"cCountry" === "FRANCE")
.groupBy($"cID", $"cName")
.select($"cID", $"cName", $"revenue".sum AS "revSum")
// emit or convert Table
// execute query
注意: Scala Table APIはドル記号($
)で始まるScala文字列補完を使って、Table
の属性を参照します。Table APIはScalaの暗黙的メソッドを使います。以下を必ずインポートしてください
org.apache.flink.table.api._
- 暗黙的な式の変換用org.apache.flink.api.scala._
とorg.apache.flink.table.api.bridge.scala._
DataStreamから/へ変換したい場合
# get a TableEnvironment
table_env = # see "Create a TableEnvironment" section
# register Orders table
# scan registered Orders table
orders = table_env.from_path("Orders")
# compute revenue for all customers from France
revenue = orders \
.filter(col('cCountry') == 'FRANCE') \
.group_by(col('cID'), col('cName')) \
.select(col('cID'), col('cName'), col('revenue').sum.alias('revSum'))
# emit or convert Table
# execute query
SQL #
FlinkのSQL統合は、Apache Calciteに基づいており、SQL標準を実装します。SQLクエリは通常の文字列として指定されます。
SQLドキュメントは、ストリーミングとバッチtableに対するFlink SQLサポートを説明します。
以下の例は、クエリを指定し、結果をTable
として返す方法を示しています。
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// emit or convert Table
// execute query
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
val revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// emit or convert Table
// execute query
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# register Orders table
# compute revenue for all customers from France
revenue = table_env.sql_query(
"SELECT cID, cName, SUM(revenue) AS revSum "
"FROM Orders "
"WHERE cCountry = 'FRANCE' "
"GROUP BY cID, cName"
)
# emit or convert Table
# execute query
以下の例は、結果を登録されたtableに挿入する更新クエリを指定する方法を示しています。
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.executeSql(
"INSERT INTO RevenueFrance " +
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.executeSql("""
|INSERT INTO RevenueFrance
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# register "Orders" table
# register "RevenueFrance" output table
# compute revenue for all customers from France and emit to "RevenueFrance"
table_env.execute_sql(
"INSERT INTO RevenueFrance "
"SELECT cID, cName, SUM(revenue) AS revSum "
"FROM Orders "
"WHERE cCountry = 'FRANCE' "
"GROUP BY cID, cName"
)
Table APIとSQLの混合 #
Table APIとSQLクエリは、どちらもTable
オブジェクトを返すため、簡単に混合できます:
- Table APIクエリはSQLクエリで返される
Table
オブジェクトに対して定義できます。 - SQLクエリは、
TableEnvironment
内の結果のTableの登録と、それをSQLクエリのFROM
句で参照することで、Table APIクエリの結果を定義できます。
テーブルの出力 #
Table
は、TableSink
に書き込むことで出力されます。TableSink
は、様々なファイル形式(CSV、Apache Parquet、Apache Avro)、ストレージシステム(JDBC、Apache HBase、Apache Cassandra、Elasticsearch)、メッセージングシステム(Apache Kafka、RabbitMQ)をサポートする汎用インタフェースです。
バッチTable
はBatchTableSink
にのみ書き込むことができますが、ストリーミングTable
は、AppendStreamTableSink
、RetractStreamTableSink
、UpsertStreamTableSink
のいずれかを必要とします。
利用可能なシンクの詳細とカスタムDynamicTableSink
の実装方法については、Table Sources & Sinksに関するドキュメントを参照してください。
Table.insertInto(String tableName)
メソッドは、ソーステーブルを登録済みのシンクテーブルに出力する完全なエンドツーエンドパイプラインを定義します。
このメソッドは、カタログからテーブルシンクを名前で検索し、Table
のスキーマがシンクのスキーマと同一であることを検証します。
パイプラインはTablePipeline.explain()
でexplainし、TablePipeline.execute()
を呼び出して実行できます。
以下の例はTable
を出力する方法を示しています:
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an output Table
final Schema schema = Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.column("c", DataTypes.BIGINT())
.build();
tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/path/to/file")
.format(FormatDescriptor.forFormat("csv")
.option("field-delimiter", "|")
.build())
.build());
// compute a result Table using Table API operators and/or SQL queries
Table result = ...;
// Prepare the insert into pipeline
TablePipeline pipeline = result.insertInto("CsvSinkTable");
// Print explain details
pipeline.printExplain();
// emit the result Table to the registered TableSink
pipeline.execute();
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// create an output Table
val schema = Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.column("c", DataTypes.BIGINT())
.build()
tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/path/to/file")
.format(FormatDescriptor.forFormat("csv")
.option("field-delimiter", "|")
.build())
.build())
// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...
// Prepare the insert into pipeline
val pipeline = result.insertInto("CsvSinkTable")
// Print explain details
pipeline.printExplain()
// emit the result Table to the registered TableSink
pipeline.execute()
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# create a TableSink
schema = Schema.new_builder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.column("c", DataTypes.BIGINT())
.build()
table_env.create_temporary_table("CsvSinkTable", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/path/to/file")
.format(FormatDescriptor.for_format("csv")
.option("field-delimiter", "|")
.build())
.build())
# compute a result Table using Table API operators and/or SQL queries
result = ...
# emit the result Table to the registered TableSink
result.execute_insert("CsvSinkTable")
クエリの翻訳と実行 #
Table APIとSQLクエリは、入力がストリーミングかバッチかに関係なく、DataStreamプログラムに変換されます。 クエリは内部的に論理クエリ計画として表され、次の2つのフェーズで変換されます:
- 論理計画の最適化
- DataStreamプログラムへの変換
Table APIまたはSQLクエリは、次の場合に変換されます:
TableEnvironment.executeSql()
が呼ばれる。このメソッドは特定のステートメントを実行するために使われ、このメソッドが呼ばれるとすぐにsqlクエリが変換されます。TablePipeline.execute()
が呼ばれる。このメソッドはソースからシンクへのパイプラインを実行するために使われ、メソッドが呼び出されるとすぐにTable APIプログラムは変換されます。Table.execute()
が呼ばれる。このメソッドはテーブルの内容をローカルクライアントに収集するために使われ、このメソッドが呼び出されるとすぐにTable APIは変換されます。StatementSet.execute()
が呼ばれる。TablePipeline
(StatementSet.add()
を通じてシンクに送信されます)またはINSERTステートメント(StatementSet.addInsertSql()
を通じて指定されます)は、最初にStatementSet
にバッファされます。これらはStatementSet.execute()
が呼び出されると変換されます。全てのシンクは1つのDAGに最適化されます。Table
は、DataStream
に変換される時に変換されます(Integration with DataStreamを参照してください)。変換されると、これは通常のDataStreamプログラムとなり、StreamExecutionEnvironment.execute()
が呼ばれる時に実行されます。
クエリの最適化 #
Apache FlinkはApache Calciteを活用および拡張して、高度なクエリ最適化を実行します。 これには以下のような一連のルールとコストベースの最適化が含まれます:
- Apache Calciteに基づくサブクエリの非相関化
- Project pruning
- Partition pruning
- Filter push-down
- 重複した計算を回避するためのサブプランの重複排除
- 以下の2つの部分が含まれる、特別なサブクエリの書き換え:
- INとEXISTSをleft semi-joinに変換します
- NOT INとNOT EXISTSをleft anti-joinに変換します
- オプションのjoinの並び替え
table.optimizer.join-reorder-enabled
で有効化
注意: IN/EXISTS/NOT IN/NOT EXISTSは現在のところ、サブクエリの書き換えにおける結合条件でのみサポートされます。
このoptimizerは、計画だけでなく、データソースから入手可能な豊富な統計と、io、cpu、network、memoryなどの各オペレーションの詳細なコストに基づいて、知的な決定を下します。
上級者は、TableEnvironment#getConfig#setPlannerConfig
を呼び出すことでテーブル環境に提供できるCalciteConfig
オブジェクトを介してカスタム最適化を提供できます。
テーブルの説明 #
Table APIは、Table
を計算するための論理的で最適化されたクエリ計画をexplainするための仕組みです。
これは、Table.explain()
メソッドやStatementSet.explain()
メソッドを通じて行われます。Table.explain()
はTable
の計画を返します。StatementSet.explain()
は複数のシンクの計画を返します。以下の3つの計画を説明する文字列を返します:
- 関係クエリの抽象構文木。つまり最適化されていない論理クエリ計画、
- 最適化された論理クエリ計画、そして
- 物理実行計画。
TableEnvironment.explainSql()
とTableEnvironment.executeSql()
は、計画を取得するためのEXPLAIN
ステートメントの実行をサポートします。EXPLAINページを参照してください。
以下のコードは、Table.explain()
メソッドを使った例と、指定されたTable
に対応する出力を示しています:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
// explain Table API
Table table1 = tEnv.fromDataStream(stream1, $("count"), $("word"));
Table table2 = tEnv.fromDataStream(stream2, $("count"), $("word"));
Table table = table1
.where($("word").like("F%"))
.unionAll(table2);
System.out.println(table.explain());
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val table1 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
val table2 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
val table = table1
.where($"word".like("F%"))
.unionAll(table2)
println(table.explain())
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table1 = t_env.from_elements([(1, "hello")], ["count", "word"])
table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
table = table1 \
.where(col('word').like('F%')) \
.union_all(table2)
print(table.explain())
上記の例の結果は、
Explain
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[Unregistered_DataStream_1]])
+- LogicalTableScan(table=[[Unregistered_DataStream_2]])
== Optimized Physical Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])
== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])
以下のコードは、StatementSet.explain()
メソッドを使った例と、複数のシンク計画に対応する出力を示しています:
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);
final Schema schema = Schema.newBuilder()
.column("count", DataTypes.INT())
.column("word", DataTypes.STRING())
.build();
tEnv.createTemporaryTable("MySource1", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/source/path1")
.format("csv")
.build());
tEnv.createTemporaryTable("MySource2", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/source/path2")
.format("csv")
.build());
tEnv.createTemporaryTable("MySink1", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/sink/path1")
.format("csv")
.build());
tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/sink/path2")
.format("csv")
.build());
StatementSet stmtSet = tEnv.createStatementSet();
Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
stmtSet.add(table1.insertInto("MySink1"));
Table table2 = table1.unionAll(tEnv.from("MySource2"));
stmtSet.add(table2.insertInto("MySink2"));
String explanation = stmtSet.explain();
System.out.println(explanation);
val settings = EnvironmentSettings.inStreamingMode()
val tEnv = TableEnvironment.create(settings)
val schema = Schema.newBuilder()
.column("count", DataTypes.INT())
.column("word", DataTypes.STRING())
.build()
tEnv.createTemporaryTable("MySource1", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/source/path1")
.format("csv")
.build())
tEnv.createTemporaryTable("MySource2", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/source/path2")
.format("csv")
.build())
tEnv.createTemporaryTable("MySink1", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/sink/path1")
.format("csv")
.build())
tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/sink/path2")
.format("csv")
.build())
val stmtSet = tEnv.createStatementSet()
val table1 = tEnv.from("MySource1").where($"word".like("F%"))
stmtSet.add(table1.insertInto("MySink1"))
val table2 = table1.unionAll(tEnv.from("MySource2"))
stmtSet.add(table2.insertInto("MySink2"))
val explanation = stmtSet.explain()
println(explanation)
settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(environment_settings=settings)
schema = Schema.new_builder()
.column("count", DataTypes.INT())
.column("word", DataTypes.STRING())
.build()
t_env.create_temporary_table("MySource1", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/source/path1")
.format("csv")
.build())
t_env.create_temporary_table("MySource2", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/source/path2")
.format("csv")
.build())
t_env.create_temporary_table("MySink1", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/sink/path1")
.format("csv")
.build())
t_env.create_temporary_table("MySink2", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/sink/path2")
.format("csv")
.build())
stmt_set = t_env.create_statement_set()
table1 = t_env.from_path("MySource1").where(col('word').like('F%'))
stmt_set.add_insert("MySink1", table1)
table2 = table1.union_all(t_env.from_path("MySource2"))
stmt_set.add_insert("MySink2", table2)
explanation = stmt_set.explain()
print(explanation)
the result of multiple-sinks plan is
MultiTable Explain
== Abstract Syntax Tree ==
LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[count, word])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[count, word])
+- LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]])
== Optimized Physical Plan ==
LegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[count, word])
+- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
LegacySink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[count, word])
+- Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
== Optimized Execution Plan ==
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])(reuse_id=[1])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
LegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[count, word])
+- Reused(reference_id=[1])
LegacySink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[count, word])
+- Union(all=[true], union=[count, word])
:- Reused(reference_id=[1])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])