This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
行ベースのオペレーション #
このページでは、PyFlink Table APIで行ベースのオペレーションを使う方法を説明します。
マップ #
Pythonの一般スカラー関数またはベクトル化スカラー関数を使ってmap
オペレーションを実行します。
出力型が複合型の場合、出力はフラット化されます。
from pyflink.common import Row
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
from pyflink.table.udf import udf
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
@udf(result_type='ROW<id BIGINT, data STRING>')
def func1(id: int, data: str) -> Row:
return Row(id, data * 2)
# the input columns are specified as the inputs
table.map(func1(col('id'), col('data'))).execute().print()
# result is
#+----------------------+--------------------------------+
#| id | data |
#+----------------------+--------------------------------+
#| 1 | HiHi |
#| 2 | HelloHello |
#+----------------------+--------------------------------+
また、Rowオブジェクト(入力tableの全ての列を含む)を入力として受け取ることもサポートされます。
@udf(result_type='ROW<id BIGINT, data STRING>')
def func2(data: Row) -> Row:
return Row(data.id, data.data * 2)
# specify the function without the input columns
table.map(func2).execute().print()
# result is
#+----------------------+--------------------------------+
#| id | data |
#+----------------------+--------------------------------+
#| 1 | HiHi |
#| 2 | HelloHello |
#+----------------------+--------------------------------+
注意 mapオペレーションでfunc2を使う場合は、入力カラムを指定しないでください。
map
オペレーションでベクトル化スカラ関数の使用もサポートされます。
この場合、入力型と出力型はRowではなく、pandas.DataFrameである必要があることに注意してください。
import pandas as pd
@udf(result_type='ROW<id BIGINT, data STRING>', func_type='pandas')
def func3(data: pd.DataFrame) -> pd.DataFrame:
res = pd.concat([data.id, data.data * 2], axis=1)
return res
table.map(func3).execute().print()
# result is
#+----------------------+--------------------------------+
#| id | data |
#+----------------------+--------------------------------+
#| 1 | HiHi |
#| 2 | HelloHello |
#+----------------------+--------------------------------+
FlatMap #
python table関数を使って、flat_map
オペレーションを実行します。
from pyflink.common import Row
from pyflink.table.udf import udtf
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
table = table_env.from_elements([(1, 'Hi,Flink'), (2, 'Hello')], ['id', 'data'])
@udtf(result_types=['INT', 'STRING'])
def split(x: Row) -> Row:
for s in x.data.split(","):
yield x.id, s
# use split in `flat_map`
table.flat_map(split).execute().print()
# result is
#+-------------+--------------------------------+
#| f0 | f1 |
#+-------------+--------------------------------+
#| 1 | Hi |
#| 1 | Flink |
#| 2 | Hello |
#+-------------+--------------------------------+
python table関数をjoin_lateral
とleft_outer_join_lateral
でも使えます。
# use table function in `join_lateral` or `left_outer_join_lateral`
table.join_lateral(split.alias('a', 'b')).execute().print()
# result is
#+----------------------+--------------------------------+-------------+--------------------------------+
#| id | data | a | b |
#+----------------------+--------------------------------+-------------+--------------------------------+
#| 1 | Hi,Flink | 1 | Hi |
#| 1 | Hi,Flink | 1 | Flink |
#| 2 | Hello | 2 | Hello |
#+----------------------+--------------------------------+-------------+--------------------------------+
集約 #
python 一般集計関数またはベクトル化集計関数を使って、aggregate
オペレーションを実行します。
from pyflink.common import Row
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
from pyflink.table.udf import AggregateFunction, udaf
class CountAndSumAggregateFunction(AggregateFunction):
def get_value(self, accumulator):
return Row(accumulator[0], accumulator[1])
def create_accumulator(self):
return Row(0, 0)
def accumulate(self, accumulator, row):
accumulator[0] += 1
accumulator[1] += row.b
def retract(self, accumulator, row):
accumulator[0] -= 1
accumulator[1] -= row.b
def merge(self, accumulator, accumulators):
for other_acc in accumulators:
accumulator[0] += other_acc[0]
accumulator[1] += other_acc[1]
def get_accumulator_type(self):
return 'ROW<a BIGINT, b BIGINT>'
def get_result_type(self):
return 'ROW<a BIGINT, b BIGINT>'
function = CountAndSumAggregateFunction()
agg = udaf(function,
result_type=function.get_result_type(),
accumulator_type=function.get_accumulator_type(),
name=str(function.__class__.__name__))
# aggregate with a python general aggregate function
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
result = t.group_by(col('a')) \
.aggregate(agg.alias("c", "d")) \
.select(col('a'), col('c'), col('d'))
result.execute().print()
# the result is
#+----+----------------------+----------------------+----------------------+
#| op | a | c | d |
#+----+----------------------+----------------------+----------------------+
#| +I | 1 | 2 | 5 |
#| +I | 2 | 1 | 1 |
#+----+----------------------+----------------------+----------------------+
# aggregate with a python vectorized aggregate function
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
result_type='ROW<a FLOAT, b INT>',
func_type="pandas")
t.aggregate(pandas_udaf.alias("a", "b")) \
.select(col('a'), col('b')).execute().print()
# the result is
#+--------------------------------+-------------+
#| a | b |
#+--------------------------------+-------------+
#| 2.0 | 3 |
#+--------------------------------+-------------+
注意 map
オペレーションと同様に、aggregate
オペレーションで入力列無しで集計関数を指定する場合、グループ化キーを含む入力テーブル全ての列を含む入力としてのRowまたはPandas.DataFrameを受け取ります。
注意 selectステートメントを使って"aggregate"を閉じる必要があり、selectステートメントに集計関数を含むことはできません。
また、複合型の場合、集計の出力はフラット化されます。
FlatAggregate #
pythonの一般的なTable集計関数を使ってflat_aggregate
オペレーションを実行します。
GroupBy Aggregation
と同様に、FlatAggregate
はグループ化キーで入力をグループ化します。
AggregateFunction
とは異なり、TableAggregateFunction
はグループ化キーに対して0、1、それ以上のレコードを返す可能性があります。
aggregate
と同様に、selectステートメントを使ってflat_aggregate
を閉じる必要があり、selectステートメントは集計関数を含めないでください。
from pyflink.common import Row
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col
from pyflink.table.udf import udtaf, TableAggregateFunction
class Top2(TableAggregateFunction):
def emit_value(self, accumulator):
yield Row(accumulator[0])
yield Row(accumulator[1])
def create_accumulator(self):
return [None, None]
def accumulate(self, accumulator, row):
if row.a is not None:
if accumulator[0] is None or row.a > accumulator[0]:
accumulator[1] = accumulator[0]
accumulator[0] = row.a
elif accumulator[1] is None or row.a > accumulator[1]:
accumulator[1] = row.a
def get_accumulator_type(self):
return 'ARRAY<BIGINT>'
def get_result_type(self):
return 'ROW<a BIGINT>'
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# the result type and accumulator type can also be specified in the udtaf decorator:
# top2 = udtaf(Top2(), result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]), accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))
top2 = udtaf(Top2())
t = table_env.from_elements([(1, 'Hi', 'Hello'),
(3, 'Hi', 'hi'),
(5, 'Hi2', 'hi'),
(7, 'Hi', 'Hello'),
(2, 'Hi', 'Hello')],
['a', 'b', 'c'])
# call function "inline" without registration in Table API
result = t.group_by(col('b')).flat_aggregate(top2).select(col('*')).execute().print()
# the result is:
#+----+--------------------------------+----------------------+
#| op | b | a |
#+----+--------------------------------+----------------------+
#| +I | Hi2 | 5 |
#| +I | Hi2 | <NULL> |
#| +I | Hi | 7 |
#| +I | Hi | 3 |
#+----+--------------------------------+----------------------+