JSON
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

JSON形式 #

Format: Serialization Schema Format: Deserialization Schema

JSONフォーマットを使って、JSONスキーマに基づいたJSONデータを読み書きできます。現在、JSONスキーマはテーブルスキーマから派生しています。

JSON形式は、Upsert Kafkaコネクタなど、取り消しストリームや更新ストリームを明示的にサポートするコネクタを使っている場合を除き、追加専用ストリームをサポートします。If you need to write retract streams and/or upsert streams, we suggest you to look at CDC JSON formats like Debezium JSON and Canal JSON.

依存 #

In order to use the Json format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency SQL Client
Built-in

JSON形式を持つテーブルの作成法 #

ここでは、KafkaコネクタとJSON形式を使ってテーブルを作成する方法を示します。

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)

フォーマットオプション #

オプション 必要条件 Forwarded デフォルト 種類 説明
形式
必須 no (none) 文字列 使用する形式を指定します。ここでは'json'にする必要があります。
json.fail-on-missing-field
オプション no false 真偽値 フィールドが欠落している場合に失敗するかどうか。
json.ignore-parse-errors
オプション no false 真偽値 解析エラーが発生したフィールドと行を失敗せずにスキップします。 エラーが発生した場合は、フィールドはnullに設定されます。
json.timestamp-format.standard
オプション はい 'SQL' 文字列 TIMESTAMPTIMESTAMP_LTZ型のために、入力および出力のタイムスタンプを指定します。現在サポートされている値は、'SQL''ISO-8601'です:
  • Option 'SQL' will parse input TIMESTAMP values in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g "2020-12-30 12:13:14.123", parse input TIMESTAMP_LTZ values in "yyyy-MM-dd HH:mm:ss.s{precision}'Z'" format, e.g "2020-12-30 12:13:14.123Z" and output timestamp in the same format.
  • Option 'ISO-8601'will parse input TIMESTAMP in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g "2020-12-30T12:13:14.123" parse input TIMESTAMP_LTZ in "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" format, e.g "2020-12-30T12:13:14.123Z" and output timestamp in the same format.
json.map-null-key.mode
オプション はい 'FAIL' 文字列 マップデータのnullキーをシリアライズする際のモードを指定します。現在サポートされる値は'FAIL''DROP''LITERAL'です:
  • オプション'FAIL'は、nullキーを持つマップ値に遭遇した場合に例外を投げます。
  • オプション'DROP'はマップデータのnullキーエントリを削除します。
  • オプション'LITERAL'はnullキーを文字列リテラルに置き換えます。文字列のリテラルはjson.map-null-key.literalオプションによって定義されます。
json.map-null-key.literal
オプション はい 'null' 文字列 'json.map-null-key.mode'がLITERALの場合、nullキーを置き開ける文字列リテラルを指定します。
json.encode.decimal-as-plain-number
オプション はい false 真偽値 全ての小数を科学的な表記法ではなく単純な数値としてエンコードします。デフォルトでは、小数は科学的な表記法を使って記述できます。例えば、0.000000027はデフォルトで2.7E-8としてエンコードされ、このオプションをtrueに設定すると0.000000027として書き込まれます。
decode.json-parser.enabled
オプション true 真偽値 jsonをでコードするために、Jackson JsonParserを使うかどうか。JsonParserはJSONデータを読み込むためのJackson JSONストリーミングAPIです。これは、以前のJsonNodeのやり方と比較して、はるかに高速で消費するメモリも少なくなります。一方、JsonParserは、データ読み込み時のネストされたプrじぇくションプッシュダウンもサポートします。このオプションはデフォルトで有効です。非互換性の問題が発生した場合は、無効して以前のJsonNodeのやり方にフォールバックできます。

データ型マッピング #

現在、JSONスキーマは常にテーブルスキーマから派生します。JSONスキーマの明示的な定義はまだサポートされません。

FlinkのJSO形式は、jackson databind APIを使ってJSON文字列を解析して生成します。

以下の表は、Flink型からJSON型への型マッピングを一覧表示しています。

Flink SQL型 JSON 型
CHAR / VARCHAR / STRING string
BOOLEAN boolean
BINARY / VARBINARY string with encoding: base64
DECIMAL number
TINYINT number
SMALLINT number
INT number
BIGINT number
FLOAT number
DOUBLE number
DATE string with format: date
TIME string with format: time
TIMESTAMP string with format: date-time
TIMESTAMP_WITH_LOCAL_TIME_ZONE string with format: date-time (with UTC time zone)
INTERVAL number
ARRAY array
MAP / MULTISET object
ROW object
inserted by FC2 system