This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Data Types
データの型 #
Apache FlinkのPython DataStream APIでは、データの型はDataStreamエコシステムでの値の型を記述します。 オペレーションの入力と出力の型を宣言するために使うことができ、システムに要素をシリアライズ化する方法を伝えます。
Pickleシリアライズ化 #
型が宣言されなかった場合、データはPickleを使ってシリアライズ化または逆シリアライズ化されます。 例えば、以下のプログラムはデータ型を指定しません。
from pyflink.datastream import StreamExecutionEnvironment
def processing():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')]) \
.map(lambda record: (record[0]+1, record[1].upper())) \
.print() # note: print to stdout on the worker machine
env.execute()
if __name__ == '__main__':
processing()
ただし、次の場合に型を指定する必要があります:
- PythonのレコードをJavaのオペレーションに渡す場合。
- シリアライズ化と逆シリアライズ化のパフォーマンスを改善する場合。
PythonのレコードをJavaのオペレーションに渡す場合 #
Javaのオペレーターや関数はPythonデータを識別できないため、処理のためにPythonの型をJavaの型に変換するのに役立つ型を提供する必要があります。 例えば、Javaで実装されたFlinkSinkを使ってデータを出力したい場合、型が提供される必要があります。
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink
def file_sink():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')]) \
.map(lambda record: (record[0]+1, record[1].upper()),
output_type=Types.ROW([Types.INT(), Types.STRING()])) \
.add_sink(FileSink
.for_row_format('/tmp/output', Encoder.simple_string_encoder())
.build())
env.execute()
if __name__ == '__main__':
file_sink()
シリアライズ化と逆シリアライズ化のパフォーマンスを改善する #
データはPickleを使ってシリアライズ化と逆シリアライズ化できますが、型が提供されるとパフォーマンスが向上します。 明示的な型により、PyFlinkはパイプライン経由でレコードを移動する時に効率的なserializerを使えます。
サポートされるデータ型 #
Python DataStream APIで型を定義するためにpyflink.common.typeinfo.Types
が使えます。
以下の表は、現在サポートされる型と、それらを定義する方法を示しています:
PyFlinkの型 | Pythonの型 | Javaの型 |
---|---|---|
Types.BOOLEAN() |
bool |
java.lang.Boolean |
Types.BYTE() |
int |
java.lang.Byte |
Types.SHORT() |
int |
java.lang.Short |
Types.INT() |
int |
java.lang.Integer |
Types.LONG() |
int |
java.lang.Long |
Types.FLOAT() |
float |
java.lang.Float |
Types.DOUBLE() |
float |
java.lang.Double |
Types.CHAR() |
str |
java.lang.Character |
Types.STRING() |
str |
java.lang.String |
Types.BIG_INT() |
int |
java.math.BigInteger |
Types.BIG_DEC() |
decimal.Decimal |
java.math.BigDecimal |
Types.INSTANT() |
pyflink.common.time.Instant |
java.time.Instant |
Types.TUPLE() |
tuple |
org.apache.flink.api.java.tuple.Tuple0 ~ org.apache.flink.api.java.tuple.Tuple25 |
Types.ROW() |
pyflink.common.Row |
org.apache.flink.types.Row |
Types.ROW_NAMED() |
pyflink.common.Row |
org.apache.flink.types.Row |
Types.MAP() |
dict |
java.util.Map |
Types.PICKLED_BYTE_ARRAY() |
The actual unpickled Python object |
byte[] |
Types.SQL_DATE() |
datetime.date |
java.sql.Date |
Types.SQL_TIME() |
datetime.time |
java.sql.Time |
Types.SQL_TIMESTAMP() |
datetime.datetime |
java.sql.Timestamp |
Types.LIST() |
list of Python object |
java.util.List |
以下の表はサポートされる配列型を示しています:
PyFlinkの配列型 | Pythonの型 | Javaの型 |
---|---|---|
Types.PRIMITIVE_ARRAY(Types.BYTE()) |
bytes |
byte[] |
Types.PRIMITIVE_ARRAY(Types.BOOLEAN()) |
list of bool |
boolean[] |
Types.PRIMITIVE_ARRAY(Types.SHORT()) |
list of int |
short[] |
Types.PRIMITIVE_ARRAY(Types.INT()) |
list of int |
int[] |
Types.PRIMITIVE_ARRAY(Types.LONG()) |
list of int |
long[] |
Types.PRIMITIVE_ARRAY(Types.FLOAT()) |
list of float |
float[] |
Types.PRIMITIVE_ARRAY(Types.DOUBLE()) |
list of float |
double[] |
Types.PRIMITIVE_ARRAY(Types.CHAR()) |
list of str |
char[] |
Types.BASIC_ARRAY(Types.BYTE()) |
list of int |
java.lang.Byte[] |
Types.BASIC_ARRAY(Types.BOOLEAN()) |
list of bool |
java.lang.Boolean[] |
Types.BASIC_ARRAY(Types.SHORT()) |
list of int |
java.lang.Short[] |
Types.BASIC_ARRAY(Types.INT()) |
list of int |
java.lang.Integer[] |
Types.BASIC_ARRAY(Types.LONG()) |
list of int |
java.lang.Long[] |
Types.BASIC_ARRAY(Types.FLOAT()) |
list of float |
java.lang.Float[] |
Types.BASIC_ARRAY(Types.DOUBLE()) |
list of float |
java.lang.Double[] |
Types.BASIC_ARRAY(Types.CHAR()) |
list of str |
java.lang.Character[] |
Types.BASIC_ARRAY(Types.STRING()) |
list of str |
java.lang.String[] |
Types.OBJECT_ARRAY() |
list of Python object |
Array |