Scala API 拡張

In order to keep a fair amount of consistency between the Scala and Java APIs, some of the features that allow a high-level of expressiveness in Scala have been left out from the standard APIs for both batch and streaming.

If you want to enjoy the full Scala experience you can choose to opt-in to extensions that enhance the Scala API via implicit conversions.

全ての利用可能な拡張を使うためには、データセットAPIのために単純にimportを追加するだけで良いです。

import org.apache.flink.api.scala.extensions._

あるいは、データストリームAPIの場合

import org.apache.flink.streaming.api.scala.extensions._

その他の方法として、好きなものを使うためだけに個々の拡張a-là-carteをインポートすることができます。

部分的な関数の許容

通常、データセットとデータストリームAPIは、以下のようにタプル、caseクラス、あるいはコレクションを分解するための匿名のパターン一致関数を受け付けません:

val data: DataSet[(Int, String, Double)] = // [...]
data.map {
  case (id, name, temperature) => // [...]
  // The previous line causes the following compilation error:
  // "The argument types of an anonymous function must be fully known. (SLS 8.5)"
}

この拡張は、拡張されたAPI内で1対1に対応するデータセットとデータストリーム Scala APIでの新しいメソッドを導入します。これらの移譲されたメソッドは匿名のパターンマッチング機能をサポートします。

データセット API

メソッド オリジナル
mapWith map (DataSet)
data.mapWith {
  case (_, value) => value.toString
}
mapPartitionWith mapPartition (DataSet)
data.mapPartitionWith {
  case head #:: _ => head
}
flatMapWith flatMap (DataSet)
data.flatMapWith {
  case (_, name, visitTimes) => visitTimes.map(name -> _)
}
filterWith filter (DataSet)
data.filterWith {
  case Train(_, isOnTime) => isOnTime
}
reduceWith reduce (DataSet, GroupedDataSet)
data.reduceWith {
  case ((_, amount1), (_, amount2)) => amount1 + amount2
}
reduceGroupWith reduceGroup (GroupedDataSet)
data.reduceGroupWith {
  case id #:: value #:: _ => id -> value
}
groupingBy groupBy (DataSet)
data.groupingBy {
  case (id, _, _) => id
}
sortGroupWith sortGroup (GroupedDataSet)
grouped.sortGroupWith(Order.ASCENDING) {
  case House(_, value) => value
}
combineGroupWith combineGroup (GroupedDataSet)
grouped.combineGroupWith {
  case header #:: amounts => amounts.sum
}
projecting apply (JoinDataSet, CrossDataSet)
data1.join(data2).
  whereClause(case (pk, _) => pk).
  isEqualTo(case (_, fk) => fk).
  projecting {
    case ((pk, tx), (products, fk)) => tx -> products
  }

data1.cross(data2).projecting {
  case ((a, _), (_, b) => a -> b
}
projecting apply (CoGroupDataSet)
data1.coGroup(data2).
  whereClause(case (pk, _) => pk).
  isEqualTo(case (_, fk) => fk).
  projecting {
    case (head1 #:: _, head2 #:: _) => head1 -> head2
  }
}

データストリーム API

メソッド オリジナル
mapWith map (DataStream)
data.mapWith {
  case (_, value) => value.toString
}
mapPartitionWith mapPartition (DataStream)
data.mapPartitionWith {
  case head #:: _ => head
}
flatMapWith flatMap (DataStream)
data.flatMapWith {
  case (_, name, visits) => visits.map(name -> _)
}
filterWith filter (DataStream)
data.filterWith {
  case Train(_, isOnTime) => isOnTime
}
keyingBy keyBy (DataStream)
data.keyingBy {
  case (id, _, _) => id
}
mapWith map (ConnectedDataStream)
data.mapWith(
  map1 = case (_, value) => value.toString,
  map2 = case (_, _, value, _) => value + 1
)
flatMapWith flatMap (ConnectedDataStream)
data.flatMapWith(
  flatMap1 = case (_, json) => parse(json),
  flatMap2 = case (_, _, json, _) => parse(json)
)
keyingBy keyBy (ConnectedDataStream)
data.keyingBy(
  key1 = case (_, timestamp) => timestamp,
  key2 = case (id, _, _) => id
)
reduceWith reduce (KeyedStream, WindowedStream)
data.reduceWith {
  case ((_, sum1), (_, sum2) => sum1 + sum2
}
foldWith fold (KeyedStream, WindowedStream)
data.foldWith(User(bought = 0)) {
  case (User(b), (_, items)) => User(b + items.size)
}
applyWith apply (WindowedStream)
data.applyWith(0)(
  foldFunction = case (sum, amount) => sum + amount
  windowFunction = case (k, w, sum) => // [...]
)
projecting apply (JoinedStream)
data1.join(data2).
  whereClause(case (pk, _) => pk).
  isEqualTo(case (_, fk) => fk).
  projecting {
    case ((pk, tx), (products, fk)) => tx -> products
  }

各メソッドのセマンティックの詳しい情報については、DataSetDataStream API ドキュメントを参照してください。

この拡張を排他的に使うためには、以下のimportを追加することができます:

import org.apache.flink.api.scala.extensions.acceptPartialFunctions

データセット拡張については

import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions

以下のコード断片はこれらの拡張メソッドを(データセットAPIを使って)お互いに使う方法の最小の例を示します:

object Main {
  import org.apache.flink.api.scala.extensions._
  case class Point(x: Double, y: Double)
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    ds.filterWith {
      case Point(x, _) => x > 1
    }.reduceWith {
      case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    }.mapWith {
      case Point(x, y) => (x, y)
    }.flatMapWith {
      case (x, y) => Seq("x" -> x, "y" -> y)
    }.groupingBy {
      case (id, value) => id
    }
  }
}

上に戻る

TOP
inserted by FC2 system