Conversions between PyFlink Table and Pandas DataFrame
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

PyFlink TableとPnadas DataFrameの間の変換 #

PyFlink Table APIはPyFlink TableとPandas DataFrameの間の変換をサポートします。

Pandas DataFramesはPyFlink Tableに変換できます。 内部的には、PyFlinkはクライアント上でArrow列形式を使ってPandas DataFrameをシリアル化します。 シリアル化されたデータは実行中にArrowソースで処理および逆シリアル化されます。 Arrowソースはストリーミングジョブでも使うことができ、チェックポイントと統合されて、確実に1回の保証を提供します。

以下の例はPnadas DataFrameからPyFlink Tableを作成する方法を示しています:

from pyflink.table import DataTypes

import pandas as pd
import numpy as np

# Create a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(1000, 2))

# Create a PyFlink Table from a Pandas DataFrame
table = t_env.from_pandas(pdf)

# Create a PyFlink Table from a Pandas DataFrame with the specified column names
table = t_env.from_pandas(pdf, ['f0', 'f1'])

# Create a PyFlink Table from a Pandas DataFrame with the specified column types
table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()])

# Create a PyFlink Table from a Pandas DataFrame with the specified row type
table = t_env.from_pandas(pdf,
                          DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.DOUBLE()),
                                         DataTypes.FIELD("f1", DataTypes.DOUBLE())]))

さらに、PyFlink TablesはPandas DataFrameに変換できます。 結果の行は、クライアント上でArrow列形式の複数のArrowバッチとしてシリアル化されます。 Arrowバッチの最大サイズは、オプションpython.fn-execution.arrow.batch.sizeで設定されます。 次に、シリアル化されたデータはPandas DataFrameに変換されます。 テーブルの内容はクライアント上で収集されるため、このメソッドが呼ばれる前にテーブルの結果がメモリ内に収まることを確認してください。 Table.limit を使って、クライアント側に収集される行数を制限できます。

以下の例は、PyFlink TableをPandas DataFrameに変換する方法を示しています:

from pyflink.table.expressions import col

import pandas as pd
import numpy as np

# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)

# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.limit(100).to_pandas()
inserted by FC2 system