テーブルとSQL Beta

テーブルAPIとSQLは実験的な機能です。

テーブルAPIは簡単にFlinkのデータセットとデータストリームAPI(JavaとScala)に組み込むことができる関連ストリームおよびバッチ処理のためのSQLのような表現言語です。テーブルAPIとSQLインタフェースは関連テーブル抽象上で操作します。これは外部データソースあるいは既存のデータセットとデータストリームから生成することができます。テーブルAPIを使って、テーブルに selection, aggregation および joins のような関連オペレータを適用することができます。

テーブルはそれらが登録(Registering Tablesを見てください)されている限り、通常のSQLを使ってクエリすることもできます。テーブルAPIとSQLは透過の機能を提供し、同じプログラム内で混ぜることができます。テーブルDataSet あるいは DataStreamに変換される時、関連オペレータとSQLクエリによって定義された論理プランはApache Calcite を使って最適化され、DataSet または DataStream プログラムに変換されます。

テーブルAPIとSQLの使用

テーブルAPIとSQLはflink-table Mavenプロジェクトの一部です。テーブルAPIおよびSQLを使うために以下の依存をプロジェクトに追加する必要があります:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table_2.11</artifactId>
  <version>1.5-SNAPSHOT</version>
</dependency>

注意: テーブルAPIは現在のところバイナリ配布物の一部ではありません。クラスタ実行のためにそれをリンクするには ここを見てください。

テーブルの登録

TableEnvironmentはテーブルをユニークな名前を使って登録することができる内部的なテーブルのカタログを持ちます。登録の後でテーブルは名前を使ってTableEnvironmentからアクセスすることができます。

注意: DataSetあるいはDataStreamTableEnvironment内でそれらを登録すること無しに直接テーブルに変換することができます。

データセットの登録

DataSetは以下のようにBatchTableEnvironment の中にTableとして登録されます:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register the DataSet cust as table "Customers" with fields derived from the dataset
tableEnv.registerDataSet("Customers", cust)

// register the DataSet ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataSet("Orders", ord, "user, product, amount");
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register the DataSet cust as table "Customers" with fields derived from the dataset
tableEnv.registerDataSet("Customers", cust)

// register the DataSet ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataSet("Orders", ord, 'user, 'product, 'amount)

注意: DataSet Table の名前は内部的な利用のためのみに保持される ^_DataSetTable_[0-9]+ パターンに一致しなければなりません。

データストリームの登録

DataStream は以下のようにStreamTableEnvironmentの中に Table として登録されます:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register the DataStream cust as table "Customers" with fields derived from the datastream
tableEnv.registerDataStream("Customers", cust)

// register the DataStream ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataStream("Orders", ord, "user, product, amount");
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register the DataStream cust as table "Customers" with fields derived from the datastream
tableEnv.registerDataStream("Customers", cust)

// register the DataStream ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataStream("Orders", ord, 'user, 'product, 'amount)

注意: DataStream Table の名前は内部的な利用のためのみに保持される ^_DataStreamTable_[0-9]+ パターンに一致しなければなりません。

テーブルの登録

テーブルAPIオペレータあるいはSQLクエリを起源とするTableは以下のようにTableEnvironmentの中で登録されます:

// works for StreamExecutionEnvironment identically
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// convert a DataSet into a Table
Table custT = tableEnv
  .toTable(custDs, "name, zipcode")
  .where("zipcode = '12345'")
  .select("name")

// register the Table custT as table "custNames"
tableEnv.registerTable("custNames", custT)
// works for StreamExecutionEnvironment identically
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// convert a DataSet into a Table
val custT = custDs
  .toTable(tableEnv, 'name, 'zipcode)
  .where('zipcode === "12345")
  .select('name)

// register the Table custT as table "custNames"
tableEnv.registerTable("custNames", custT)

テーブルAPIオペレータあるいはSQLクエリを起源とする登録されたTableはリレーショナルDBMSでのビューと似たものとして扱われます。つまり、クエリを最適化する時にインラインにされるかもしれません。

TableSourceを使った外部テーブルの登録

外部的なテーブルは以下のようにTableSourceを使ってTableEnvironment内に登録されます:

// works for StreamExecutionEnvironment identically
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

TableSource custTS = new CsvTableSource("/path/to/file", ...)

// register a `TableSource` as external table "Customers"
tableEnv.registerTableSource("Customers", custTS)
// works for StreamExecutionEnvironment identically
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val custTS: TableSource = new CsvTableSource("/path/to/file", ...)

// register a `TableSource` as external table "Customers"
tableEnv.registerTableSource("Customers", custTS)

TableSourceはデータベース (MySQL, HBase, …)、ファイル形式 (CSV, Apache Parquet, Avro, ORC, …) あるいはメッセージングシステム (Apache Kafka, RabbitMQ, …) のような様々なストレージシステム内に格納されたデータへのアクセスを提供します。

現在のところ、FlinkはCSVファイルを読むために CsvTableSourceを、KafkaからJSONファイルを読むためにKafka08JsonTableSource/Kafka09JsonTableSource を提供します。独自のTableSourceBatchTableSource または StreamTableSource インタフェースを実装することで定義することができます。

利用可能なテーブルソース

クラス名 Maven 依存 Batch? Streaming? 解説
CsvTableSouce flink-table Y Y CSVファイルのための単純なソース。
Kafka08JsonTableSource flink-connector-kafka-0.8 N Y JSONデータのための Kafka 0.8 ソース。
Kafka09JsonTableSource flink-connector-kafka-0.9 N Y JSONデータのための Kafka 0.9 ソース。

flink-table 依存に付属する全てのソースはTableプログラムを使って直接使うことができます。他の全てのテーブルソースについては、flink-table 依存に加えてそれぞれの依存を使いする必要があります。

KafkaJsonTableSource

KafkaJSONソースを使うには、プロジェクトにKafkaコネクタ依存を追加する必要があります:

  • Kafka 0.8 については flink-connector-kafka-0.8 を、
  • Kafka 0.9 については flink-connector-kafka-0.9 を。

そして、以下のようにソースを生成することができます (Kafka 0.8の例):

// The JSON field names and types
String[] fieldNames =  new String[] { "id", "name", "score"};
Class[] fieldTypes = new Class[] { Integer.class, String.class, Double.class };

KafkaJsonTableSource kafkaTableSource = new Kafka08JsonTableSource(
    kafkaTopic,
    kafkaProperties,
    fieldNames,
    fieldTypes);

デフォルトでは、JSONフィールドの欠落はソースを失敗しません。以下のようにこれを設定することができます:

// Fail on missing JSON field
tableSource.setFailOnMissingField(true);

Table APIガイドの後で説明されるようにテーブルを連携することができます:

tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);
Table result = tableEnvironment.ingest("kafka-source");

CsvTableSource

CsvTableSourceは追加の依存無しに既にflink-tableに含まれています。

CsvTableSource を生成する最も簡単な方法は同梱されたCsvTableSource.builder()を使うことで、ビルダーはプロパティを設定するために以下のメソッドを持ちます:

  • path(String path) CSVファイルへのパスを設定します。必須。
  • field(String fieldName, TypeInformation fieldType) フィールド名とフィールド型の情報を持つフィールドを追加します。複数回呼ぶことができます。必須。このメソッドの呼び出し順は行内のフィールドの順番も定義します。
  • fieldDelimiter(String delim) フィールドのデリミタを設定します。デフォルトは","
  • lineDelimiter(String delim) 行のデリミタを設定します。デフォルトは"\n"
  • quoteCharacter(Character quote) 文字列値のためのクォート文字を設定します。デフォルトはnull
  • commentPrefix(String prefix) コメントを示すプレフィックスを設定します。デフォルトはnull
  • ignoreFirstLine() 最初の行を無視します。デフォルトは無効です。
  • ignoreParseErrors() パースに失敗したレコードを失敗する代わりにスキップします。デフォルトでは例外を投げます。

以下のようにソースを生成することができます:

CsvTableSource csvTableSource = CsvTableSource
    .builder()
    .path("/path/to/your/file.csv")
    .field("name", Types.STRING())
    .field("id", Types.INT())
    .field("score", Types.DOUBLE())
    .field("comments", Types.STRING())
    .fieldDelimiter("#")
    .lineDelimiter("$")
    .ignoreFirstLine()
    .ignoreParseErrors()
    .commentPrefix("%");
val csvTableSource = CsvTableSource
    .builder
    .path("/path/to/your/file.csv")
    .field("name", Types.STRING)
    .field("id", Types.INT)
    .field("score", Types.DOUBLE)
    .field("comments", Types.STRING)
    .fieldDelimiter("#")
    .lineDelimiter("$")
    .ignoreFirstLine
    .ignoreParseErrors
    .commentPrefix("%")

ストリームとバッチのTableEnvironmentの中のTable APIガイドの後で説明されるようにテーブルを連携することができます:

tableEnvironment.registerTableSource("mycsv", csvTableSource);

Table streamTable = streamTableEnvironment.ingest("mycsv");

Table batchTable = batchTableEnvironment.scan("mycsv");
tableEnvironment.registerTableSource("mycsv", csvTableSource)

val streamTable = streamTableEnvironment.ingest("mycsv")

val batchTable = batchTableEnvironment.scan("mycsv")

テーブルの登録解除

テーブルは以下のメソッドを使って登録を解除することができます。後に続くSQLクエリは登録を解除されたテーブル名をもう見つけることができないでしょう。

tableEnvironment.unregisterTable("Customers");
tableEnvironment.unregisterTable("Customers")

テーブルAPI

テーブルAPIはScalaおよびJavaでのデータセットとデータストリーム上にリレーショナル オペレーションを適用するためのメソッドを提供します。

テーブルAPIの中核となる概念は、リレーショナル スキーマ(あるいはリレーション)を使ってテーブルを表現する Table です。テーブルはDataSet あるいは DataStreamから生成され、DataSet あるいは DataStreamから変換され、あるいはTableEnvironmentを使ってテーブルカタログ内に登録することができます。Table は常に特定のTableEnvironmentに束縛されます。異なる TableEnvironments のテーブルを組み合わせることはできません。

FlinkのJava Dataset APIを使う場合、データセットは TableEnvironmentを使ってテーブルとデータセットへのテーブルに変換されます。以下の例は、以下の事を説明します:

  • DataSet がどのように Table に変換されるか、
  • リレーショナル クエリがどのように指定されるか、そして
  • Table がどのように DataSet に変換されるか。
public class WC {

  public WC(String word, int count) {
    this.word = word; this.count = count;
  }

  public WC() {} // empty constructor to satisfy POJO requirements

  public String word;
  public int count;
}

...

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

DataSet<WC> input = env.fromElements(
        new WC("Hello", 1),
        new WC("Ciao", 1),
        new WC("Hello", 1));

Table table = tEnv.fromDataSet(input);

Table wordCounts = table
        .groupBy("word")
        .select("word, count.sum as count");

DataSet<WC> result = tableEnv.toDataSet(wordCounts, WC.class);

Javaを使って、表現は文字列で指定される必要があります。埋め込み表現のDSLはサポートされません。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register the DataSet cust as table "Customers" with fields derived from the dataset
tableEnv.registerDataSet("Customers", cust)

// register the DataSet ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataSet("Orders", ord, "user, product, amount");

サポートされるオペレータの完全なリストと表現構文の説目いについてはJavadocを参照してください。

Table API はorg.apache.flink.table.api.scala._をインポートすることで有効にされます。これはDataSet あるいは DataStream のテーブルへの暗黙的な変換を有効にします。以下の例は、以下の事を説明します:

  • DataSet がどのように Table に変換されるか、
  • リレーショナル クエリがどのように指定されるか、そして
  • Table がどのように DataSet に変換されるか。
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._

case class WC(word: String, count: Int)

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
val expr = input.toTable(tEnv)
val result = expr
               .groupBy('word)
               .select('word, 'count.sum as 'count)
               .toDataSet[WC]

表現DSLはフィールド名を参照するためにScalaシンボルを使い、ランタイムコードを効率的にするために表現を変換するためにコード生成を使用します。テーブルから、への変換は、Scala caseクラスあるいはJava POJOを使う時にのみ動作することに注意してください。有効なPOJOの特性を学ぶには、型の抽出とシリアライズ化 を参照してください。

別の例は2つのテーブルをjoinする方法を示します:

case class MyResult(a: String, d: Int)

val input1 = env.fromElements(...).toTable(tEnv).as('a, 'b)
val input2 = env.fromElements(...).toTable(tEnv, 'c, 'd)

val joined = input1.join(input2)
               .where("a = c && d > 42")
               .select("a, d")
               .toDataSet[MyResult]

データセットをテーブルに変換する時に、テーブルのフィールド名が as() あるいは toTable()を使った指定でどのように変更することができるかについて注意してください。更に、例はリレーショナル表現を指定するために文字列をどのように使うかを示します。

DataStreamからのTableの生成も似たようなやりかたで動作します。以下の例はDataStream をどのように Tableに変換するか、そしてテーブルAPIを使ってそれをフィルターする方法を示します。

import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val inputStream = env.addSource(...)
val result = inputStream
                .toTable(tEnv, 'a, 'b, 'c)
                .filter('a === 3)
val resultStream = result.toDataStream[Row]

サポートされるオペレータの完全なリストと表現構文の説目いについてはScaladocを参照してください。

上に戻る

登録されたテーブルへのアクセス

登録されたテーブルは以下のようにしてTableEnvironmentからアクセスすることができます:

  • tEnv.scan("tName")BatchTableEnvironment内で"tName"として登録されたTable を走査します。
  • tEnv.ingest("tName")StreamTableEnvironment内で"tName"として登録されたStreamTableEnvironmentを取り込みます。

上に戻る

テーブルAPIのオペレータ

テーブルAPIはScalaおよびJavaで構造化データ上の言語統合クエリを実行するためにドメイン固有の言語を特徴とします。この章は利用可能なオペレータについての短い概要を説明します。Javadoc内でオペレータのもっと詳細を見つけることができます。

オペレータ 解説
Select

SQLのSELECT文と似ています。selectオペレーションを実行します。

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.select("a, c as d");

テーブル内の全てのカラムを選択するワイルドカードとして振舞う星記号 (*) を使うことができます。

Table result = in.select("*");
As

フィールドの改名。

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.as("d, e, f");
Where / Filter

SQLのWHERE句に似ています。フィルター意味するものを通過できない行をフィルターアウトします。

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.where("b = 'red'");
あるいは
Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.filter("a % 2 = 0");
GroupBy

SQLのGROUPBY句に似ています。行をグループ単位で集約するために以下の集約オペレータを使って、グルーピング キー上で行をグループ化します。

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.groupBy("a").select("a, b.sum as d");
Join

SQLのJOIN句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、少なくとも1つのjoinの等しさを意味するものがjoinオペレータあるいはwhereまたはfilterオペレータを使って定義されなければなりません。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.join(right).where("a = d").select("a, b, e");
LeftOuterJoin

SQLのLEFT OUTER JOIN句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、少なくとも1つのjoinを等しさを意味するものが定義されなければなりません。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.leftOuterJoin(right, "a = d").select("a, b, e");
RightOuterJoin

SQLの RIGHT OUTER JOIN 句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、少なくとも1つのjoinを等しさを意味するものが定義されなければなりません。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.rightOuterJoin(right, "a = d").select("a, b, e");
FullOuterJoin

SQLの FULL OUTER JOIN 句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、少なくとも1つのjoinを等しさを意味するものが定義されなければなりません。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.fullOuterJoin(right, "a = d").select("a, b, e");
和集合

SQLの UNION 句に似ています。重複するレコードを削除して2つのテーブルを結合します。両方のテーブルは等しいフィールド型を持つ必要があります。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.union(right);
UnionAll

SQLの UNION ALL 句に似ています。2つのテーブルを結合します。両方のテーブルは等しいフィールド型を持つ必要があります。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.unionAll(right);
Intersect

SQLの INTERSECT 句に似ています。intersectは両方のテーブルに存在するレコードを返します。もしレコードが1つ以上のテーブルに1回以上存在する場合、1度だけ返します。つまり、結果のテーブルは重複するレコードを持ちません。両方のテーブルは等しいフィールド型を持つ必要があります。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.intersect(right);
IntersectAll

SQLのINTERSECT ALL句に似ています。intersectAllは両方のテーブルに存在するレコードを返します。もしレコードが両方のテーブルに1度以上存在する場合、両方のテーブルに存在するだけの数だけ返します。つmり結果のレコードは重複するレコードを持つかもしれません。両方のテーブルは等しいフィールド型を持つ必要があります。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.intersectAll(right);
Minus

SQLのEXCEPT句に似ています。右のテーブル内に存在しないレコードを左テーブルから引いたものを返します。左のテーブル内の重複レコードは確実に1回だけ返されます。つまり、重複は削除されます。両方のテーブルは等しいフィールド型を持つ必要があります。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minus(right);
MinusAll

SQLの EXCEPT ALL句に似ています。MinusAll は右のレコードに存在しないレコードを返します。左のテーブル内でn回、右のテーブルでm回存在するレコードは (n-m)回返されます。つまり、右のテーブルに存在する重複の数だけ削除されます。両方のテーブルは等しいフィールド型を持つ必要があります。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minusAll(right);
Distinct

SQLの DISTINCT 句に似ています。全く異なる組み合わせを持つレコードを返します。

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.distinct();
Order By

SQLのORDER BY句に似ています。全ての並行パーティションに渡ってグローバルにソートされたレコードを返します。

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc");
Limit

SQLの LIMIT句に似ています。ソートされた結果をオフセットの位置からの指定された数のレコードに制限します。Limit は技術的には Order By オペレータの一部で、従って先にそれが行われなければなりません。

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
あるいは
Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
オペレータ 解説
Select

SQLのSELECT文と似ています。selectオペレーションを実行します。

val in = ds.toTable(tableEnv, 'a, 'b, 'c);
val result = in.select('a, 'c as 'd);

テーブル内の全てのカラムを選択するワイルドカードとして振舞う星記号 (*) を使うことができます。

val in = ds.toTable(tableEnv, 'a, 'b, 'c);
val result = in.select('*);
As

フィールドの改名。

val in = ds.toTable(tableEnv).as('a, 'b, 'c);
Where / Filter

SQLのWHERE句に似ています。フィルター意味するものを通過できない行をフィルターアウトします。

val in = ds.toTable(tableEnv, 'a, 'b, 'c);
val result = in.filter('a % 2 === 0)
あるいは
val in = ds.toTable(tableEnv, 'a, 'b, 'c);
val result = in.where('b === "red");
GroupBy

SQLのGROUPBY句に似ています。Groups rows on the grouping keys, with a following aggregation operator to aggregate rows group-wise.

val in = ds.toTable(tableEnv, 'a, 'b, 'c);
val result = in.groupBy('a).select('a, 'b.sum as 'd);
Join

SQLのJOIN句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、joinの等しさを意味するものがwhereまたはfilterオペレータを使って定義されなければなりません。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
val right = ds2.toTable(tableEnv, 'd, 'e, 'f);
val result = left.join(right).where('a === 'd).select('a, 'b, 'e);
LeftOuterJoin

SQLのLEFT OUTER JOIN句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、少なくとも1つのjoinを等しさを意味するものが定義されなければなりません。

val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
val result = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
RightOuterJoin

SQLの RIGHT OUTER JOIN 句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、少なくとも1つのjoinを等しさを意味するものが定義されなければなりません。

val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
val result = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
FullOuterJoin

SQLの FULL OUTER JOIN 句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、少なくとも1つのjoinを等しさを意味するものが定義されなければなりません。

val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)
val result = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
和集合

SQLの UNION 句に似ています。重複するレコードを削除して2つのテーブルを結合します。両方のテーブルは同じフィールド型を持つ必要があります。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
val result = left.union(right);
UnionAll

SQLの UNION ALL 句に似ています。2つのテーブルを結合します。両方のテーブルは同じフィールド型を持つ必要があります。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
val result = left.unionAll(right);
Intersect

SQLの INTERSECT 句に似ています。intersectは両方のテーブルに存在するレコードを返します。もしレコードが1つ以上のテーブルに1回以上存在する場合、1度だけ返します。つまり、結果のテーブルは重複するレコードを持ちません。両方のテーブルは等しいフィールド型を持つ必要があります。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
val right = ds2.toTable(tableEnv, 'e, 'f, 'g);
val result = left.intersect(right);
IntersectAll

SQLのINTERSECT ALL句に似ています。intersectAllは両方のテーブルに存在するレコードを返します。もしレコードが両方のテーブルに1度以上存在する場合、両方のテーブルに存在するだけの数だけ返します。つmり結果のレコードは重複するレコードを持つかもしれません。両方のテーブルは等しいフィールド型を持つ必要があります。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
val right = ds2.toTable(tableEnv, 'e, 'f, 'g);
val result = left.intersectAll(right);
Minus

SQLのEXCEPT句に似ています。右のテーブル内に存在しないレコードを左テーブルから引いたものを返します。左のテーブル内の重複レコードは確実に1回だけ返されます。つまり、重複は削除されます。両方のテーブルは等しいフィールド型を持つ必要があります。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
val result = left.minus(right);
MinusAll

SQLの EXCEPT ALL句に似ています。MinusAll は右のレコードに存在しないレコードを返します。左のテーブル内でn回、右のテーブルでm回存在するレコードは (n-m)回返されます。つまり、右のテーブルに存在する重複の数だけ削除されます。両方のテーブルは等しいフィールド型を持つ必要があります。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
val result = left.minusAll(right);
Distinct

SQLの DISTINCT 句に似ています。全く異なる組み合わせを持つレコードを返します。

val in = ds.toTable(tableEnv, 'a, 'b, 'c);
val result = in.distinct();
Order By

SQLのORDER BY句に似ています。全ての並行パーティションに渡ってグローバルにソートされたレコードを返します。

val in = ds.toTable(tableEnv, 'a, 'b, 'c);
val result = in.orderBy('a.asc);
Limit

SQLの LIMIT句に似ています。ソートされた結果をオフセットの位置からの指定された数のレコードに制限します。Limit は技術的には Order By オペレータの一部で、従って先にそれが行われなければなりません。

val in = ds.toTable(tableEnv, 'a, 'b, 'c);
val result = in.orderBy('a.asc).limit(3); // returns unlimited number of records beginning with the 4th record
あるいは
val in = ds.toTable(tableEnv, 'a, 'b, 'c);
val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with the 4th record

上に戻る

表現構文

前の章の幾つかのオペレータは1つ以上の表現を期待します。表現は組み込みのScala DSLあるいは文字列を使って指定することができます。表現がどのように指定されるかについて学ぶには上の例を参照してください。

以下は表現のEBNF文法です:

expressionList = expression , { "," , expression } ;

expression = alias ;

alias = logic | ( logic , "AS" , fieldReference ) ;

logic = comparison , [ ( "&&" | "||" ) , comparison ] ;

comparison = term , [ ( "=" | "==" | "===" | "!=" | "!==" | ">" | ">=" | "<" | "<=" ) , term ] ;

term = product , [ ( "+" | "-" ) , product ] ;

product = unary , [ ( "*" | "/" | "%") , unary ] ;

unary = [ "!" | "-" ] , composite ;

composite = suffixed | atom ;

suffixed = interval | cast | as | aggregation | if | functionCall ;

timeInterval = composite , "." , ("year" | "years" | "month" | "months" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;

rowInterval = composite , "." , "rows" ;

cast = composite , ".cast(" , dataType , ")" ;

dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "DATE" | "TIME" | "TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" ;

as = composite , ".as(" , fieldReference , ")" ;

aggregation = composite , ( ".sum" | ".min" | ".max" | ".count" | ".avg" | ".start" | ".end" ) , [ "()" ] ;

if = composite , ".?(" , expression , "," , expression , ")" ;

functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;

atom = ( "(" , expression , ")" ) | literal | nullLiteral | fieldReference ;

fieldReference = "*" | identifier ;

nullLiteral = "Null(" , dataType , ")" ;

timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;

timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;

ここで、literal は有効なJavaリテラル、fieldReference はデータ内のカラムを指定 (あるいはもし*が使われた場合は全てのカラム)、そしてfunctionIdentifier はサポートされるscalar関数を指定します。カラム名と関数名はJavaの識別子の構文に従います。カラム名rowtime はストリーミング環境内での保持された論理属性です。文字列として指定された表現はオペレータと関数を呼ぶためにサフィックスの表記の代わりにプリフィックスの表記を使うこともできます。

厳密な数値あるいは大きな10進数との連携が必要な場合、テーブルAPIはJavaのBigDecimal型もサポートします。Scala テーブルAPIの10進数はBigDecimal("123456") で定義することができ、Javaでは精度のための “p” を追加することで定義することができます。例えば、123456p

一時的な値と連携するために、テーブルAPIはJavaのSQLのDate、Time および Timestamp型をサポートします。ScalaのテーブルAPIではリテラルはjava.sql.Date.valueOf("2016-06-27"), java.sql.Time.valueOf("10:10:42") または java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")を使って定義することができます。Java と Scala Table API は文字列を一時的な型に変換するために"2016-06-27".toDate(), "10:10:42".toTime() および "2016-06-27 10:10:42.123".toTimestamp() の呼び出しもサポートします。注意: Javaの一時的なSQL型はタイムゾーンに依存するため、Flinkクライアントと全てのタスクマネージャが同じタイムゾーンを使うようにしてください。

一時的な間隔は月数(Types.INTERVAL_MONTHS) あるいはミリ秒数 (Types.INTERVAL_MILLIS) として表現することができます。同じ型の間隔は足し算あるいは引き算することができます (例えば 1.hour + 10.minutes)。ミリ秒の間隔は time point に追加することができます (例えば "2016-08-10".toDate + 5.days)。

上に戻る

ウィンドウ

テーブルAPIはバッチおよびストリーミングテーブル上でクエリを定義するための宣言型APIです。投射、選択、結合オペレーションは追加のセマンティクス無しにストリーミングおよびバッチテーブルの両方に適用することができます。しかし、(おそらく)無限のストリーミングテーブル上での集約は、レコードの有限のグループ上でのみ計算することができます。ウィンドウは行のグループを時間あるいは行数の間隔に基づいて有限のグループに集約し、グループごとに集約関数を評価します。バッチテーブルについては、ウィンドウは時間の間隔によってレコードをグループ化するための便利なショートカットです。

ウィンドウはwindow(w: Window) 句を使って定義することができ、エイリアスを必要とします。これはas句を使って指定されます。ウィンドウを使ってテーブルをグループ化するには、ウィンドウのエイリアスは通常のグループ化属性のように groupBy(...)句の中で参照されなければなりません。以下の例はテーブル上にウィンドウの集約を定義する方法を示します。

Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w")  // group the table by window w
  .select("b.sum")  // aggregate
val table = input
  .window([w: Window] as 'w)  // define window with alias w
  .groupBy('w)   // group the table by window w
  .select('b.sum)  // aggregate

ストリーミング環境では、もしウィンドウの集約がウィンドウに加えて1つ以上の属性でグループ化する場合のみ、それらは並行して計算することができます。つまり、groupBy(...) 句はウィンドウのエイリアスと少なくとも1つの追加の属性を参照します。(上の例のような)ウィンドウのエイリアスのみを参照するgroupBy(...)句は並行のタスクでは無く1つでのみ評価することができます。以下の例は追加のグループ属性を使ってウィンドウ集約を定義する方法を示します。

Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w, a")  // group the table by attribute a and window w 
  .select("a, b.sum")  // aggregate
val table = input
  .window([w: Window] as 'w) // define window with alias w
  .groupBy('w, 'a)  // group the table by attribute a and window w 
  .select('a, 'b.sum)  // aggregate

Windowパラメータは行がどうやってウィンドウにマップされるかを定義します。Windowはユーザが実装することができるインタフェースではありません。代わりに、テーブルAPIは特定のセマンティクスを持つ事前定義されたWindow クラスのセットを提供します。これらは背後にある DataStream あるいは DataSet オペレーションに変換されます。サポートされるウィンドウ定義は以下にリスト化されます。時間ウィンドウの開始及び終了タイムスタンプのようなウィンドウプロパティは、それぞれw.startw.endのようなウィンドウ エイリアスのプロパティとしてselect文の中で追加することができます。

Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w, a")  // group the table by attribute a and window w 
  .select("a, w.start, w.end, b.count") // aggregate and add window start and end timestamps
val table = input
  .window([w: Window] as 'w)  // define window with alias w
  .groupBy('w, 'a)  // group the table by attribute a and window w 
  .select('a, 'w.start, 'w.end, 'b.count) // aggregate and add window start and end timestamps

タンブル (タンブリング ウィンドウ)

タンブリング ウィンドウは行をオーバーラップしない固定の長さの連続するウィンドウに割り当てます。例えば、5分のタンブリング ウィンドウは行を5分の間隔にグループ化します。タンブリング ウィンドウはイベント時間、処理時間あるいは行カウント上で定義することができます。

タンブリング ウィンドウは以下のようにTumble クラスを使って定義することができます:

メソッド Required? 解説
over 必須。 時間あるいは行カウントの間隔のどちらかで、ウィンドウの長さを定義します。
on ストリーミング イベント時間ウィンドウおよびバッチテーブル上のウィンドウで必要です。 ストリーミングテーブルのための時間モードを定義します(rowtime は論理的なシステム属性です); バッチテーブルについては、どのレコードがグループ化されるかの時間属性。
as 必須。 エイリアスをウィンドウに割り当てます。エイリアスは後に続くgroupBy() 句でのウィンドウを参照し、任意でselect()句内のウィンドウの開始あるいは終了時間のようなウィンドウ属性を選択するために使われます。
// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"))

// Tumbling Processing-time Window
.window(Tumble.over("10.minutes").as("w"))

// Tumbling Row-count Window
.window(Tumble.over("10.rows").as("w"))
// Tumbling Event-time Window
.window(Tumble over 10.minutes on 'rowtime as 'w)

// Tumbling Processing-time Window
.window(Tumble over 10.minutes as 'w)

// Tumbling Row-count Window
.window(Tumble over 10.rows as 'w)

スライド (スラインディング ウィンドウ)

スライディング ウィンドウは固定サイズを持ち、指定されたスライドの間隔によってスライドします。もしスライドの間隔がウィンドウサイズよりも小さい場合、スライディング ウィンドウはオーバーラップします。従って、行は複数のウィンドウに割り当てられるかもしれません。例えば、15分のサイズのスライディングウィンドウと5分のスライド間隔は、各行を15分のサイズの3つの異なるウィンドウに割り当てます。これは5分の間隔で評価されます。スライディング ウィンドウはイベント時間、処理時間あるいは行カウント上で定義することができます。

スライディング ウィンドウは以下のようにSlide クラスを使って定義することができます:

メソッド Required? 解説
over 必須。 時間あるいは行カウントの間隔のどちらかで、ウィンドウの長さを定義します。
every 必須。 時間あるいは行カウントの間隔のどちらかで、スライドの長さを定義します。スライドの間隔はサイズの間隔として同じ型でなければなりません。
on イベント時間ウィンドウおよびバッチテーブル上のウィンドウで必要です。 ストリーミングテーブルのための時間モードを定義します(rowtime は論理的なシステム属性です); バッチテーブルについては、どのレコードがグループ化されるかの時間属性
as 必須。 エイリアスをウィンドウに割り当てます。エイリアスは後に続くgroupBy() 句でのウィンドウを参照し、任意でselect()句内のウィンドウの開始あるいは終了時間のようなウィンドウ属性を選択するために使われます。
// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))

// Sliding Processing-time window
.window(Slide.over("10.minutes").every("5.minutes").as("w"))

// Sliding Row-count window
.window(Slide.over("10.rows").every("5.rows").as("w"))
// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)

// Sliding Processing-time window
.window(Slide over 10.minutes every 5.minutes as 'w)

// Sliding Row-count window
.window(Slide over 10.rows every 5.rows as 'w)

セッション (セッション ウィンドウ)

セッション ウィンドウは固定のサイズを持ちませんが、それらの境界は不活性の間隔によって定義されます。つまり、もし定義された隙間の間にイベントが起きない場合にセッションウィンドウは閉じられます。例えば、30分の不活性の後で1つの行が観測された場合、30分の隙間を持つセッションウィンドウが開始します(そうでなければ、行は既存のウィンドウに追加されます)。そして30分以内に行が追加されない場合に閉じられます。セッションウィンドウはイベント時間あるいは処理時間で動作することができます。

セッションウィンドウは以下のようにSession クラスを使って定義されます:

メソッド Required? 解説
withGap 必須。 時間間隔として2つのウィンドウ間の隙間を定義します。
on イベント時間ウィンドウおよびバッチテーブル上のウィンドウで必要です。 ストリーミングテーブルのための時間モードを定義します(rowtime は論理的なシステム属性です); バッチテーブルについては、どのレコードがグループ化されるかの時間属性
as 必須。 エイリアスをウィンドウに割り当てます。エイリアスは後に続くgroupBy() 句でのウィンドウを参照し、任意でselect()句内のウィンドウの開始あるいは終了時間のようなウィンドウ属性を選択するために使われます。
// Session Event-time Window
.window(Session.withGap("10.minutes").on("rowtime").as("w"))

// Session Processing-time Window
.window(Session.withGap("10.minutes").as("w"))
// Session Event-time Window
.window(Session withGap 10.minutes on 'rowtime as 'w)

// Session Processing-time Window
.window(Session withGap 10.minutes as 'w)

制限事項

現在のところ以下の機能はまだサポートされません:

  • イベント時間の行カウント ウィンドウ
  • バッチテーブル上の非グループ セッションウィンドウ
  • バッチテーブル上のスライディング ウィンドウ

SQL

SQLのクエリはTableEnvironmentsql()メソッドを使って指定されます。メソッドはDataSetあるいはDataStreamに変換、続くテーブルAPIテーブル クエリ内で使用、あるいはTableSinkに書き込むことができるTableとして、SQLクエリの結果を返します(外部シンクへのテーブルの書き込みを見てください)。SQL と Table API のクエリはシームレスに混合され、全体的に最適化され、1つのデータストリームあるいはデータセット プログラムに変換することができます。

Table, DataSet, DataStream あるいは外部TableSourceはSQLクエリによってアクセス可能にするためにTableEnvironmentに登録されなければなりません(テーブルの登録を見てください)。

注意: Flinkの SQLのサポートはまだ完全ではありません。サポートされないSQL機能を含むクエリはTableExceptionを起こすでしょう。バッチおよびストリーミングテーブル上のSQLの制限は以下の章でリスト化されます。

バッチテーブル上のSQL

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// read a DataSet from an external source
DataSet<Tuple3<Long, String, Integer>> ds = env.readCsvFile(...);
// register the DataSet as table "Orders"
tableEnv.registerDataSet("Orders", ds, "user, product, amount");
// run a SQL query on the Table and retrieve the result as a new Table
Table result = tableEnv.sql(
  "SELECT SUM(amount) FROM Orders WHERE product LIKE '%Rubber%'");
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// read a DataSet from an external source
val ds: DataSet[(Long, String, Integer)] = env.readCsvFile(...)
// register the DataSet under the name "Orders"
tableEnv.registerDataSet("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql(
  "SELECT SUM(amount) FROM Orders WHERE product LIKE '%Rubber%'")

制限事項

現在のバージョンはバッチテーブル上の selection (filter), projection, inner equi-joins, grouping, non-distinct aggregates および sorting をサポートします。

中でも、以下のSQL機能はまだサポートされません:

  • タイムスタンプと間隔はミリ秒の精度に制限されます
  • 間隔の計算は現在のところ制限されています
  • distinct 集約 (例えば COUNT(DISTINCT name))
  • Non-equi joins とデカルト積
  • 効率的なグルーピング セット

注意: テーブルは FROM 句で指定された順番でjoinされます。幾つかの場合において、テーブルの順番はデカルト積を解くために手動で調整されます。

ストリーミングテーブル上のSQL

SQLのクエリは、標準的なSQLのようにストリーミング テーブル(DataStreamあるいは StreamTableSourceで支援されたテーブル)上で実行することができます。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// ingest a DataStream from an external source
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
// register the DataStream as table "Orders"
tableEnv.registerDataStream("Orders", ds, "user, product, amount");
// run a SQL query on the Table and retrieve the result as a new Table
Table result = tableEnv.sql(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

制限事項

ストリーミングSQLの現在のバージョンはSELECT, FROM, WHEREUNION句のみをサポートします。集約とjoinはまだサポートされません。

上に戻る

SQL構文

Flink はSQLのパースのためにApache Calcite を使います。現在のところ、FlinkのSQLはクエリ関連SQL構文のみと包括的なSQL標準のサブセットのみをサポートします。以下の BNF構文はサポートされるSQLの機能を説明します:


query:
  values
  | {
      select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

orderItem:
  expression [ ASC | DESC ]

select:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }
  FROM tableExpression
  [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ]
  [ HAVING booleanExpression ]

selectWithoutFrom:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }

projectItem:
  expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
  ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
  tablePrimary
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
  [ TABLE ] [ [ catalogName . ] schemaName . ] tableName
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'

values:
  VALUES expression [, expression ]*

groupItem:
  expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

Java文字列内でのより良いSQLクエリの定義のために、FlinkのSQLはJavaに似た語彙ポリシーを使います:

  • 識別子の大文字小文字はクォートされているかどうかによって保持されます。
  • クォートされていない場合、識別子は大文字小文字を区別して合致します。
  • Javaと異なり、back-ticks を使って識別子は非アルファベット文字列を含むことができます (例えば "SELECT a AS `my field` FROM t")。

上に戻る

予約語

全てのSQL機能はまだ実装されていませんが、幾つかの文字の組み合わせは将来の使用のためのキーワードとして予約されています。フィールド名として以下の文字列のうちの1つを使いたい場合は、それらを backticks で囲むようにしてください (例えば `value`, `count`)。

A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICTS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE

上に戻る

データの種類

テーブルAPIはFlinkのデータセットとデータストリームAPIの上に構築されています。内部的には、それは型を識別するためにFlinkの TypeInformationも使います。テーブルAPIは今のところ全てのFlinkの型をサポートしません。サポートされる単純な型の全てはorg.apache.flink.table.api.Typesの中でリスト化されています。以下の表はテーブルAPIの型、SQLの型、そして結果のJavaクラス間の関連を要約します。

テーブルAPI SQL Java の型
Types.STRING VARCHAR java.lang.String
Types.BOOLEAN BOOLEAN java.lang.Boolean
Types.BYTE TINYINT java.lang.Byte
Types.SHORT SMALLINT java.lang.Short
Types.INT INTEGER, INT java.lang.Integer
Types.LONG BIGINT java.lang.Long
Types.FLOAT REAL, FLOAT java.lang.Float
Types.DOUBLE DOUBLE java.lang.Double
Types.DECIMAL DECIMAL java.math.BigDecimal
Types.DATE DATE java.sql.Date
Types.TIME TIME java.sql.Time
Types.TIMESTAMP TIMESTAMP(3) java.sql.Timestamp
Types.INTERVAL_MONTHS INTERVAL YEAR TO MONTH java.lang.Integer
Types.INTERVAL_MILLIS INTERVAL DAY TO SECOND(3) java.lang.Long

generic型、composite型(つまり、POJOあるいはTuple) および配列型(オブジェクトあるいはprimitiveの配列)のような進化型は行のフィールドになるかもしれません。

generic型はテーブルAPIおよびSQL内ではまだブラックボックスとして扱われます。

しかし、composite型はcomposite型のフィールドがテーブルAPI内の.get() およびSQL内のdotオペレータ(たとえばMyTable.pojoColumn.myField)オペレータを使ってアクセスすることができる完全にサポートされた型です。composite型はテーブルAPI内で.flatten()あるいはSQL内でMyTable.pojoColumn.*を使ってflattenにすることもできます。

配列型はテーブルAPI内でmyArray.at(1)オペレータおよびSQL内でmyArray[1]オペレータを使ってアクセスすることができます。配列のリテラルはテーブルAPI内でarray(1, 2, 3) およびSQL内で ARRAY[1, 2, 3]を使って生成することができます。

上に戻る

組み込みの関数

テーブルAPIおよびSQLの両方はデータ変換のための組み込み関数のセットが付属します。この章は今のところ利用可能な関数の短い概要を説明します。

比較関数 解説
ANY === ANY

等しい

ANY !== ANY

等しくない

ANY > ANY

より大きい

ANY >= ANY

以上

ANY < ANY

より少ない

ANY <= ANY

以下

ANY.isNull

指定された表現がnullの時にtrueを返す。

ANY.isNotNull

指定された表現がnullではない時にtrueを返す。

STRING.like(STRING)

文字列が指定されたLIKEパターンと一致する時にtrueを返す。例えば、"Jo_n%" は"Jo(arbitrary letter)n"で始まる全ての文字列と一致します。

STRING.similar(STRING)

文字列が指定されたSQL正規表現パターンと一致する時にtrueを返す。例えば、"A+" は少なくとも1つの "A" から成る全ての文字列と一致します。

論理関数 解説
boolean1 || boolean2

boolean1 が true あるいは boolean2 がtrueの時にtrueを返す。3値ロジックがサポートされます。

boolean1 && boolean2

boolean1boolean2 の両方がtrueの時にtrueを返す。3値ロジックがサポートされます。

!BOOLEAN

boolean表現がtrueではない時にtrueを返す; booleanがnullの時にnullを返す。

BOOLEAN.isTrue

指定されたboolean表現がtrueの時にtrueを返す。そうでなければ false (nullおよびfalseについて)。

BOOLEAN.isFalse

指定されたboolean表現がfalseの時にtrueを返す。そうでなければ false (nullおよびtrueについて)。

BOOLEAN.isNotTrue

指定されたboolean表現がtrueではない時にtrueを返す (nullおよびfalseについて)。そうでなければfalse。

BOOLEAN.isNotFalse

指定されたboolean表現がfalseではない時にtrue返す (nullおよびtrueについて)。そうでなければfalse。

計算関数 解説
+ numeric

numericを返す。

- numeric

負のnumericを返す

numeric1 + numeric2

numeric1足すnumeric2を返す。

numeric1 - numeric2

numeric1 引く numeric2を返す。

numeric1 * numeric2

numeric1掛ける numeric2を返す。

numeric1 / numeric2

numeric1 割るnumeric2を返す。

numeric1.power(numeric2)

numeric1numeric2乗を返す。

NUMERIC.abs()

指定された値の絶対値を計算します。

numeric1 % numeric2

numeric1 割る numeric2 の剰余 (モジュラス)を返す。numeric1が負の時だけ結果は負です。

NUMERIC.sqrt()

指定された値の平方根を計算します。

NUMERIC.ln()

指定された値の自然対数を計算します。

NUMERIC.log10()

指定された値の基底10の対数を計算します。

NUMERIC.exp()

オイラー数の指定乗数を計算します。

NUMERIC.ceil()

指定された値以上の最小の整数を計算します。

NUMERIC.floor()

指定された値以下の最大の整数を計算します。

文字列関数 解説
STRING + STRING

2つの文字列を連結する。

STRING.charLength()

文字列の長さを返す。

STRING.upperCase()

デフォルトのロケールのルールを使って文字列内の全ての文字列の大文字を返す。

STRING.lowerCase()

デフォルトのロケールのルールを使って文字列内の全ての文字列の小文字を返す。

STRING.position(STRING)

他の文字列内での文字列の1から始まる位置を返す。文字列が見つからない場合は0を返す。例えば、'a'.position('bbbbba') は6になります。

STRING.trim(LEADING, STRING)
STRING.trim(TRAILING, STRING)
STRING.trim(BOTH, STRING)
STRING.trim(BOTH)
STRING.trim()

指定された文字列から、最初 および/あるいは 最後の文字を削除します。デフォルトでは、両方の端の空白文字が削除されます。

STRING.overlay(STRING, INT)
STRING.overlay(STRING, INT, INT)

文字列の部分文字列を(1から始まる)開始位置の文字列で置き換えます。任意の長さはどれだけの数の文字が削除されるべきかを指定します。例えば、'xxxxxtest'.overlay('xxxx', 6) は "xxxxxxxxx" になり、'xxxxxtest'.overlay('xxxx', 6, 2) は "xxxxxxxxxst" になります。

STRING.substring(INT)

指定された文字列の部分文字列を指定されたインデックスから最後までの文字列で作成します。開始のインデックスは1から始まり、それは含まれます。

STRING.substring(INT, INT)

指定された文字列の部分文字列を指定されたインデックスから指定された長さまでの文字列で生成します。インデックスは1から始まり、含まれます。つまり、インデックスの位置の文字は部分文字列に含まれます。部分文字列は指定された長さ以下です。

STRING.initCap()

文字列内の各単語の最初の文字を大文字に変換します。文字列は [A-Za-z0-9] だけを含むと仮定し、それ以外の全ては空白文字として扱われます。

条件関数 解説
BOOLEAN.?(value1, value2)

評価されたboolean条件に基づいて2つの他の表現のどっちが評価されるべきかを決定する3条件オペレータ。例えば、(42 > 5).?("A", "B") は "A" になります。

型変換関数 解説
ANY.cast(TYPE)

値を指定された型に変換します。例えば、"42".cast(INT) は 42 になります。

値のコンストラクタ関数 解説
ARRAY.at(INT)

配列内の特定の位置の要素を返す。インデックスは1から始まります。

array(ANY [, ANY ]*)

値のリストから配列を生成する。配列は(primitiveでは無い)オブジェクトの配列になるでしょう。

NUMERIC.rows

行の間隔を生成する。

一時的な関数 解説
STRING.toDate()

"yy-mm-dd"形式の日付文字列をSQLの日付にパースする。

STRING.toTime()

"hh:mm:ss"形式の時間の文字列をSQLの時間にパースする。

STRING.toTimestamp()

"yy-mm-dd hh:mm:ss.fff"形式のタイムスタンプ文字列をSQLのタイムスタンプにパースする。

NUMERIC.year
NUMERIC.years

指定された年数について月の間隔を生成する。

NUMERIC.month
NUMERIC.months

指定された月数について月の間隔を生成する。

NUMERIC.day
NUMERIC.days

指定された日数についてミリ秒の間隔を生成する。

NUMERIC.hour
NUMERIC.hours

指定された時間数についてミリ秒の間隔を生成する。

NUMERIC.minute
NUMERIC.minutes

指定された分数についてミリ秒の間隔を生成する。

NUMERIC.second
NUMERIC.seconds

指定された秒数についてミリ秒の間隔を生成する。

NUMERIC.milli
NUMERIC.millis

ミリ秒の間隔を生成する。

currentDate()

UTCタイムゾーンの現在のSQLの日付を返す。

currentTime()

UTCタイムゾーンの現在のSQLの時間を返す。

currentTimestamp()

UTCタイムゾーンの現在のSQLのタイムスタンプを返す。

localTime()

ローカルタイムゾーンの現在のSQLの時間を返す。

localTimestamp()

ローカルタイムゾーンの現在のSQLのタイムスタンプを返す。

TEMPORAL.extract(TIMEINTERVALUNIT)

time pointあるいは時間間隔の各部分を抽出する。部分をlong値として返します。例えば、'2006-06-05'.toDate.extract(DAY) は 5 になります。

TIMEPOINT.floor(TIMEINTERVALUNIT)

time point を指定された単位に上方に丸める。例えば、'12:44:31'.toDate.floor(MINUTE) は 12:44:00 になります。

TIMEPOINT.ceil(TIMEINTERVALUNIT)

time point を指定された単位に下方に丸めます。例えば、'12:44:31'.toTime.floor(MINUTE) は 12:45:00 になります。

DATE.quarter()

SQLの日付から年の四半期を返す。例えば、'1994-09-27'.toDate.quarter() は 3 になります。

temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, TEMPORAL)

2つの固定された時間の間隔がオーバーラップするかどうかを決定する。time point と temporal は2つのtime point (開始、終了)によって定義される範囲に変換されます。関数は leftEnd >= rightStart && rightEnd >= leftStart を評価します。例えば、temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) は true になります。

集約関数 解説
FIELD.count

フィールドがnullではない入力行の数を返す。

FIELD.avg

全ての入力値に渡って数値フィールドの平均(数値平均)を返す。

FIELD.sum

全ての入力値に渡って数値フィールドの合計を返す。

FIELD.max

全ての入力値に渡ってフィールドの最大の値を返す。

FIELD.min

全ての入力値に渡ってフィールドの最小の値を返す。

値へのアクセス関数 解説
COMPOSITE.get(STRING)
COMPOSITE.get(INT)

インデックスあるいは名前を使って(Tuple, POJOなどのような)Flinkのcomposite型のフィールドへアクセスし、その値を返す。例えば、pojo.get('myField') または tuple.get(0)

ANY.flatten()

(Tuple, Pojoなどのような)Flinkのcomposite型とその直接のsubtypeの全てを、各subtypeが分割フィールドであるflatな表現に変換する。ほとんどの場合で、flatな表現のフィールドは元のフィールドに似ているがダラー セパレイターを使った名前を付けられます (例えば、mypojo$mytuple$f0)。

配列関数 解説
ARRAY.cardinality()

配列の要素の数を返す。

ARRAY.element()

1つの要素を持つ配列の唯一の要素を返す。配列が空の場合は null を返す。配列が1つ以上の要素を持つ場合は例外を投げる。

補助関数 解説
ANY.as(name [, name ]* )

表現のための名前、つまりフィールド、を指定します。もし表現が複数のフィールドに拡張する場合は、追加の名前を指定することができます。

比較関数 解説
ANY === ANY

等しい

ANY !== ANY

等しくない

ANY > ANY

より大きい

ANY >= ANY

以上

ANY < ANY

より少ない

ANY <= ANY

以下

ANY.isNull

指定された表現がnullの時にtrueを返す。

ANY.isNotNull

指定された表現がnullではない時にtrueを返す。

STRING.like(STRING)

文字列が指定されたLIKEパターンと一致する時にtrueを返す。例えば、"Jo_n%" は"Jo(arbitrary letter)n"で始まる全ての文字列と一致します。

STRING.similar(STRING)

文字列が指定されたSQL正規表現パターンと一致する時にtrueを返す。例えば、"A+" は少なくとも1つの "A" から成る全ての文字列と一致します。

論理関数 解説
boolean1 || boolean2

boolean1 が true あるいは boolean2 がtrueの時にtrueを返す。3値ロジックがサポートされます。

boolean1 && boolean2

boolean1boolean2 の両方がtrueの時にtrueを返す。3値ロジックがサポートされます。

!BOOLEAN

boolean表現がtrueではない時にtrueを返す; booleanがnullの時にnullを返す。

BOOLEAN.isTrue

指定されたboolean表現がtrueの時にtrueを返す。そうでなければ false (nullおよびfalseについて)。

BOOLEAN.isFalse

指定されたboolean表現がfalseの時にtrueを返す。そうでなければ false (nullおよびtrueについて)。

BOOLEAN.isNotTrue

指定されたboolean表現がtrueではない時にtrueを返す (nullおよびfalseについて)。そうでなければfalse。

BOOLEAN.isNotFalse

指定されたboolean表現がfalseではない時にtrue返す (nullおよびtrueについて)。そうでなければfalse。

計算関数 解説
+ numeric

numericを返す。

- numeric

負のnumericを返す

numeric1 + numeric2

numeric1足すnumeric2を返す。

numeric1 - numeric2

numeric1 引く numeric2を返す。

numeric1 * numeric2

numeric1掛ける numeric2を返す。

numeric1 / numeric2

numeric1 割るnumeric2を返す。

numeric1.power(numeric2)

numeric1numeric2乗を返す。

NUMERIC.abs()

指定された値の絶対値を計算します。

numeric1 % numeric2

numeric1 割る numeric2 の剰余 (モジュラス)を返す。numeric1が負の時だけ結果は負です。

NUMERIC.sqrt()

指定された値の平方根を計算します。

NUMERIC.ln()

指定された値の自然対数を計算します。

NUMERIC.log10()

指定された値の基底10の対数を計算します。

NUMERIC.exp()

オイラー数の指定乗数を計算します。

NUMERIC.ceil()

指定された値以上の最小の整数を計算します。

NUMERIC.floor()

指定された値以下の最大の整数を計算します。

計算関数 解説
STRING + STRING

2つの文字列を連結する。

STRING.charLength()

文字列の長さを返す。

STRING.upperCase()

デフォルトのロケールのルールを使って文字列内の全ての文字列の大文字を返す。

STRING.lowerCase()

デフォルトのロケールのルールを使って文字列内の全ての文字列の小文字を返す。

STRING.position(STRING)

他の文字列内での文字列の1から始まる位置を返す。文字列が見つからない場合は0を返す。例えば、"a".position("bbbbba") は 6 になります。

STRING.trim(
  leading = true,
  trailing = true,
  character = " ")

指定された文字列から、最初 および/あるいは 最後の文字を削除します。

STRING.overlay(STRING, INT)
STRING.overlay(STRING, INT, INT)

文字列の部分文字列を(1から始まる)開始位置の文字列で置き換えます。任意の長さはどれだけの数の文字が削除されるべきかを指定します。例えば、"xxxxxtest".overlay("xxxx", 6) は "xxxxxxxxx" になり、"xxxxxtest".overlay('xxxx', 6, 2) は "xxxxxxxxxst" になります。

STRING.substring(INT)

指定された文字列の部分文字列を指定されたインデックスから最後までの文字列で作成します。開始のインデックスは1から始まり、それは含まれます。

STRING.substring(INT, INT)

指定された文字列の部分文字列を指定されたインデックスから指定された長さまでの文字列で生成します。インデックスは1から始まり、含まれます。つまり、インデックスの位置の文字は部分文字列に含まれます。部分文字列は指定された長さ以下です。

STRING.initCap()

文字列内の各単語の最初の文字を大文字に変換します。文字列は [A-Za-z0-9] だけを含むと仮定し、それ以外の全ては空白文字として扱われます。

条件関数 解説
BOOLEAN.?(value1, value2)

評価されたboolean条件に基づいて2つの他の表現のどっちが評価されるべきかを決定する3条件オペレータ。例えば、(42 > 5).?("A", "B") は "A" になります。

型変換関数 解説
ANY.cast(TYPE)

値を指定された型に変換します。例えば、"42".cast(Types.INT) は 42 になります。

値のコンストラクタ関数 解説
ARRAY.at(INT)

配列内の特定の位置の要素を返す。インデックスは1から始まります。

array(ANY [, ANY ]*)

値のリストから配列を生成する。配列は(primitiveでは無い)オブジェクトの配列になるでしょう。

NUMERIC.rows

行の間隔を生成する。

一時的な関数 解説
STRING.toDate

"yy-mm-dd"形式の日付文字列をSQLの日付にパースする。

STRING.toTime

"hh:mm:ss"形式の時間の文字列をSQLの時間にパースする。

STRING.toTimestamp

"yy-mm-dd hh:mm:ss.fff"形式のタイムスタンプ文字列をSQLのタイムスタンプにパースする。

NUMERIC.year
NUMERIC.years

指定された年数について月の間隔を生成する。

NUMERIC.month
NUMERIC.months

指定された月数について月の間隔を生成する。

NUMERIC.day
NUMERIC.days

指定された日数についてミリ秒の間隔を生成する。

NUMERIC.hour
NUMERIC.hours

指定された時間数についてミリ秒の間隔を生成する。

NUMERIC.minute
NUMERIC.minutes

指定された分数についてミリ秒の間隔を生成する。

NUMERIC.second
NUMERIC.seconds

指定された秒数についてミリ秒の間隔を生成する。

NUMERIC.milli
NUMERIC.millis

ミリ秒の間隔を生成する。

currentDate()

UTCタイムゾーンの現在のSQLの日付を返す。

currentTime()

UTCタイムゾーンの現在のSQLの時間を返す。

currentTimestamp()

UTCタイムゾーンの現在のSQLのタイムスタンプを返す。

localTime()

ローカルタイムゾーンの現在のSQLの時間を返す。

localTimestamp()

ローカルタイムゾーンの現在のSQLのタイムスタンプを返す。

TEMPORAL.extract(TimeIntervalUnit)

time pointあるいは時間間隔の各部分を抽出する。部分をlong値として返します。例えば、"2006-06-05".toDate.extract(TimeIntervalUnit.DAY) は 5 になります。

TIMEPOINT.floor(TimeIntervalUnit)

time point を指定された単位に上方に丸める。例えば、"12:44:31".toTime.floor(TimeIntervalUnit.MINUTE) は 12:44:00 になります。

TIMEPOINT.ceil(TimeIntervalUnit)

time point を指定された単位に下方に丸めます。例えば、"12:44:31".toTime.floor(TimeIntervalUnit.MINUTE) は 12:45:00 になります。

DATE.quarter()

SQLの日付から年の四半期を返す。例えば、"1994-09-27".toDate.quarter() は 3 になります。

temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, TEMPORAL)

2つの固定された時間の間隔がオーバーラップするかどうかを決定する。time point と temporal は2つのtime point (開始、終了)によって定義される範囲に変換されます。関数は leftEnd >= rightStart && rightEnd >= leftStart を評価します。例えば、temporalOverlaps('2:55:00'.toTime, 1.hour, '3:30:00'.toTime, 2.hours) は true になります。

集約関数 解説
FIELD.count

フィールドがnullではない入力行の数を返す。

FIELD.avg

全ての入力値に渡って数値フィールドの平均(数値平均)を返す。

FIELD.sum

全ての入力値に渡って数値フィールドの合計を返す。

FIELD.max

全ての入力値に渡ってフィールドの最大の値を返す。

FIELD.min

全ての入力値に渡ってフィールドの最小の値を返す。

値へのアクセス関数 解説
COMPOSITE.get(STRING)
COMPOSITE.get(INT)

インデックスあるいは名前を使って(Tuple, POJOなどのような)Flinkのcomposite型のフィールドへアクセスし、その値を返す。例えば、'pojo.get("myField") または 'tuple.get(0)

ANY.flatten()

(Tuple, Pojoなどのような)Flinkのcomposite型とその直接のsubtypeの全てを、各subtypeが分割フィールドであるflatな表現に変換する。ほとんどの場合で、flatな表現のフィールドは元のフィールドに似ているがダラー セパレイターを使った名前を付けられます (例えば、mypojo$mytuple$f0)。

配列関数 解説
ARRAY.cardinality()

配列の要素の数を返す。

ARRAY.element()

1つの要素を持つ配列の唯一の要素を返す。配列が空の場合は null を返す。配列が1つ以上の要素を持つ場合は例外を投げる。

補助関数 解説
ANY.as(name [, name ]* )

表現のための名前、つまりフィールド、を指定します。もし表現が複数のフィールドに拡張する場合は、追加の名前を指定することができます。

(構文を含む) Flink の SQL関数は Apache Calciteの組み込み関数の部分集合です。ドキュメントのほとんどはCalcite SQL リファレンスから採用されました。

比較関数 解説
value1 = value2

等しい

value1 <> value2

等しくない

value1 > value2

より大きい

value1 >= value2

以上

value1 < value2

より少ない

value1 <= value2

以下

value IS NULL

value が null の時にTRUEを返す。

value IS NOT NULL

value が nullでは無い時にTRUEを返す。

value1 IS DISTINCT FROM value2

2つの値が等しく無い時にTRUEを返し、null値は同じものとして扱われる。

value1 IS NOT DISTINCT FROM value2

2つの値が等しい時にTRUEを返し、null値は同じものとして扱われる。

value1 BETWEEN [ASYMMETRIC | SYMMETRIC] value2 AND value3

value1value2以上で、value3以下の時にTRUEを返す。

value1 NOT BETWEEN value2 AND value3

value1value2 未満か、value3より大きい時にTRUEを返す。

string1 LIKE string2 [ ESCAPE string3 ]

string1 がパターンstring2に一致する時にTRUEを返す。必要であればエスケープ文字を定義することができます。

string1 NOT LIKE string2 [ ESCAPE string3 ]

string1 がパターン string2 に一致しない時にTRUEを返す。必要であればエスケープ文字を定義することができます。

string1 SIMILAR TO string2 [ ESCAPE string3 ]

string1 が正規表現 string2 に一致する時にTRUEを返す。必要であればエスケープ文字を定義することができます。

string1 NOT SIMILAR TO string2 [ ESCAPE string3 ]

string1 が正規表現 string2 に一致しない時にTRUEを返す。必要であればエスケープ文字を定義することができます。

value IN (value [, value]* )

value がリスト内の値に等しい時にTRUEを返す。

value NOT IN (value [, value]* )

value がリスト内の各値に等しく無い時にTRUEを返す。

EXISTS (sub-query)

sub-query が少なくとも1行を返す時にTRUEを返す。オペレーションがjoinおよびgroupオペレーション内で上書きできる時のみサポートされます。

論理関数 解説
boolean1 OR boolean2

boolean1がTRUEか、boolean2 がTRUEの時にTRUEを返す。3値ロジックがサポートされます。

boolean1 AND boolean2

boolean1boolean2 の両方がTRUEの時にTRUEを返す。3値ロジックがサポートされます。

NOT boolean

boolean がTRUEでは無い時にTRUEを返す; boolean が UNKNOWN の時に UNKNOWN を返す。

boolean IS FALSE

boolean がFALSEの時にTRUEを返す; boolean が UNKNOWN の時に FALSE を返す。

boolean IS NOT FALSE

boolean が FALSE では無い時にTRUEを返す; boolean が UNKNOWN の時にTRUEを返す。

boolean IS TRUE

boolean が TRUE の時にTRUEを返す; boolean が UNKNOWN の時にFALSEを返す。

boolean IS NOT TRUE

boolean が TRUEでは無い時にTRUEを返す; boolean が UNKNOWN の時にTRUEを返す。

boolean IS UNKNOWN

boolean が UNKNOWN の時にTRUEを返す。

boolean IS NOT UNKNOWN

boolean が UNKNOWN では無い時にTRUEを返す。

計算関数 解説
+ numeric

numericを返す。

- numeric

負のnumericを返す

numeric1 + numeric2

numeric1足すnumeric2を返す。

numeric1 - numeric2

numeric1 引く numeric2を返す。

numeric1 * numeric2

numeric1掛ける numeric2を返す。

numeric1 / numeric2

numeric1 割るnumeric2を返す。

POWER(numeric1, numeric2)

numeric1numeric2乗を返す。

ABS(numeric)

numericの絶対値を返す。

MOD(numeric1, numeric2)

numeric1 割る numeric2 の剰余 (モジュラス)を返す。numeric1が負の時だけ結果は負です。

SQRT(numeric)

numericの平方根を返す。

LN(numeric)

numericの自然対数 (基底 e) を返す。

LOG10(numeric)

numericの基底10の対数を返す。

EXP(numeric)

numericの乗数を返す。

CEIL(numeric)

numericを上に丸め、numeric以上の最小の数を返す。

FLOOR(numeric)

numericを下に丸め、numeric以下の最大の数を返す。

文字列関数 解説
string || string

2つの文字列を連結する。

CHAR_LENGTH(string)

文字列内の文字の数を返す。

CHARACTER_LENGTH(string)

CHAR_LENGTH(string)の通り。

UPPER(string)

大文字に変換された文字列を返す。

LOWER(string)

小文字に変換された文字列を返す。

POSITION(string1 IN string2)

string2内にstring1が最初に現れた位置を返す。

TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2)

string2から、最初 および/あるいは 最後の文字を削除する。デフォルトでは、両方の端の空白文字が削除されます。

OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ])

string1の部分文字列をstring2で置き換える。

SUBSTRING(string FROM integer)

指定された位置から始まる部分文字列を返す。

SUBSTRING(string FROM integer FOR integer)

指定された位置から指定された長さの部分文字列を返す。

INITCAP(string)

各単語の最初の文字が大文字で残りが小文字の文字列を返す。単語は非英数字の文字で分割された英数字の文字のシーケンスです。

条件関数 解説
CASE value
WHEN value1 [, value11 ]* THEN result1
[ WHEN valueN [, valueN1 ]* THEN resultN ]*
[ ELSE resultZ ]
END

簡単な例。

CASE
WHEN condition1 THEN result1
[ WHEN conditionN THEN resultN ]*
[ ELSE resultZ ]
END

検索された場合。

NULLIF(value, value)

値が同じ場合はNULLを返す。例えば、NULLIF(5, 5) は NULLを返す; NULLIF(5, 0) は 5 を返す。

COALESCE(value, value [, value ]* )

最初の値がnullの時に値を提供する。例えば、COALESCE(NULL, 5) は 5 を返す。

型変換関数 解説
CAST(value AS type)

値を指定された型に変換します。

値のコンストラクタ関数 解説
array ‘[’ index ‘]’

配列内の特定の位置の要素を返す。インデックスは1から始まります。

ARRAY ‘[’ value [, value ]* ‘]’

値のリストから配列を生成する。

一時的な関数 解説
DATE string

"yy-mm-dd"形式の日付文字列をSQLの日付にパースする。

TIME string

"hh:mm:ss"の形式の時間stringをSQLの時間にパースする。

TIMESTAMP string

"yy-mm-dd hh:mm:ss.fff"の形式のタイムスタンプstring をSQLのタイムスタンプにパースする。

INTERVAL string range

"dd hh:mm:ss.fff"形式の間隔stringをSQLのミリ秒間隔に、"yyyy-mm"をSQLの月間隔にパースする。間隔の範囲は、例えば ミリ秒の間隔については、DAY, MINUTE, DAY TO HOUR または DAY TO SECONDです; 月の間隔についてはYEAR または YEAR TO MONTH です。例えば、INTERVAL '10 00:00:00.004' DAY TO SECOND, INTERVAL '10' DAY あるいは INTERVAL '2-10' YEAR TO MONTH は 間隔を返します。

CURRENT_DATE

UTCタイムゾーンの現在のSQLの日付を返す。

CURRENT_TIME

UTCタイムゾーンの現在のSQLの時間を返す。

CURRENT_TIMESTAMP

UTCタイムゾーンの現在のSQLのタイムスタンプを返す。

LOCALTIME

ローカルタイムゾーンの現在のSQLの時間を返す。

LOCALTIMESTAMP

ローカルタイムゾーンの現在のSQLのタイムスタンプを返す。

EXTRACT(timeintervalunit FROM temporal)

time pointあるいは時間間隔の各部分を抽出する。部分をlong値として返します。例えば、EXTRACT(DAY FROM DATE '2006-06-05') は 5 になります。

FLOOR(timepoint TO timeintervalunit)

time point を指定された単位に上方に丸める。例えば、FLOOR(TIME '12:44:31' TO MINUTE) は 12:44:00 になります。

CEIL(timepoint TO timeintervalunit)

time point を指定された単位に下方に丸めます。例えば、CEIL(TIME '12:44:31' TO MINUTE) は 12:45:00 になります。

QUARTER(date)

SQLの日付から年の四半期を返す。例えば、QUARTER(DATE '1994-09-27') は 3 になります。

(timepoint, temporal) OVERLAPS (timepoint, temporal)

2つの固定された時間の間隔がオーバーラップするかどうかを決定する。time point と temporal は2つのtime point (開始、終了)によって定義される範囲に変換されます。関数は leftEnd >= rightStart && rightEnd >= leftStart を評価します。例えば、(TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR) は true になります; (TIME '9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR) は false になります。

集約関数 解説
COUNT(value [, value]* )

value が null では無い入力行の数を返す。

COUNT(*)

入力行の数を返す。

AVG(numeric)

全ての入力値に渡ってnumericの平均(数値平均)を返す。

SUM(numeric)

全ての入力値に渡ってnumericの合計を返す。

MAX(value)

全ての入力値に渡ってvalueの最大の値を返す。

MIN(value)

全ての入力値に渡ってvalueの最小の値を返す。

グルーピング 関数 解説
GROUP_ID()

グルーピング キーの組み合わせをユニークに識別する数値を返す。

GROUPING(expression)

expressionが現在の行のグルーピング セット内で集まる場合は1を、そうでなければ 0 を返す。

GROUPING_ID(expression [, expression]* )

指定されたグルーピング表現のビット ベクトルを返す。

値へのアクセス関数 解説
tableName.compositeType.field

名前を使って(Tuple, POJOなどのような)Flinkのcomposite型のフィールドへアクセスし、その値を返す。

tableName.compositeType.*

(Tuple, Pojoなどのような)Flinkのcomposite型とその直接のsubtypeの全てを、各subtypeが分割フィールドであるflatな表現に変換する。

配列関数 解説
CARDINALITY(ARRAY)

配列の要素の数を返す。

ELEMENT(ARRAY)

1つの要素を持つ配列の唯一の要素を返す。配列が空の場合は null を返す。配列が1つ以上の要素を持つ場合は例外を投げる。

ユーザ定義のScalar関数

必要なscalar関数が組み込み関数内に含まれていない場合、テーブルAPIおよびSQLの両方のための独自のユーザ定義のscalar関数を定義することができます。ユーザ定義のscalar関数は0, 1, あるいは多数の値を新しいscalar値にマップします。

scalar関数を定義するには、org.apache.flink.table.functions内のScalarFunctionを拡張し、(1つ以上の)評価メソッドを実装する必要があります。scalar関数の挙動は評価関数によって決定されます。評価メソッドは公的に宣言され、evalという名前でなければなりません。評価メソッドのパラメータの型と返り値の型はscalar関数のパラメータと返り値の型も決定します。評価メソッドは多数のevalという名前のメソッドを実装することで上書きすることができます。

以下の例の断片は独自のハッシュコード関数を定義する方法を示します:

public static class HashCode extends ScalarFunction {
  public int eval(String s) {
    return s.hashCode() * 12;
  }
}

BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register the function
tableEnv.registerFunction("hashCode", new HashCode())

// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");

// use the function in SQL API
tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
// must be defined in static/object context
object hashCode extends ScalarFunction {
  def eval(s: String): Int = {
    s.hashCode() * 12
  }
}

val tableEnv = TableEnvironment.getTableEnvironment(env)

// use the function in Scala Table API
myTable.select('string, hashCode('string))

// register and use the function in SQL
tableEnv.registerFunction("hashCode", hashCode)
tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");

デフォルトで評価メソッドの結果の型はFlinkの型の抽出ファシリティによって決定されます。これは基本的な型あるいは単純なPOJOにとって十分ですが、もっと複雑、独自、あるいは複合の型にとっては具合が悪いかもしれません。これらの場合、結果の型のTypeInformationScalarFunction#getResultType()を上書くことで手動で定義することができます。

内部的には、テーブルAPIとSQLのコード生成は出来る限りprimitive値と連携します。もしユーザ定義のscalar関数が実行時にオブジェクトの生成/キャストによってあまり大きなオーバーヘッドを導入すべきでないなら、それらの箱詰めの代わりにprimitiveの型としてパラメータと結果の型を宣言することをお勧めします。Types.DATETypes.TIMEint として表現することもできます。Types.TIMESTAMPlongとして表現されるかもしれません。

以下の例は内部的なタイムスタンプ表現を取り、またlong値として内部的なタイムスタンプ表現を返す、より進んだ例です。ScalarFunction#getResultType()を上書くことで、返されるlong値はコードの生成によってTypes.TIMESTAMPとして解釈されるべきだと定義します。

public static class TimestampModifier extends ScalarFunction {
  public long eval(long t) {
    return t % 1000;
  }

  public TypeInformation getResultType(signature: Class[]) {
    return Types.TIMESTAMP;
  }
}
object TimestampModifier extends ScalarFunction {
  def eval(t: Long): Long = {
    t % 1000
  }

  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
    Types.TIMESTAMP
  }
}

ユーザ定義のテーブル関数

ユーザ定義のscalar関数と似て、ユーザ定義のテーブル関数は0, 1 あるいは多数のscalar値を入力パラメータとして取ります。しかし、scalar関数とは対照的に、出力として1つの値の代わりに任意の数の行を返すことができます。返される行は1つ以上のカラムから成るかもしれません。

テーブル関数を定義するには、org.apache.flink.table.functions内のTableFunctionを拡張し、(1つ以上の)評価メソッドを実装する必要があります。テーブル関数の挙動は評価関数によって決定されます。評価メソッドはpublicに宣言され、evalという名前でなければなりません。TableFunctionは多数のevalという名前のメソッドを実装することで上書きすることができます。評価メソッドのパラメータの型はテーブル関数の全ての有効なパラメータを決定します。返されるテーブルの型はTableFunctionのgeneric型によって決定されます。評価メソッドは protected collect(T) メソッドを使って出力行を発行します。

テーブルAPI内では、テーブル関数はScalarユーザについては .join(Expression) あるいは .leftOuterJoin(Expression) と一緒に、Javaユーザについては.join(String) あるいは .leftOuterJoin(String) と一緒に使われます。join オペレータは、outerテーブル(オペレータの左のテーブル)からの各行とテーブル値関数によって生成された全ての行(オペレータの右のテーブル)を (cross)joinします。leftOuterJoin オペレータはouterテーブル(オペレータの左のテーブル)からの各行と、テーブル値関数によって生成された全ての行(オペレータの右のテーブル)をjoinし、テーブル関数が空のテーブルを返すouterの行を保持します。In SQL use LATERAL TABLE(<TableFunction>) with CROSS JOIN and LEFT JOIN with an ON TRUE join condition (see examples below).

以下の例はテーブル値関数をどうやって定義し使うかを示します:

// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
public class Split extends TableFunction<Tuple2<String, Integer>> {
    public void eval(String str) {
        for (String s : str.split(" ")) {
            // use collect(...) to emit a row
            collect(new Tuple2<String, Integer>(s, s.length()));
        }
    }
}

BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ...         // table schema: [a: String]

// Register the function.
tableEnv.registerFunction("split", new Split());

// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.join("split(a) as (word, length)").select("a, word, length");
myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");

// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
// The generic type "(String, Int)" determines the schema of the returned table as (String, Integer).
class Split extends TableFunction[(String, Int)] {
  def eval(str: String): Unit = {
    // use collect(...) to emit a row.
    str.split(" ").foreach(x -> collect((x, x.length))
  }
}

val tableEnv = TableEnvironment.getTableEnvironment(env)
val myTable = ...         // table schema: [a: String]

// Use the table function in the Scala Table API (Note: No registration required in Scala Table API).
val split = new Split()
// "as" specifies the field names of the generated table.
myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length);
myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length);

// Register the table function to use it in SQL queries.
tableEnv.registerFunction("split", new Split())

// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API)
tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE");

重要: テーブル関数をScalaオブジェクトとして実装しないでください。Scala オブジェクトはシングルトンで並行処理の問題を起こすでしょう。

POJO型は決定論的フィールドの順番を持たないことに注意してください。従って、テーブル関数を使って返されたPOJOのフィールドをASを使って名前を変えることができません。

デフォルトではTableFunctionの結果の型はFlinkの自動型抽出ファシリティを使って決定されます。これは基本型と単純なPOJOについては良く動作しますが、もっと複雑、独自、あるいは複合の型については具合が悪いかもしれません。そのような場合、結果の型はTypeInformationを返すTableFunction#getResultType()を上書きすることで手動で指定することができます。

The following example shows an example of a TableFunction that returns a Row type which requires explicit type information. We define that the returned table type should be RowTypeInfo(String, Integer) by overriding TableFunction#getResultType().

public class CustomTypeSplit extends TableFunction<Row> {
    public void eval(String str) {
        for (String s : str.split(" ")) {
            Row row = new Row(2);
            row.setField(0, s);
            row.setField(1, s.length);
            collect(row);
        }
    }

    @Override
    public TypeInformation<Row> getResultType() {
        return new RowTypeInfo(new TypeInformation[]{
               			BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO});
    }
}
class CustomTypeSplit extends TableFunction[Row] {
  def eval(str: String): Unit = {
    str.split(" ").foreach({ s =>
      val row = new Row(2)
      row.setField(0, s)
      row.setField(1, s.length)
      collect(row)
    })
  }

  override def getResultType: TypeInformation[Row] = {
    new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO,
                        BasicTypeInfo.INT_TYPE_INFO))
  }
}

制限事項

以下のオペレーションはまだサポートされません:

  • バイナリの文字列オペレーションと関数
  • システム関数
  • コレクション関数
  • STDDEV_xxx, VAR_xxx および REGR_xxx のような集約関数
  • COUNT DISTINCT のようなdistinct集約
  • ウィンドウ関数
  • グルーピング 関数

上に戻る

外部シンクへのテーブルの書き込み

TableTableSinkに書き込むことができます。これは広く様々なフォーマット(例えば、CSV, Apache Parquet, Apache Avro)、ストレージシステム (例えば、JDBC, Apache HBase, Apache Cassandra, Elasticsearch)、あるいはメッセージシステム (例えば Apache Kafka, RabbitMQ)をサポートするための一般的なインタフェースです。バッチTableBatchTableSinkに書き込むことのみできます。これはStreamTableSinkを必要とするストリーミングテーブルです。TableSinkは同時に両方のインタフェースを実装することができます。

現在のところ、FlinkはバッチあるいはストリーミングTable を CSV-フォーマットのファイルに書き込むCsvTableSinkのみを提供します。独自のTableSinkBatchTableSink および/あるいは StreamTableSink インタフェースを実装することで定義することができます。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// compute the result Table using Table API operators and/or SQL queries
Table result = ...

// create a TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
// write the result Table to the TableSink
result.writeToSink(sink);

// execute the program
env.execute();
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// compute the result Table using Table API operators and/or SQL queries
val result: Table = ...

// create a TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
// write the result Table to the TableSink
result.writeToSink(sink)

// execute the program
env.execute()

上に戻る

ランタイムの設定

テーブルAPIはランタイムの挙動を修正するための設定(TableConfigと呼ばれます)を提供します。TableEnvironmentを使ってアクセスすることができます。

Nullの扱い

デフォルトで、テーブルAPIはnull 値をサポートします。テーブルAPIは今のところ全てのFlinkの型をサポートしません。

上に戻る

テーブルの説明

テーブルAPIはTableを計算するための論理的で最適化されたクエリ計画を説明するための仕組みを提供します。これはTableEnvironment#explain(table) メソッドを使って行われます。3つの計画を説明する文字列を返します。

  1. 関係クエリの抽象構文木。つまり最適化されていない論理クエリ計画、
  2. 最適化された論理クエリ計画、そして
  3. 物理実行計画。

以下のコードは例と対応する出力を示します:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));

Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
        .where("LIKE(word, 'F%')")
        .unionAll(table2);

String explanation = tEnv.explain(table);
System.out.println(explanation);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
      .where('word.like("F%"))
      .unionAll(table2)

val explanation: String = tEnv.explain(table)
println(explanation)
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, 'F%')])
    LogicalTableScan(table=[[_DataStreamTable_0]])
  LogicalTableScan(table=[[_DataStreamTable_1]])

== Optimized Logical Plan ==
DataStreamUnion(union=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
    DataStreamScan(table=[[_DataStreamTable_0]])
  DataStreamScan(table=[[_DataStreamTable_1]])

== Physical Execution Plan ==
Stage 1 : Data Source
  content : collect elements with CollectionInputFormat

Stage 2 : Data Source
  content : collect elements with CollectionInputFormat

  Stage 3 : Operator
    content : from: (count, word)
    ship_strategy : REBALANCE

    Stage 4 : Operator
      content : where: (LIKE(word, 'F%')), select: (count, word)
      ship_strategy : FORWARD

      Stage 5 : Operator
        content : from: (count, word)
        ship_strategy : REBALANCE

上に戻る

TOP
inserted by FC2 system