Intro to the Python DataStream API
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Python DataStream APIの紹介 #

FlinkのDataStreamプログラムは、データストリームの変換(例えば、フィルタリング、状態の更新、ウィンドウの定義、集約など)を実装する通常のプログラムです。データストリームは、最初に様々なソース(例えば、メッセージキュー、ソケットストリーム、ファイル)から作成されます。結果はシンクから返されます。シンクは、データをファイルや標準出力(例えばコマンドラインターミナル)に書き込むことができます。

Python DataStream APIは、PythonユーザがPython DataStream APIジョブを書くことができるようにするDataStream APIのPythonバージョンです。

Python DataStream APIプログラムの共通構造 #

以下のコード例は、Python DataStream APIプログラムの共通構造を示しています。

from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
from pyflink.datastream.connectors.number_seq import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor


class MyMapFunction(MapFunction):

    def open(self, runtime_context: RuntimeContext):
        state_desc = ValueStateDescriptor('cnt', Types.PICKLED_BYTE_ARRAY())
        self.cnt_state = runtime_context.get_state(state_desc)

    def map(self, value):
        cnt = self.cnt_state.value()
        if cnt is None or cnt < 2:
            self.cnt_state.update(1 if cnt is None else cnt + 1)
            return value[0], value[1] + 1
        else:
            return value[0], value[1]


def state_access_demo():
    # 1. create a StreamExecutionEnvironment
    env = StreamExecutionEnvironment.get_execution_environment()

    # 2. create source DataStream
    seq_num_source = NumberSequenceSource(1, 10000)
    ds = env.from_source(
        source=seq_num_source,
        watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
        source_name='seq_num_source',
        type_info=Types.LONG())

    # 3. define the execution logic
    ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
           .key_by(lambda a: a[0]) \
           .map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))

    # 4. create sink and emit result to sink
    output_path = '/opt/output/'
    file_sink = FileSink \
        .for_row_format(output_path, Encoder.simple_string_encoder()) \
        .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \
        .build()
    ds.sink_to(file_sink)

    # 5. execute the job
    env.execute('state_access_demo')


if __name__ == '__main__':
    state_access_demo()

Back to top

StreamExecutionEnvironmentの作成 #

StreamExecutionEnvironmentはDataStream APIプログラムの中心的な概念です。 以下のコード例はStreamExecutionEnvironmentの作成法を示します:

from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()

Back to top

DataStreamの作成 #

データストリームAPIは、Flinkプログラムのデータのコレクションを表すために使われる特別なDataStreamクラスから名前を取得します。これらは、重複を含むデータの不変のコレクションと考えることができます。データは有限または無制限のいずれかであり、データを操作するために使うAPIは同じです。

DataStreamは、使用方法に関しては通常のPython Collectionに似ていますが、いくつかの重要な点で全く異なります。これらは不変です。つまり、作成後に要素を追加または削除することはできません。また、単に内部の要素を検査するだけでなく、DataStream APIオペレーション、変換とも呼ばれます、を使って要素を操作することもできます。

Flinkプログラムにソースを追加することで、初期のDataStreamを作成することもできます。 次に、そこから新しいストリームを派生し、mapfilterなどのAPIメソッドを使ってそれらを結合できます。

リストオブジェクトからの作成 #

リストオブジェクトからDataStreamを作成できます:

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_collection(
    collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))

パラメータtype_infoはオプションです。指定した場合は、返されるDataStreamの出力型がTypes.PICKLED_BYTE_ARRAY()になります。

DataStreamコネクタを使った作成 #

以下のように、メソッドadd_sourceでDataStreamコネクタを使ってDataStreamを作成することもできます:

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowDeserializationSchema

env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
env.add_jars("file:///path/to/flink-sql-connector-kafka.jar")

deserialization_schema = JsonRowDeserializationSchema.builder() \
    .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()

kafka_consumer = FlinkKafkaConsumer(
    topics='test_source_topic',
    deserialization_schema=deserialization_schema,
    properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})

ds = env.add_source(kafka_consumer)

注意 現在、メソッドadd_sourceでDataStreamソースコネクタとして使われるFlinkKafkaConsumerのみをサポートします。

注意 add_sourceを使って作成されたDataStreamは、streaming実行モードでのみ実行できます。

from_sourceメソッドを呼び出して、統合DataStreamソースコネクタを使ってDataStreamを作成することもできます。

from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.number_seq import NumberSequenceSource

env = StreamExecutionEnvironment.get_execution_environment()
seq_num_source = NumberSequenceSource(1, 1000)
ds = env.from_source(
    source=seq_num_source,
    watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
    source_name='seq_num_source',
    type_info=Types.LONG())

注意 現在、統合DataStreamソースコネクタとしてNumberSequenceSourceFileSourceのみをサポートします。

注意 from_sourceを使って作成されたDataStreamは、batch実行モードとstreaming実行モードの両方で実行できます。

Table & SQLコネクタの作成 #

Table & SQLコネクタをDataStreamを作成するために使うこともできます。まず、Table & SQLコネクタを使ってTableを作成し、次にそれをDataStreamに変換します。

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)

t_env.execute_sql("""
        CREATE TABLE my_source (
          a INT,
          b VARCHAR
        ) WITH (
          'connector' = 'datagen',
          'number-of-rows' = '10'
        )
    """)

ds = t_env.to_append_stream(
    t_env.from_path('my_source'),
    Types.ROW([Types.INT(), Types.STRING()]))

注意 StreamExecutionEnvironment envは、TableEnvironment t_envを作成する時に指定する必要があります。

注意 全てのJava Table & SQLコネクタは、PyFlink Table APIで使えます。これはそれらすべてがPyFlink DataStream APIで使えることも意味します。

Back to top

データストリームの変換 #

オペレータは、1つ以上のDataStreamを新しいDataStreamに変換します。プログラムは複数の変換を組み合わせて洗練されたデータフロートポロジーを作成できます。

以下の例は、DataStreamを別のmap変換を使ってDataStreamに変換する方法の簡単な例を示しています:

ds = ds.map(lambda a: a + 1)

利用可能なDataStream変換の概要については、operatorsを参照してください。

DataStreamとテーブル間の変換 #

DataStreamからTableへの変換、およびその逆の変換もサポートします。

# convert a DataStream to a Table
table = t_env.from_data_stream(ds, 'a, b, c')

# convert a Table to a DataStream
ds = t_env.to_append_stream(table, Types.ROW([Types.INT(), Types.STRING()]))
# or
ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()]))

Back to top

結果の発行 #

Print #

printメソッドを呼び出すを呼び出して、DataStream のデータを標準出力に出力することができます:

ds.print()

クライアントの結果の収集 #

execute_and_collectメソッドを呼び出して、DataStreamのデータをクライアントに収集することができます:

with ds.execute_and_collect() as results:
    for result in results:
        print(result)

注意 execute_and_collectDataStreamのデータをクライアントのメモリに収集するため、収集する行数を制限することは良い習慣です。

結果をDataStreamシンクコネクタに発行 #

add_sinkメソッドを呼び出してDataStreamのデータをDataStreamシンクコネクタに発行できます:

from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer
from pyflink.datastream.formats.json import JsonRowSerializationSchema

serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
    type_info=Types.ROW([Types.INT(), Types.STRING()])).build()

kafka_producer = FlinkKafkaProducer(
    topic='test_sink_topic',
    serialization_schema=serialization_schema,
    producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})

ds.add_sink(kafka_producer)

注意 現在、メソッドadd_sinkを使ってDetaStreamシンクコネクタとして使われるFlinkKafkaProducerとJdbcSinkのみがサポートされます。

注意 メソッドadd_sinkstreaming実行モードのみで使えます。 executing mode.

sink_toメソッドを呼び出して、DataStreamのデータを統合DataStreamシンクコネクタに発行することもできます。 sink connector:

from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
from pyflink.common.serialization import Encoder

output_path = '/opt/output/'
file_sink = FileSink \
    .for_row_format(output_path, Encoder.simple_string_encoder()) \
    .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \
    .build()
ds.sink_to(file_sink)

注意 現在、統合DataStreamシンクコネクタとしてFileSinkのみをサポートします。

注意 メソッドsink_tobatch実行モードとstreaming実行モードの両方で使えます。

結果をTable & SQLシンクコネクタに発行 #

Table & SQLコネクタをDataStreamを書き出すために使うこともできます。まず、DataStreamTableに変換し、次にそれをTable & SQLシンクコネクタに書き出します。

from pyflink.common import Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)

# option 1:the result type of ds is Types.ROW
def split(s):
    splits = s[1].split("|")
    for sp in splits:
        yield Row(s[0], sp)

ds = ds.map(lambda i: (i[0] + 1, i[1])) \
       .flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \
       .key_by(lambda i: i[1]) \
       .reduce(lambda i, j: Row(i[0] + j[0], i[1]))

# option 1:the result type of ds is Types.TUPLE
def split(s):
    splits = s[1].split("|")
    for sp in splits:
        yield s[0], sp

ds = ds.map(lambda i: (i[0] + 1, i[1])) \
       .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
       .key_by(lambda i: i[1]) \
       .reduce(lambda i, j: (i[0] + j[0], i[1]))

# emit ds to print sink
t_env.execute_sql("""
        CREATE TABLE my_sink (
          a INT,
          b VARCHAR
        ) WITH (
          'connector' = 'print'
        )
    """)

table = t_env.from_data_stream(ds)
table_result = table.execute_insert("my_sink")

注意 DataStream dsの出力の型は複合型である必要があります。

ジョブの送信 #

最後に、StreamExecutionEnvironment.executeメソッドを呼び出して、実行のためのDataStream APIジョブを送信する必要があります:

env.execute()

DataStreamTableに変換してそれをTable API & SQLシンクコネクタに書き込む場合、TableEnvironment.executeメソッドを使ってジョブを送信する必要がある場合があります。

t_env.execute()
inserted by FC2 system