This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Conversions between PyFlink Table and Pandas DataFrame
PyFlink TableとPnadas DataFrameの間の変換 #
PyFlink Table APIはPyFlink TableとPandas DataFrameの間の変換をサポートします。
Pandas DataFrameをPyFlink Tableに変換する #
Pandas DataFramesはPyFlink Tableに変換できます。 内部的には、PyFlinkはクライアント上でArrow列形式を使ってPandas DataFrameをシリアル化します。 シリアル化されたデータは実行中にArrowソースで処理および逆シリアル化されます。 Arrowソースはストリーミングジョブでも使うことができ、チェックポイントと統合されて、確実に1回の保証を提供します。
以下の例はPnadas DataFrameからPyFlink Tableを作成する方法を示しています:
from pyflink.table import DataTypes
import pandas as pd
import numpy as np
# Create a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(1000, 2))
# Create a PyFlink Table from a Pandas DataFrame
table = t_env.from_pandas(pdf)
# Create a PyFlink Table from a Pandas DataFrame with the specified column names
table = t_env.from_pandas(pdf, ['f0', 'f1'])
# Create a PyFlink Table from a Pandas DataFrame with the specified column types
table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()])
# Create a PyFlink Table from a Pandas DataFrame with the specified row type
table = t_env.from_pandas(pdf,
DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.DOUBLE()),
DataTypes.FIELD("f1", DataTypes.DOUBLE())]))
PyFlink TableをPandas DataFrameに変換する #
さらに、PyFlink TablesはPandas DataFrameに変換できます。 結果の行は、クライアント上でArrow列形式の複数のArrowバッチとしてシリアル化されます。 Arrowバッチの最大サイズは、オプションpython.fn-execution.arrow.batch.sizeで設定されます。 次に、シリアル化されたデータはPandas DataFrameに変換されます。 テーブルの内容はクライアント上で収集されるため、このメソッドが呼ばれる前にテーブルの結果がメモリ内に収まることを確認してください。 Table.limit を使って、クライアント側に収集される行数を制限できます。
以下の例は、PyFlink TableをPandas DataFrameに変換する方法を示しています:
from pyflink.table.expressions import col
import pandas as pd
import numpy as np
# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)
# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.limit(100).to_pandas()