Scala API Extensions
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

全てのFlink Scala APIは非推奨となり、将来のFlinkバージョンでは削除される予定です。引き続きScalaでアプリケーションをビルドできますが、DataStream APIやTable APIのJavaバージョンへ移行する必要があります。

詳細は、 FLIP-265 Scala APIサポートの廃止と削除

Scala API 拡張 #

Scala APIとJava APIの間でかなりの一貫性を維持するために、Scalaで高度な表現力を可能にする機能の一部は、バッチとストリーミングの両方の標準のAPIから除外されました。

_完全なScalaの経験を楽しむ_ために、暗黙的な変換によってScala APIを強化する拡張機能をオプトインすることを選択することができます。

全ての利用可能な拡張を使うためには、データセットAPIのために単純にimportを追加するだけで良いです。

import org.apache.flink.streaming.api.scala.extensions._

その他の方法として、好きなものを使うためだけに個々の拡張_a-là-carte_をインポートすることができます。

部分的な関数の許容 #

通常、DataStream APIは、以下のようにタプル、ケースクラス、あるいはコレクションを分解するための匿名のパターン一致関数を受け付けません:

val data: DataStream[(Int, String, Double)] = // [...]
data.map {
case (id, name, temperature) => // [...]
// The previous line causes the following compilation error:
// "The argument types of an anonymous function must be fully known. (SLS 8.5)"
}

この拡張は、拡張されたAPI内で1対1に対応するデータセットとデータストリーム Scala APIでの新しいメソッドを導入します。これらの移譲されたメソッドは匿名のパターンマッチング関数をサポートします。

データストリーム API #

メソッド オリジナル
mapWith map (DataStream) {{< highlight scala >}} data.mapWith { case (_, value) => value.toString } {{< /highlight >}}
flatMapWith flatMap (DataStream) {{< highlight scala >}} data.flatMapWith { case (_, name, visits) => visits.map(name -> _) } {{< /highlight >}}
filterWith filter (DataStream) {{< highlight scala >}} data.filterWith { case Train(_, isOnTime) => isOnTime } {{< /highlight >}}
keyingBy keyBy (DataStream) {{< highlight scala >}} data.keyingBy { case (id, _, _) => id } {{< /highlight >}}
mapWith map (ConnectedDataStream) {{< highlight scala >}} data.mapWith( map1 = case (_, value) => value.toString, map2 = case (_, _, value, _) => value + 1 ) {{< /highlight >}}
flatMapWith flatMap (ConnectedDataStream) {{< highlight scala >}} data.flatMapWith( flatMap1 = case (_, json) => parse(json), flatMap2 = case (_, _, json, _) => parse(json) ) {{< /highlight >}}
keyingBy keyBy (ConnectedDataStream) {{< highlight scala >}} data.keyingBy( key1 = case (_, timestamp) => timestamp, key2 = case (id, _, _) => id ) {{< /highlight >}}
reduceWith reduce (KeyedStream, WindowedStream) {{< highlight scala >}} data.reduceWith { case ((_, sum1), (_, sum2) => sum1 + sum2 } {{< /highlight >}}
projecting apply (JoinedStream) {{< highlight scala >}} data1.join(data2). whereClause(case (pk, _) => pk). isEqualTo(case (_, fk) => fk). projecting { case ((pk, tx), (products, fk)) => tx -> products } {{< /highlight >}}

各メソッドのセマンティクスの詳細については、DataStream APIドキュメントを参照してください。

個の拡張機能を排他的に使うには、以下のimportを追加します:

import org.apache.flink.api.scala.extensions.acceptPartialFunctions

データセット拡張については

import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions

次のスニペットは、これらの拡張メソッドを一緒に(DataSet APIを使って)使う方法の最小限の例を示します:

object Main {
import org.apache.flink.streaming.api.scala.extensions._

case class Point(x: Double, y: Double)

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))

    ds.filterWith {
    case Point(x, _) => x > 1
    }.reduceWith {
    case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    }.mapWith {
    case Point(x, y) => (x, y)
    }.flatMapWith {
    case (x, y) => Seq("x" -> x, "y" -> y)
    }.keyingBy {
    case (id, value) => id
    }
}
}

Back to top

inserted by FC2 system