Real Time Reporting with the Table API
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Table APIを使ったリアルタイムレポート #

Apache Flinkは、バッチとストリーム処理用の統合されたリレーショナルAPIとしてTable APIを提供します。つまり、クエリは、制限のないリアルタイムストリームまたは制限のあるバッチデータセットに対して同じセマンティクスで実行され、同じ結果を生成します。 FlinkのTable APIは、データ分析、データパイプライン、ETLアプリケーションの定義を明確にするために一般的に使われます。

何を構築するつもりですか? #

このチュートリアルでは、口座ごとに金融取引を追跡するためのリアルタイムダッシュボードを構築する方法を学習します。 パイプラインはKafkaからデータを読み取り、Grafana経由で描画されるMySQLへ結果を書き込みます。

必要条件 #

このウォークスルーは、Javaにある程度知識があることを前提としていますが、別のプログラミング言語を使っている場合でも理解できるはずです。 また、SELECT句やGROUP BY句のような基本的なリレーショナル概念を理解していることを仮定します。

助けてください。行き詰まりました! #

行き詰った場合は、コミュニティサポートリソースを調べてください。 特に、Apache Flinkのuser mailing listは、あらゆるApacheプロジェクトの中で最も活発なプロジェクトの1つとして常にランク付けされており、すばやく助けを受けるのに最適な方法です。

Windowsでdockerを実行して、データジェネレータが起動できない場合は、正しいシェルを使っていることを確認してください。 例えば、table-walkthrough_data-generator_1コンテナのdocker-entrypoint.shには、bashが必要です。 利用できない場合は、エラーstandard_init_linux.go:211: exec user process caused “no such file or directory”が投げられます。 回避策は、docker-entrypoint.shの最初の行でシェルをshに切り替えることです。

How To Follow Along #

この手順に従って進めたい場合は、次の機能を備えたコンピュータが必要です:

  • Java 11
  • Maven
  • Docker

注意: このプレイグラウンドで使われるApache Flink Dockerイメージは、Apache Flinkのリリースされたバージョンでのみ使えます。

現在、ドキュメントの最新のスナップショットバージョンを見ているため、以下の全てのバージョン参照は動作しません。 メニューの左下にあるリリースピッカーを使って、ドキュメントを最新のリリースバージョンに切り替えてください。

必要な設定ファイルはflink-playgroundsリポジトリにあります。 ダウンロードしたら、IDEでプロジェクトflink-playground/table-walkthroughを開き、ファイルSpendReportに移動します。

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

tEnv.executeSql("CREATE TABLE transactions (\n" +
    "    account_id  BIGINT,\n" +
    "    amount      BIGINT,\n" +
    "    transaction_time TIMESTAMP(3),\n" +
    "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
    ") WITH (\n" +
    "    'connector' = 'kafka',\n" +
    "    'topic'     = 'transactions',\n" +
    "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
    "    'format'    = 'csv'\n" +
    ")");

tEnv.executeSql("CREATE TABLE spend_report (\n" +
    "    account_id BIGINT,\n" +
    "    log_ts     TIMESTAMP(3),\n" +
    "    amount     BIGINT\n," +
    "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
    ") WITH (\n" +
    "   'connector'  = 'jdbc',\n" +
    "   'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
    "   'table-name' = 'spend_report',\n" +
    "   'driver'     = 'com.mysql.jdbc.Driver',\n" +
    "   'username'   = 'sql-demo',\n" +
    "   'password'   = 'demo-sql'\n" +
    ")");

Table transactions = tEnv.from("transactions");
report(transactions).executeInsert("spend_report");

コードの分解 #

実行環境 #

最初の2行はTableEnvironmentを設定します。 テーブル環境は、ジョブのプロパティを設定し、バッチまたはストリーミングアプリケーションを書いているかどうかを指定し、ソースを作成する方法を示します。 このウォークスルーは、ストリーミング実行を使う標準的なテーブル環境を作成します。

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

テーブルの登録 #

次に、テーブルが現在のカタログに登録されます。これを使って外部システムに接続し、バッチの読み取りとストリーミングの両方の読み書きを行うことができます。 テーブルソースは、データベース、キーバリューストア、メッセージキュー、ファイルシステムのような外部システムに格納されるデータへのアクセスを提供します。 テーブルsinkは外部ストレージシステムへテーブルを発行します。 ソースとシンクの種類に応じて、CSV、JSON、Avro、Parquetのような様々な形式をサポートします。

tEnv.executeSql("CREATE TABLE transactions (\n" +
     "    account_id  BIGINT,\n" +
     "    amount      BIGINT,\n" +
     "    transaction_time TIMESTAMP(3),\n" +
     "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
     ") WITH (\n" +
     "    'connector' = 'kafka',\n" +
     "    'topic'     = 'transactions',\n" +
     "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
     "    'format'    = 'csv'\n" +
     ")");

2つのテーブルが登録されています; トランザクション入力テーブルと支出レポート出力テーブル。 トランザクション(transactions)テーブルを使うと、アカウントID(account_id)、タイムスタンプ(transaction_time)、US$ 金額(amount)を含むクレジットカードトランザクションを読み取ることができます。 このテーブルは、CSVデータを含むtransactionsと呼ばれるKafkaトピックの論理ビューです。

tEnv.executeSql("CREATE TABLE spend_report (\n" +
    "    account_id BIGINT,\n" +
    "    log_ts     TIMESTAMP(3),\n" +
    "    amount     BIGINT\n," +
    "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
    ") WITH (\n" +
    "    'connector'  = 'jdbc',\n" +
    "    'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
    "    'table-name' = 'spend_report',\n" +
    "    'driver'     = 'com.mysql.jdbc.Driver',\n" +
    "    'username'   = 'sql-demo',\n" +
    "    'password'   = 'demo-sql'\n" +
    ")");

2番目のテーブルspend_reportは、集計の最終結果を保存します。 その基礎となるストレージは、MySqlデータベースです。

クエリ #

環境が設定されテーブルが登録されると、最初のアプリケーションを構築する準備が整います。 TableEnvironmentから入力テーブルからfromを読み取ってその行を読み取り、executeInsertを使ってその結果を出力テーブルに書き込むことができます。 report関数は、ビジネスロジックを実装する場所です。 現在は未実装です。

Table transactions = tEnv.from("transactions");
report(transactions).executeInsert("spend_report");

テスト #

プロジェクトには、レポートのロジックを検証する2次テストクラスSpendReportTestが含まれます。 バッチモードでテーブル環境を作成します。

EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tEnv = TableEnvironment.create(settings); 

Flinkのユニークな特性の1つに、バッチとストリーミング全体で一貫したセマンティクスを提供することがあります。 これは、静的データセット上でバッチモードでアプリケーションを開発およびテストし、ストリーミングアプリケーションとして運用環境にデプロイできることを意味します。

試行1 #

ジョブセットアップのスケルトンが完成したので、ビジネスロジックを追加する用意ができました。 目標は、1日の各時間に渡って各アカウントの合計支出を示すレポートを作成することです。 これは、タイムスタンプ列をミリ秒から時間単位に切り捨てることを意味します。

Flinkは、純粋なSQLまたはTable APIを使ったリレーショナルアプリケーションの開発をサポートします。 Table APIはSQLからインスピレーションを得た流暢なDSLであり、JavaまたはPythonで記述でき、強力なIDE統合をサポートします。 SQLクエリと同じように、Tableプログラムは必要なフィールドを選択し、キーでグループ化することができます。 これらの機能と、floorsumなどの組み込み関数を使うと、レポートを作成できるようになります。

public static Table report(Table transactions) {
    return transactions.select(
            $("account_id"),
            $("transaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"),
            $("amount"))
        .groupBy($("account_id"), $("log_ts"))
        .select(
            $("account_id"),
            $("log_ts"),
            $("amount").sum().as("amount"));
}

ユーザ定義関数 #

Flinkには限られた数の組み込み関数が含まれるため、ユーザ定義関数を使って拡張する必要がある場合があります。 floorが事前定義されていない場合、自分で実装できます。

import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;

public class MyFloor extends ScalarFunction {

    public @DataTypeHint("TIMESTAMP(3)") LocalDateTime eval(
        @DataTypeHint("TIMESTAMP(3)") LocalDateTime timestamp) {

        return timestamp.truncatedTo(ChronoUnit.HOURS);
    }
}

そして、それをアプリケーションにすぐに統合します。

public static Table report(Table transactions) {
    return transactions.select(
            $("account_id"),
            call(MyFloor.class, $("transaction_time")).as("log_ts"),
            $("amount"))
        .groupBy($("account_id"), $("log_ts"))
        .select(
            $("account_id"),
            $("log_ts"),
            $("amount").sum().as("amount"));
}

このクエリは、transactionsテーブルの全てのレコードを消費し、レポートを計算し、効率的かつスケーラブルな方法で結果を出力します。 この実装でテストを実行すると、合格します。

ウィンドウの追加 #

時間に基づいてデータをグループ化することは、徳に無限のストリームを扱う場合のデータ処理における一般的な操作です。 時間に基づくグループ化はwindowと呼ばれ、Flinkは柔軟なウィンドウセマンティクスを提供します。 最も基本的なウィンドウの種類は、Tumbleウィンドウと呼ばれ、固定のサイズで時間枠が重複しません。

public static Table report(Table transactions) {
    return transactions
        .window(Tumble.over(lit(1).hour()).on($("transaction_time")).as("log_ts"))
        .groupBy($("account_id"), $("log_ts"))
        .select(
            $("account_id"),
            $("log_ts").start().as("log_ts"),
            $("amount").sum().as("amount"));
}

これにより、アプリケーションはタイムスタンプ列に基づいて1時間のタンブリングウィンドウを使うように定義されます。 従って、タイムスタンプ2019-06-01 01:23:47の行が、2019-06-01 01:00:00ウィンドウに配置されます。

時間に基づく集計は、他の属性とは対照的に、連続ストリーミングアプリケーションでは普通は時間は前進するため、独特です。 floorやUDFと異なり、window関数は組み込みであるため、ランタイムは追加の最適化を適用できます。 バッチコンテキストでは、windowはタイムスタンプ属性によってれこーどをグループ化するための便利なAPIを提供します。

この実装でテストを実行すると、合格します。

ストリーミングでもう一度! #

これで、完全に機能するステートフルな分散ストリーミングアプリケーションが完成しました! クエリは、Kafkaからのトランザクションのストリームを継続的に消費し、時間ごとの支出を計算し、準備が整うとすぐに結果を出力します。 入力は無制限であるため、クエリは手動で停止されるまで実行され続けます。 ジョブは時間windowベースの集計を使うため、フレームワークがj特定のウィンドウにこれ以上レコードが到着しないことを認識している場合、Flinkは状態のクリーンアップなどの特定の最適化を実行できます。

テーブルのプレイグラウンドは完全にDocker化されており、ストリーミングアプリケーションとしてローカルで実行できます。 この環境には、Kafkaトピック、継続的データジェネレーター、MySQL、Grafanaが含まれています。

table-walkthroughフォルダ内から、docker-composeスクリプトを開始します。

$ docker-compose build
$ docker-compose up -d

実行中のジョブに関する情報は、Flinkコンソールで確認できます。

Flink Console

MySQL内部から結果を調べます。

$ docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sql

mysql> use sql-demo;
Database changed

mysql> select count(*) from spend_report;
+----------+
| count(*) |
+----------+
|      110 |
+----------+

最後に、Grafanaに移動して、完全に視覚化された結果を確認します!

Grafana
inserted by FC2 system