This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Python DataStream APIの紹介 #
FlinkのDataStreamプログラムは、データストリームの変換(例えば、フィルタリング、状態の更新、ウィンドウの定義、集約など)を実装する通常のプログラムです。データストリームは、最初に様々なソース(例えば、メッセージキュー、ソケットストリーム、ファイル)から作成されます。結果はシンクから返されます。シンクは、データをファイルや標準出力(例えばコマンドラインターミナル)に書き込むことができます。
Python DataStream APIは、PythonユーザがPython DataStream APIジョブを書くことができるようにするDataStream APIのPythonバージョンです。
Python DataStream APIプログラムの共通構造 #
以下のコード例は、Python DataStream APIプログラムの共通構造を示しています。
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
from pyflink.datastream.connectors.number_seq import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor
class MyMapFunction(MapFunction):
def open(self, runtime_context: RuntimeContext):
state_desc = ValueStateDescriptor('cnt', Types.PICKLED_BYTE_ARRAY())
self.cnt_state = runtime_context.get_state(state_desc)
def map(self, value):
cnt = self.cnt_state.value()
if cnt is None or cnt < 2:
self.cnt_state.update(1 if cnt is None else cnt + 1)
return value[0], value[1] + 1
else:
return value[0], value[1]
def state_access_demo():
# 1. create a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
# 2. create source DataStream
seq_num_source = NumberSequenceSource(1, 10000)
ds = env.from_source(
source=seq_num_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name='seq_num_source',
type_info=Types.LONG())
# 3. define the execution logic
ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
.key_by(lambda a: a[0]) \
.map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))
# 4. create sink and emit result to sink
output_path = '/opt/output/'
file_sink = FileSink \
.for_row_format(output_path, Encoder.simple_string_encoder()) \
.with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \
.build()
ds.sink_to(file_sink)
# 5. execute the job
env.execute('state_access_demo')
if __name__ == '__main__':
state_access_demo()
StreamExecutionEnvironmentの作成 #
StreamExecutionEnvironment
はDataStream APIプログラムの中心的な概念です。
以下のコード例はStreamExecutionEnvironment
の作成法を示します:
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
DataStreamの作成 #
データストリームAPIは、Flinkプログラムのデータのコレクションを表すために使われる特別なDataStream
クラスから名前を取得します。これらは、重複を含むデータの不変のコレクションと考えることができます。データは有限または無制限のいずれかであり、データを操作するために使うAPIは同じです。
DataStream
は、使用方法に関しては通常のPython Collection
に似ていますが、いくつかの重要な点で全く異なります。これらは不変です。つまり、作成後に要素を追加または削除することはできません。また、単に内部の要素を検査するだけでなく、DataStream
APIオペレーション、変換とも呼ばれます、を使って要素を操作することもできます。
Flinkプログラムにソースを追加することで、初期のDataStream
を作成することもできます。
次に、そこから新しいストリームを派生し、map
、filter
などのAPIメソッドを使ってそれらを結合できます。
リストオブジェクトからの作成 #
リストオブジェクトからDataStream
を作成できます:
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_collection(
collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
type_info=Types.ROW([Types.INT(), Types.STRING()]))
パラメータtype_info
はオプションです。指定した場合は、返されるDataStream
の出力型がTypes.PICKLED_BYTE_ARRAY()
になります。
DataStreamコネクタを使った作成 #
以下のように、メソッドadd_source
でDataStreamコネクタを使ってDataStream
を作成することもできます:
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
env.add_jars("file:///path/to/flink-sql-connector-kafka.jar")
deserialization_schema = JsonRowDeserializationSchema.builder() \
.type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
kafka_consumer = FlinkKafkaConsumer(
topics='test_source_topic',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
ds = env.add_source(kafka_consumer)
注意 現在、メソッドadd_source
でDataStreamソースコネクタとして使われるFlinkKafkaConsumer
のみをサポートします。
注意 add_source
を使って作成されたDataStream
は、streaming
実行モードでのみ実行できます。
from_source
メソッドを呼び出して、統合DataStreamソースコネクタを使ってDataStream
を作成することもできます。
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.number_seq import NumberSequenceSource
env = StreamExecutionEnvironment.get_execution_environment()
seq_num_source = NumberSequenceSource(1, 1000)
ds = env.from_source(
source=seq_num_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name='seq_num_source',
type_info=Types.LONG())
注意 現在、統合DataStreamソースコネクタとしてNumberSequenceSource
とFileSource
のみをサポートします。
注意 from_source
を使って作成されたDataStream
は、batch
実行モードとstreaming
実行モードの両方で実行できます。
Table & SQLコネクタの作成 #
Table & SQLコネクタをDataStream
を作成するために使うこともできます。まず、Table & SQLコネクタを使ってTable
を作成し、次にそれをDataStream
に変換します。
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.execute_sql("""
CREATE TABLE my_source (
a INT,
b VARCHAR
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
""")
ds = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.INT(), Types.STRING()]))
注意 StreamExecutionEnvironment env
は、TableEnvironment t_env
を作成する時に指定する必要があります。
注意 全てのJava Table & SQLコネクタは、PyFlink Table APIで使えます。これはそれらすべてがPyFlink DataStream APIで使えることも意味します。
データストリームの変換 #
オペレータは、1つ以上のDataStream
を新しいDataStream
に変換します。プログラムは複数の変換を組み合わせて洗練されたデータフロートポロジーを作成できます。
以下の例は、DataStream
を別のmap
変換を使ってDataStream
に変換する方法の簡単な例を示しています:
ds = ds.map(lambda a: a + 1)
利用可能なDataStream変換の概要については、operatorsを参照してください。
DataStreamとテーブル間の変換 #
DataStream
からTable
への変換、およびその逆の変換もサポートします。
# convert a DataStream to a Table
table = t_env.from_data_stream(ds, 'a, b, c')
# convert a Table to a DataStream
ds = t_env.to_append_stream(table, Types.ROW([Types.INT(), Types.STRING()]))
# or
ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()]))
結果の発行 #
Print #
print
メソッドを呼び出すを呼び出して、DataStream
のデータを標準出力に出力することができます:
ds.print()
クライアントの結果の収集 #
execute_and_collect
メソッドを呼び出して、DataStream
のデータをクライアントに収集することができます:
with ds.execute_and_collect() as results:
for result in results:
print(result)
注意 execute_and_collect
はDataStream
のデータをクライアントのメモリに収集するため、収集する行数を制限することは良い習慣です。
結果をDataStreamシンクコネクタに発行 #
add_sink
メソッドを呼び出してDataStream
のデータをDataStreamシンクコネクタに発行できます:
from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer
from pyflink.datastream.formats.json import JsonRowSerializationSchema
serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
kafka_producer = FlinkKafkaProducer(
topic='test_sink_topic',
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
ds.add_sink(kafka_producer)
注意 現在、メソッドadd_sink
を使ってDetaStreamシンクコネクタとして使われるFlinkKafkaProducerとJdbcSinkのみがサポートされます。
注意 メソッドadd_sink
はstreaming
実行モードのみで使えます。
executing mode.
sink_to
メソッドを呼び出して、DataStream
のデータを統合DataStreamシンクコネクタに発行することもできます。
sink connector:
from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
from pyflink.common.serialization import Encoder
output_path = '/opt/output/'
file_sink = FileSink \
.for_row_format(output_path, Encoder.simple_string_encoder()) \
.with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \
.build()
ds.sink_to(file_sink)
注意 現在、統合DataStreamシンクコネクタとしてFileSink
のみをサポートします。
注意 メソッドsink_to
はbatch
実行モードとstreaming
実行モードの両方で使えます。
結果をTable & SQLシンクコネクタに発行 #
Table & SQLコネクタをDataStream
を書き出すために使うこともできます。まず、DataStream
をTable
に変換し、次にそれをTable & SQLシンクコネクタに書き出します。
from pyflink.common import Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
# option 1:the result type of ds is Types.ROW
def split(s):
splits = s[1].split("|")
for sp in splits:
yield Row(s[0], sp)
ds = ds.map(lambda i: (i[0] + 1, i[1])) \
.flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \
.key_by(lambda i: i[1]) \
.reduce(lambda i, j: Row(i[0] + j[0], i[1]))
# option 1:the result type of ds is Types.TUPLE
def split(s):
splits = s[1].split("|")
for sp in splits:
yield s[0], sp
ds = ds.map(lambda i: (i[0] + 1, i[1])) \
.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
.key_by(lambda i: i[1]) \
.reduce(lambda i, j: (i[0] + j[0], i[1]))
# emit ds to print sink
t_env.execute_sql("""
CREATE TABLE my_sink (
a INT,
b VARCHAR
) WITH (
'connector' = 'print'
)
""")
table = t_env.from_data_stream(ds)
table_result = table.execute_insert("my_sink")
注意 DataStream ds
の出力の型は複合型である必要があります。
ジョブの送信 #
最後に、StreamExecutionEnvironment.execute
メソッドを呼び出して、実行のためのDataStream APIジョブを送信する必要があります:
env.execute()
DataStream
をTable
に変換してそれをTable API & SQLシンクコネクタに書き込む場合、TableEnvironment.execute
メソッドを使ってジョブを送信する必要がある場合があります。
t_env.execute()