In order to keep a fair amount of consistency between the Scala and Java APIs, some of the features that allow a high-level of expressiveness in Scala have been left out from the standard APIs for both batch and streaming.
If you want to enjoy the full Scala experience you can choose to opt-in to extensions that enhance the Scala API via implicit conversions.
全ての利用可能な拡張を使うためには、データセットAPIのために単純にimport
を追加するだけで良いです。
import org.apache.flink.api.scala.extensions._
あるいは、データストリームAPIの場合
import org.apache.flink.streaming.api.scala.extensions._
その他の方法として、好きなものを使うためだけに個々の拡張a-là-carteをインポートすることができます。
Normally, both the DataSet and DataStream APIs don’t accept anonymous pattern matching functions to deconstruct tuples, case classes or collections, like the following:
val data: DataSet[(Int, String, Double)] = // [...]
data.map {
case (id, name, temperature) => // [...]
// The previous line causes the following compilation error:
// "The argument types of an anonymous function must be fully known. (SLS 8.5)"
}
This extension introduces new methods in both the DataSet and DataStream Scala API that have a one-to-one correspondance in the extended API. これらの移譲されたメソッドは匿名のパターンマッチング機能をサポートします。
メソッド | Original | 例 |
---|---|---|
mapWith | map (DataSet) |
|
mapPartitionWith | mapPartition (DataSet) |
|
flatMapWith | flatMap (DataSet) |
|
filterWith | filter (DataSet) |
|
reduceWith | reduce (DataSet, GroupedDataSet) |
|
reduceGroupWith | reduceGroup (GroupedDataSet) |
|
groupingBy | groupBy (DataSet) |
|
sortGroupWith | sortGroup (GroupedDataSet) |
|
combineGroupWith | combineGroup (GroupedDataSet) |
|
projecting | apply (JoinDataSet, CrossDataSet) |
|
projecting | apply (CoGroupDataSet) |
|
メソッド | Original | 例 |
---|---|---|
mapWith | map (DataStream) |
|
mapPartitionWith | mapPartition (DataStream) |
|
flatMapWith | flatMap (DataStream) |
|
filterWith | filter (DataStream) |
|
keyingBy | keyBy (DataStream) |
|
mapWith | map (ConnectedDataStream) |
|
flatMapWith | flatMap (ConnectedDataStream) |
|
keyingBy | keyBy (ConnectedDataStream) |
|
reduceWith | reduce (KeyedDataStream, WindowedDataStream) |
|
foldWith | fold (KeyedDataStream, WindowedDataStream) |
|
applyWith | apply (WindowedDataStream) |
|
projecting | apply (JoinedDataStream) |
|
For more information on the semantics of each method, please refer to the DataSet and DataStream API documentation.
この拡張を排他的に使うためには、以下のimport
を追加することができます:
import org.apache.flink.api.scala.extensions.acceptPartialFunctions
データセット拡張については
import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
以下のコード断片はこれらの拡張メソッドを(データセットAPIを使って)お互いに使う方法の最小の例を示します:
object Main {
import org.apache.flink.api.scala.extensions._
case class Point(x: Double, y: Double)
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
ds.filterWith {
case Point(x, _) => x > 1
}.reduceWith {
case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
}.mapWith {
case Point(x, y) => (x, y)
}.flatMapWith {
case (x, y) => Seq("x" -> x, "y" -> y)
}.groupingBy {
case (id, value) => id
}
}
}