テーブルAPIとSQLは実験的な機能です。
テーブルAPIは簡単にFlinkのデータセットとデータストリームAPI(JavaとScala)に組み込むことができる関連ストリームおよびバッチ処理のためのSQLのような表現言語です。テーブルAPIとSQLインタフェースは関連テーブル
抽象上で操作します。これは外部データソースあるいは既存のデータセットとデータストリームから生成することができます。テーブルAPIを使って、テーブル
に selection, aggregation および joins のような関連オペレータを適用することができます。
テーブル
はそれらが登録(Registering Tablesを見てください)されている限り、通常のSQLを使ってクエリすることもできます。テーブルAPIとSQLは透過の機能を提供し、同じプログラム内で混ぜることができます。テーブル
がDataSet
あるいは DataStream
に変換される時、関連オペレータとSQLクエリによって定義された論理プランはApache Calcite を使って最適化され、DataSet
または DataStream
プログラムに変換されます。
テーブルAPIとSQLはflink-table Mavenプロジェクトの一部です。テーブルAPIおよびSQLを使うために以下の依存をプロジェクトに追加する必要があります:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.5-SNAPSHOT</version>
</dependency>
注意: テーブルAPIは現在のところバイナリ配布物の一部ではありません。クラスタ実行のためにそれをリンクするには ここを見てください。
TableEnvironment
はテーブルをユニークな名前を使って登録することができる内部的なテーブルのカタログを持ちます。登録の後でテーブルは名前を使ってTableEnvironment
からアクセスすることができます。
注意: DataSet
あるいはDataStream
はTableEnvironment
内でそれらを登録すること無しに直接テーブル
に変換することができます。
DataSet
は以下のようにBatchTableEnvironment
の中にTable
として登録されます:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register the DataSet cust as table "Customers" with fields derived from the dataset
tableEnv.registerDataSet("Customers", cust)
// register the DataSet ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataSet("Orders", ord, "user, product, amount");
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register the DataSet cust as table "Customers" with fields derived from the dataset
tableEnv.registerDataSet("Customers", cust)
// register the DataSet ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataSet("Orders", ord, 'user, 'product, 'amount)
注意: DataSet
Table
の名前は内部的な利用のためのみに保持される ^_DataSetTable_[0-9]+
パターンに一致しなければなりません。
DataStream
は以下のようにStreamTableEnvironment
の中に Table
として登録されます:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register the DataStream cust as table "Customers" with fields derived from the datastream
tableEnv.registerDataStream("Customers", cust)
// register the DataStream ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataStream("Orders", ord, "user, product, amount");
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register the DataStream cust as table "Customers" with fields derived from the datastream
tableEnv.registerDataStream("Customers", cust)
// register the DataStream ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataStream("Orders", ord, 'user, 'product, 'amount)
注意: DataStream
Table
の名前は内部的な利用のためのみに保持される ^_DataStreamTable_[0-9]+
パターンに一致しなければなりません。
テーブルAPIオペレータあるいはSQLクエリを起源とするTable
は以下のようにTableEnvironment
の中で登録されます:
// works for StreamExecutionEnvironment identically
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// convert a DataSet into a Table
Table custT = tableEnv
.toTable(custDs, "name, zipcode")
.where("zipcode = '12345'")
.select("name")
// register the Table custT as table "custNames"
tableEnv.registerTable("custNames", custT)
// works for StreamExecutionEnvironment identically
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// convert a DataSet into a Table
val custT = custDs
.toTable(tableEnv, 'name, 'zipcode)
.where('zipcode === "12345")
.select('name)
// register the Table custT as table "custNames"
tableEnv.registerTable("custNames", custT)
テーブルAPIオペレータあるいはSQLクエリを起源とする登録されたTable
はリレーショナルDBMSでのビューと似たものとして扱われます。つまり、クエリを最適化する時にインラインにされるかもしれません。
外部的なテーブルは以下のようにTableSource
を使ってTableEnvironment
内に登録されます:
// works for StreamExecutionEnvironment identically
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
TableSource custTS = new CsvTableSource("/path/to/file", ...)
// register a `TableSource` as external table "Customers"
tableEnv.registerTableSource("Customers", custTS)
// works for StreamExecutionEnvironment identically
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val custTS: TableSource = new CsvTableSource("/path/to/file", ...)
// register a `TableSource` as external table "Customers"
tableEnv.registerTableSource("Customers", custTS)
TableSource
はデータベース (MySQL, HBase, …)、ファイル形式 (CSV, Apache Parquet, Avro, ORC, …) あるいはメッセージングシステム (Apache Kafka, RabbitMQ, …) のような様々なストレージシステム内に格納されたデータへのアクセスを提供します。
現在のところ、FlinkはCSVファイルを読むために CsvTableSource
を、KafkaからJSONファイルを読むためにKafka08JsonTableSource
/Kafka09JsonTableSource
を提供します。独自のTableSource
はBatchTableSource
または StreamTableSource
インタフェースを実装することで定義することができます。
クラス名 | Maven 依存 | Batch? | Streaming? | 解説 |
CsvTableSouce |
flink-table |
Y | Y | CSVファイルのための単純なソース。 |
Kafka08JsonTableSource |
flink-connector-kafka-0.8 |
N | Y | JSONデータのための Kafka 0.8 ソース。 |
Kafka09JsonTableSource |
flink-connector-kafka-0.9 |
N | Y | JSONデータのための Kafka 0.9 ソース。 |
flink-table
依存に付属する全てのソースはTableプログラムを使って直接使うことができます。他の全てのテーブルソースについては、flink-table
依存に加えてそれぞれの依存を使いする必要があります。
KafkaJSONソースを使うには、プロジェクトにKafkaコネクタ依存を追加する必要があります:
flink-connector-kafka-0.8
を、flink-connector-kafka-0.9
を。そして、以下のようにソースを生成することができます (Kafka 0.8の例):
// The JSON field names and types
String[] fieldNames = new String[] { "id", "name", "score"};
Class>[] fieldTypes = new Class>[] { Integer.class, String.class, Double.class };
KafkaJsonTableSource kafkaTableSource = new Kafka08JsonTableSource(
kafkaTopic,
kafkaProperties,
fieldNames,
fieldTypes);
デフォルトでは、JSONフィールドの欠落はソースを失敗しません。以下のようにこれを設定することができます:
// Fail on missing JSON field
tableSource.setFailOnMissingField(true);
Table APIガイドの後で説明されるようにテーブルを連携することができます:
tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);
Table result = tableEnvironment.ingest("kafka-source");
CsvTableSource
は追加の依存無しに既にflink-table
に含まれています。
CsvTableSource
を生成する最も簡単な方法は同梱されたCsvTableSource.builder()
を使うことで、ビルダーはプロパティを設定するために以下のメソッドを持ちます:
path(String path)
CSVファイルへのパスを設定します。必須。field(String fieldName, TypeInformation> fieldType)
フィールド名とフィールド型の情報を持つフィールドを追加します。複数回呼ぶことができます。必須。このメソッドの呼び出し順は行内のフィールドの順番も定義します。fieldDelimiter(String delim)
フィールドのデリミタを設定します。デフォルトは","
。lineDelimiter(String delim)
行のデリミタを設定します。デフォルトは"\n"
。quoteCharacter(Character quote)
文字列値のためのクォート文字を設定します。デフォルトはnull
。commentPrefix(String prefix)
コメントを示すプレフィックスを設定します。デフォルトはnull
。ignoreFirstLine()
最初の行を無視します。デフォルトは無効です。ignoreParseErrors()
パースに失敗したレコードを失敗する代わりにスキップします。デフォルトでは例外を投げます。以下のようにソースを生成することができます:
CsvTableSource csvTableSource = CsvTableSource
.builder()
.path("/path/to/your/file.csv")
.field("name", Types.STRING())
.field("id", Types.INT())
.field("score", Types.DOUBLE())
.field("comments", Types.STRING())
.fieldDelimiter("#")
.lineDelimiter("$")
.ignoreFirstLine()
.ignoreParseErrors()
.commentPrefix("%");
val csvTableSource = CsvTableSource
.builder
.path("/path/to/your/file.csv")
.field("name", Types.STRING)
.field("id", Types.INT)
.field("score", Types.DOUBLE)
.field("comments", Types.STRING)
.fieldDelimiter("#")
.lineDelimiter("$")
.ignoreFirstLine
.ignoreParseErrors
.commentPrefix("%")
ストリームとバッチのTableEnvironment
の中のTable APIガイドの後で説明されるようにテーブルを連携することができます:
tableEnvironment.registerTableSource("mycsv", csvTableSource);
Table streamTable = streamTableEnvironment.ingest("mycsv");
Table batchTable = batchTableEnvironment.scan("mycsv");
tableEnvironment.registerTableSource("mycsv", csvTableSource)
val streamTable = streamTableEnvironment.ingest("mycsv")
val batchTable = batchTableEnvironment.scan("mycsv")
テーブルは以下のメソッドを使って登録を解除することができます。後に続くSQLクエリは登録を解除されたテーブル名をもう見つけることができないでしょう。
tableEnvironment.unregisterTable("Customers");
tableEnvironment.unregisterTable("Customers")
テーブルAPIはScalaおよびJavaでのデータセットとデータストリーム上にリレーショナル オペレーションを適用するためのメソッドを提供します。
テーブルAPIの中核となる概念は、リレーショナル スキーマ(あるいはリレーション)を使ってテーブルを表現する Table
です。テーブルはDataSet
あるいは DataStream
から生成され、DataSet
あるいは DataStream
から変換され、あるいはTableEnvironment
を使ってテーブルカタログ内に登録することができます。Table
は常に特定のTableEnvironment
に束縛されます。異なる TableEnvironments のテーブルを組み合わせることはできません。
FlinkのJava Dataset APIを使う場合、データセットは TableEnvironment
を使ってテーブルとデータセットへのテーブルに変換されます。以下の例は、以下の事を説明します:
DataSet
がどのように Table
に変換されるか、Table
がどのように DataSet
に変換されるか。public class WC {
public WC(String word, int count) {
this.word = word; this.count = count;
}
public WC() {} // empty constructor to satisfy POJO requirements
public String word;
public int count;
}
...
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataSet<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("Ciao", 1),
new WC("Hello", 1));
Table table = tEnv.fromDataSet(input);
Table wordCounts = table
.groupBy("word")
.select("word, count.sum as count");
DataSet<WC> result = tableEnv.toDataSet(wordCounts, WC.class);
Javaを使って、表現は文字列で指定される必要があります。埋め込み表現のDSLはサポートされません。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register the DataSet cust as table "Customers" with fields derived from the dataset
tableEnv.registerDataSet("Customers", cust)
// register the DataSet ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataSet("Orders", ord, "user, product, amount");
サポートされるオペレータの完全なリストと表現構文の説目いについてはJavadocを参照してください。
Table API はorg.apache.flink.table.api.scala._
をインポートすることで有効にされます。これはDataSet
あるいは DataStream
のテーブルへの暗黙的な変換を有効にします。以下の例は、以下の事を説明します:
DataSet
がどのように Table
に変換されるか、Table
がどのように DataSet
に変換されるか。import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
case class WC(word: String, count: Int)
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
val expr = input.toTable(tEnv)
val result = expr
.groupBy('word)
.select('word, 'count.sum as 'count)
.toDataSet[WC]
表現DSLはフィールド名を参照するためにScalaシンボルを使い、ランタイムコードを効率的にするために表現を変換するためにコード生成を使用します。テーブルから、への変換は、Scala caseクラスあるいはJava POJOを使う時にのみ動作することに注意してください。有効なPOJOの特性を学ぶには、型の抽出とシリアライズ化 を参照してください。
別の例は2つのテーブルをjoinする方法を示します:
case class MyResult(a: String, d: Int)
val input1 = env.fromElements(...).toTable(tEnv).as('a, 'b)
val input2 = env.fromElements(...).toTable(tEnv, 'c, 'd)
val joined = input1.join(input2)
.where("a = c && d > 42")
.select("a, d")
.toDataSet[MyResult]
データセットをテーブルに変換する時に、テーブルのフィールド名が as()
あるいは toTable()
を使った指定でどのように変更することができるかについて注意してください。更に、例はリレーショナル表現を指定するために文字列をどのように使うかを示します。
DataStream
からのTable
の生成も似たようなやりかたで動作します。以下の例はDataStream
をどのように Table
に変換するか、そしてテーブルAPIを使ってそれをフィルターする方法を示します。
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val inputStream = env.addSource(...)
val result = inputStream
.toTable(tEnv, 'a, 'b, 'c)
.filter('a === 3)
val resultStream = result.toDataStream[Row]
サポートされるオペレータの完全なリストと表現構文の説目いについてはScaladocを参照してください。
登録されたテーブルは以下のようにしてTableEnvironment
からアクセスすることができます:
tEnv.scan("tName")
はBatchTableEnvironment
内で"tName"
として登録されたTable
を走査します。tEnv.ingest("tName")
はStreamTableEnvironment
内で"tName"
として登録されたStreamTableEnvironment
を取り込みます。テーブルAPIはScalaおよびJavaで構造化データ上の言語統合クエリを実行するためにドメイン固有の言語を特徴とします。この章は利用可能なオペレータについての短い概要を説明します。Javadoc内でオペレータのもっと詳細を見つけることができます。
オペレータ | 解説 |
---|---|
Select |
SQLのSELECT文と似ています。selectオペレーションを実行します。
テーブル内の全てのカラムを選択するワイルドカードとして振舞う星記号 (
|
As |
フィールドの改名。
|
Where / Filter |
SQLのWHERE句に似ています。フィルター意味するものを通過できない行をフィルターアウトします。
|
GroupBy |
SQLのGROUPBY句に似ています。行をグループ単位で集約するために以下の集約オペレータを使って、グルーピング キー上で行をグループ化します。
|
Join |
SQLのJOIN句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、少なくとも1つのjoinの等しさを意味するものがjoinオペレータあるいはwhereまたはfilterオペレータを使って定義されなければなりません。
|
LeftOuterJoin |
SQLのLEFT OUTER JOIN句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、少なくとも1つのjoinを等しさを意味するものが定義されなければなりません。
|
RightOuterJoin |
SQLの RIGHT OUTER JOIN 句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、少なくとも1つのjoinを等しさを意味するものが定義されなければなりません。
|
FullOuterJoin |
SQLの FULL OUTER JOIN 句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、少なくとも1つのjoinを等しさを意味するものが定義されなければなりません。
|
和集合 |
SQLの UNION 句に似ています。重複するレコードを削除して2つのテーブルを結合します。両方のテーブルは等しいフィールド型を持つ必要があります。
|
UnionAll |
SQLの UNION ALL 句に似ています。2つのテーブルを結合します。両方のテーブルは等しいフィールド型を持つ必要があります。
|
Intersect |
SQLの INTERSECT 句に似ています。intersectは両方のテーブルに存在するレコードを返します。もしレコードが1つ以上のテーブルに1回以上存在する場合、1度だけ返します。つまり、結果のテーブルは重複するレコードを持ちません。両方のテーブルは等しいフィールド型を持つ必要があります。
|
IntersectAll |
SQLのINTERSECT ALL句に似ています。intersectAllは両方のテーブルに存在するレコードを返します。もしレコードが両方のテーブルに1度以上存在する場合、両方のテーブルに存在するだけの数だけ返します。つmり結果のレコードは重複するレコードを持つかもしれません。両方のテーブルは等しいフィールド型を持つ必要があります。
|
Minus |
SQLのEXCEPT句に似ています。右のテーブル内に存在しないレコードを左テーブルから引いたものを返します。左のテーブル内の重複レコードは確実に1回だけ返されます。つまり、重複は削除されます。両方のテーブルは等しいフィールド型を持つ必要があります。
|
MinusAll |
SQLの EXCEPT ALL句に似ています。MinusAll は右のレコードに存在しないレコードを返します。左のテーブル内でn回、右のテーブルでm回存在するレコードは (n-m)回返されます。つまり、右のテーブルに存在する重複の数だけ削除されます。両方のテーブルは等しいフィールド型を持つ必要があります。
|
Distinct |
SQLの DISTINCT 句に似ています。全く異なる組み合わせを持つレコードを返します。
|
Order By |
SQLのORDER BY句に似ています。全ての並行パーティションに渡ってグローバルにソートされたレコードを返します。
|
Limit |
SQLの LIMIT句に似ています。ソートされた結果をオフセットの位置からの指定された数のレコードに制限します。Limit は技術的には Order By オペレータの一部で、従って先にそれが行われなければなりません。
|
オペレータ | 解説 |
---|---|
Select |
SQLのSELECT文と似ています。selectオペレーションを実行します。
テーブル内の全てのカラムを選択するワイルドカードとして振舞う星記号 (
|
As |
フィールドの改名。
|
Where / Filter |
SQLのWHERE句に似ています。フィルター意味するものを通過できない行をフィルターアウトします。
|
GroupBy |
SQLのGROUPBY句に似ています。Groups rows on the grouping keys, with a following aggregation operator to aggregate rows group-wise.
|
Join |
SQLのJOIN句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、joinの等しさを意味するものがwhereまたはfilterオペレータを使って定義されなければなりません。
|
LeftOuterJoin |
SQLのLEFT OUTER JOIN句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、少なくとも1つのjoinを等しさを意味するものが定義されなければなりません。
|
RightOuterJoin |
SQLの RIGHT OUTER JOIN 句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、少なくとも1つのjoinを等しさを意味するものが定義されなければなりません。
|
FullOuterJoin |
SQLの FULL OUTER JOIN 句に似ています。2つのテーブルをjoinします。両方のテーブルは異なるフィールド名を持ち、少なくとも1つのjoinを等しさを意味するものが定義されなければなりません。
|
和集合 |
SQLの UNION 句に似ています。重複するレコードを削除して2つのテーブルを結合します。両方のテーブルは同じフィールド型を持つ必要があります。
|
UnionAll |
SQLの UNION ALL 句に似ています。2つのテーブルを結合します。両方のテーブルは同じフィールド型を持つ必要があります。
|
Intersect |
SQLの INTERSECT 句に似ています。intersectは両方のテーブルに存在するレコードを返します。もしレコードが1つ以上のテーブルに1回以上存在する場合、1度だけ返します。つまり、結果のテーブルは重複するレコードを持ちません。両方のテーブルは等しいフィールド型を持つ必要があります。
|
IntersectAll |
SQLのINTERSECT ALL句に似ています。intersectAllは両方のテーブルに存在するレコードを返します。もしレコードが両方のテーブルに1度以上存在する場合、両方のテーブルに存在するだけの数だけ返します。つmり結果のレコードは重複するレコードを持つかもしれません。両方のテーブルは等しいフィールド型を持つ必要があります。
|
Minus |
SQLのEXCEPT句に似ています。右のテーブル内に存在しないレコードを左テーブルから引いたものを返します。左のテーブル内の重複レコードは確実に1回だけ返されます。つまり、重複は削除されます。両方のテーブルは等しいフィールド型を持つ必要があります。
|
MinusAll |
SQLの EXCEPT ALL句に似ています。MinusAll は右のレコードに存在しないレコードを返します。左のテーブル内でn回、右のテーブルでm回存在するレコードは (n-m)回返されます。つまり、右のテーブルに存在する重複の数だけ削除されます。両方のテーブルは等しいフィールド型を持つ必要があります。
|
Distinct |
SQLの DISTINCT 句に似ています。全く異なる組み合わせを持つレコードを返します。
|
Order By |
SQLのORDER BY句に似ています。全ての並行パーティションに渡ってグローバルにソートされたレコードを返します。
|
Limit |
SQLの LIMIT句に似ています。ソートされた結果をオフセットの位置からの指定された数のレコードに制限します。Limit は技術的には Order By オペレータの一部で、従って先にそれが行われなければなりません。
|
前の章の幾つかのオペレータは1つ以上の表現を期待します。表現は組み込みのScala DSLあるいは文字列を使って指定することができます。表現がどのように指定されるかについて学ぶには上の例を参照してください。
以下は表現のEBNF文法です:
expressionList = expression , { "," , expression } ;
expression = alias ;
alias = logic | ( logic , "AS" , fieldReference ) ;
logic = comparison , [ ( "&&" | "||" ) , comparison ] ;
comparison = term , [ ( "=" | "==" | "===" | "!=" | "!==" | ">" | ">=" | "<" | "<=" ) , term ] ;
term = product , [ ( "+" | "-" ) , product ] ;
product = unary , [ ( "*" | "/" | "%") , unary ] ;
unary = [ "!" | "-" ] , composite ;
composite = suffixed | atom ;
suffixed = interval | cast | as | aggregation | if | functionCall ;
timeInterval = composite , "." , ("year" | "years" | "month" | "months" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;
rowInterval = composite , "." , "rows" ;
cast = composite , ".cast(" , dataType , ")" ;
dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "DATE" | "TIME" | "TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" ;
as = composite , ".as(" , fieldReference , ")" ;
aggregation = composite , ( ".sum" | ".min" | ".max" | ".count" | ".avg" | ".start" | ".end" ) , [ "()" ] ;
if = composite , ".?(" , expression , "," , expression , ")" ;
functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;
atom = ( "(" , expression , ")" ) | literal | nullLiteral | fieldReference ;
fieldReference = "*" | identifier ;
nullLiteral = "Null(" , dataType , ")" ;
timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;
timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;
ここで、literal
は有効なJavaリテラル、fieldReference
はデータ内のカラムを指定 (あるいはもし*
が使われた場合は全てのカラム)、そしてfunctionIdentifier
はサポートされるscalar関数を指定します。カラム名と関数名はJavaの識別子の構文に従います。カラム名rowtime
はストリーミング環境内での保持された論理属性です。文字列として指定された表現はオペレータと関数を呼ぶためにサフィックスの表記の代わりにプリフィックスの表記を使うこともできます。
厳密な数値あるいは大きな10進数との連携が必要な場合、テーブルAPIはJavaのBigDecimal型もサポートします。Scala テーブルAPIの10進数はBigDecimal("123456")
で定義することができ、Javaでは精度のための “p” を追加することで定義することができます。例えば、123456p
。
一時的な値と連携するために、テーブルAPIはJavaのSQLのDate、Time および Timestamp型をサポートします。ScalaのテーブルAPIではリテラルはjava.sql.Date.valueOf("2016-06-27")
, java.sql.Time.valueOf("10:10:42")
または java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")
を使って定義することができます。Java と Scala Table API は文字列を一時的な型に変換するために"2016-06-27".toDate()
, "10:10:42".toTime()
および "2016-06-27 10:10:42.123".toTimestamp()
の呼び出しもサポートします。注意: Javaの一時的なSQL型はタイムゾーンに依存するため、Flinkクライアントと全てのタスクマネージャが同じタイムゾーンを使うようにしてください。
一時的な間隔は月数(Types.INTERVAL_MONTHS
) あるいはミリ秒数 (Types.INTERVAL_MILLIS
) として表現することができます。同じ型の間隔は足し算あるいは引き算することができます (例えば 1.hour + 10.minutes
)。ミリ秒の間隔は time point に追加することができます (例えば "2016-08-10".toDate + 5.days
)。
テーブルAPIはバッチおよびストリーミングテーブル上でクエリを定義するための宣言型APIです。投射、選択、結合オペレーションは追加のセマンティクス無しにストリーミングおよびバッチテーブルの両方に適用することができます。しかし、(おそらく)無限のストリーミングテーブル上での集約は、レコードの有限のグループ上でのみ計算することができます。ウィンドウは行のグループを時間あるいは行数の間隔に基づいて有限のグループに集約し、グループごとに集約関数を評価します。バッチテーブルについては、ウィンドウは時間の間隔によってレコードをグループ化するための便利なショートカットです。
ウィンドウはwindow(w: Window)
句を使って定義することができ、エイリアスを必要とします。これはas
句を使って指定されます。ウィンドウを使ってテーブルをグループ化するには、ウィンドウのエイリアスは通常のグループ化属性のように groupBy(...)
句の中で参照されなければなりません。以下の例はテーブル上にウィンドウの集約を定義する方法を示します。
Table table = input
.window([Window w].as("w")) // define window with alias w
.groupBy("w") // group the table by window w
.select("b.sum") // aggregate
val table = input
.window([w: Window] as 'w) // define window with alias w
.groupBy('w) // group the table by window w
.select('b.sum) // aggregate
ストリーミング環境では、もしウィンドウの集約がウィンドウに加えて1つ以上の属性でグループ化する場合のみ、それらは並行して計算することができます。つまり、groupBy(...)
句はウィンドウのエイリアスと少なくとも1つの追加の属性を参照します。(上の例のような)ウィンドウのエイリアスのみを参照するgroupBy(...)
句は並行のタスクでは無く1つでのみ評価することができます。以下の例は追加のグループ属性を使ってウィンドウ集約を定義する方法を示します。
Table table = input
.window([Window w].as("w")) // define window with alias w
.groupBy("w, a") // group the table by attribute a and window w
.select("a, b.sum") // aggregate
val table = input
.window([w: Window] as 'w) // define window with alias w
.groupBy('w, 'a) // group the table by attribute a and window w
.select('a, 'b.sum) // aggregate
Window
パラメータは行がどうやってウィンドウにマップされるかを定義します。Window
はユーザが実装することができるインタフェースではありません。代わりに、テーブルAPIは特定のセマンティクスを持つ事前定義されたWindow
クラスのセットを提供します。これらは背後にある DataStream
あるいは DataSet
オペレーションに変換されます。サポートされるウィンドウ定義は以下にリスト化されます。時間ウィンドウの開始及び終了タイムスタンプのようなウィンドウプロパティは、それぞれw.start
と w.end
のようなウィンドウ エイリアスのプロパティとしてselect文の中で追加することができます。
Table table = input
.window([Window w].as("w")) // define window with alias w
.groupBy("w, a") // group the table by attribute a and window w
.select("a, w.start, w.end, b.count") // aggregate and add window start and end timestamps
val table = input
.window([w: Window] as 'w) // define window with alias w
.groupBy('w, 'a) // group the table by attribute a and window w
.select('a, 'w.start, 'w.end, 'b.count) // aggregate and add window start and end timestamps
タンブリング ウィンドウは行をオーバーラップしない固定の長さの連続するウィンドウに割り当てます。例えば、5分のタンブリング ウィンドウは行を5分の間隔にグループ化します。タンブリング ウィンドウはイベント時間、処理時間あるいは行カウント上で定義することができます。
タンブリング ウィンドウは以下のようにTumble
クラスを使って定義することができます:
メソッド | Required? | 解説 |
---|---|---|
over |
必須。 | 時間あるいは行カウントの間隔のどちらかで、ウィンドウの長さを定義します。 |
on |
ストリーミング イベント時間ウィンドウおよびバッチテーブル上のウィンドウで必要です。 | ストリーミングテーブルのための時間モードを定義します(rowtime は論理的なシステム属性です); バッチテーブルについては、どのレコードがグループ化されるかの時間属性。 |
as |
必須。 | エイリアスをウィンドウに割り当てます。エイリアスは後に続くgroupBy() 句でのウィンドウを参照し、任意でselect() 句内のウィンドウの開始あるいは終了時間のようなウィンドウ属性を選択するために使われます。 |
// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"))
// Tumbling Processing-time Window
.window(Tumble.over("10.minutes").as("w"))
// Tumbling Row-count Window
.window(Tumble.over("10.rows").as("w"))
// Tumbling Event-time Window
.window(Tumble over 10.minutes on 'rowtime as 'w)
// Tumbling Processing-time Window
.window(Tumble over 10.minutes as 'w)
// Tumbling Row-count Window
.window(Tumble over 10.rows as 'w)
スライディング ウィンドウは固定サイズを持ち、指定されたスライドの間隔によってスライドします。もしスライドの間隔がウィンドウサイズよりも小さい場合、スライディング ウィンドウはオーバーラップします。従って、行は複数のウィンドウに割り当てられるかもしれません。例えば、15分のサイズのスライディングウィンドウと5分のスライド間隔は、各行を15分のサイズの3つの異なるウィンドウに割り当てます。これは5分の間隔で評価されます。スライディング ウィンドウはイベント時間、処理時間あるいは行カウント上で定義することができます。
スライディング ウィンドウは以下のようにSlide
クラスを使って定義することができます:
メソッド | Required? | 解説 |
---|---|---|
over |
必須。 | 時間あるいは行カウントの間隔のどちらかで、ウィンドウの長さを定義します。 |
every |
必須。 | 時間あるいは行カウントの間隔のどちらかで、スライドの長さを定義します。スライドの間隔はサイズの間隔として同じ型でなければなりません。 |
on |
イベント時間ウィンドウおよびバッチテーブル上のウィンドウで必要です。 | ストリーミングテーブルのための時間モードを定義します(rowtime は論理的なシステム属性です); バッチテーブルについては、どのレコードがグループ化されるかの時間属性 |
as |
必須。 | エイリアスをウィンドウに割り当てます。エイリアスは後に続くgroupBy() 句でのウィンドウを参照し、任意でselect() 句内のウィンドウの開始あるいは終了時間のようなウィンドウ属性を選択するために使われます。 |
// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))
// Sliding Processing-time window
.window(Slide.over("10.minutes").every("5.minutes").as("w"))
// Sliding Row-count window
.window(Slide.over("10.rows").every("5.rows").as("w"))
// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)
// Sliding Processing-time window
.window(Slide over 10.minutes every 5.minutes as 'w)
// Sliding Row-count window
.window(Slide over 10.rows every 5.rows as 'w)
セッション ウィンドウは固定のサイズを持ちませんが、それらの境界は不活性の間隔によって定義されます。つまり、もし定義された隙間の間にイベントが起きない場合にセッションウィンドウは閉じられます。例えば、30分の不活性の後で1つの行が観測された場合、30分の隙間を持つセッションウィンドウが開始します(そうでなければ、行は既存のウィンドウに追加されます)。そして30分以内に行が追加されない場合に閉じられます。セッションウィンドウはイベント時間あるいは処理時間で動作することができます。
セッションウィンドウは以下のようにSession
クラスを使って定義されます:
メソッド | Required? | 解説 |
---|---|---|
withGap |
必須。 | 時間間隔として2つのウィンドウ間の隙間を定義します。 |
on |
イベント時間ウィンドウおよびバッチテーブル上のウィンドウで必要です。 | ストリーミングテーブルのための時間モードを定義します(rowtime は論理的なシステム属性です); バッチテーブルについては、どのレコードがグループ化されるかの時間属性 |
as |
必須。 | エイリアスをウィンドウに割り当てます。エイリアスは後に続くgroupBy() 句でのウィンドウを参照し、任意でselect() 句内のウィンドウの開始あるいは終了時間のようなウィンドウ属性を選択するために使われます。 |
// Session Event-time Window
.window(Session.withGap("10.minutes").on("rowtime").as("w"))
// Session Processing-time Window
.window(Session.withGap("10.minutes").as("w"))
// Session Event-time Window
.window(Session withGap 10.minutes on 'rowtime as 'w)
// Session Processing-time Window
.window(Session withGap 10.minutes as 'w)
現在のところ以下の機能はまだサポートされません:
SQLのクエリはTableEnvironment
のsql()
メソッドを使って指定されます。メソッドはDataSet
あるいはDataStream
に変換、続くテーブルAPIテーブル クエリ内で使用、あるいはTableSink
に書き込むことができるTable
として、SQLクエリの結果を返します(外部シンクへのテーブルの書き込みを見てください)。SQL と Table API のクエリはシームレスに混合され、全体的に最適化され、1つのデータストリームあるいはデータセット プログラムに変換することができます。
Table
, DataSet
, DataStream
あるいは外部TableSource
はSQLクエリによってアクセス可能にするためにTableEnvironment
に登録されなければなりません(テーブルの登録を見てください)。
注意: Flinkの SQLのサポートはまだ完全ではありません。サポートされないSQL機能を含むクエリはTableException
を起こすでしょう。バッチおよびストリーミングテーブル上のSQLの制限は以下の章でリスト化されます。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// read a DataSet from an external source
DataSet<Tuple3<Long, String, Integer>> ds = env.readCsvFile(...);
// register the DataSet as table "Orders"
tableEnv.registerDataSet("Orders", ds, "user, product, amount");
// run a SQL query on the Table and retrieve the result as a new Table
Table result = tableEnv.sql(
"SELECT SUM(amount) FROM Orders WHERE product LIKE '%Rubber%'");
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// read a DataSet from an external source
val ds: DataSet[(Long, String, Integer)] = env.readCsvFile(...)
// register the DataSet under the name "Orders"
tableEnv.registerDataSet("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql(
"SELECT SUM(amount) FROM Orders WHERE product LIKE '%Rubber%'")
現在のバージョンはバッチテーブル上の selection (filter), projection, inner equi-joins, grouping, non-distinct aggregates および sorting をサポートします。
中でも、以下のSQL機能はまだサポートされません:
COUNT(DISTINCT name)
)注意: テーブルは FROM
句で指定された順番でjoinされます。幾つかの場合において、テーブルの順番はデカルト積を解くために手動で調整されます。
SQLのクエリは、標準的なSQLのようにストリーミング テーブル(DataStream
あるいは StreamTableSource
で支援されたテーブル)上で実行することができます。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// ingest a DataStream from an external source
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
// register the DataStream as table "Orders"
tableEnv.registerDataStream("Orders", ds, "user, product, amount");
// run a SQL query on the Table and retrieve the result as a new Table
Table result = tableEnv.sql(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sql(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
ストリーミングSQLの現在のバージョンはSELECT
, FROM
, WHERE
とUNION
句のみをサポートします。集約とjoinはまだサポートされません。
Flink はSQLのパースのためにApache Calcite を使います。現在のところ、FlinkのSQLはクエリ関連SQL構文のみと包括的なSQL標準のサブセットのみをサポートします。以下の BNF構文はサポートされるSQLの機能を説明します:
query:
values
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] [ [ catalogName . ] schemaName . ] tableName
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
Java文字列内でのより良いSQLクエリの定義のために、FlinkのSQLはJavaに似た語彙ポリシーを使います:
"SELECT a AS `my field` FROM t"
)。全てのSQL機能はまだ実装されていませんが、幾つかの文字の組み合わせは将来の使用のためのキーワードとして予約されています。フィールド名として以下の文字列のうちの1つを使いたい場合は、それらを backticks で囲むようにしてください (例えば `value`
, `count`
)。
A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICTS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE
テーブルAPIはFlinkのデータセットとデータストリームAPIの上に構築されています。内部的には、それは型を識別するためにFlinkの TypeInformation
も使います。テーブルAPIは今のところ全てのFlinkの型をサポートしません。サポートされる単純な型の全てはorg.apache.flink.table.api.Types
の中でリスト化されています。以下の表はテーブルAPIの型、SQLの型、そして結果のJavaクラス間の関連を要約します。
テーブルAPI | SQL | Java の型 |
---|---|---|
Types.STRING |
VARCHAR |
java.lang.String |
Types.BOOLEAN |
BOOLEAN |
java.lang.Boolean |
Types.BYTE |
TINYINT |
java.lang.Byte |
Types.SHORT |
SMALLINT |
java.lang.Short |
Types.INT |
INTEGER, INT |
java.lang.Integer |
Types.LONG |
BIGINT |
java.lang.Long |
Types.FLOAT |
REAL, FLOAT |
java.lang.Float |
Types.DOUBLE |
DOUBLE |
java.lang.Double |
Types.DECIMAL |
DECIMAL |
java.math.BigDecimal |
Types.DATE |
DATE |
java.sql.Date |
Types.TIME |
TIME |
java.sql.Time |
Types.TIMESTAMP |
TIMESTAMP(3) |
java.sql.Timestamp |
Types.INTERVAL_MONTHS |
INTERVAL YEAR TO MONTH |
java.lang.Integer |
Types.INTERVAL_MILLIS |
INTERVAL DAY TO SECOND(3) |
java.lang.Long |
generic型、composite型(つまり、POJOあるいはTuple) および配列型(オブジェクトあるいはprimitiveの配列)のような進化型は行のフィールドになるかもしれません。
generic型はテーブルAPIおよびSQL内ではまだブラックボックスとして扱われます。
しかし、composite型はcomposite型のフィールドがテーブルAPI内の.get()
およびSQL内のdotオペレータ(たとえばMyTable.pojoColumn.myField
)オペレータを使ってアクセスすることができる完全にサポートされた型です。composite型はテーブルAPI内で.flatten()
あるいはSQL内でMyTable.pojoColumn.*
を使ってflattenにすることもできます。
配列型はテーブルAPI内でmyArray.at(1)
オペレータおよびSQL内でmyArray[1]
オペレータを使ってアクセスすることができます。配列のリテラルはテーブルAPI内でarray(1, 2, 3)
およびSQL内で ARRAY[1, 2, 3]
を使って生成することができます。
テーブルAPIおよびSQLの両方はデータ変換のための組み込み関数のセットが付属します。この章は今のところ利用可能な関数の短い概要を説明します。
比較関数 | 解説 |
---|---|
|
等しい |
|
等しくない |
|
より大きい |
|
以上 |
|
より少ない |
|
以下 |
|
指定された表現がnullの時にtrueを返す。 |
|
指定された表現がnullではない時にtrueを返す。 |
|
文字列が指定されたLIKEパターンと一致する時にtrueを返す。例えば、"Jo_n%" は"Jo(arbitrary letter)n"で始まる全ての文字列と一致します。 |
|
文字列が指定されたSQL正規表現パターンと一致する時にtrueを返す。例えば、"A+" は少なくとも1つの "A" から成る全ての文字列と一致します。 |
論理関数 | 解説 |
---|---|
|
boolean1 が true あるいは boolean2 がtrueの時にtrueを返す。3値ロジックがサポートされます。 |
|
boolean1 と boolean2 の両方がtrueの時にtrueを返す。3値ロジックがサポートされます。 |
|
boolean表現がtrueではない時にtrueを返す; booleanがnullの時にnullを返す。 |
|
指定されたboolean表現がtrueの時にtrueを返す。そうでなければ false (nullおよびfalseについて)。 |
|
指定されたboolean表現がfalseの時にtrueを返す。そうでなければ false (nullおよびtrueについて)。 |
|
指定されたboolean表現がtrueではない時にtrueを返す (nullおよびfalseについて)。そうでなければfalse。 |
|
指定されたboolean表現がfalseではない時にtrue返す (nullおよびtrueについて)。そうでなければfalse。 |
計算関数 | 解説 |
---|---|
|
numericを返す。 |
|
負のnumericを返す |
|
numeric1足すnumeric2を返す。 |
|
numeric1 引く numeric2を返す。 |
|
numeric1掛ける numeric2を返す。 |
|
numeric1 割るnumeric2を返す。 |
|
numeric1 の numeric2乗を返す。 |
|
指定された値の絶対値を計算します。 |
|
numeric1 割る numeric2 の剰余 (モジュラス)を返す。numeric1が負の時だけ結果は負です。 |
|
指定された値の平方根を計算します。 |
|
指定された値の自然対数を計算します。 |
|
指定された値の基底10の対数を計算します。 |
|
オイラー数の指定乗数を計算します。 |
|
指定された値以上の最小の整数を計算します。 |
|
指定された値以下の最大の整数を計算します。 |
文字列関数 | 解説 |
---|---|
|
2つの文字列を連結する。 |
|
文字列の長さを返す。 |
|
デフォルトのロケールのルールを使って文字列内の全ての文字列の大文字を返す。 |
|
デフォルトのロケールのルールを使って文字列内の全ての文字列の小文字を返す。 |
|
他の文字列内での文字列の1から始まる位置を返す。文字列が見つからない場合は0を返す。例えば、 |
|
指定された文字列から、最初 および/あるいは 最後の文字を削除します。デフォルトでは、両方の端の空白文字が削除されます。 |
|
文字列の部分文字列を(1から始まる)開始位置の文字列で置き換えます。任意の長さはどれだけの数の文字が削除されるべきかを指定します。例えば、 |
|
指定された文字列の部分文字列を指定されたインデックスから最後までの文字列で作成します。開始のインデックスは1から始まり、それは含まれます。 |
|
指定された文字列の部分文字列を指定されたインデックスから指定された長さまでの文字列で生成します。インデックスは1から始まり、含まれます。つまり、インデックスの位置の文字は部分文字列に含まれます。部分文字列は指定された長さ以下です。 |
|
文字列内の各単語の最初の文字を大文字に変換します。文字列は [A-Za-z0-9] だけを含むと仮定し、それ以外の全ては空白文字として扱われます。 |
条件関数 | 解説 |
---|---|
|
評価されたboolean条件に基づいて2つの他の表現のどっちが評価されるべきかを決定する3条件オペレータ。例えば、 |
型変換関数 | 解説 |
---|---|
|
値を指定された型に変換します。例えば、 |
値のコンストラクタ関数 | 解説 |
---|---|
|
配列内の特定の位置の要素を返す。インデックスは1から始まります。 |
|
値のリストから配列を生成する。配列は(primitiveでは無い)オブジェクトの配列になるでしょう。 |
|
行の間隔を生成する。 |
一時的な関数 | 解説 |
---|---|
|
"yy-mm-dd"形式の日付文字列をSQLの日付にパースする。 |
|
"hh:mm:ss"形式の時間の文字列をSQLの時間にパースする。 |
|
"yy-mm-dd hh:mm:ss.fff"形式のタイムスタンプ文字列をSQLのタイムスタンプにパースする。 |
|
指定された年数について月の間隔を生成する。 |
|
指定された月数について月の間隔を生成する。 |
|
指定された日数についてミリ秒の間隔を生成する。 |
|
指定された時間数についてミリ秒の間隔を生成する。 |
|
指定された分数についてミリ秒の間隔を生成する。 |
|
指定された秒数についてミリ秒の間隔を生成する。 |
|
ミリ秒の間隔を生成する。 |
|
UTCタイムゾーンの現在のSQLの日付を返す。 |
|
UTCタイムゾーンの現在のSQLの時間を返す。 |
|
UTCタイムゾーンの現在のSQLのタイムスタンプを返す。 |
|
ローカルタイムゾーンの現在のSQLの時間を返す。 |
|
ローカルタイムゾーンの現在のSQLのタイムスタンプを返す。 |
|
time pointあるいは時間間隔の各部分を抽出する。部分をlong値として返します。例えば、 |
|
time point を指定された単位に上方に丸める。例えば、 |
|
time point を指定された単位に下方に丸めます。例えば、 |
|
SQLの日付から年の四半期を返す。例えば、 |
|
2つの固定された時間の間隔がオーバーラップするかどうかを決定する。time point と temporal は2つのtime point (開始、終了)によって定義される範囲に変換されます。関数は |
集約関数 | 解説 |
---|---|
|
フィールドがnullではない入力行の数を返す。 |
|
全ての入力値に渡って数値フィールドの平均(数値平均)を返す。 |
|
全ての入力値に渡って数値フィールドの合計を返す。 |
|
全ての入力値に渡ってフィールドの最大の値を返す。 |
|
全ての入力値に渡ってフィールドの最小の値を返す。 |
値へのアクセス関数 | 解説 |
---|---|
|
インデックスあるいは名前を使って(Tuple, POJOなどのような)Flinkのcomposite型のフィールドへアクセスし、その値を返す。例えば、 |
|
(Tuple, Pojoなどのような)Flinkのcomposite型とその直接のsubtypeの全てを、各subtypeが分割フィールドであるflatな表現に変換する。ほとんどの場合で、flatな表現のフィールドは元のフィールドに似ているがダラー セパレイターを使った名前を付けられます (例えば、 |
配列関数 | 解説 |
---|---|
|
配列の要素の数を返す。 |
|
1つの要素を持つ配列の唯一の要素を返す。配列が空の場合は |
補助関数 | 解説 |
---|---|
|
表現のための名前、つまりフィールド、を指定します。もし表現が複数のフィールドに拡張する場合は、追加の名前を指定することができます。 |
比較関数 | 解説 |
---|---|
|
等しい |
|
等しくない |
|
より大きい |
|
以上 |
|
より少ない |
|
以下 |
|
指定された表現がnullの時にtrueを返す。 |
|
指定された表現がnullではない時にtrueを返す。 |
|
文字列が指定されたLIKEパターンと一致する時にtrueを返す。例えば、"Jo_n%" は"Jo(arbitrary letter)n"で始まる全ての文字列と一致します。 |
|
文字列が指定されたSQL正規表現パターンと一致する時にtrueを返す。例えば、"A+" は少なくとも1つの "A" から成る全ての文字列と一致します。 |
論理関数 | 解説 |
---|---|
|
boolean1 が true あるいは boolean2 がtrueの時にtrueを返す。3値ロジックがサポートされます。 |
|
boolean1 と boolean2 の両方がtrueの時にtrueを返す。3値ロジックがサポートされます。 |
|
boolean表現がtrueではない時にtrueを返す; booleanがnullの時にnullを返す。 |
|
指定されたboolean表現がtrueの時にtrueを返す。そうでなければ false (nullおよびfalseについて)。 |
|
指定されたboolean表現がfalseの時にtrueを返す。そうでなければ false (nullおよびtrueについて)。 |
|
指定されたboolean表現がtrueではない時にtrueを返す (nullおよびfalseについて)。そうでなければfalse。 |
|
指定されたboolean表現がfalseではない時にtrue返す (nullおよびtrueについて)。そうでなければfalse。 |
計算関数 | 解説 |
---|---|
|
numericを返す。 |
|
負のnumericを返す |
|
numeric1足すnumeric2を返す。 |
|
numeric1 引く numeric2を返す。 |
|
numeric1掛ける numeric2を返す。 |
|
numeric1 割るnumeric2を返す。 |
|
numeric1 の numeric2乗を返す。 |
|
指定された値の絶対値を計算します。 |
|
numeric1 割る numeric2 の剰余 (モジュラス)を返す。numeric1が負の時だけ結果は負です。 |
|
指定された値の平方根を計算します。 |
|
指定された値の自然対数を計算します。 |
|
指定された値の基底10の対数を計算します。 |
|
オイラー数の指定乗数を計算します。 |
|
指定された値以上の最小の整数を計算します。 |
|
指定された値以下の最大の整数を計算します。 |
計算関数 | 解説 |
---|---|
|
2つの文字列を連結する。 |
|
文字列の長さを返す。 |
|
デフォルトのロケールのルールを使って文字列内の全ての文字列の大文字を返す。 |
|
デフォルトのロケールのルールを使って文字列内の全ての文字列の小文字を返す。 |
|
他の文字列内での文字列の1から始まる位置を返す。文字列が見つからない場合は0を返す。例えば、 |
|
指定された文字列から、最初 および/あるいは 最後の文字を削除します。 |
|
文字列の部分文字列を(1から始まる)開始位置の文字列で置き換えます。任意の長さはどれだけの数の文字が削除されるべきかを指定します。例えば、 |
|
指定された文字列の部分文字列を指定されたインデックスから最後までの文字列で作成します。開始のインデックスは1から始まり、それは含まれます。 |
|
指定された文字列の部分文字列を指定されたインデックスから指定された長さまでの文字列で生成します。インデックスは1から始まり、含まれます。つまり、インデックスの位置の文字は部分文字列に含まれます。部分文字列は指定された長さ以下です。 |
|
文字列内の各単語の最初の文字を大文字に変換します。文字列は [A-Za-z0-9] だけを含むと仮定し、それ以外の全ては空白文字として扱われます。 |
条件関数 | 解説 |
---|---|
|
評価されたboolean条件に基づいて2つの他の表現のどっちが評価されるべきかを決定する3条件オペレータ。例えば、 |
型変換関数 | 解説 |
---|---|
|
値を指定された型に変換します。例えば、 |
値のコンストラクタ関数 | 解説 |
---|---|
|
配列内の特定の位置の要素を返す。インデックスは1から始まります。 |
|
値のリストから配列を生成する。配列は(primitiveでは無い)オブジェクトの配列になるでしょう。 |
|
行の間隔を生成する。 |
一時的な関数 | 解説 |
---|---|
|
"yy-mm-dd"形式の日付文字列をSQLの日付にパースする。 |
|
"hh:mm:ss"形式の時間の文字列をSQLの時間にパースする。 |
|
"yy-mm-dd hh:mm:ss.fff"形式のタイムスタンプ文字列をSQLのタイムスタンプにパースする。 |
|
指定された年数について月の間隔を生成する。 |
|
指定された月数について月の間隔を生成する。 |
|
指定された日数についてミリ秒の間隔を生成する。 |
|
指定された時間数についてミリ秒の間隔を生成する。 |
|
指定された分数についてミリ秒の間隔を生成する。 |
|
指定された秒数についてミリ秒の間隔を生成する。 |
|
ミリ秒の間隔を生成する。 |
|
UTCタイムゾーンの現在のSQLの日付を返す。 |
|
UTCタイムゾーンの現在のSQLの時間を返す。 |
|
UTCタイムゾーンの現在のSQLのタイムスタンプを返す。 |
|
ローカルタイムゾーンの現在のSQLの時間を返す。 |
|
ローカルタイムゾーンの現在のSQLのタイムスタンプを返す。 |
|
time pointあるいは時間間隔の各部分を抽出する。部分をlong値として返します。例えば、 |
|
time point を指定された単位に上方に丸める。例えば、 |
|
time point を指定された単位に下方に丸めます。例えば、 |
|
SQLの日付から年の四半期を返す。例えば、 |
|
2つの固定された時間の間隔がオーバーラップするかどうかを決定する。time point と temporal は2つのtime point (開始、終了)によって定義される範囲に変換されます。関数は |
集約関数 | 解説 |
---|---|
|
フィールドがnullではない入力行の数を返す。 |
|
全ての入力値に渡って数値フィールドの平均(数値平均)を返す。 |
|
全ての入力値に渡って数値フィールドの合計を返す。 |
|
全ての入力値に渡ってフィールドの最大の値を返す。 |
|
全ての入力値に渡ってフィールドの最小の値を返す。 |
値へのアクセス関数 | 解説 |
---|---|
|
インデックスあるいは名前を使って(Tuple, POJOなどのような)Flinkのcomposite型のフィールドへアクセスし、その値を返す。例えば、 |
|
(Tuple, Pojoなどのような)Flinkのcomposite型とその直接のsubtypeの全てを、各subtypeが分割フィールドであるflatな表現に変換する。ほとんどの場合で、flatな表現のフィールドは元のフィールドに似ているがダラー セパレイターを使った名前を付けられます (例えば、 |
配列関数 | 解説 |
---|---|
|
配列の要素の数を返す。 |
|
1つの要素を持つ配列の唯一の要素を返す。配列が空の場合は |
補助関数 | 解説 |
---|---|
|
表現のための名前、つまりフィールド、を指定します。もし表現が複数のフィールドに拡張する場合は、追加の名前を指定することができます。 |
(構文を含む) Flink の SQL関数は Apache Calciteの組み込み関数の部分集合です。ドキュメントのほとんどはCalcite SQL リファレンスから採用されました。
比較関数 | 解説 |
---|---|
|
等しい |
|
等しくない |
|
より大きい |
|
以上 |
|
より少ない |
|
以下 |
|
value が null の時にTRUEを返す。 |
|
value が nullでは無い時にTRUEを返す。 |
|
2つの値が等しく無い時にTRUEを返し、null値は同じものとして扱われる。 |
|
2つの値が等しい時にTRUEを返し、null値は同じものとして扱われる。 |
|
value1 がvalue2以上で、value3以下の時にTRUEを返す。 |
|
value1 が value2 未満か、value3より大きい時にTRUEを返す。 |
|
string1 がパターンstring2に一致する時にTRUEを返す。必要であればエスケープ文字を定義することができます。 |
|
string1 がパターン string2 に一致しない時にTRUEを返す。必要であればエスケープ文字を定義することができます。 |
|
string1 が正規表現 string2 に一致する時にTRUEを返す。必要であればエスケープ文字を定義することができます。 |
|
string1 が正規表現 string2 に一致しない時にTRUEを返す。必要であればエスケープ文字を定義することができます。 |
|
value がリスト内の値に等しい時にTRUEを返す。 |
|
value がリスト内の各値に等しく無い時にTRUEを返す。 |
|
sub-query が少なくとも1行を返す時にTRUEを返す。オペレーションがjoinおよびgroupオペレーション内で上書きできる時のみサポートされます。 |
論理関数 | 解説 |
---|---|
|
boolean1がTRUEか、boolean2 がTRUEの時にTRUEを返す。3値ロジックがサポートされます。 |
|
boolean1 と boolean2 の両方がTRUEの時にTRUEを返す。3値ロジックがサポートされます。 |
|
boolean がTRUEでは無い時にTRUEを返す; boolean が UNKNOWN の時に UNKNOWN を返す。 |
|
boolean がFALSEの時にTRUEを返す; boolean が UNKNOWN の時に FALSE を返す。 |
|
boolean が FALSE では無い時にTRUEを返す; boolean が UNKNOWN の時にTRUEを返す。 |
|
boolean が TRUE の時にTRUEを返す; boolean が UNKNOWN の時にFALSEを返す。 |
|
boolean が TRUEでは無い時にTRUEを返す; boolean が UNKNOWN の時にTRUEを返す。 |
|
boolean が UNKNOWN の時にTRUEを返す。 |
|
boolean が UNKNOWN では無い時にTRUEを返す。 |
計算関数 | 解説 |
---|---|
|
numericを返す。 |
|
負のnumericを返す |
|
numeric1足すnumeric2を返す。 |
|
numeric1 引く numeric2を返す。 |
|
numeric1掛ける numeric2を返す。 |
|
numeric1 割るnumeric2を返す。 |
|
numeric1 の numeric2乗を返す。 |
|
numericの絶対値を返す。 |
|
numeric1 割る numeric2 の剰余 (モジュラス)を返す。numeric1が負の時だけ結果は負です。 |
|
numericの平方根を返す。 |
|
numericの自然対数 (基底 e) を返す。 |
|
numericの基底10の対数を返す。 |
|
numericの乗数を返す。 |
|
numericを上に丸め、numeric以上の最小の数を返す。 |
|
numericを下に丸め、numeric以下の最大の数を返す。 |
文字列関数 | 解説 |
---|---|
|
2つの文字列を連結する。 |
|
文字列内の文字の数を返す。 |
|
CHAR_LENGTH(string)の通り。 |
|
大文字に変換された文字列を返す。 |
|
小文字に変換された文字列を返す。 |
|
string2内にstring1が最初に現れた位置を返す。 |
|
string2から、最初 および/あるいは 最後の文字を削除する。デフォルトでは、両方の端の空白文字が削除されます。 |
|
string1の部分文字列をstring2で置き換える。 |
|
指定された位置から始まる部分文字列を返す。 |
|
指定された位置から指定された長さの部分文字列を返す。 |
|
各単語の最初の文字が大文字で残りが小文字の文字列を返す。単語は非英数字の文字で分割された英数字の文字のシーケンスです。 |
条件関数 | 解説 |
---|---|
|
簡単な例。 |
|
検索された場合。 |
|
値が同じ場合はNULLを返す。例えば、 |
|
最初の値がnullの時に値を提供する。例えば、 |
型変換関数 | 解説 |
---|---|
|
値を指定された型に変換します。 |
値のコンストラクタ関数 | 解説 |
---|---|
|
配列内の特定の位置の要素を返す。インデックスは1から始まります。 |
|
値のリストから配列を生成する。 |
一時的な関数 | 解説 |
---|---|
|
"yy-mm-dd"形式の日付文字列をSQLの日付にパースする。 |
|
"hh:mm:ss"の形式の時間stringをSQLの時間にパースする。 |
|
"yy-mm-dd hh:mm:ss.fff"の形式のタイムスタンプstring をSQLのタイムスタンプにパースする。 |
|
"dd hh:mm:ss.fff"形式の間隔stringをSQLのミリ秒間隔に、"yyyy-mm"をSQLの月間隔にパースする。間隔の範囲は、例えば ミリ秒の間隔については、 |
|
UTCタイムゾーンの現在のSQLの日付を返す。 |
|
UTCタイムゾーンの現在のSQLの時間を返す。 |
|
UTCタイムゾーンの現在のSQLのタイムスタンプを返す。 |
|
ローカルタイムゾーンの現在のSQLの時間を返す。 |
|
ローカルタイムゾーンの現在のSQLのタイムスタンプを返す。 |
|
time pointあるいは時間間隔の各部分を抽出する。部分をlong値として返します。例えば、 |
|
time point を指定された単位に上方に丸める。例えば、 |
|
time point を指定された単位に下方に丸めます。例えば、 |
|
SQLの日付から年の四半期を返す。例えば、 |
|
2つの固定された時間の間隔がオーバーラップするかどうかを決定する。time point と temporal は2つのtime point (開始、終了)によって定義される範囲に変換されます。関数は |
集約関数 | 解説 |
---|---|
|
value が null では無い入力行の数を返す。 |
|
入力行の数を返す。 |
|
全ての入力値に渡ってnumericの平均(数値平均)を返す。 |
|
全ての入力値に渡ってnumericの合計を返す。 |
|
全ての入力値に渡ってvalueの最大の値を返す。 |
|
全ての入力値に渡ってvalueの最小の値を返す。 |
グルーピング 関数 | 解説 |
---|---|
|
グルーピング キーの組み合わせをユニークに識別する数値を返す。 |
|
expressionが現在の行のグルーピング セット内で集まる場合は1を、そうでなければ 0 を返す。 |
|
指定されたグルーピング表現のビット ベクトルを返す。 |
値へのアクセス関数 | 解説 |
---|---|
|
名前を使って(Tuple, POJOなどのような)Flinkのcomposite型のフィールドへアクセスし、その値を返す。 |
|
(Tuple, Pojoなどのような)Flinkのcomposite型とその直接のsubtypeの全てを、各subtypeが分割フィールドであるflatな表現に変換する。 |
配列関数 | 解説 |
---|---|
|
配列の要素の数を返す。 |
|
1つの要素を持つ配列の唯一の要素を返す。配列が空の場合は |
必要なscalar関数が組み込み関数内に含まれていない場合、テーブルAPIおよびSQLの両方のための独自のユーザ定義のscalar関数を定義することができます。ユーザ定義のscalar関数は0, 1, あるいは多数の値を新しいscalar値にマップします。
scalar関数を定義するには、org.apache.flink.table.functions
内のScalarFunction
を拡張し、(1つ以上の)評価メソッドを実装する必要があります。scalar関数の挙動は評価関数によって決定されます。評価メソッドは公的に宣言され、eval
という名前でなければなりません。評価メソッドのパラメータの型と返り値の型はscalar関数のパラメータと返り値の型も決定します。評価メソッドは多数のeval
という名前のメソッドを実装することで上書きすることができます。
以下の例の断片は独自のハッシュコード関数を定義する方法を示します:
public static class HashCode extends ScalarFunction {
public int eval(String s) {
return s.hashCode() * 12;
}
}
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register the function
tableEnv.registerFunction("hashCode", new HashCode())
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL API
tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
// must be defined in static/object context
object hashCode extends ScalarFunction {
def eval(s: String): Int = {
s.hashCode() * 12
}
}
val tableEnv = TableEnvironment.getTableEnvironment(env)
// use the function in Scala Table API
myTable.select('string, hashCode('string))
// register and use the function in SQL
tableEnv.registerFunction("hashCode", hashCode)
tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
デフォルトで評価メソッドの結果の型はFlinkの型の抽出ファシリティによって決定されます。これは基本的な型あるいは単純なPOJOにとって十分ですが、もっと複雑、独自、あるいは複合の型にとっては具合が悪いかもしれません。これらの場合、結果の型のTypeInformation
はScalarFunction#getResultType()
を上書くことで手動で定義することができます。
内部的には、テーブルAPIとSQLのコード生成は出来る限りprimitive値と連携します。もしユーザ定義のscalar関数が実行時にオブジェクトの生成/キャストによってあまり大きなオーバーヘッドを導入すべきでないなら、それらの箱詰めの代わりにprimitiveの型としてパラメータと結果の型を宣言することをお勧めします。Types.DATE
と Types.TIME
は int
として表現することもできます。Types.TIMESTAMP
は long
として表現されるかもしれません。
以下の例は内部的なタイムスタンプ表現を取り、またlong値として内部的なタイムスタンプ表現を返す、より進んだ例です。ScalarFunction#getResultType()
を上書くことで、返されるlong値はコードの生成によってTypes.TIMESTAMP
として解釈されるべきだと定義します。
public static class TimestampModifier extends ScalarFunction {
public long eval(long t) {
return t % 1000;
}
public TypeInformation> getResultType(signature: Class>[]) {
return Types.TIMESTAMP;
}
}
object TimestampModifier extends ScalarFunction {
def eval(t: Long): Long = {
t % 1000
}
override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
Types.TIMESTAMP
}
}
ユーザ定義のscalar関数と似て、ユーザ定義のテーブル関数は0, 1 あるいは多数のscalar値を入力パラメータとして取ります。しかし、scalar関数とは対照的に、出力として1つの値の代わりに任意の数の行を返すことができます。返される行は1つ以上のカラムから成るかもしれません。
テーブル関数を定義するには、org.apache.flink.table.functions
内のTableFunction
を拡張し、(1つ以上の)評価メソッドを実装する必要があります。テーブル関数の挙動は評価関数によって決定されます。評価メソッドはpublic
に宣言され、eval
という名前でなければなりません。TableFunction
は多数のeval
という名前のメソッドを実装することで上書きすることができます。評価メソッドのパラメータの型はテーブル関数の全ての有効なパラメータを決定します。返されるテーブルの型はTableFunction
のgeneric型によって決定されます。評価メソッドは protected collect(T)
メソッドを使って出力行を発行します。
テーブルAPI内では、テーブル関数はScalarユーザについては .join(Expression)
あるいは .leftOuterJoin(Expression)
と一緒に、Javaユーザについては.join(String)
あるいは .leftOuterJoin(String)
と一緒に使われます。join
オペレータは、outerテーブル(オペレータの左のテーブル)からの各行とテーブル値関数によって生成された全ての行(オペレータの右のテーブル)を (cross)joinします。leftOuterJoin
オペレータはouterテーブル(オペレータの左のテーブル)からの各行と、テーブル値関数によって生成された全ての行(オペレータの右のテーブル)をjoinし、テーブル関数が空のテーブルを返すouterの行を保持します。In SQL use LATERAL TABLE(<TableFunction>)
with CROSS JOIN and LEFT JOIN with an ON TRUE join condition (see examples below).
以下の例はテーブル値関数をどうやって定義し使うかを示します:
// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
public class Split extends TableFunction<Tuple2<String, Integer>> {
public void eval(String str) {
for (String s : str.split(" ")) {
// use collect(...) to emit a row
collect(new Tuple2<String, Integer>(s, s.length()));
}
}
}
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ... // table schema: [a: String]
// Register the function.
tableEnv.registerFunction("split", new Split());
// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.join("split(a) as (word, length)").select("a, word, length");
myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");
// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
// The generic type "(String, Int)" determines the schema of the returned table as (String, Integer).
class Split extends TableFunction[(String, Int)] {
def eval(str: String): Unit = {
// use collect(...) to emit a row.
str.split(" ").foreach(x -> collect((x, x.length))
}
}
val tableEnv = TableEnvironment.getTableEnvironment(env)
val myTable = ... // table schema: [a: String]
// Use the table function in the Scala Table API (Note: No registration required in Scala Table API).
val split = new Split()
// "as" specifies the field names of the generated table.
myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length);
myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length);
// Register the table function to use it in SQL queries.
tableEnv.registerFunction("split", new Split())
// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API)
tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE");
重要: テーブル関数をScalaオブジェクトとして実装しないでください。Scala オブジェクトはシングルトンで並行処理の問題を起こすでしょう。
POJO型は決定論的フィールドの順番を持たないことに注意してください。従って、テーブル関数を使って返されたPOJOのフィールドをAS
を使って名前を変えることができません。
デフォルトではTableFunction
の結果の型はFlinkの自動型抽出ファシリティを使って決定されます。これは基本型と単純なPOJOについては良く動作しますが、もっと複雑、独自、あるいは複合の型については具合が悪いかもしれません。そのような場合、結果の型はTypeInformation
を返すTableFunction#getResultType()
を上書きすることで手動で指定することができます。
The following example shows an example of a TableFunction
that returns a Row
type which requires explicit type information. We define that the returned table type should be RowTypeInfo(String, Integer)
by overriding TableFunction#getResultType()
.
public class CustomTypeSplit extends TableFunction<Row> {
public void eval(String str) {
for (String s : str.split(" ")) {
Row row = new Row(2);
row.setField(0, s);
row.setField(1, s.length);
collect(row);
}
}
@Override
public TypeInformation<Row> getResultType() {
return new RowTypeInfo(new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO});
}
}
class CustomTypeSplit extends TableFunction[Row] {
def eval(str: String): Unit = {
str.split(" ").foreach({ s =>
val row = new Row(2)
row.setField(0, s)
row.setField(1, s.length)
collect(row)
})
}
override def getResultType: TypeInformation[Row] = {
new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO))
}
}
以下のオペレーションはまだサポートされません:
Table
はTableSink
に書き込むことができます。これは広く様々なフォーマット(例えば、CSV, Apache Parquet, Apache Avro)、ストレージシステム (例えば、JDBC, Apache HBase, Apache Cassandra, Elasticsearch)、あるいはメッセージシステム (例えば Apache Kafka, RabbitMQ)をサポートするための一般的なインタフェースです。バッチTable
はBatchTableSink
に書き込むことのみできます。これはStreamTableSink
を必要とするストリーミングテーブルです。TableSink
は同時に両方のインタフェースを実装することができます。
現在のところ、FlinkはバッチあるいはストリーミングTable
を CSV-フォーマットのファイルに書き込むCsvTableSink
のみを提供します。独自のTableSink
はBatchTableSink
および/あるいは StreamTableSink
インタフェースを実装することで定義することができます。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// compute the result Table using Table API operators and/or SQL queries
Table result = ...
// create a TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
// write the result Table to the TableSink
result.writeToSink(sink);
// execute the program
env.execute();
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// compute the result Table using Table API operators and/or SQL queries
val result: Table = ...
// create a TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
// write the result Table to the TableSink
result.writeToSink(sink)
// execute the program
env.execute()
テーブルAPIはランタイムの挙動を修正するための設定(TableConfig
と呼ばれます)を提供します。TableEnvironment
を使ってアクセスすることができます。
デフォルトで、テーブルAPIはnull
値をサポートします。テーブルAPIは今のところ全てのFlinkの型をサポートしません。
テーブルAPIはTable
を計算するための論理的で最適化されたクエリ計画を説明するための仕組みを提供します。これはTableEnvironment#explain(table)
メソッドを使って行われます。3つの計画を説明する文字列を返します。
以下のコードは例と対応する出力を示します:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
.where("LIKE(word, 'F%')")
.unionAll(table2);
String explanation = tEnv.explain(table);
System.out.println(explanation);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
.where('word.like("F%"))
.unionAll(table2)
val explanation: String = tEnv.explain(table)
println(explanation)
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, 'F%')])
LogicalTableScan(table=[[_DataStreamTable_0]])
LogicalTableScan(table=[[_DataStreamTable_1]])
== Optimized Logical Plan ==
DataStreamUnion(union=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
DataStreamScan(table=[[_DataStreamTable_0]])
DataStreamScan(table=[[_DataStreamTable_1]])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, 'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE