This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Python REPL #
Flinkには、統合されたインタラクティブなPythonシェルが付属しています。 それはローカルセットアップおよびクラスタセットアップで使うことができます。 ローカルFlinkのセットアップ方法の詳細については、スタンドアローンリソースプロバイダのページを参照してください。 ソースからローカルセットアップをビルドすることもできます。
注意 Pythonシェルはコマンド“python”を実行します。Python実行環境のセットアップ方法については、Python Table API インストレーションガイドを参照してください。
統合されたFlinkクラスタでシェルを使うには、PyPiでPyFlinkをインストールし、シェルを直接実行するだけです:
# install PyFlink
$ python -m pip install apache-flink
# execute the shell
$ pyflink-shell.sh local
クラスタでシェルを実行するには、以下のセットアップセクションを参照してください。
使い方 #
現在シェルはTable APIのみをサポートします。 Table Environmentsは起動後に自動的に事前バインドされます。 BatchTableEnvironmentとStreamTableEnvironmentにアクセスするいは、それぞれ"bt_env"と"st_env"を使います。
テーブルAPI #
以下の例はPythonシェルの単純なプログラムです:
>>> import tempfile
>>> import os
>>> import shutil
>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
>>> if os.path.exists(sink_path):
... if os.path.isfile(sink_path):
... os.remove(sink_path)
... else:
... shutil.rmtree(sink_path)
>>> s_env.set_parallelism(1)
>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
>>> st_env.create_temporary_table("stream_sink", TableDescriptor.for_connector("filesystem")
... .schema(Schema.new_builder()
... .column("a", DataTypes.BIGINT())
... .column("b", DataTypes.STRING())
... .column("c", DataTypes.STRING())
... .build())
... .option("path", path)
... .format(FormatDescriptor.for_format("csv")
... .option("field-delimiter", ",")
... .build())
... .build())
>>> t.select("a + 1, b, c")\
... .execute_insert("stream_sink").wait()
>>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
>>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
... print(f.read())
>>> import tempfile
>>> import os
>>> import shutil
>>> sink_path = tempfile.gettempdir() + '/batch.csv'
>>> if os.path.exists(sink_path):
... if os.path.isfile(sink_path):
... os.remove(sink_path)
... else:
... shutil.rmtree(sink_path)
>>> b_env.set_parallelism(1)
>>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
>>> st_env.create_temporary_table("batch_sink", TableDescriptor.for_connector("filesystem")
... .schema(Schema.new_builder()
... .column("a", DataTypes.BIGINT())
... .column("b", DataTypes.STRING())
... .column("c", DataTypes.STRING())
... .build())
... .option("path", path)
... .format(FormatDescriptor.for_format("csv")
... .option("field-delimiter", ",")
... .build())
... .build())
>>> t.select("a + 1, b, c")\
... .execute_insert("batch_sink").wait()
>>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
>>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
... print(f.read())
セットアップ #
Pythonシェルが提供するオプションの概要を取得するには、以下を使ってください
pyflink-shell.sh --help
ローカル #
統合されたFlinkクラスタと一緒にシェルを使うには、単に以下のように実行します:
pyflink-shell.sh local
リモート #
実行中のクラスタで使うには、キーワードremote
を使ってPythonシェルを起動し、JobManagerのホストとポートを以下のように指定します:
pyflink-shell.sh remote <hostname> <portnumber>
Yarn Pythonシェルクラスタ #
シェルは、YARNにFlinkクラスタをデプロイできます。これはシェルによって排他的に使われます。 shell. シェルはYARNに新しいFlinkクラスタをデプロイし、クラスタに接続します。JobManagerのメモリ、YARNアプリケーション名など、YARNクラスタのオプションを指定することもできます。
例えば、2つのTaskManagerを使ってPythonシェルのためのYarnクラスタを開始するには、以下を使います:
pyflink-shell.sh yarn -n 2
他の全てのオプションに関しては、一番下の完全なリファレンスを見てください。
Yarn セッション #
Flink Yarnセッションを使ってFlinkクラスタを以前にデプロイしたことがある場合は、Pythonシェルは次のコマンドを使って接続できます:
pyflink-shell.sh yarn
完全なリファレンス #
Flink Python Shell
Usage: pyflink-shell.sh [local|remote|yarn] [options] <args>...
Command: local [options]
Starts Flink Python shell with a local Flink cluster
usage:
-h,--help Show the help message with descriptions of all options.
Command: remote [options] <host> <port>
Starts Flink Python shell connecting to a remote cluster
<host>
Remote host name as string
<port>
Remote port as integer
usage:
-h,--help Show the help message with descriptions of all options.
Command: yarn [options]
Starts Flink Python shell connecting to a yarn cluster
usage:
-h,--help Show the help message with descriptions of
all options.
-jm,--jobManagerMemory <arg> Memory for JobManager Container with
optional unit (default: MB)
-nm,--name <arg> Set a custom name for the application on
YARN
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)
-h | --help
Prints this usage text