This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Scala API Extensions
全てのFlink Scala APIは非推奨となり、将来のFlinkバージョンでは削除される予定です。引き続きScalaでアプリケーションをビルドできますが、DataStream APIやTable APIのJavaバージョンへ移行する必要があります。
Scala API 拡張 #
Scala APIとJava APIの間でかなりの一貫性を維持するために、Scalaで高度な表現力を可能にする機能の一部は、バッチとストリーミングの両方の標準のAPIから除外されました。
_完全なScalaの経験を楽しむ_ために、暗黙的な変換によってScala APIを強化する拡張機能をオプトインすることを選択することができます。
全ての利用可能な拡張を使うためには、データセットAPIのために単純にimport
を追加するだけで良いです。
import org.apache.flink.streaming.api.scala.extensions._
その他の方法として、好きなものを使うためだけに個々の拡張_a-là-carte_をインポートすることができます。
部分的な関数の許容 #
通常、DataStream APIは、以下のようにタプル、ケースクラス、あるいはコレクションを分解するための匿名のパターン一致関数を受け付けません:
val data: DataStream[(Int, String, Double)] = // [...]
data.map {
case (id, name, temperature) => // [...]
// The previous line causes the following compilation error:
// "The argument types of an anonymous function must be fully known. (SLS 8.5)"
}
この拡張は、拡張されたAPI内で1対1に対応するデータセットとデータストリーム Scala APIでの新しいメソッドを導入します。これらの移譲されたメソッドは匿名のパターンマッチング関数をサポートします。
データストリーム API #
メソッド | オリジナル | 例 |
---|---|---|
mapWith | map (DataStream) | {{< highlight scala >}} data.mapWith { case (_, value) => value.toString } {{< /highlight >}} |
flatMapWith | flatMap (DataStream) | {{< highlight scala >}} data.flatMapWith { case (_, name, visits) => visits.map(name -> _) } {{< /highlight >}} |
filterWith | filter (DataStream) | {{< highlight scala >}} data.filterWith { case Train(_, isOnTime) => isOnTime } {{< /highlight >}} |
keyingBy | keyBy (DataStream) | {{< highlight scala >}} data.keyingBy { case (id, _, _) => id } {{< /highlight >}} |
mapWith | map (ConnectedDataStream) | {{< highlight scala >}} data.mapWith( map1 = case (_, value) => value.toString, map2 = case (_, _, value, _) => value + 1 ) {{< /highlight >}} |
flatMapWith | flatMap (ConnectedDataStream) | {{< highlight scala >}} data.flatMapWith( flatMap1 = case (_, json) => parse(json), flatMap2 = case (_, _, json, _) => parse(json) ) {{< /highlight >}} |
keyingBy | keyBy (ConnectedDataStream) | {{< highlight scala >}} data.keyingBy( key1 = case (_, timestamp) => timestamp, key2 = case (id, _, _) => id ) {{< /highlight >}} |
reduceWith | reduce (KeyedStream, WindowedStream) | {{< highlight scala >}} data.reduceWith { case ((_, sum1), (_, sum2) => sum1 + sum2 } {{< /highlight >}} |
projecting | apply (JoinedStream) | {{< highlight scala >}} data1.join(data2). whereClause(case (pk, _) => pk). isEqualTo(case (_, fk) => fk). projecting { case ((pk, tx), (products, fk)) => tx -> products } {{< /highlight >}} |
各メソッドのセマンティクスの詳細については、DataStream APIドキュメントを参照してください。
個の拡張機能を排他的に使うには、以下のimport
を追加します:
import org.apache.flink.api.scala.extensions.acceptPartialFunctions
データセット拡張については
import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
次のスニペットは、これらの拡張メソッドを一緒に(DataSet APIを使って)使う方法の最小限の例を示します:
object Main {
import org.apache.flink.streaming.api.scala.extensions._
case class Point(x: Double, y: Double)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
ds.filterWith {
case Point(x, _) => x > 1
}.reduceWith {
case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
}.mapWith {
case Point(x, y) => (x, y)
}.flatMapWith {
case (x, y) => Seq("x" -> x, "y" -> y)
}.keyingBy {
case (id, value) => id
}
}
}