Flinkでの解析プログラムはデータセット上の変換を実装する通常のプログラムです (例えば、フィルタリング、マッピング、ジョイニング、グルーピング)。データセットは最初に特定のソースから生成されます(例えば、ファイルからの読み込み、あるいはコレクションから)。結果はsinkを使って返されます。これは例えば(分散された)ファイルあるいは標準出力(例えばコマンドラインの端末)へデータを書き込むかも知れません。Flink のプログラムは様々なコンテキスト、スタンドアローン、あるいは他のプログラムの組み込みの中で動作します。実行は、ローカルのJVM、あるいは多くのマシーンのクラスタ上で起こり得ます。
独自のFlinkデータセットプログラムを作成するには、プログラムのスケルトンから始め、次第に独自の 変換を追加することをお勧めします。残りのセクションは、追加のオペレーションと上級の特徴についてのリファレンスとして振る舞います。
以下のプログラムはWordCountの完全に動作する例です。ローカルでそれを動作するためにコードをコピー&ペーストすることができます。
from flink.plan.Environment import get_environment
from flink.functions.GroupReduceFunction import GroupReduceFunction
class Adder(GroupReduceFunction):
def reduce(self, iterator, collector):
count, word = iterator.next()
count += sum([x[0] for x in iterator])
collector.collect((count, word))
env = get_environment()
data = env.from_elements("Who's there?",
"I think I hear them. Stand, ho! Who's there?")
data \
.flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
.group_by(1) \
.reduce_group(Adder(), combinable=True) \
.output()
env.execute(local=True)
例で既に見たように、Flinkプログラムは通常のpythonプログラムのように見えます。各プログラムは同じ基本的な部分からできています:
環境
を取得、ここでそれぞれのステップの概要を示すつもりですが、詳細はそれぞれの章を参照してください。
環境
は全てのFlinkプログラムの基本です。クラス環境
上でこれらの静的メソッドを使って取得することができます:
get_environment()
データソースを指定するために実行環境はファイルから読み込むための幾つかのメソッドを持ちます。連続する行のテキストファイルを単に読むには、以下を使うことができます:
env = get_environment()
text = env.read_text("file:///path/to/file")
これは変換を適用できるデータセットを与えるでしょう。データソースと入力フォーマットの詳しい情報はデータソースを参照してください。
いったんデータセットを持つと、新しいデータセットを生成するために変換を適用することができます。そして、それらをファイルに書き込み、再変換、あるいは他のデータセットと組み合わせをすることができます。独自の変換関数を使ってデータセット上にメソッドを呼ぶことで変換を適用します。例えば、map変換はこのようになります:
data.map(lambda x: x*2)
これは元のデータセット内の各値を二倍にした新しいデータセットを生成するでしょう。更に詳しい情報および全ての変換のリストについては、変換を参照してください。
ディスクに書き込む必要があるデータセットがある場合、データセットのこれらのメソッドのうちの一つを呼ぶことができます。
data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)
output()
最後のメソッドはローカルマシーン上での開発/デバッグにのみ便利です。それはデータセットの内容を標準出力に出力するでしょう。(クラスター内では、結果はクラスタノードの標準出力ストリームに行き、最後にはワーカーの.out 中に入ることに注意してください)。最初の2つは名前の示唆として行われます。ファイルへの書き込みについての詳細はデータ シンク を参照してください。
いったん終了したプログラムを指定した場合、Environment
上でexecute
を呼び出す必要があります。これは、Flinkがどう開始されたかによって、ローカルマシーン上で実行するか、実行のためにクラスタ上にプログラムをサブミットするでしょう。execute(local=True)
を使ってローカル実行を強制することができます。
Flinkのセットアップを別として、余分な作業は必要ありません。python パッケージはFlinkの配布物の /resource フォルダ内で見つかるでしょう。The flink package, along with the plan and optional packages are automatically distributed among the cluster via HDFS when running a job.
Python API は Python 2.7 あるいは 3.4 がインストールされたLinux/Windows システムでテストされました。
デフォルトでは、Flinkはどのstart-scriptが使われたかによって “python” あるいは “python3” を呼び出すことでpythonプロセスが開始するでしょう。flink-conf.ymlに “python.binary.python[2/3]” キーを設定することで、この挙動を選択したバイナリを使うように修正することができます。
全てのFlinkプログラムは怠惰に実行されます: プログラムのメインメソッドが実行される時にデータのロードと変換は直接起こりません。むしろ、各操作は生成され、プログラムの計画に追加されます。操作は実際のところEnvironmentオブジェクト上でexecute()
メソッドの一つが呼び出された時に実行されます。プログラムがローカルあるいはクラスタ上で実行されるかは、プログラムの環境に依存します。
The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.
データ変換は1つ以上のデータセットを新しいデータセットに変換します。プログラムは複数の変換を洗練された集合体に組み合わせることができます。
この章は利用可能な変換の短い概要を説明します。変換ドキュメント には例付きの全ての変換の完全な説明があります。
変換 | 解説 |
---|---|
Map |
一つの要素を取り、一つの要素を生成します。
|
FlatMap |
一つの要素を取り、0、1、あるいはそれ以上の要素を生成します。
|
MapPartition |
1回の関数の呼び出しで並行するパーティションを変換する。関数は'Iterator'としてパーティションを取り、任意の数の結果値を生成することができます。各パーティション内の要素の数は並行度と以前の操作に依存します。
|
フィルター |
各要素についてのboolean関数を評価し、関数がtrueを返す関数を維持します。
|
Reduce |
再帰的に二つの要素を1つに結合することで、要素のグループを1つの要素に結合します。Reduceは完全なデータセット、あるいはグループ化されたデータセットに適用されるかも知れません。
|
ReduceGroup |
要素のグループを1つ以上の要素に結合します。ReduceGroupは完全なデータセット、あるいはグループ化されたデータセットに適用されるかも知れません。
|
Aggregate |
データセットあるいはデータセットの各グループ内の全てのタプルの1つのフィールド上で、組み込みの操作(sum, min, max) が実行されます。集約は完全なデータセットあるいはグループ化されたデータセット上に適用することができます。
|
Join |
キーが等しい二つのデータセットの要素の全てのペアを生成し結合します。任意で要素のペアを1つの要素に変換するためにJoinFunctionを使います。joinキーを指定する方法はキーを見てください。
|
CoGroup |
reduce オペレーションの二次元の変数。1つ以上のフィールドの各入力をグループ化し、グループをjoinします。変換関数はグループの各ペアごとに呼ばれます。coGroupキーを指定する方法はキーを見てください。
|
Cross |
要素の全てのペアを作成して、二つの入力のデカルト積(クロス積)を構築します。任意で要素のペアを1つの要素に変換するためにCrossFunctionを使います。
|
Union |
二つのデータセットの和集合を生成します。
|
ZipWithIndex |
連続するインデックスを各要素に割り当てます。もっと詳しい情報は、[Zip 要素ガイド(zip_elements_guide.html#zip-with-a-dense-index) を参照してください。
|
(JoinあるいはCoGroupのような)幾つかの変換はキーが引数のデータセット上で定義されていることを必要とし、他の変換(Reduce, GroupReduce)によってそれらが適用される前にデータセットをキー上でグループ化することができます。
データセットは以下のようにグループ化されます
reduced = data \
.group_by(<define key here>) \
.reduce_group(<do something>)
Flinkのデータモデルはキー-値ペアに基づいていません。従って、データセットの型を物理的にキーと値にまとめる必要はありません。キーは“virtual”です: それらはグループ化のオペレータを導くために実際のデータ上で関数として定義されています。
もっとも単純な場合は、タプルの1つ以上のフィールド上にタプルのデータセットをグループ化します:
reduced = data \
.group_by(0) \
.reduce_group(<do something>)
データセットはタプルの最初のフィールド上でグループ化されます。group-reduce関数は従って最初のフィールドに同じ値を持つタプルのグループを受け取るでしょう。
grouped = data \
.group_by(0,1) \
.reduce(/*do something*/)
データセットは最初と二つ目のフィールドから成る合成キー上でグループ化されます。従ってreduce関数は両方のフィールドについて同じ値を持つグループを受け取るでしょう。
入れ子のタプルについての注意: 入れ子になったタプルを持つデータセットがある場合、group_by(<index of tuple>)
の指定はシステムにキーとして完全なタプルを使うようにさせるでしょう。
ある操作はユーザ定義の関数を必要とします。一方で、それら全ては引数としてlambda関数と機能豊富な関数を受け付けます。
data.filter(lambda x: x > 5)
class Filter(FilterFunction):
def filter(self, value):
return value > 5
data.filter(Filter())
Rich functions allow the use of imported functions, provide access to broadcast-variables, can be parameterized using init(), and are the go-to-option for complex functions. それらはreduce操作のための任意のcombine
関数を定義する唯一の方法でもあります。
Lambda 関数は1行の簡単な挿入ができます。もし操作が複数の値を返すことができる場合は、lambda関数はiterableを返す必要があることに注意してください。(collector引数を受け取るすべての関数)
Flinkの Python API は現在のところ primitive なpython 型 (int, float, bool, string) とバイト配列のためのネイティブなサポートだけを提供します。
型のサポートはシリアライザ、デシリアライザ および環境への型クラスを渡すことで拡張することができます。
class MyObj(object):
def __init__(self, i):
self.value = i
class MySerializer(object):
def serialize(self, value):
return struct.pack(">i", value.value)
class MyDeserializer(object):
def _deserialize(self, read):
i = struct.unpack(">i", read(4))[0]
return MyObj(i)
env.register_custom_type(MyObj, MySerializer(), MyDeserializer())
合成型のためにタプル(あるいはリスト)を使うことができます。Python のタプルはFlinkのタプル型にマップされ、さまざまな型の(25までの)固定数のフィールドを含みます。タプルの各フィールドはプリミティブな型が可能です - さらなるタプル、結果的に入れ子のタプル、を含みます。
word_counts = env.from_elements(("hello", 1), ("world",2))
counts = word_counts.map(lambda x: x[1])
レコードのグルーピングあるいはマッチングのためのキーを必要とするオペレータと連携する時、タプルによってキーとして使われるフィールドの場所を単純に指定します。合成キーとして使うために1つ以上の場所を指定することができます (データ変換の章を見てください)。
wordCounts \
.group_by(0) \
.reduce(MyReduceFunction())
データソースはファイルあるいはコレクションから初期データを作成します。
ファイルベース:
read_text(path)
- ファイルを行ごとに読み込み、文字列として返します。read_csv(path, type)
- カンマ(あるいは他の文字)区切りのファイルをパースする。タプルのデータセットを返す。基本java型とフィールド型としてそれらのValueに対応するものをサポートします。コレクション ベース:
from_elements(*args)
- シーケンスからデータセットを作成する。全ての要素generate_sequence(from, to)
- 並行して渡された間隔内の数のシーケンスを生成します。例
env = get_environment
\# read text file from local files system
localLiens = env.read_text("file:#/path/to/my/textfile")
\# read text file from a HDFS running at nnHost:nnPort
hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")
\# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants
csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))
\# create a set from some given elements
values = env.from_elements("Foo", "bar", "foobar", "fubar")
\# generate a number sequence
numbers = env.generate_sequence(1, 10000000)
データ シンクはデータセットを消費し、それらを格納あるいは返すために使う事ができます:
write_text()
- 行ごとの要素を文字列として書き込む。文字列は各要素のstr() メソッドを呼ぶことで取得されます。write_csv(...)
- タプルをカンマ区切りの値のファイルとして書き込みます。行とフィールドのデリミタが設定可能です。各フィールドの値はオブジェクトのstr()メソッドから来ます。output()
- 各要素のstr()の値を標準出力に出力します。データセットは複数のオペレータの入力かも知れません。プログラムはデータセットを書き込みあるいは出力することができ、同時にそれらに追加の変換を実行します。
例
標準的なデータ シンク メソッド:
write DataSet to a file on the local file system
textData.write_text("file:///my/result/on/localFS")
write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")
write DataSet to a file and overwrite the file if it exists
textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)
tuples as lines with pipe as the separator "a|b|c"
values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")
this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.write_text("file:///path/to/the/result/file")
ブロードキャスト変数により、操作の通常の入力に加えて、データが操作の全ての並行インスタンスに利用可能にします。これは予備のデータセット、あるいはデータに依存するパラメータ化として役に立ちます。データセットはコレクションとしてオペレータでアクセス可能でしょう。
with_broadcast_set(DataSet, String)
を使って名前で登録されます。self.context.get_broadcast_variable(String)
を使ってアクセス可能です。class MapperBcv(MapFunction):
def map(self, value):
factor = self.context.get_broadcast_variable("bcv")[0][0]
return value * factor
# 1. The DataSet to be broadcasted
toBroadcast = env.from_elements(1, 2, 3)
data = env.from_elements("a", "b")
# 2. Broadcast the DataSet
data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast)
ブロードキャストされたデータセットを登録およびアクセスする場合には、名前 (前の例ではbcv
) が一致するようにします。
注意: ブロードキャスト変数の内容は各ノード上のインメモリに保存されます。あまりに大きくするべきではありません。スカラ値のようにシンプルにするために、リッチな関数を単にパラメータすることができます。
この章はプログラムの並行実行をFlink内でどのように設定することができるかを説明します。Flinkプログラムは複数のタスク(オペレータ、データソース、およびシンク)からできています。タスクは実行のために幾つかの並行インスタンスに分割され、各並行インタウンスはタスクの入力データのサブセットを処理します。タスクの並行インスタンスの数は並行度 あるいは 並行次数 (DOP)と呼ばれます。
タスクの並行次数はFlink内で異なるレベルに設定することができます。
Flinkプログラムは実行環境のコンテキスト内で実行されます。実行環境はそれを実行する全てのオペレータ、データソース、およびデータシンクのためのデフォルトの並列度を定義します。実行環境の並行度はオペレータの並行度を明示的に設定することで上書きすることができます。
実行環境のデフォルトの並行度はset_parallelism()
メソッドを呼び出すことで指定することができます。WordCountの例のプログラムの全てのオペレータ、データソース、およびデータシンク を並行度3
で実行するには、実行環境のデフォルトの並行度を以下のように設定します:
env = get_environment()
env.set_parallelism(3)
text.flat_map(lambda x,c: x.lower().split()) \
.group_by(1) \
.reduce_group(Adder(), combinable=True) \
.output()
env.execute()
全ての実行環境のためのシステム全体のデフォルトの並行度は./conf/flink-conf.yaml
内のparallelism.default
プロパティを設定することで定義することができます。詳細は設定 ドキュメントを見てください。
Flinkを使って計画を実行するには、Flinkの配布物の場所に行き、/binフォルダから pyflink.sh スクリプトを実行します。python 2.7 には pyflink2.sh を、python 3.4 には pyflink3.sh を使います。計画を含むスクリプトは最初の引数として渡される必要があり、付加的にpythonパッケージの数が続き、最後にスクリプトによって消費されるだろう - で分割された付加的な引数が続きます。
./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]