Row-based Operations
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

行ベースのオペレーション #

このページでは、PyFlink Table APIで行ベースのオペレーションを使う方法を説明します。

マップ #

Pythonの一般スカラー関数またはベクトル化スカラー関数を使ってmapオペレーションを実行します。 出力型が複合型の場合、出力はフラット化されます。

from pyflink.common import Row
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
from pyflink.table.udf import udf

env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])

@udf(result_type='ROW<id BIGINT, data STRING>')
def func1(id: int, data: str) -> Row:
    return Row(id, data * 2)

# the input columns are specified as the inputs
table.map(func1(col('id'), col('data'))).execute().print()
# result is 
#+----------------------+--------------------------------+
#|                   id |                           data |
#+----------------------+--------------------------------+
#|                    1 |                           HiHi |
#|                    2 |                     HelloHello |
#+----------------------+--------------------------------+

また、Rowオブジェクト(入力tableの全ての列を含む)を入力として受け取ることもサポートされます。

@udf(result_type='ROW<id BIGINT, data STRING>')
def func2(data: Row) -> Row:
    return Row(data.id, data.data * 2)

# specify the function without the input columns
table.map(func2).execute().print()
# result is 
#+----------------------+--------------------------------+
#|                   id |                           data |
#+----------------------+--------------------------------+
#|                    1 |                           HiHi |
#|                    2 |                     HelloHello |
#+----------------------+--------------------------------+

注意 mapオペレーションでfunc2を使う場合は、入力カラムを指定しないでください。

mapオペレーションでベクトル化スカラ関数の使用もサポートされます。 この場合、入力型と出力型はRowではなく、pandas.DataFrameである必要があることに注意してください。

import pandas as pd
@udf(result_type='ROW<id BIGINT, data STRING>', func_type='pandas')
def func3(data: pd.DataFrame) -> pd.DataFrame:
    res = pd.concat([data.id, data.data * 2], axis=1)
    return res

table.map(func3).execute().print()
# result is 
#+----------------------+--------------------------------+
#|                   id |                           data |
#+----------------------+--------------------------------+
#|                    1 |                           HiHi |
#|                    2 |                     HelloHello |
#+----------------------+--------------------------------+

FlatMap #

python table関数を使って、flat_mapオペレーションを実行します。

from pyflink.common import Row
from pyflink.table.udf import udtf
from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

table = table_env.from_elements([(1, 'Hi,Flink'), (2, 'Hello')], ['id', 'data'])

@udtf(result_types=['INT', 'STRING'])
def split(x: Row) -> Row:
    for s in x.data.split(","):
        yield x.id, s

# use split in `flat_map`
table.flat_map(split).execute().print()
# result is
#+-------------+--------------------------------+
#|          f0 |                             f1 |
#+-------------+--------------------------------+
#|           1 |                             Hi |
#|           1 |                          Flink |
#|           2 |                          Hello |
#+-------------+--------------------------------+

python table関数join_lateralleft_outer_join_lateralでも使えます。

# use table function in `join_lateral` or `left_outer_join_lateral`
table.join_lateral(split.alias('a', 'b')).execute().print()
# result is 
#+----------------------+--------------------------------+-------------+--------------------------------+
#|                   id |                           data |           a |                              b |
#+----------------------+--------------------------------+-------------+--------------------------------+
#|                    1 |                       Hi,Flink |           1 |                             Hi |
#|                    1 |                       Hi,Flink |           1 |                          Flink |
#|                    2 |                          Hello |           2 |                          Hello |
#+----------------------+--------------------------------+-------------+--------------------------------+

集約 #

python 一般集計関数またはベクトル化集計関数を使って、aggregateオペレーションを実行します。

from pyflink.common import Row
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
from pyflink.table.udf import AggregateFunction, udaf

class CountAndSumAggregateFunction(AggregateFunction):

    def get_value(self, accumulator):
        return Row(accumulator[0], accumulator[1])

    def create_accumulator(self):
        return Row(0, 0)

    def accumulate(self, accumulator, row):
        accumulator[0] += 1
        accumulator[1] += row.b

    def retract(self, accumulator, row):
        accumulator[0] -= 1
        accumulator[1] -= row.b

    def merge(self, accumulator, accumulators):
        for other_acc in accumulators:
            accumulator[0] += other_acc[0]
            accumulator[1] += other_acc[1]

    def get_accumulator_type(self):
        return 'ROW<a BIGINT, b BIGINT>'

    def get_result_type(self):
        return 'ROW<a BIGINT, b BIGINT>'

function = CountAndSumAggregateFunction()
agg = udaf(function,
           result_type=function.get_result_type(),
           accumulator_type=function.get_accumulator_type(),
           name=str(function.__class__.__name__))

# aggregate with a python general aggregate function

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])

result = t.group_by(col('a')) \
    .aggregate(agg.alias("c", "d")) \
    .select(col('a'), col('c'), col('d'))
result.execute().print()

# the result is
#+----+----------------------+----------------------+----------------------+
#| op |                    a |                    c |                    d |
#+----+----------------------+----------------------+----------------------+
#| +I |                    1 |                    2 |                    5 |
#| +I |                    2 |                    1 |                    1 |
#+----+----------------------+----------------------+----------------------+

# aggregate with a python vectorized aggregate function
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])

pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                   result_type='ROW<a FLOAT, b INT>',
                   func_type="pandas")
t.aggregate(pandas_udaf.alias("a", "b")) \
 .select(col('a'), col('b')).execute().print()

# the result is
#+--------------------------------+-------------+
#|                              a |           b |
#+--------------------------------+-------------+
#|                            2.0 |           3 |
#+--------------------------------+-------------+

注意 mapオペレーションと同様に、aggregateオペレーションで入力列無しで集計関数を指定する場合、グループ化キーを含む入力テーブル全ての列を含む入力としてのRowまたはPandas.DataFrameを受け取ります。 注意 selectステートメントを使って"aggregate"を閉じる必要があり、selectステートメントに集計関数を含むことはできません。 また、複合型の場合、集計の出力はフラット化されます。

FlatAggregate #

pythonの一般的なTable集計関数を使ってflat_aggregateオペレーションを実行します。

GroupBy Aggregationと同様に、FlatAggregateはグループ化キーで入力をグループ化します。 AggregateFunctionとは異なり、TableAggregateFunctionはグループ化キーに対して0、1、それ以上のレコードを返す可能性があります。 aggregateと同様に、selectステートメントを使ってflat_aggregateを閉じる必要があり、selectステートメントは集計関数を含めないでください。

from pyflink.common import Row
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col
from pyflink.table.udf import udtaf, TableAggregateFunction

class Top2(TableAggregateFunction):

    def emit_value(self, accumulator):
        yield Row(accumulator[0])
        yield Row(accumulator[1])

    def create_accumulator(self):
        return [None, None]

    def accumulate(self, accumulator, row):
        if row.a is not None:
            if accumulator[0] is None or row.a > accumulator[0]:
                accumulator[1] = accumulator[0]
                accumulator[0] = row.a
            elif accumulator[1] is None or row.a > accumulator[1]:
                accumulator[1] = row.a

    def get_accumulator_type(self):
        return 'ARRAY<BIGINT>'

    def get_result_type(self):
        return 'ROW<a BIGINT>'


env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# the result type and accumulator type can also be specified in the udtaf decorator:
# top2 = udtaf(Top2(), result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]), accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))
top2 = udtaf(Top2())
t = table_env.from_elements([(1, 'Hi', 'Hello'),
                             (3, 'Hi', 'hi'),
                             (5, 'Hi2', 'hi'),
                             (7, 'Hi', 'Hello'),
                             (2, 'Hi', 'Hello')],
                            ['a', 'b', 'c'])

# call function "inline" without registration in Table API
result = t.group_by(col('b')).flat_aggregate(top2).select(col('*')).execute().print()

# the result is:
#+----+--------------------------------+----------------------+
#| op |                              b |                    a |
#+----+--------------------------------+----------------------+
#| +I |                            Hi2 |                    5 |
#| +I |                            Hi2 |               <NULL> |
#| +I |                             Hi |                    7 |
#| +I |                             Hi |                    3 |
#+----+--------------------------------+----------------------+
inserted by FC2 system