This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
メトリクス #
PyFlinkは、メトリクスを収集して外部システムに公開できるメトリクスシステムを公開します。
メトリクスの登録 #
open
メソッドでfunction_context.get_metric_group()
を呼び出すことで、Pythonのユーザ定義関数からメトリクスシステムにアクセスできます。
get_metric_group()
メソッドは、新しいメトリクスを作成して登録するMetricGroup
オブジェクトを返します。
メトリクスの型 #
PyFlinkは、カウンター
、ゲージ
、分布
、メーター
をサポートします。
カウンター #
カウンター
は何かを数えるために使われます。現在の値は、inc()/inc(n: int)
またはdec()/dec(n: int)
を使って増減できます。
MetricGroup
でcounter(name: str)
を呼び出すことで、カウンター
を作成して登録できます。
from pyflink.table.udf import ScalarFunction
class MyUDF(ScalarFunction):
def __init__(self):
self.counter = None
def open(self, function_context):
self.counter = function_context.get_metric_group().counter("my_counter")
def eval(self, i):
self.counter.inc(i)
return i
ゲージ #
Gauge
は、オンデマンドで値を提供します。MetricGroupでgauge(name: str, obj: Callable[[], int])
を呼び出すことで、ゲージを登録できます。Callableオブジェクトは値を報告するために使われます。ゲージメトリクスは整数値のみに制限されます。
from pyflink.table.udf import ScalarFunction
class MyUDF(ScalarFunction):
def __init__(self):
self.length = 0
def open(self, function_context):
function_context.get_metric_group().gauge("my_gauge", lambda : self.length)
def eval(self, i):
self.length = i
return i - 1
分散 #
報告される値の分布に関する情報(sum, count, min, max, mean)を報告するメトリクス。値は、update(n: int)
を使って更新できます。MetricGroupでdistribution(name: str)
を呼び出すことで分布を登録できます。分布メトリクスは整数のみの分布に制限されます。
from pyflink.table.udf import ScalarFunction
class MyUDF(ScalarFunction):
def __init__(self):
self.distribution = None
def open(self, function_context):
self.distribution = function_context.get_metric_group().distribution("my_distribution")
def eval(self, i):
self.distribution.update(i)
return i - 1
メーター #
メーターは平均スループットを計測します。イベントの出現はmark_event()
メソッドを使って登録できます。mark_event(n: int)メソッドで複数のイベントを同時発生を登録できます。MetricGroupでmeter(self, name: str, time_span_in_seconds: int = 60)
を呼び出すことでメーターを登録できます。
time_span_in_secondsのデフォルト値は60です。
from pyflink.table.udf import ScalarFunction
class MyUDF(ScalarFunction):
def __init__(self):
self.meter = None
def open(self, function_context):
# an average rate of events per second over 120s, default is 60s.
self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120)
def eval(self, i):
self.meter.mark_event(i)
return i - 1
スコープ #
スコープ定義の詳細については、Javaメトリクスドキュメントを参照してください。
ユーザ スコープ #
MetricGroup.add_group(key: str, value: str = None)
を呼び出すことで、ユーザスコープを定義できます。
value
がNone
では無い場合、新しいキーバリューMetricGroupペアが作成されます。
キーグループはこのグループのサブグループに追加され、値グループはキーグループのサブグループに追加されます。この場合、値グループが返され、ユーザ変数が定義されます。
function_context \
.get_metric_group() \
.add_group("my_metrics") \
.counter("my_counter")
function_context \
.get_metric_group() \
.add_group("my_metrics_key", "my_metrics_value") \
.counter("my_counter")
システム スコープ #
システムスコープの詳細については、Javaメトリクスドキュメントを参照してください。
全ての変数のリスト #
全ての変数のリストの詳細については、Javaメトリクスドキュメントを参照してください。
ユーザ変数 #
MetricGroup.addGroup(key: str, value: str = None)
を呼び出して、値パラメータを指定することで、ユーザ変数を定義できます。
重要: ユーザ変数はスコープ形式では使えません。
function_context \
.get_metric_group() \
.add_group("my_metrics_key", "my_metrics_value") \
.counter("my_counter")
PyFlinkとFlinkの共通部分 #
以下のセクションの詳細については、Javaメトリクスドキュメントを参照してください。