Python REPL
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Python REPL #

Flinkには、統合されたインタラクティブなPythonシェルが付属しています。 それはローカルセットアップおよびクラスタセットアップで使うことができます。 ローカルFlinkのセットアップ方法の詳細については、スタンドアローンリソースプロバイダのページを参照してください。 ソースからローカルセットアップをビルドすることもできます。

注意 Pythonシェルはコマンド“python”を実行します。Python実行環境のセットアップ方法については、Python Table API インストレーションガイドを参照してください。

統合されたFlinkクラスタでシェルを使うには、PyPiでPyFlinkをインストールし、シェルを直接実行するだけです:

# install PyFlink
$ python -m pip install apache-flink
# execute the shell
$ pyflink-shell.sh local

クラスタでシェルを実行するには、以下のセットアップセクションを参照してください。

使い方 #

現在シェルはTable APIのみをサポートします。 Table Environmentsは起動後に自動的に事前バインドされます。 BatchTableEnvironmentとStreamTableEnvironmentにアクセスするいは、それぞれ"bt_env"と"st_env"を使います。

テーブルAPI #

以下の例はPythonシェルの単純なプログラムです:

>>> import tempfile
>>> import os
>>> import shutil
>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
>>> if os.path.exists(sink_path):
...     if os.path.isfile(sink_path):
...         os.remove(sink_path)
...     else:
...         shutil.rmtree(sink_path)
>>> s_env.set_parallelism(1)
>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
>>> st_env.create_temporary_table("stream_sink", TableDescriptor.for_connector("filesystem")
...     .schema(Schema.new_builder()
...         .column("a", DataTypes.BIGINT())
...         .column("b", DataTypes.STRING())
...         .column("c", DataTypes.STRING())
...         .build())
...     .option("path", path)
...     .format(FormatDescriptor.for_format("csv")
...         .option("field-delimiter", ",")
...         .build())
...     .build())
>>> t.select("a + 1, b, c")\
...     .execute_insert("stream_sink").wait()
>>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
>>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
...     print(f.read())
>>> import tempfile
>>> import os
>>> import shutil
>>> sink_path = tempfile.gettempdir() + '/batch.csv'
>>> if os.path.exists(sink_path):
...     if os.path.isfile(sink_path):
...         os.remove(sink_path)
...     else:
...         shutil.rmtree(sink_path)
>>> b_env.set_parallelism(1)
>>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
>>> st_env.create_temporary_table("batch_sink", TableDescriptor.for_connector("filesystem")
...     .schema(Schema.new_builder()
...         .column("a", DataTypes.BIGINT())
...         .column("b", DataTypes.STRING())
...         .column("c", DataTypes.STRING())
...         .build())
...     .option("path", path)
...     .format(FormatDescriptor.for_format("csv")
...         .option("field-delimiter", ",")
...         .build())
...     .build())
>>> t.select("a + 1, b, c")\
...     .execute_insert("batch_sink").wait()
>>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
>>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
...     print(f.read())

セットアップ #

Pythonシェルが提供するオプションの概要を取得するには、以下を使ってください

pyflink-shell.sh --help

ローカル #

統合されたFlinkクラスタと一緒にシェルを使うには、単に以下のように実行します:

pyflink-shell.sh local

リモート #

実行中のクラスタで使うには、キーワードremoteを使ってPythonシェルを起動し、JobManagerのホストとポートを以下のように指定します:

pyflink-shell.sh remote <hostname> <portnumber>

Yarn Pythonシェルクラスタ #

シェルは、YARNにFlinkクラスタをデプロイできます。これはシェルによって排他的に使われます。 shell. シェルはYARNに新しいFlinkクラスタをデプロイし、クラスタに接続します。JobManagerのメモリ、YARNアプリケーション名など、YARNクラスタのオプションを指定することもできます。

例えば、2つのTaskManagerを使ってPythonシェルのためのYarnクラスタを開始するには、以下を使います:

pyflink-shell.sh yarn -n 2

他の全てのオプションに関しては、一番下の完全なリファレンスを見てください。

Yarn セッション #

Flink Yarnセッションを使ってFlinkクラスタを以前にデプロイしたことがある場合は、Pythonシェルは次のコマンドを使って接続できます:

pyflink-shell.sh yarn

完全なリファレンス #

Flink Python Shell
Usage: pyflink-shell.sh [local|remote|yarn] [options] <args>...

Command: local [options]
Starts Flink Python shell with a local Flink cluster
usage:
     -h,--help   Show the help message with descriptions of all options.
Command: remote [options] <host> <port>
Starts Flink Python shell connecting to a remote cluster
  <host>
        Remote host name as string
  <port>
        Remote port as integer

usage:
     -h,--help   Show the help message with descriptions of all options.
Command: yarn [options]
Starts Flink Python shell connecting to a yarn cluster
usage:
     -h,--help                       Show the help message with descriptions of
                                     all options.
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with
                                     optional unit (default: MB)
     -nm,--name <arg>                Set a custom name for the application on
                                     YARN
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with
                                     optional unit (default: MB)
-h | --help
      Prints this usage text

Back to top

inserted by FC2 system