Concepts & Common API
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

概念 & 共通 API #

テーブルAPIとSQLはjoint APIに統合されています。 このAPIの中心的な概念は、クエリの入力と出力として機能するTableです。 このドキュメントでは、テーブルAPIとSQLクエリを使うプログラムの一般的な構造、Tableの登録方法、Tableのクエリ方法、Tableの発行方法を示します。

テーブルAPIとSQLプログラムの構造 #

以下のコード例は、Table APIとSQLプログラムの共通構造を示しています。

全てのFlink Scala APIは非推奨となり、将来のFlinkバージョンでは削除される予定です。引き続きScalaでアプリケーションをビルドできますが、DataStream APIやTable APIのJavaバージョンへ移行する必要があります。

詳細は、 FLIP-265 Scala APIサポートの廃止と削除

import org.apache.flink.table.api.*;
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;

// Create a TableEnvironment for batch or streaming execution.
// See the "Create a TableEnvironment" section for details.
TableEnvironment tableEnv = TableEnvironment.create(/*…*/);

// Create a source table
tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
    .schema(Schema.newBuilder()
      .column("f0", DataTypes.STRING())
      .build())
    .option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
    .build());

// Create a sink table (using SQL DDL)
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable (EXCLUDING OPTIONS) ");

// Create a Table object from a Table API query
Table table1 = tableEnv.from("SourceTable");

// Create a Table object from a SQL query
Table table2 = tableEnv.sqlQuery("SELECT * FROM SourceTable");

// Emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table1.insertInto("SinkTable").execute();
import org.apache.flink.table.api._
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions

// Create a TableEnvironment for batch or streaming execution.
// See the "Create a TableEnvironment" section for details.
val tableEnv = TableEnvironment.create(/*…*/)

// Create a source table
tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
  .schema(Schema.newBuilder()
    .column("f0", DataTypes.STRING())
    .build())
  .option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
  .build())

// Create a sink table (using SQL DDL)
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable (EXCLUDING OPTIONS) ")

// Create a Table object from a Table API query
val table1 = tableEnv.from("SourceTable")

// Create a Table object from a SQL query
val table2 = tableEnv.sqlQuery("SELECT * FROM SourceTable")

// Emit a Table API result Table to a TableSink, same for SQL result
val tableResult = table1.insertInto("SinkTable").execute()
from pyflink.table import *

# Create a TableEnvironment for batch or streaming execution
table_env = ... # see "Create a TableEnvironment" section

# Create a source table
table_env.executeSql("""CREATE TEMPORARY TABLE SourceTable (
  f0 STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '100'
)
""")

# Create a sink table
table_env.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable (EXCLUDING OPTIONS) ")

# Create a Table from a Table API query
table1 = table_env.from_path("SourceTable").select(...)

# Create a Table from a SQL query
table2 = table_env.sql_query("SELECT ... FROM SourceTable ...")

# Emit a Table API result Table to a TableSink, same for SQL result
table_result = table1.execute_insert("SinkTable")
Table API and SQL queries can be easily integrated with and embedded into DataStream programs. Have a look at the DataStream API Integration page to learn how DataStreams can be converted into Tables and vice versa.

Back to top

TableEnvironmentの作成 #

TableEnvironmentは、テーブルAPIとSQL統合のエントリポイントで、次の役割を果たします:

  • 内部カタログにTableを登録
  • カタログを登録
  • プラグ可能なモジュールのロード
  • SQLクエリの実行
  • ユーザ定義(スカラー、テーブル、集計)関数の登録
  • DataStreamTableの間の変換(StreamTableEnvironmentの場合)

Tableは常に特定のTableEnvironmentにバインドされます。 同じクエリ内で異なるTableEnvironmentのテーブルをjoinしたりunionしたりすることはできません。 TableEnvironmentはstatic TableEnvironment.create()メソッドを呼び出すことで作成されます。

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    //.inBatchMode()
    .build();

TableEnvironment tEnv = TableEnvironment.create(settings);
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    //.inBatchMode()
    .build()

val tEnv = TableEnvironment.create(settings)
from pyflink.table import EnvironmentSettings, TableEnvironment

# create a streaming TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

別のやり方として、ユーザは既存のStreamExecutionEnvironmentからStreamTableEnvironmentを作成してDataStream APIを共同利用することができます。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

s_env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(s_env)

Back to top

カタログ内でのテーブルの作成 #

TableEnvironmentは作成されたテーブルのカタログのマップをidentifierを使って保持します。各識別子は3つの部分から構成されます: カタログ名、データベース名、オブジェクト名。カタログまたはデータベースが指定されていない場合、現在のデフォルト値が使われます(Table識別子の拡張セクションの例を見てください)。

テーブルは、仮想(VIEWS)または通常(TABLES)のいずれかです。VIEWSは既存のTableオブジェクト、通常はTable APIまたはSQLクエリの結果、から作成できます。TABLESはファイル、データベーステーブル、メッセージキューなどの外部データを記述します。

一時vs永続テーブル #

テーブルは、単一のFlinkセッションのライフサイクルに関連付けられた一時的なテーブル、または複数のFlinkセッションとクラスタに渡って表示される永続的なテーブルのいずれかです。

永続テーブルには、テーブルに関するメタデータを維持するためのcatalog (Hiveメタデータなど)が必要です。永続テーブルが作成されると、カタログに接続されている全てのFlinkセッションに表示され、テーブルが明示的に削除されるまで存在し続けます。

一方、一時テーブルは常にメモリに保存され、作成されたFlinkセッションの間のみ存在します。これらのテーブルは他のセッションには表示されません。これらは、カタログやデータベースにバインドされていませんが、カタログやデータベースの名前空間内に作成できます。一時テーブルは、対応するデータベースが削除されても削除されません。

シャドーイング #

既存の永続テーブルと同じ識別子を持つ一時テーブルを登録することができます。一時テーブルは永続テーブルをシャドウし、一時テーブルが存在する限り永続テーブルにアクセスできなくなります。その識別子を持つ全てのクエリは、一時テーブルに対して実行されます。

これは実験に役立つかもしれません。It allows running exactly the same query first against a temporary table that e.g. has just a subset of data, or the data is obfuscated. クエリが正しいことが確認されたら、実際の本番データに対して実行できます。

テーブルの作成 #

仮想テーブル #

SQL用語のVIEW (仮想テーブル)に対応するTable APIオブジェクト。論理クエリ計画をカプセル化します。以下のようにして、カタログ内に作成できます:

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// table is the result of a simple projection query 
Table projTable = tableEnv.from("X").select(...);

// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// table is the result of a simple projection query 
val projTable: Table = tableEnv.from("X").select(...)

// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable)
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section

# table is the result of a simple projection query 
proj_table = table_env.from_path("X").select(...)

# register the Table projTable as table "projectedTable"
table_env.register_table("projectedTable", proj_table)

注意: TableオブジェクトはリレーショナルデータベースシステムのVIEWに似ています。つまり、Tableを定義するクエリは最適化されていませんが、別のクエリが登録されたTableを参照する時にインライン化されます。複数のクエリが同じ登録ずみのTableを参照する場合、クエリは参照するクエリごとにインライン化され、複数回実行されます。つまり、登録されたTableは共有されません

Back to top

コネクタテーブル #

リレーショナルデータベースとして知られるTABLEconnector宣言から作成することもできます。 コネクタはテーブルのデータを保持する外部システムを記述します。Apache Kafkaや通常のファイルシステムのようなストレージシステムをここで宣言できます。

このようなテーブルはTable APIを直接使うか、SQL DDLに切り替えることによっても作成できます。

// Using table descriptors
final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
    .schema(Schema.newBuilder()
    .column("f0", DataTypes.STRING())
    .build())
    .option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
    .build();

tableEnv.createTable("SourceTableA", sourceDescriptor);
tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);

// Using SQL DDL
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)");
# Using table descriptors
source_descriptor = TableDescriptor.for_connector("datagen") \
    .schema(Schema.new_builder()
            .column("f0", DataTypes.STRING())
            .build()) \
    .option("rows-per-second", "100") \
    .build()

t_env.create_table("SourceTableA", source_descriptor)
t_env.create_temporary_table("SourceTableB", source_descriptor)

# Using SQL DDL
t_env.execute_sql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")

テーブル識別子の拡張 #

テーブルは常に、カタログ、データベース、テーブル名で構成される3つの部分からなる識別子を使って登録されます。

ユーザは1つのカタログと1つのデータベースを“current catalog”と“current database”として設定できます。 これらを使うと、上記の3つの部分からなる識別子の最初の2つの部分はオプションになります。これらが指定されない倍は、現在のカタログと現在のデータベースが参照されます。ユーザは現在のカタログと現在のデータベースをtable APIまたはSQLを介して切り替えることができます。table API or SQL.

識別子はSQL要件に従います。つまり、バッククォート文字(`)でエスケープできることを意味します。

TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");

Table table = ...;

// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database' 
tableEnv.createTemporaryView("exampleView", table);

// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database' 
tableEnv.createTemporaryView("other_database.exampleView", table);

// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database' 
tableEnv.createTemporaryView("`example.View`", table);

// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database' 
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
// get a TableEnvironment
val tEnv: TableEnvironment = ...
tEnv.useCatalog("custom_catalog")
tEnv.useDatabase("custom_database")

val table: Table = ...

// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database' 
tableEnv.createTemporaryView("exampleView", table)

// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database' 
tableEnv.createTemporaryView("other_database.exampleView", table)

// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database' 
tableEnv.createTemporaryView("`example.View`", table)

// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database' 
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)
# get a TableEnvironment
t_env = TableEnvironment.create(...)
t_env.use_catalog("custom_catalog")
t_env.use_database("custom_database")

table = ...

# register the view named 'exampleView' in the catalog named 'custom_catalog'
# in the database named 'custom_database'
t_env.create_temporary_view("other_database.exampleView", table)

# register the view named 'example.View' in the catalog named 'custom_catalog'
# in the database named 'custom_database'
t_env.create_temporary_view("`example.View`", table)

# register the view named 'exampleView' in the catalog named 'other_catalog'
# in the database named 'other_database'
t_env.create_temporary_view("other_catalog.other_database.exampleView", table)

テーブルのクエリ #

テーブルAPI #

Table APIはScalaとJava用の言語統合クエリAPIです。SQLとは対照的に、クエリは文字列として指定されませんが、ホストの言語で段階的に作成されます。

APIは、テーブル(ストリーミングまたはバッチ)を表すTableクラスに基づいており、リレーショナル演算を適用するメソッドを提供します。これらのメソッドは新しいTableオブジェクトを返します。これは入力Tableにリレーショナル演算を適用した結果を表します。Some relational operations are composed of multiple method calls such as table.groupBy(...).select(), where groupBy(...) specifies a grouping of table, and select(...) the projection on the grouping of table.

Table APIドキュメントは、ストリーミングとバッチテーブルでサポートされる全てのTable APIオペレーションについて説明します。

以下の例は、単純なTable API集計クエリを示します:

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// register Orders table

// scan registered Orders table
Table orders = tableEnv.from("Orders");
// compute revenue for all customers from France
Table revenue = orders
  .filter($("cCountry").isEqual("FRANCE"))
  .groupBy($("cID"), $("cName"))
  .select($("cID"), $("cName"), $("revenue").sum().as("revSum"));

// emit or convert Table
// execute query
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// register Orders table

// scan registered Orders table
val orders = tableEnv.from("Orders")
// compute revenue for all customers from France
val revenue = orders
  .filter($"cCountry" === "FRANCE")
  .groupBy($"cID", $"cName")
  .select($"cID", $"cName", $"revenue".sum AS "revSum")

// emit or convert Table
// execute query

注意: Scala Table APIはドル記号($)で始まるScala文字列補完を使って、Tableの属性を参照します。Table APIはScalaの暗黙的メソッドを使います。以下を必ずインポートしてください

  • org.apache.flink.table.api._ - 暗黙的な式の変換用
  • org.apache.flink.api.scala._org.apache.flink.table.api.bridge.scala._ DataStreamから/へ変換したい場合
# get a TableEnvironment
table_env = # see "Create a TableEnvironment" section

# register Orders table

# scan registered Orders table
orders = table_env.from_path("Orders")
# compute revenue for all customers from France
revenue = orders \
    .filter(col('cCountry') == 'FRANCE') \
    .group_by(col('cID'), col('cName')) \
    .select(col('cID'), col('cName'), col('revenue').sum.alias('revSum'))

# emit or convert Table
# execute query

Back to top

SQL #

FlinkのSQL統合は、Apache Calciteに基づいており、SQL標準を実装します。SQLクエリは通常の文字列として指定されます。

SQLドキュメントは、ストリーミングとバッチtableに対するFlink SQLサポートを説明します。

以下の例は、クエリを指定し、結果をTableとして返す方法を示しています。

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// register Orders table

// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// emit or convert Table
// execute query
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// register Orders table

// compute revenue for all customers from France
val revenue = tableEnv.sqlQuery("""
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// emit or convert Table
// execute query
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section

# register Orders table

# compute revenue for all customers from France
revenue = table_env.sql_query(
    "SELECT cID, cName, SUM(revenue) AS revSum "
    "FROM Orders "
    "WHERE cCountry = 'FRANCE' "
    "GROUP BY cID, cName"
)

# emit or convert Table
# execute query

以下の例は、結果を登録されたtableに挿入する更新クエリを指定する方法を示しています。

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// register "Orders" table
// register "RevenueFrance" output table

// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.executeSql(
    "INSERT INTO RevenueFrance " +
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// register "Orders" table
// register "RevenueFrance" output table

// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.executeSql("""
  |INSERT INTO RevenueFrance
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section

# register "Orders" table
# register "RevenueFrance" output table

# compute revenue for all customers from France and emit to "RevenueFrance"
table_env.execute_sql(
    "INSERT INTO RevenueFrance "
    "SELECT cID, cName, SUM(revenue) AS revSum "
    "FROM Orders "
    "WHERE cCountry = 'FRANCE' "
    "GROUP BY cID, cName"
)

Back to top

Table APIとSQLの混合 #

Table APIとSQLクエリは、どちらもTableオブジェクトを返すため、簡単に混合できます:

  • Table APIクエリはSQLクエリで返されるTableオブジェクトに対して定義できます。
  • SQLクエリは、TableEnvironment内の結果のTableの登録と、それをSQLクエリの FROM句で参照することで、Table APIクエリの結果を定義できます。

Back to top

テーブルの出力 #

Tableは、TableSinkに書き込むことで出力されます。TableSinkは、様々なファイル形式(CSV、Apache Parquet、Apache Avro)、ストレージシステム(JDBC、Apache HBase、Apache Cassandra、Elasticsearch)、メッセージングシステム(Apache Kafka、RabbitMQ)をサポートする汎用インタフェースです。

バッチTableBatchTableSinkにのみ書き込むことができますが、ストリーミングTableは、AppendStreamTableSinkRetractStreamTableSinkUpsertStreamTableSinkのいずれかを必要とします。

利用可能なシンクの詳細とカスタムDynamicTableSinkの実装方法については、Table Sources & Sinksに関するドキュメントを参照してください。

Table.insertInto(String tableName)メソッドは、ソーステーブルを登録済みのシンクテーブルに出力する完全なエンドツーエンドパイプラインを定義します。 このメソッドは、カタログからテーブルシンクを名前で検索し、Tableのスキーマがシンクのスキーマと同一であることを検証します。 パイプラインはTablePipeline.explain()でexplainし、TablePipeline.execute()を呼び出して実行できます。

以下の例はTableを出力する方法を示しています:

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// create an output Table
final Schema schema = Schema.newBuilder()
    .column("a", DataTypes.INT())
    .column("b", DataTypes.STRING())
    .column("c", DataTypes.BIGINT())
    .build();

tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("filesystem")
    .schema(schema)
    .option("path", "/path/to/file")
    .format(FormatDescriptor.forFormat("csv")
        .option("field-delimiter", "|")
        .build())
    .build());

// compute a result Table using Table API operators and/or SQL queries
Table result = ...;

// Prepare the insert into pipeline
TablePipeline pipeline = result.insertInto("CsvSinkTable");

// Print explain details
pipeline.printExplain();

// emit the result Table to the registered TableSink
pipeline.execute();
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// create an output Table
val schema = Schema.newBuilder()
    .column("a", DataTypes.INT())
    .column("b", DataTypes.STRING())
    .column("c", DataTypes.BIGINT())
    .build()

tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("filesystem")
    .schema(schema)
    .option("path", "/path/to/file")
    .format(FormatDescriptor.forFormat("csv")
        .option("field-delimiter", "|")
        .build())
    .build())

// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...

// Prepare the insert into pipeline
val pipeline = result.insertInto("CsvSinkTable")

// Print explain details
pipeline.printExplain()

// emit the result Table to the registered TableSink
pipeline.execute()
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section

# create a TableSink
schema = Schema.new_builder()
    .column("a", DataTypes.INT())
    .column("b", DataTypes.STRING())
    .column("c", DataTypes.BIGINT())
    .build()
    
table_env.create_temporary_table("CsvSinkTable", TableDescriptor.for_connector("filesystem")
    .schema(schema)
    .option("path", "/path/to/file")
    .format(FormatDescriptor.for_format("csv")
        .option("field-delimiter", "|")
        .build())
    .build())

# compute a result Table using Table API operators and/or SQL queries
result = ...

# emit the result Table to the registered TableSink
result.execute_insert("CsvSinkTable")

Back to top

クエリの翻訳と実行 #

Table APIとSQLクエリは、入力がストリーミングかバッチかに関係なく、DataStreamプログラムに変換されます。 クエリは内部的に論理クエリ計画として表され、次の2つのフェーズで変換されます:

  1. 論理計画の最適化
  2. DataStreamプログラムへの変換

Table APIまたはSQLクエリは、次の場合に変換されます:

  • TableEnvironment.executeSql()が呼ばれる。このメソッドは特定のステートメントを実行するために使われ、このメソッドが呼ばれるとすぐにsqlクエリが変換されます。
  • TablePipeline.execute()が呼ばれる。このメソッドはソースからシンクへのパイプラインを実行するために使われ、メソッドが呼び出されるとすぐにTable APIプログラムは変換されます。
  • Table.execute()が呼ばれる。このメソッドはテーブルの内容をローカルクライアントに収集するために使われ、このメソッドが呼び出されるとすぐにTable APIは変換されます。
  • StatementSet.execute()が呼ばれる。TablePipeline (StatementSet.add()を通じてシンクに送信されます)またはINSERTステートメント(StatementSet.addInsertSql()を通じて指定されます)は、最初にStatementSetにバッファされます。これらはStatementSet.execute()が呼び出されると変換されます。全てのシンクは1つのDAGに最適化されます。
  • Tableは、DataStreamに変換される時に変換されます(Integration with DataStreamを参照してください)。変換されると、これは通常のDataStreamプログラムとなり、StreamExecutionEnvironment.execute()が呼ばれる時に実行されます。

Back to top

クエリの最適化 #

Apache FlinkはApache Calciteを活用および拡張して、高度なクエリ最適化を実行します。 これには以下のような一連のルールとコストベースの最適化が含まれます:

  • Apache Calciteに基づくサブクエリの非相関化
  • Project pruning
  • Partition pruning
  • Filter push-down
  • 重複した計算を回避するためのサブプランの重複排除
  • 以下の2つの部分が含まれる、特別なサブクエリの書き換え:
    • INとEXISTSをleft semi-joinに変換します
    • NOT INとNOT EXISTSをleft anti-joinに変換します
  • オプションのjoinの並び替え
    • table.optimizer.join-reorder-enabledで有効化

注意: IN/EXISTS/NOT IN/NOT EXISTSは現在のところ、サブクエリの書き換えにおける結合条件でのみサポートされます。

このoptimizerは、計画だけでなく、データソースから入手可能な豊富な統計と、io、cpu、network、memoryなどの各オペレーションの詳細なコストに基づいて、知的な決定を下します。

上級者は、TableEnvironment#getConfig#setPlannerConfigを呼び出すことでテーブル環境に提供できるCalciteConfigオブジェクトを介してカスタム最適化を提供できます。

テーブルの説明 #

Table APIは、Tableを計算するための論理的で最適化されたクエリ計画をexplainするための仕組みです。 これは、Table.explain()メソッドやStatementSet.explain()メソッドを通じて行われます。Table.explain()Tableの計画を返します。StatementSet.explain()は複数のシンクの計画を返します。以下の3つの計画を説明する文字列を返します:

  1. 関係クエリの抽象構文木。つまり最適化されていない論理クエリ計画、
  2. 最適化された論理クエリ計画、そして
  3. 物理実行計画。

TableEnvironment.explainSql()TableEnvironment.executeSql()は、計画を取得するためのEXPLAINステートメントの実行をサポートします。EXPLAINページを参照してください。

以下のコードは、Table.explain()メソッドを使った例と、指定されたTableに対応する出力を示しています:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));

// explain Table API
Table table1 = tEnv.fromDataStream(stream1, $("count"), $("word"));
Table table2 = tEnv.fromDataStream(stream2, $("count"), $("word"));
Table table = table1
  .where($("word").like("F%"))
  .unionAll(table2);

System.out.println(table.explain());
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val table1 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
val table2 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
val table = table1
  .where($"word".like("F%"))
  .unionAll(table2)

println(table.explain())
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

table1 = t_env.from_elements([(1, "hello")], ["count", "word"])
table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
table = table1 \
    .where(col('word').like('F%')) \
    .union_all(table2)
print(table.explain())

上記の例の結果は、

Explain
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:  +- LogicalTableScan(table=[[Unregistered_DataStream_1]])
+- LogicalTableScan(table=[[Unregistered_DataStream_2]])

== Optimized Physical Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])

以下のコードは、StatementSet.explain()メソッドを使った例と、複数のシンク計画に対応する出力を示しています:

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

final Schema schema = Schema.newBuilder()
    .column("count", DataTypes.INT())
    .column("word", DataTypes.STRING())
    .build();

tEnv.createTemporaryTable("MySource1", TableDescriptor.forConnector("filesystem")
    .schema(schema)
    .option("path", "/source/path1")
    .format("csv")
    .build());
tEnv.createTemporaryTable("MySource2", TableDescriptor.forConnector("filesystem")
    .schema(schema)
    .option("path", "/source/path2")
    .format("csv")
    .build());
tEnv.createTemporaryTable("MySink1", TableDescriptor.forConnector("filesystem")
    .schema(schema)
    .option("path", "/sink/path1")
    .format("csv")
    .build());
tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
    .schema(schema)
    .option("path", "/sink/path2")
    .format("csv")
    .build());

StatementSet stmtSet = tEnv.createStatementSet();

Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
stmtSet.add(table1.insertInto("MySink1"));

Table table2 = table1.unionAll(tEnv.from("MySource2"));
stmtSet.add(table2.insertInto("MySink2"));

String explanation = stmtSet.explain();
System.out.println(explanation);
val settings = EnvironmentSettings.inStreamingMode()
val tEnv = TableEnvironment.create(settings)

val schema = Schema.newBuilder()
    .column("count", DataTypes.INT())
    .column("word", DataTypes.STRING())
    .build()

tEnv.createTemporaryTable("MySource1", TableDescriptor.forConnector("filesystem")
    .schema(schema)
    .option("path", "/source/path1")
    .format("csv")
    .build())
tEnv.createTemporaryTable("MySource2", TableDescriptor.forConnector("filesystem")
    .schema(schema)
    .option("path", "/source/path2")
    .format("csv")
    .build())
tEnv.createTemporaryTable("MySink1", TableDescriptor.forConnector("filesystem")
    .schema(schema)
    .option("path", "/sink/path1")
    .format("csv")
    .build())
tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
    .schema(schema)
    .option("path", "/sink/path2")
    .format("csv")
    .build())
    
val stmtSet = tEnv.createStatementSet()

val table1 = tEnv.from("MySource1").where($"word".like("F%"))
stmtSet.add(table1.insertInto("MySink1"))

val table2 = table1.unionAll(tEnv.from("MySource2"))
stmtSet.add(table2.insertInto("MySink2"))

val explanation = stmtSet.explain()
println(explanation)
settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(environment_settings=settings)

schema = Schema.new_builder()
    .column("count", DataTypes.INT())
    .column("word", DataTypes.STRING())
    .build()

t_env.create_temporary_table("MySource1", TableDescriptor.for_connector("filesystem")
    .schema(schema)
    .option("path", "/source/path1")
    .format("csv")
    .build())
t_env.create_temporary_table("MySource2", TableDescriptor.for_connector("filesystem")
    .schema(schema)
    .option("path", "/source/path2")
    .format("csv")
    .build())
t_env.create_temporary_table("MySink1", TableDescriptor.for_connector("filesystem")
    .schema(schema)
    .option("path", "/sink/path1")
    .format("csv")
    .build())
t_env.create_temporary_table("MySink2", TableDescriptor.for_connector("filesystem")
    .schema(schema)
    .option("path", "/sink/path2")
    .format("csv")
    .build())

stmt_set = t_env.create_statement_set()

table1 = t_env.from_path("MySource1").where(col('word').like('F%'))
stmt_set.add_insert("MySink1", table1)

table2 = table1.union_all(t_env.from_path("MySource2"))
stmt_set.add_insert("MySink2", table2)

explanation = stmt_set.explain()
print(explanation)

the result of multiple-sinks plan is

MultiTable Explain
== Abstract Syntax Tree ==
LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[count, word])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
   +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])

LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[count, word])
+- LogicalUnion(all=[true])
   :- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
   :  +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
   +- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]])

== Optimized Physical Plan ==
LegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[count, word])
+- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
   +- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])

LegacySink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[count, word])
+- Union(all=[true], union=[count, word])
   :- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
   +- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])

== Optimized Execution Plan ==
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])(reuse_id=[1])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])

LegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[count, word])
+- Reused(reference_id=[1])

LegacySink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[count, word])
+- Union(all=[true], union=[count, word])
   :- Reused(reference_id=[1])
   +- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])

Back to top

inserted by FC2 system