Intro to the Python Table API
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Python Table APIの概要 #

このドキュメントは、PyFlink Table APIの簡単な紹介であり、初心者ユーザがPyFlink Table APIの基本的な使い方をすぐに理解できるようにするために使われます。 高度な使用法については、個のユーザガイドの他のドキュメントを参照してください。

Python Table APIプログラムの共通構造 #

全てのTable APIとSQLプログラムは、バッチとストリーミングの両方で同じパターンに従います。以下のコード例は、Table APIとSQLプログラムの共通構造を示しています。

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col

# 1. create a TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# 2. create source Table
table_env.execute_sql("""
    CREATE TABLE datagen (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind' = 'sequence',
        'fields.id.start' = '1',
        'fields.id.end' = '10'
    )
""")

# 3. create sink Table
table_env.execute_sql("""
    CREATE TABLE print (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'print'
    )
""")

# 4. query from source table and perform calculations
# create a Table from a Table API query:
source_table = table_env.from_path("datagen")
# or create a Table from a SQL query:
# source_table = table_env.sql_query("SELECT * FROM datagen")

result_table = source_table.select(col("id") + 1, col("data"))

# 5. emit query result to sink table
# emit a Table API result Table to a sink table:
result_table.execute_insert("print").wait()
# or emit results via SQL query:
# table_env.execute_sql("INSERT INTO print SELECT * FROM datagen").wait()

Back to top

TableEnvironmentの作成 #

TableEnvironment is a central concept of the Table API and SQL integration. The following code example shows how to create a TableEnvironment:

from pyflink.table import EnvironmentSettings, TableEnvironment

# create a streaming TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# or create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

TableEnvironmentを作成する様々な方法の詳細については、TableEnvironment Documentationを参照してください。

TableEnvironmentは以下を担当します:

Back to top

Tableの作成 #

TableはPython Table APIのコアコンポーネントです。Tableオブジェクトは、データ変換のパイプラインを記述します。データ自体は決して含まれません。代わりに、テーブルソースからデータを読み取る方法と、最終的にテーブルシンクにデータを書き込む方法について説明します。千g年されたパイプラインは、出力、最適化され、最終的にクラスタで実行されます。パイプラインは、ストリーミングとバッチシナリオの両方を可能にする、制限付きまたは制限ン足のストリームを操作できます。

Tableは常に特定のTableEnvironmentにバインドされます。同じクエリ内で異なるTableEnvironmentsのテーブルを結合することはできません。例えば、結合、統合など。

リスとオブジェクトを使って作成する #

リストからテーブルを作成できます。これは通常、例や単体テストを書くために使われます。

from pyflink.table import EnvironmentSettings, TableEnvironment

# create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
table.execute().print()

結果は次の通りです:

+----------------------+--------------------------------+
|                   _1 |                             _2 |
+----------------------+--------------------------------+
|                    1 |                             Hi |
|                    2 |                          Hello |
+----------------------+--------------------------------+

特定のカラム名を使ってテーブルを作成することもできます:

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table.execute().print()

結果は次の通りです:

+----------------------+--------------------------------+
|                   id |                           data |
+----------------------+--------------------------------+
|                    1 |                             Hi |
|                    2 |                          Hello |
+----------------------+--------------------------------+

デフォルトでは、テーブルのスキーマはデータから自動的に抽出されます。自動生成されたテーブルスキーマが期待通りではない場合は、手動で指定することもできます:

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
# by default, the type of the "id" column is BIGINT
print('By default the type of the "id" column is %s.' % table.get_schema().get_field_data_type("id"))

from pyflink.table import DataTypes
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
                                DataTypes.ROW([DataTypes.FIELD("id", DataTypes.TINYINT()),
                                               DataTypes.FIELD("data", DataTypes.STRING())]))
# now the type of the "id" column is set as TINYINT
print('Now the type of the "id" column is %s.' % table.get_schema().get_field_data_type("id"))

結果は次の通りです:

By default the type of the "id" column is BIGINT.
Now the type of the "id" column is TINYINT.

DDLステートメントを使って作成する #

SQL DDLステートメントを使ってテーブルを作成することもできます。指定された外部ストレージからデータを読み取るテーブルを表します。

from pyflink.table import EnvironmentSettings, TableEnvironment

# create a stream TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

table_env.execute_sql("""
    CREATE TABLE random_source (
        id BIGINT, 
        data TINYINT 
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind'='sequence',
        'fields.id.start'='1',
        'fields.id.end'='3',
        'fields.data.kind'='sequence',
        'fields.data.start'='4',
        'fields.data.end'='6'
    )
""")
table = table_env.from_path("random_source")
table.execute().print()

結果は次の通りです:

+----+----------------------+--------+
| op |                   id |   data |
+----+----------------------+--------+
| +I |                    1 |      4 |
| +I |                    2 |      5 |
| +I |                    3 |      6 |
+----+----------------------+--------+

TableDescriptorを使って作成する #

TableDescriptorはテーブルを定義するもう1つの方法です。これはSQL DDLステートメントと等価です。

from pyflink.table import EnvironmentSettings, TableEnvironment, TableDescriptor, Schema, DataTypes

# create a stream TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

table_env.create_temporary_table(
    'random_source',
    TableDescriptor.for_connector('datagen')
        .schema(Schema.new_builder()
                .column('id', DataTypes.BIGINT())
                .column('data', DataTypes.TINYINT())
                .build())
        .option('fields.id.kind', 'sequence')
        .option('fields.id.start', '1')
        .option('fields.id.end', '3')
        .option('fields.data.kind', 'sequence')
        .option('fields.data.start', '4')
        .option('fields.data.end', '6')
        .build())

table = table_env.from_path("random_source")
table.execute().print()

結果は次の通りです:

+----+----------------------+--------+
| op |                   id |   data |
+----+----------------------+--------+
| +I |                    1 |      4 |
| +I |                    2 |      5 |
| +I |                    3 |      6 |
+----+----------------------+--------+

カタログを使って作成する #

TableEnvironmentは識別子を使って作成されたテーブルのカタログのマップを維持します。

カタログ内のテーブルは、単一のFlinkセッションのライフサイクルに関連付けられた一時的なテーブル、または複数のFlinkセッションに渡って表示される永続的なテーブルのいずれかです。

SQL DDL、“create table …“と"create view …“経由で作成されたテーブルとビューもカタログに格納されます。

カタログ内のテーブルへSLを介してアクセスできます。

Table APIを使ってカタログからテーブルを使う場合は、“from_path"メソッドを使ってTable APIオブジェクトを作成できます:

# prepare the catalog
# register Table API tables in the catalog
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('source_table', table)

# create Table API table from catalog
new_table = table_env.from_path('source_table')
new_table.execute().print()

結果は次の通りです:

+----+----------------------+--------------------------------+
| op |                   id |                           data |
+----+----------------------+--------------------------------+
| +I |                    1 |                             Hi |
| +I |                    2 |                          Hello |
+----+----------------------+--------------------------------+

Back to top

クエリの書き込み #

Table APIクエリの書き込み #

Tableオブジェクトは、リレーショナルオペレーションを適用するために多くのメソッドを提供します。 これらのメソッドは、入力Table上にリレーショナルオペレーションを適用した結果を表す新しいTableオブジェクトを返します。 これらのリレーショナルオペレーションは、table.group_by(...).select(...)のような複数のメソッド呼び出しで構成される場合があります。

Table APIドキュメントは、ストリーミングとバッチtableでサポートされる全てのTable APIオペレーションを説明します。

以下の例は、単純なTable API集計クエリを示します:

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col

# using batch table environment to execute the queries
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],
                                 ['name', 'country', 'revenue'])

# compute revenue for all customers from France
revenue = orders \
    .select(col("name"), col("country"), col("revenue")) \
    .where(col("country") == 'FRANCE') \
    .group_by(col("name")) \
    .select(col("name"), col("country").sum.alias('rev_sum'))

revenue.execute().print()

結果は次の通りです:

+--------------------------------+----------------------+
|                           name |              rev_sum |
+--------------------------------+----------------------+
|                           Jack |                   30 |
+--------------------------------+----------------------+

行ベースのオペレーションも、MapオペレーションFlatMapオペレーションAggregatオペレーションFlatAggregateオペレーションを含むPython Table APIでサポートされます。

以下の例は、単純な行ベースのオペレーションクエリを示しています:

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.udf import udf
import pandas as pd

# using batch table environment to execute the queries
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],
                                 ['name', 'country', 'revenue'])

map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1),
                   result_type=DataTypes.ROW(
                               [DataTypes.FIELD("name", DataTypes.STRING()),
                                DataTypes.FIELD("revenue", DataTypes.BIGINT())]),
                   func_type="pandas")

orders.map(map_function).execute().print()

結果は次の通りです:

+--------------------------------+----------------------+
|                           name |              revenue |
+--------------------------------+----------------------+
|                           Jack |                  100 |
|                           Rose |                  300 |
|                           Jack |                  200 |
+--------------------------------+----------------------+

SQLクエリの書き込み #

FlinkのSQL統合は、Apache Calciteに基づいており、SQL標準を実装します。SQLクエリはStringsとして指定されます。

SQLドキュメントは、ストリーミングとバッチtableに対するFlink SQLサポートを説明します。

以下の例は、単純なSQL集計クエリを示します:

from pyflink.table import EnvironmentSettings, TableEnvironment

# use a stream TableEnvironment to execute the queries
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)


table_env.execute_sql("""
    CREATE TABLE random_source (
        id BIGINT, 
        data TINYINT
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind'='sequence',
        'fields.id.start'='1',
        'fields.id.end'='8',
        'fields.data.kind'='sequence',
        'fields.data.start'='4',
        'fields.data.end'='11'
    )
""")

table_env.execute_sql("""
    CREATE TABLE print_sink (
        id BIGINT, 
        data_sum TINYINT 
    ) WITH (
        'connector' = 'print'
    )
""")

table_env.execute_sql("""
    INSERT INTO print_sink
        SELECT id, sum(data) as data_sum FROM 
            (SELECT id / 2 as id, data FROM random_source)
        WHERE id > 1
        GROUP BY id
""").wait()

結果は次の通りです:

2> +I(4,11)
6> +I(2,8)
8> +I(3,10)
6> -U(2,8)
8> -U(3,10)
6> +U(2,15)
8> +U(3,19)

実際、これはprintシンクが受信する変更ログを示しています。 変更ログの出力フォーマットは以下の通りです:

{subtask id}> {message type}{string format of the value}

例えば、“2> +I(4,11)“はこのメッセージが2番目のサブタスクからのものであることを意味し、"+I"は挿入メッセージであることを意味します。"(4, 11)“はメッセージの内容です。 さらに、"-U"はレコードの取り消し(つまり、update-before)を意味し、このメッセージはシンクから削除または取り消される必要があることを意味します。 “+U"は、これが更新レコード(つまり、update-after)であることを意味し、このメッセージはシンクによって更新または挿入される必要があることを意味します。

つまり、上記の変更ログから次の結果を得ます:

(4, 11)
(2, 15) 
(3, 19)

Table APIとSQLの組み合わせ #

Table APIで使われるTableオブジェクトとSQLで使われるtableは、相互に自由に変換できます。

以下の例は、SQLでTableオブジェクトを使う方法を示しています:

# create a sink table to emit results
table_env.execute_sql("""
    CREATE TABLE table_sink (
        id BIGINT, 
        data VARCHAR 
    ) WITH (
        'connector' = 'print'
    )
""")

# convert the Table API table to a SQL view
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('table_api_table', table)

# emit the Table API table
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()

結果は次の通りです:

6> +I(1,Hi)
6> +I(2,Hello)

以下の例はTable APIでSQL tableを使う方法を示しています:

# create a sql source table
table_env.execute_sql("""
    CREATE TABLE sql_source (
        id BIGINT, 
        data TINYINT 
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind'='sequence',
        'fields.id.start'='1',
        'fields.id.end'='4',
        'fields.data.kind'='sequence',
        'fields.data.start'='4',
        'fields.data.end'='7'
    )
""")

# convert the sql table to Table API table
table = table_env.from_path("sql_source")

# or create the table from a sql query
# table = table_env.sql_query("SELECT * FROM sql_source")

# emit the table
table.execute().print()

結果は次の通りです:

+----+----------------------+--------+
| op |                   id |   data |
+----+----------------------+--------+
| +I |                    1 |      4 |
| +I |                    2 |      5 |
| +I |                    3 |      6 |
| +I |                    4 |      7 |
+----+----------------------+--------+

Back to top

結果の発行 #

Tableの出力 #

TableResult.printメソッドを呼び出してTableの内容をコンソールに出力できます。 これは通常、tableをプレビューする場合に使われます。

# prepare source tables 
source = table_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])

# Get TableResult
table_result = table_env.execute_sql("select a + 1, b, c from %s" % source)

# Print the table
table_result.print()

結果は次の通りです:

+----+----------------------+--------------------------------+--------------------------------+
| op |               EXPR$0 |                              b |                              c |
+----+----------------------+--------------------------------+--------------------------------+
| +I |                    2 |                             Hi |                          Hello |
| +I |                    3 |                          Hello |                          Hello |
+----+----------------------+--------------------------------+--------------------------------+

注意 tableの実体化がトリガーされ、tableの内容がクライアントのメモリに収集されます。 Table.limit を介して収集される行数を制限することをお勧めします。

結果をクライアントに収集する #

TableResult.collectメソッドを呼び出して、tableの結果をクライアントに収集できます。 結果の型は自動クローズ可能なイテレータです。

以下のコードはTableResult.collect()メソッドの使用方法を示しています:

# prepare source tables 
source = table_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])

# Get TableResult
table_result = table_env.execute_sql("select a + 1, b, c from %s" % source)

# Traversal result
with table_result.collect() as results:
   for result in results:
       print(result)

結果は以下の通りです:

<Row(2, 'Hi', 'Hello')>
<Row(3, 'Hello', 'Hello')>

注意 tableの実体化がトリガーされ、tableの内容がクライアントのメモリに収集されます。 Table.limit を介して収集される行数を制限することをお勧めします。

pandas DataFrameに変換してクライアントに結果を収集する #

“to_pandas"メソッドを呼び出してTableオブジェクトをpandas DataFrameに変換できます:

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
print(table.to_pandas())

結果は次の通りです:

   id   data
0   1     Hi
1   2  Hello

注意 tableの実体化がトリガーされ、tableの内容がクライアントのメモリに収集されます。 Table.limit を介して収集される行数を制限することをお勧めします。

注意 全てのデータ型がサポートされるわけではありません。

結果を1つのシンクTableに発行する #

“execute_insert"メソッドを呼び出して、Tableオブジェクト内のデータをシンクtableに発行できます:

table_env.execute_sql("""
    CREATE TABLE sink_table (
        id BIGINT, 
        data VARCHAR 
    ) WITH (
        'connector' = 'print'
    )
""")

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table.execute_insert("sink_table").wait()

結果は次の通りです:

6> +I(1,Hi)
6> +I(2,Hello)

これは、SQLを使って行うこともできます:

table_env.create_temporary_view("table_source", table)
table_env.execute_sql("INSERT INTO sink_table SELECT * FROM table_source").wait()

結果を複数のシンクTableに発行する #

StatementSetを使って1つのジョブ内でTableを複数のsink tableに発行できます:

# prepare source tables and sink tables
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view("simple_source", table)
table_env.execute_sql("""
    CREATE TABLE first_sink_table (
        id BIGINT, 
        data VARCHAR 
    ) WITH (
        'connector' = 'print'
    )
""")
table_env.execute_sql("""
    CREATE TABLE second_sink_table (
        id BIGINT, 
        data VARCHAR
    ) WITH (
        'connector' = 'print'
    )
""")

# create a statement set
statement_set = table_env.create_statement_set()

# emit the "table" object to the "first_sink_table"
statement_set.add_insert("first_sink_table", table)

# emit the "simple_source" to the "second_sink_table" via a insert sql query
statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")

# execute the statement set
statement_set.execute().wait()

結果は次の通りです:

7> +I(1,Hi)
7> +I(1,Hi)
7> +I(2,Hello)
7> +I(2,Hello)

TablesのExplain #

Table APIはTableの計算に使われる論理的で最適化されたクエリ計画を説明する仕組みを提供します。 これは、Table.explain()StatementSet.explain()メソッドを使って行われます。Table.explain()Tableの計画を返します。 StatementSet.explain()は複数のシンクを含むジョブに関する計画を取得するために使われます。これらのメソッドは、以下の3つのことを説明する文字列を返します:

  1. 関係クエリの抽象構文木。つまり最適化されていない論理クエリ計画、
  2. 最適化された論理クエリ計画、そして
  3. 物理実行計画。

TableEnvironment.explain_sql()TableEnvironment.execute_sql()は、計画を取得するためのEXPLAINステートメントの実行をサポートしています。詳細については、 EXPLAINページを参照してください。

以下のコードは、Table.explain()メソッドを使う方法を示しています:

# using a stream TableEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table = table1 \
    .where(col("data").like('H%')) \
    .union_all(table2)
print(table.explain())

結果は次の通りです:

== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'H%')])
:  +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_201907291, source: [PythonInputFormatTableSource(id, data)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1709623525, source: [PythonInputFormatTableSource(id, data)]]])

== Optimized Logical Plan ==
Union(all=[true], union=[id, data])
:- Calc(select=[id, data], where=[LIKE(data, _UTF-16LE'H%')])
:  +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_201907291, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1709623525, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])

== Physical Execution Plan ==
Stage 133 : Data Source
        content : Source: PythonInputFormatTableSource(id, data)

        Stage 134 : Operator
                content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_201907291, source: [PythonInputFormatTableSource(id, data)]], fields=[id, data])
                ship_strategy : FORWARD

                Stage 135 : Operator
                        content : Calc(select=[id, data], where=[(data LIKE _UTF-16LE'H%')])
                        ship_strategy : FORWARD

Stage 136 : Data Source
        content : Source: PythonInputFormatTableSource(id, data)

        Stage 137 : Operator
                content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1709623525, source: [PythonInputFormatTableSource(id, data)]], fields=[id, data])
                ship_strategy : FORWARD

以下のコードは、StatementSet.explain()メソッドを使う方法を示しています:

# using a stream TableEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.execute_sql("""
    CREATE TABLE print_sink_table (
        id BIGINT, 
        data VARCHAR 
    ) WITH (
        'connector' = 'print'
    )
""")
table_env.execute_sql("""
    CREATE TABLE black_hole_sink_table (
        id BIGINT, 
        data VARCHAR 
    ) WITH (
        'connector' = 'blackhole'
    )
""")

statement_set = table_env.create_statement_set()

statement_set.add_insert("print_sink_table", table1.where(col("data").like('H%')))
statement_set.add_insert("black_hole_sink_table", table2)

print(statement_set.explain())

結果は次の通りです:

== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.print_sink_table], fields=[id, data])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'H%')])
   +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_541737614, source: [PythonInputFormatTableSource(id, data)]]])

LogicalSink(table=[default_catalog.default_database.black_hole_sink_table], fields=[id, data])
+- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1437429083, source: [PythonInputFormatTableSource(id, data)]]])

== Optimized Logical Plan ==
Sink(table=[default_catalog.default_database.print_sink_table], fields=[id, data])
+- Calc(select=[id, data], where=[LIKE(data, _UTF-16LE'H%')])
   +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_541737614, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])

Sink(table=[default_catalog.default_database.black_hole_sink_table], fields=[id, data])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1437429083, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])

== Physical Execution Plan ==
Stage 139 : Data Source
        content : Source: PythonInputFormatTableSource(id, data)

        Stage 140 : Operator
                content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_541737614, source: [PythonInputFormatTableSource(id, data)]], fields=[id, data])
                ship_strategy : FORWARD

                Stage 141 : Operator
                        content : Calc(select=[id, data], where=[(data LIKE _UTF-16LE'H%')])
                        ship_strategy : FORWARD

Stage 143 : Data Source
        content : Source: PythonInputFormatTableSource(id, data)

        Stage 144 : Operator
                content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1437429083, source: [PythonInputFormatTableSource(id, data)]], fields=[id, data])
                ship_strategy : FORWARD

                Stage 142 : Data Sink
                        content : Sink: Sink(table=[default_catalog.default_database.print_sink_table], fields=[id, data])
                        ship_strategy : FORWARD

                        Stage 145 : Data Sink
                                content : Sink: Sink(table=[default_catalog.default_database.black_hole_sink_table], fields=[id, data])
                                ship_strategy : FORWARD
inserted by FC2 system