Overview
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

ストリーミングの概念 #

FlinkのTable APISQLサポートは、バッチとストリーム処理用の統合APIです。 これは、Table APIとSQLのクエリは、入力が制限付きバッチ入力であるか制限無しストリーム入力であるかに関係なく、同じセマンティクスを持つことを意味します。

以下のページでは、ストリームデータに関するFlinkのリレーショナルAPIの概念、実際的な制限、ストリーム固有の設定パラメータについて説明します。

状態管理 #

ストリーミングモードで時刻されるTableプログラムは、ステートフルストリームプロセッサとしてFlinkの全ての機能を活用します。 processor.

特に、tableプログラムは状態バックエンドと様々なチェックポイントオプションを使って、状態サイズと耐障害性に関する様々な要件を処理するために設定できます。実行中のTable APIとSQLパイプラインのセーブポイントを取得し、後でアプリケーションの状態を復元することができます。

状態の使用方法 #

Table APIとSQLプログラムの宣言的な性質により、パイプライン内のどこでどの程度の状態が使われているが必ずしも明確ではありません。プランナは正しい結果を計算するために状態が必要かどうかを決定します。パイプラインは現在のオプティマイザのルールのセットを考慮してできる限り少ない状態を要求するように最適化されます。

概念的には、ソーステーブルが完全に状態に保たれることはありません。実装者は論理テーブル(つまり、動的テーブル)を扱います。それらの状態要求は使われる操作によって異なります。

ステートフルオペレータ #

joinsaggregationsdeduplicationのようなステートフル操作を含むクエリは、Flinkの状態抽象化が使われる耐障害性ストレージに中間結果を保持する必要があります。

例えば、2つのテーブルの通常のSQL joinは、オペレータは両方の入力テーブルを完全に状態に保つ必要があります。SQLセマンティクスを正しくするためには、ランタイムはいつでも両側からマッチングが発生する可能性があると想定する必要があります。Flinkは、ウォーターマークの概念を使って状態サイズを小さく保つことを目的とする最適化されたウィンドウ結合とインターバルjoinを提供します。

別の例は、単語数を計算する次のクエリです。

CREATE TABLE doc (
    word STRING
) WITH (
    'connector' = '...'
);
CREATE TABLE word_cnt (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt  BIGINT
) WITH (
    'connector' = '...'
);

INSERT INTO word_cnt
SELECT word, COUNT(1) AS cnt
FROM doc
GROUP BY word;

wordフィールドはグループ化キーとして使われ、連続クエリは発見した各wordのカウントをシンクに書き込みます。 word値は時間の経過とともに変化していて、連続クエリは終了しないため、フレームワークは発見した各word値のカウントを維持する必要があります。 その結果、発見されたword値が増えるに連れて、クエリの合計状態サイズが継続的に増加します。

Explicit-derived stateful op

Queries such as SELECT ... FROM ... WHERE which only consist of field projections or filters are usually stateless pipelines. ただし、状況によっては、ステートフルオペレーションは入力の特性を通じて暗黙的に算出されます(例えば、入力はUPDATE_BEFOREの無い変更ログです。テーブルからストリームへの変換を参照)、またはユーザ設定による(table-exec-source-cdc-events-duplicateを参照)ものです。

The following figure illustrates a SELECT ... FROM statement that querying an upsert kafka source.

CREATE TABLE upsert_kakfa (
    id INT PRIMARY KEY NOT ENFORCED,
    message  STRING
) WITH (
    'connector' = 'upsert-kafka',
    ...
);

SELECT * FROM upsert_kakfa;

テーブルソースはINSERTUPDATE_AFTERDELETEタイプのメッセージのみを提供しますが、ダウンストリームシンクには完全な変更ログ(UPDATE_BEFOREを含む)が必要です。 その結果、このクエリ自体は明示的なステートフル計算は含まれませんが、プランナは完全な変更ログの取得に役立つ"ChangelogNormalize"と呼ばれるステートフルオペレータを生成します。 Implicit-derived stateful op

必要な状態の量と、増大し続ける状態サイズを制限する方法の詳細については、個々のオペレータのドキュメントを参照してください。

アイドル状態の保持期間 #

アイドル状態の保持期間パラメータtable.exec.state.ttlは、キーの状態が削除されるまでに更新されずに保持される期間を定義します。 前のクエリの例では、wordのカウントは設定された期間更新されないとすぐに削除されます。

キーの状態を削除すると、連続クエリは以前にこのキーを見たことが完全に忘れ去られます。以前に状態が削除されたキーを持つレコードが処理される場合、レコードはそれぞれのキーを持つ最初のレコードを持つ最初のレコードであるかのように扱われます。上の例の場合、これはwordのカウントが0から再び開始されることを意味します。

オペレータレベルの状態TTLの設定 #


これは高度な機能であるため、注意して使う必要があります。これは、パイプラインで複数の状態が使われ、状態ごとに異なるTTL(Time-to-Live)を設定する必要がある場合にのみ適しています。 パイプラインにステートフル計算が含まれない場合は、この手順に従う必要はありません。 パイプラインが1つの状態のみを使う場合は、パイプラインレベルではtable.exec.state.ttlを設定するだけで済みます。

Flink v1.18以降、Table APIとSQLは、状態の使用を改善するためにオペレータレベルでの詳細な状態TTLの設定をサポートします。 設定可能な粒度は、各状態オペレータの受信入力エッジの数として定義されます。 具体的には、OneInputStreamOperatorは1つの状態のTTLを設定できますが、2つの入力を持つ(通常のjoinなど)TwoInputStreamOperatorは左右のTTLをそれぞれ設定できます。 より一般的には、K個の入力を持つMultipleInputStreamOperatorの場合、K状態のTTLを設定できます。

一般的な使い方は次の通りです:

  • 通常のjoinsに異なるTTLを設定します。 通常のjoinは、左の入力を維持するために左の状態、右の入力を維持するために右の状態を持つTwoInputStreamOperatorを生成します。Flink v1.18以降、左の状態と右の状態に異なる状態TTLを設定できます。
  • 1つのパイプライン内の異なる変換に対して異なるTTLを設定します。 例えば、ROW_NUMBERを使ってdeduplicationを行い、GROUP BYを使ってaggregationを行うETLパイプラインがあります。 このテーブルプログラムは、独自の状態を持つ2つのOneInputStreamOperatorを生成します。 重複排除状態と集計状態に異なる状態TTLを設定できるようになりました。
ウィンドウベースのオペレーション(Window JoinWindow AggregationWindow Top-N など)とInterval Joinsは、状態維持の制御にtable.exec.state.ttlに依存依存せず、それらの状態TTLをオペレータレベルで設定できません。

コンパイルされた計画の生成

セットアッププロセスは、COMPILE PLAN文を使ってJSONファイルを生成することから始まります。これは現在のテーブルプログラムのシリアライズ化された実行計画を表します。

Currently, COMPILE PLAN statement does not support SELECT... FROM... queries.

  • COMPILE PLAN文の実行
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tableEnv.executeSql(
    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id BIGINT, ...)");
tableEnv.executeSql(
    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, ...)");
tableEnv.executeSql(
    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, order_status TINYINT, ...)");

// CompilePlan#writeToFile only supports a local file path, if you need to write to remote filesystem,
// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR ...")
CompiledPlan compiledPlan = 
    tableEnv.compilePlanSql(
        "INSERT INTO enriched_orders \n" 
       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
       + "FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id");

compiledPlan.writeToFile("/path/to/plan.json");
val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
tableEnv.executeSql(
    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id BIGINT, ...)")
tableEnv.executeSql(
    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, ...)")
tableEnv.executeSql(
    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, order_status TINYINT, ...)")

val compiledPlan = 
    tableEnv.compilePlanSql(
       """
        |INSERT INTO enriched_orders
        |SELECT a.order_id, a.order_line_id, b.order_status, ...
        |FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id
        |""".stripMargin)
// CompilePlan#writeToFile only supports a local file path, if you need to write to remote filesystem,
// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR ...")
compiledPlan.writeToFile("/path/to/plan.json")
Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id BIGINT, ...);
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, ...);
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, order_status TINYINT, ...);
[INFO] Execute statement succeed.

Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO enriched_orders
> SELECT a.order_id, a.order_line_id, b.order_status, ...
> FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id;
[INFO] Execute statement succeed.
  • SQL構文

    COMPILE PLAN [IF NOT EXISTS] <plan_file_path> FOR <insert_statement>|<statement_set>;
    

statement_set: EXECUTE STATEMENT SET BEGIN insert_statement; … insert_statement; END;

insert_statement: <insert_from_select>|<insert_from_values>

これは`/path/to/plan.json`にJSONファイルを生成します。

COMPILE PLAN文は、hdfs://s3://のようなリモートfilesystemへの計画の書き込みをサポートします。 ターゲットパスに書き込みアクセスが設定されていることを確認してください。
**コンパイルされた計画の変更** 状態を使う全てのオペレータは、以下の構造を持つ"state"という名前のJSON配列を明示的に生成します。 理論的には、k番目の入力ストリームオペレータはk番目の状態になります。 ```json "state": [ { "index": 0, "ttl": "0 ms", "name": "${1st input state name}" }, { "index": 1, "ttl": "0 ms", "name": "${2nd input state name}" }, ... ]

変更する必要があるオペレータを見つけて、TTLの値を正の整数に変更し、時間単位"ms"が含まれるようにしてください。 例えば、状態のTTLとして1時間を設定する場合、JSONを次のように変更できます:

{
  "index": 0,
  "ttl": "3600000 ms",
  "name": "${1st input state name}"
}

ファイルを保存し、ジョブを送信するためにEXECUTE PLAN文を使います。

概念的には、ダウンストリームのステートフルオペレータのTTLは、アップストリームステートフルオペレータのTTL以上である必要があります。

コンパイルされた計画の実行

EXECUTE PLAN文は、指定されたファイルをデシリアライズして現在のテーブルプログラムの実行計画に戻し、ジョブを送信します。 EXECUTE PLAN文を介して送信されたジョブは、設定table.exec.state.ttlの代わりにファイルから読み込んだ状態TTLを適用します。

  • EXECUTE PLAN文の実行

    TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
    tableEnv.executeSql(
        "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id BIGINT, ...)");
    tableEnv.executeSql(
        "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, ...)");
    tableEnv.executeSql(
        "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, order_status TINYINT, ...)");
    
    // PlanReference#fromFile only supports a local file path, if you need to read from remote filesystem,
    // please use tableEnv.executeSql("EXECUTE PLAN 'hdfs://path/to/plan.json'").await();
    tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await();
    
    val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
    tableEnv.executeSql(
        "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id BIGINT, ...)")
    tableEnv.executeSql(
        "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, ...)")
    tableEnv.executeSql(
        "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, order_status TINYINT, ...)")
    
    // PlanReference#fromFile only supports a local file path, if you need to read from remote filesystem,
    // please use tableEnv.executeSql("EXECUTE PLAN 'hdfs://path/to/plan.json'").await()
    tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await()
    
    Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id BIGINT, ...);
    [INFO] Execute statement succeed.
    
    Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, ...);
    [INFO] Execute statement succeed.
    
    Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, order_status TINYINT, ...);
    [INFO] Execute statement succeed.
    
    Flink SQL> EXECUTE PLAN 'file:///path/to/plan.json';
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 79fbe3fa497e4689165dd81b1d225ea8
    

  • SQL構文

    EXECUTE PLAN [IF EXISTS] <plan_file_path>;
    

    これは、JSONファイルをデシリアライズ化し、insert文のジョブを送信します。

完全な例

次のテーブルプログラムは、充実した注文出荷情報を計算します。 左側と右側で異なる状態TTLを使って通常のinner joinを実行します。

  • コンパイルされた計画の生成
    -- left source table
    

CREATE TABLE Orders ( order_id INT, line_order_id INT ) WITH ( ‘connector’=’…’ );

– right source table CREATE TABLE LineOrders ( line_order_id INT, ship_mode STRING ) WITH ( ‘connector’=’…’ );

– sink table CREATE TABLE OrdersShipInfo ( order_id INT, line_order_id INT, ship_mode STRING ) WITH ( ‘connector’ = ‘…’ );

COMPILE PLAN ‘/path/to/plan.json’ FOR INSERT INTO OrdersShipInfo SELECT a.order_id, a.line_order_id, b.ship_mode FROM Orders a JOIN LineOrders b ON a.line_order_id = b.line_order_id;

生成されたJSONファイルには次の内容が含まれます:

```json
{
"flinkVersion" : "1.18",
"nodes" : [ {
 "id" : 1,
 "type" : "stream-exec-table-source-scan_1",
 "scanTableSource" : {
   "table" : {
     "identifier" : "`default_catalog`.`default_database`.`Orders`",
     "resolvedTable" : { ... }
   }
 },
 "outputType" : "ROW<`order_id` INT, `line_order_id` INT>",
 "description" : "TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, line_order_id])",
 "inputProperties" : [ ]
}, {
 "id" : 2,
 "type" : "stream-exec-exchange_1",
 "inputProperties" : [ ... ],
 "outputType" : "ROW<`order_id` INT, `line_order_id` INT>",
 "description" : "Exchange(distribution=[hash[line_order_id]])"
}, {
 "id" : 3,
 "type" : "stream-exec-table-source-scan_1",
 "scanTableSource" : {
   "table" : {
     "identifier" : "`default_catalog`.`default_database`.`LineOrders`",
     "resolvedTable" : {...}
   }
 },
 "outputType" : "ROW<`line_order_id` INT, `ship_mode` VARCHAR(2147483647)>",
 "description" : "TableSourceScan(table=[[default_catalog, default_database, LineOrders]], fields=[line_order_id, ship_mode])",
 "inputProperties" : [ ]
}, {
 "id" : 4,
 "type" : "stream-exec-exchange_1",
 "inputProperties" : [ ... ],
 "outputType" : "ROW<`line_order_id` INT, `ship_mode` VARCHAR(2147483647)>",
 "description" : "Exchange(distribution=[hash[line_order_id]])"
}, {
 "id" : 5,
 "type" : "stream-exec-join_1",
 "joinSpec" : { ... },
 "state" : [ {
   "index" : 0,
   "ttl" : "0 ms",
   "name" : "leftState"
 }, {
   "index" : 1,
   "ttl" : "0 ms",
   "name" : "rightState"
 } ],
 "inputProperties" : [ ... ],
 "outputType" : "ROW<`order_id` INT, `line_order_id` INT, `line_order_id0` INT, `ship_mode` VARCHAR(2147483647)>",
 "description" : "Join(joinType=[InnerJoin], where=[(line_order_id = line_order_id0)], select=[order_id, line_order_id, line_order_id0, ship_mode], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])"
}, {
 "id" : 6,
 "type" : "stream-exec-calc_1",
 "projection" : [ ... ],
 "condition" : null,
 "inputProperties" : [ ... ],
 "outputType" : "ROW<`order_id` INT, `line_order_id` INT, `ship_mode` VARCHAR(2147483647)>",
 "description" : "Calc(select=[order_id, line_order_id, ship_mode])"
}, {
 "id" : 7,
 "type" : "stream-exec-sink_1",
 "configuration" : { ... },
 "dynamicTableSink" : {
   "table" : {
     "identifier" : "`default_catalog`.`default_database`.`OrdersShipInfo`",
     "resolvedTable" : { ... }
   }
 },
 "inputChangelogMode" : [ "INSERT" ],
 "inputProperties" : [ ... ],
 "outputType" : "ROW<`order_id` INT, `line_order_id` INT, `ship_mode` VARCHAR(2147483647)>",
 "description" : "Sink(table=[default_catalog.default_database.OrdersShipInfo], fields=[order_id, line_order_id, ship_mode])"
} ],
"edges" : [ ... ]
}
  • 計画内容の変更と計画の実行

    joinオペレータの状態のJSON表現は、次の構造を持ちます:

    "state": [
     {
       "index": 0,
       "ttl": "0 ms",
       "name": "leftState"
     },
     {
       "index": 1,
       "ttl": "0 ms",
       "name": "rightState"
     }
    ]
    

    "index"は、現在の状態がオペレータのi番目の入力であることを示し、インデックスはゼロから始まります。 左右の現在のTTL値は"0 ms"で、状態保持が有効になっていないことを意味します。 ここで、左側の状態の値を"3000 ms"に、右側の状態の値を"9000 ms"に変更します。

    "state": [
     {
       "index": 0,
       "ttl": "3000 ms",
       "name": "leftState"
     },
     {
       "index": 1,
       "ttl": "9000 ms",
       "name": "rightState"
     }
    ]
    

    ファイルに加えた変更を保温し、計画を実行します。

    EXECUTE PLAN '/path/to/plan.json'
    

ステートフルアップグレードと進化 #

ストリーミングモードで実行されるテーブルプログラムは、標準クエリを意図しており、一度定義されると静的なエンドツーエンドパイプラインんとして継続的に評価されます。

ステートフルパイプラインの場合、クエリまたはFlinkのプランナの両方に変更を加えると、全く異なる実行計画になる可能性があります。このため、現時点では、ステートフルアップグレードとテーブルプログラムの進化が困難になっています。コミュニティはこれらの欠点の改善に取り組んでいます。

例えば、フィルタ記述子を追加することで、オプティマイザはjoinの順序を変更したり、中間オペレータのスキーマを変更したりすることを決定する場合があります。これにより、トポロジの変更やオペレータの状態内の列レイアウトの違いによるセーブポイントからの復元が防止されます。

クエリ実装者は、変更の前後で最適化された計画に互換性があることを確認する必要があります。 SQLのEXPLAINコマンドまたはTable APIのtable.explain()を使って、見識を深めます

新しいオプティマイザのルールが継続的に追加され、オペレータがより効率的かつ専門化されるため、新しいFlinkバージョンにアップグレードすると計画に互換性が無くなる可能性があります。

現在、フレームワークは、セーブポイントから新しいテーブルオペレータトポロジに状態をマップできることを保証できません。

言い換えると: セーブポイントは、クエリとFlinkのバージョンの両方が一定のままである場合のみサポートされます。

コミュニティはパッチバージョン(例えば、1.13.1から1.13.2)で最適化された計画とオペレータトポロジを変更する貢献を拒否しているため、Table APIとSQLパイプラインを新しいバグ修正リリースへアップグレードしても安全です。ただし、メジャー-マイナーアップグレード(例えば、1.12から1.13)はサポートされていません。

両方の欠点(つまり、変更されたクエリと変更されたFlinkバージョン)については、リアルタイムデータに切り替える前に、更新されたテーブルプログラムの状態を履歴データで再度"warmed up"(つまり初期化)できるかどうかを調査することをお勧めします。Flinkコミュニティはこの切替をできるだけ便利にするために、ハイブリッドソースに取り組んでいます。

次はどこに行きますか? #

Back to top

inserted by FC2 system