Data Types
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

データの型 #

Apache FlinkのPython DataStream APIでは、データの型はDataStreamエコシステムでの値の型を記述します。 オペレーションの入力と出力の型を宣言するために使うことができ、システムに要素をシリアライズ化する方法を伝えます。

Pickleシリアライズ化 #

型が宣言されなかった場合、データはPickleを使ってシリアライズ化または逆シリアライズ化されます。 例えば、以下のプログラムはデータ型を指定しません。

from pyflink.datastream import StreamExecutionEnvironment


def processing():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')]) \
        .map(lambda record: (record[0]+1, record[1].upper())) \
        .print()  # note: print to stdout on the worker machine

    env.execute()


if __name__ == '__main__':
    processing()

ただし、次の場合に型を指定する必要があります:

  • PythonのレコードをJavaのオペレーションに渡す場合。
  • シリアライズ化と逆シリアライズ化のパフォーマンスを改善する場合。

PythonのレコードをJavaのオペレーションに渡す場合 #

Javaのオペレーターや関数はPythonデータを識別できないため、処理のためにPythonの型をJavaの型に変換するのに役立つ型を提供する必要があります。 例えば、Javaで実装されたFlinkSinkを使ってデータを出力したい場合、型が提供される必要があります。

from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink


def file_sink():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')]) \
        .map(lambda record: (record[0]+1, record[1].upper()),
             output_type=Types.ROW([Types.INT(), Types.STRING()])) \
        .add_sink(FileSink
                  .for_row_format('/tmp/output', Encoder.simple_string_encoder())
                  .build())

    env.execute()


if __name__ == '__main__':
    file_sink()

シリアライズ化と逆シリアライズ化のパフォーマンスを改善する #

データはPickleを使ってシリアライズ化と逆シリアライズ化できますが、型が提供されるとパフォーマンスが向上します。 明示的な型により、PyFlinkはパイプライン経由でレコードを移動する時に効率的なserializerを使えます。

サポートされるデータ型 #

Python DataStream APIで型を定義するためにpyflink.common.typeinfo.Typesが使えます。 以下の表は、現在サポートされる型と、それらを定義する方法を示しています:

PyFlinkの型 Pythonの型 Javaの型
Types.BOOLEAN() bool java.lang.Boolean
Types.BYTE() int java.lang.Byte
Types.SHORT() int java.lang.Short
Types.INT() int java.lang.Integer
Types.LONG() int java.lang.Long
Types.FLOAT() float java.lang.Float
Types.DOUBLE() float java.lang.Double
Types.CHAR() str java.lang.Character
Types.STRING() str java.lang.String
Types.BIG_INT() int java.math.BigInteger
Types.BIG_DEC() decimal.Decimal java.math.BigDecimal
Types.INSTANT() pyflink.common.time.Instant java.time.Instant
Types.TUPLE() tuple org.apache.flink.api.java.tuple.Tuple0 ~ org.apache.flink.api.java.tuple.Tuple25
Types.ROW() pyflink.common.Row org.apache.flink.types.Row
Types.ROW_NAMED() pyflink.common.Row org.apache.flink.types.Row
Types.MAP() dict java.util.Map
Types.PICKLED_BYTE_ARRAY() The actual unpickled Python object byte[]
Types.SQL_DATE() datetime.date java.sql.Date
Types.SQL_TIME() datetime.time java.sql.Time
Types.SQL_TIMESTAMP() datetime.datetime java.sql.Timestamp
Types.LIST() list of Python object java.util.List

以下の表はサポートされる配列型を示しています:

PyFlinkの配列型 Pythonの型 Javaの型
Types.PRIMITIVE_ARRAY(Types.BYTE()) bytes byte[]
Types.PRIMITIVE_ARRAY(Types.BOOLEAN()) list of bool boolean[]
Types.PRIMITIVE_ARRAY(Types.SHORT()) list of int short[]
Types.PRIMITIVE_ARRAY(Types.INT()) list of int int[]
Types.PRIMITIVE_ARRAY(Types.LONG()) list of int long[]
Types.PRIMITIVE_ARRAY(Types.FLOAT()) list of float float[]
Types.PRIMITIVE_ARRAY(Types.DOUBLE()) list of float double[]
Types.PRIMITIVE_ARRAY(Types.CHAR()) list of str char[]
Types.BASIC_ARRAY(Types.BYTE()) list of int java.lang.Byte[]
Types.BASIC_ARRAY(Types.BOOLEAN()) list of bool java.lang.Boolean[]
Types.BASIC_ARRAY(Types.SHORT()) list of int java.lang.Short[]
Types.BASIC_ARRAY(Types.INT()) list of int java.lang.Integer[]
Types.BASIC_ARRAY(Types.LONG()) list of int java.lang.Long[]
Types.BASIC_ARRAY(Types.FLOAT()) list of float java.lang.Float[]
Types.BASIC_ARRAY(Types.DOUBLE()) list of float java.lang.Double[]
Types.BASIC_ARRAY(Types.CHAR()) list of str java.lang.Character[]
Types.BASIC_ARRAY(Types.STRING()) list of str java.lang.String[]
Types.OBJECT_ARRAY() list of Python object Array
inserted by FC2 system