This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
JSON
JSON形式 #
Format: Serialization Schema Format: Deserialization Schema
JSONフォーマットを使って、JSONスキーマに基づいたJSONデータを読み書きできます。現在、JSONスキーマはテーブルスキーマから派生しています。
JSON形式は、Upsert Kafkaコネクタなど、取り消しストリームや更新ストリームを明示的にサポートするコネクタを使っている場合を除き、追加専用ストリームをサポートします。If you need to write retract streams and/or upsert streams, we suggest you to look at CDC JSON formats like Debezium JSON and Canal JSON.
依存 #
In order to use the Json format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency | SQL Client |
---|---|
|
Built-in |
JSON形式を持つテーブルの作成法 #
ここでは、KafkaコネクタとJSON形式を使ってテーブルを作成する方法を示します。
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
フォーマットオプション #
オプション | 必要条件 | Forwarded | デフォルト | 種類 | 説明 |
---|---|---|---|---|---|
形式 |
必須 | no | (none) | 文字列 | 使用する形式を指定します。ここでは'json' にする必要があります。 |
json.fail-on-missing-field |
オプション | no | false | 真偽値 | フィールドが欠落している場合に失敗するかどうか。 |
json.ignore-parse-errors |
オプション | no | false | 真偽値 | 解析エラーが発生したフィールドと行を失敗せずにスキップします。 エラーが発生した場合は、フィールドはnullに設定されます。 |
json.timestamp-format.standard |
オプション | はい | 'SQL' |
文字列 | TIMESTAMP とTIMESTAMP_LTZ 型のために、入力および出力のタイムスタンプを指定します。現在サポートされている値は、'SQL' と'ISO-8601' です:
|
json.map-null-key.mode |
オプション | はい | 'FAIL' |
文字列 | マップデータのnullキーをシリアライズする際のモードを指定します。現在サポートされる値は'FAIL' 、'DROP' 、'LITERAL' です:
|
json.map-null-key.literal |
オプション | はい | 'null' | 文字列 | 'json.map-null-key.mode' がLITERALの場合、nullキーを置き開ける文字列リテラルを指定します。 |
json.encode.decimal-as-plain-number |
オプション | はい | false | 真偽値 | 全ての小数を科学的な表記法ではなく単純な数値としてエンコードします。デフォルトでは、小数は科学的な表記法を使って記述できます。例えば、0.000000027 はデフォルトで2.7E-8 としてエンコードされ、このオプションをtrueに設定すると0.000000027 として書き込まれます。 |
decode.json-parser.enabled |
オプション | true | 真偽値 | jsonをでコードするために、Jackson JsonParser を使うかどうか。JsonParser はJSONデータを読み込むためのJackson JSONストリーミングAPIです。これは、以前のJsonNode のやり方と比較して、はるかに高速で消費するメモリも少なくなります。一方、JsonParser は、データ読み込み時のネストされたプrじぇくションプッシュダウンもサポートします。このオプションはデフォルトで有効です。非互換性の問題が発生した場合は、無効して以前のJsonNode のやり方にフォールバックできます。 |
データ型マッピング #
現在、JSONスキーマは常にテーブルスキーマから派生します。JSONスキーマの明示的な定義はまだサポートされません。
FlinkのJSO形式は、jackson databind APIを使ってJSON文字列を解析して生成します。
以下の表は、Flink型からJSON型への型マッピングを一覧表示しています。
Flink SQL型 | JSON 型 |
---|---|
CHAR / VARCHAR / STRING |
string |
BOOLEAN |
boolean |
BINARY / VARBINARY |
string with encoding: base64 |
DECIMAL |
number |
TINYINT |
number |
SMALLINT |
number |
INT |
number |
BIGINT |
number |
FLOAT |
number |
DOUBLE |
number |
DATE |
string with format: date |
TIME |
string with format: time |
TIMESTAMP |
string with format: date-time |
TIMESTAMP_WITH_LOCAL_TIME_ZONE |
string with format: date-time (with UTC time zone) |
INTERVAL |
number |
ARRAY |
array |
MAP / MULTISET |
object |
ROW |
object |