Fraud Detection with the DataStream API
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

DataStream APIを使った不正行為の検出 #

Apache Flinkは、堅牢でステートフルなストリーミングアプリケーションを構築するためのDataStream APIを提供します。 状態と時間をきめ細かく制御できるため、高度なイベント駆動型システムの実装が可能になります。 このステップバイステップのガイドで、FlinkのDataStream APIを使ってステートフルストリーミングアプリケーションを構築する方法を学習します。

何を構築していますか? #

クレジットカード詐欺は、デジタル時代において懸念が高まっています。 犯罪者は詐欺を行ったり、安全でないシステムに侵入したりして、クレジットカード番号を盗みます。 盗まれた番号は、1回以上の少額の購入(多くの場合1ドル以下)をすることでテストされます。 それがうまくいけば、彼らは販売したり自分用に保管できる商品を手に入れるために、より高額な購入をするようになります。

このチュートリアルでは、不審なクレジットカード取引を警告するための不正検出システムを構築します。 単純なルールのセットを使って、Flinkによる高度なビジネスロジックを実装でき、リアルタイムで動作することが分かります。

必要条件 #

このウォークスルーは、Javaにある程度知識があることを前提としていますが、別のプログラミング言語を使っている場合でも理解できるはずです。

IDEで実行 #

IDEでプロジェクトを実行すると、java.lang.NoClassDefFoundError例外が発生する可能性があります。これはおそらく、必要な全てのFlink依存関係がクラスパスに暗黙的に読み込まれていないことが原因です。

  • IntelliJ IDEA: Go to Run > Edit Configurations > Modify options > Select include dependencies with "Provided" scope. この実行構成で、IDEからアプリケーションを実行するために必要な全てのクラスが含まれるようになります。

助けてください。行き詰まりました! #

行き詰った場合は、コミュニティサポートリソースを調べてください。 特に、Apache Flinkのuser mailing listは、あらゆるApacheプロジェクトの中で最も活発なプロジェクトの1つとして常にランク付けされており、すばやく助けを受けるのに最適な方法です。

How to Follow Along #

この手順に従って進めたい場合は、次の機能を備えたコンピュータが必要です:

  • Java 11
  • Maven

提供されているFlink Maven Archetypeは、必要な全ての依存関係を含むスケルトンプロジェクトを素早く作成するため、ビジネスロジックを記入することだけに集中できます。 これらの依存関係は、全てのFlinkストリーミングアプリケーションの中核となる依存関係であるflink-streaming-javaと、このウォークスルーに固有のデータジェネレータとその他のクラスを持つflink-walkthrough-commonが含まれます。

$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-java \
    -DarchetypeVersion=1.19-SNAPSHOT \
    -DgroupId=frauddetection \
    -DartifactId=frauddetection \
    -Dversion=0.1 \
    -Dpackage=spendreport \
    -DinteractiveMode=false

Maven 3.0以降では、コマンドラインからリポジトリを指定(-DarchetypeCatalog)することはできなくなりました。この変更についての詳細は、Maven公式ドキュメントを参照してください レポジトリのスナップショットを使いたい場合は、settings.xml にレポジトリのエントリを追加する必要があります。例えば:

<settings>
  <activeProfiles>
    <activeProfile>apache</activeProfile>
  </activeProfiles>
  <profiles>
    <profile>
      <id>apache</id>
      <repositories>
        <repository>
          <id>apache-snapshots</id>
          <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
      </repositories>
    </profile>
  </profiles>
</settings>

必要に応じて、groupIdartifactIdpackageを編集できます。上記のパラメータを使うと、Mavenは、このチュートリアルを完了するための全ての依存関係を含むプロジェクトを含むfrauddetectionという名前のフォルダを作成します。 プロジェクトをエディタにインポートすると、IDE内で直接実行できる次のコードを含むファイルFraudDetectionJob.javaが見つかります。 データストリーム全体にブレークポイントを設定し、DEBUGモードでコードを実行して、全てがどのように機能するかを感じてみてください。

FraudDetectionJob.java #

package spendreport;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

public class FraudDetectionJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> transactions = env
            .addSource(new TransactionSource())
            .name("transactions");
        
        DataStream<Alert> alerts = transactions
            .keyBy(Transaction::getAccountId)
            .process(new FraudDetector())
            .name("fraud-detector");

        alerts
            .addSink(new AlertSink())
            .name("send-alerts");

        env.execute("Fraud Detection");
    }
}

FraudDetector.java #

package spendreport;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {

        Alert alert = new Alert();
        alert.setId(transaction.getAccountId());

        collector.collect(alert);
    }
}

コードの分解 #

これら2つのファイルのコードを段階的に見てみましょう。FraudDetectionJobクラスはアプリケーションのデータフローを定義し、FraudDetectorクラスは不正なトランザクションを検知する機能のビジネスロジックを定義します。

FraudDetectionJobクラスのmainメソッドで、ジョブがどのように組み立てられるかを説明します。

実行環境 #

最初の行はStreamExecutionEnvironmentを設定します。 execution environmentは、ジョブのプロパティを設定し、ソースを作成し、最後にジョブの実行を起動する方法です。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ソースの作成 #

ソースは、Apache Kafka、Rabbit MQ、Apache Pulsarのような外部システムからデータをFlink Jobsに取り込みます。 このウォークスルーでは、処理するクレジットカードトランザクションの無限のストリームを生成するソースを使います。 各トランザクションには、アカウントID (accountId)、トランザクションが発生した時のタイムスタンプ(timestamp)、US$金額(amount)が含まれます。 ソースに付加されたnameはデバッグのみを目的としているため、何か問題が発生した場合、エラーの発生場所が分かります。

DataStream<Transaction> transactions = env
    .addSource(new TransactionSource())
    .name("transactions");

イベントの分割と、不正行為の検出 #

transactionsストリームには、多数のユーザからの大量のトランザクションが含まれているため、複数の不正検出タスクによって並行して処理する必要があります。不正行為はアカウントごとに発生するため、同じアカウントの全てのトランザクションが不正検出オペレータの同じ並列タスクによって処理されるようにする必要があります。

同じ物理タスクで特定のキーの全てのレコードを確実に処理するには、DataStream#keyByを使ってストリームを分割します。 process()の呼び出しは、ストリーム内の分割された各要素に関数を適用するオペレータを追加します。 keyByの直後のオペレータ(この場合はFraudDetector)が_keyed context_内で実行されることはよくあることです。

DataStream<Alert> alerts = transactions
    .keyBy(Transaction::getAccountId)
    .process(new FraudDetector())
    .name("fraud-detector");

結果の出力 #

sinkは、Apache Kafka、Cassandra、AWS Kinesisなどの外部システムへDataStreamを書き込みます。 AlertSinkは、各Alertレコードを、永続的なストレージに書き込むのではなく、ログレベルINFOで記録するため、結果を簡単に確認できます。

alerts.addSink(new AlertSink());

The Fraud Detector #

不正検知はKeyedProcessFunctionとして実装されます。 そのメソッドKeyedProcessFunction#processElementはトランザクションイベントごとに呼び出されます。 この最初のバージョンでは、トランザクションごとにアラートが生成されますが、保守的すぎると言う人もいるかもしれません。

このチュートリアルの次のステップでは、より有意義なビジネスロジックを使って不正検知機能を拡張する方法を説明します。

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {
  
        Alert alert = new Alert();
        alert.setId(transaction.getAccountId());

        collector.collect(alert);
    }
}

実際のアプリケーションの作成 (v1) #

最初のバージョンでは、不正検出機能は小さなトランザクションの直後に大きなトランザクションを実行するアカウントに対してアラートを出力する必要があります。小さいというのは$1.00未満で、大きいというのは$500より大きいものです。 不正検出機能が特定のアカウントの次のトランザクションストリームを処理することを考えます。

Fraud Transaction

トランザクション 3 と 4 は、$0.09という小さなトランザクションの後に$510という大きなトランザクションが続くため、不正としてマークされる必要があります。あるいは、トランザクション 7, 8, 9 は、$0.02という少額の直後に高額が続くわけではないため、詐欺ではありません; それどころか、パターンを破る中間トランザクションがあります。

これを行うために、不正検知機能はイベント全体に渡る情報を_記憶_する必要があります; 高額なトランザクションは、前のトランザクションが少額だった場合にのみ不正行為となります。 イベント全体に渡って情報を記録するには、stateが必要で、そのためにKeyedProcessFunctionを使うことにしました。 これにより、状態と時間の両方をきめ細かく制御できるため、このウォークスルーを通じてより複雑な要件に合わせてアルゴリズムを進化させることができます。

最も簡単な実装は、少額のトランザクションが処理されるtびに設定されるブールフラグです。 高額のトランザクションが発生した場合、そのアカウントにフラグが設定されているかを簡単に確認できます。

ただし、フラグをFraudDetectorクラスのメンバー変数として実装するだけでは動作しません。 FlinkはFraudDetectorの同じオブジェクトインスタンスを使って複数のアカウントのトランザクションを処理します。つまり、アカウントAとBがFraudDetectorの同じインスタンスにルーティングされる場合、アカウントAがフラグをtrueに設定し、アカウントBのトランザクションが誤ったアラートを発する可能性があります。 もちろん、Mapのようなデータ構造を使って個々のキーのフラグを追跡することができますが、単純なメンバー変数は耐障害性が無く、障害時には全ての情報が失われます。 したがって、障害から回復するためにアプリケーションを再起動する必要がある場合は、不正検出機能はアラートを見逃す可能性があります。

これらの課題を解決するために、Flinkは通常のメンバー変数と同じくらい使いやすい耐障害性状態のプリミティブを提供します。

Flinkのもっとも基本的な状態の型は、ValueStateで、これはラップする変数に耐障害性を追加するデータ型です。 ValueStateは_キー付き状態_の形式です。つまり、_キー付きコンテキスト_で適用されるオペレータでのみ使えます; DataStream#keyByの直後に続く任意のオペレータ。 オペレータの_キー付き状態_は、現在処理されているレコードのキーに自動的にスコープされます。 この例では、キーは現在のトランザクションのアカウントID(keyBy()で宣言)であり、FraudDetectorは各アカウントごとに独立した状態を維持します。 ValueStateは、Flinkが変数を管理する方法に関するメタデータを含むValueStateDescriptorを使って作成されます。関数がデータの処理を開始する前に、状態を登録する必要があります。 これに適したフックはopen()メソッドです。

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private transient ValueState<Boolean> flagState;

    @Override
    public void open(OpenContext openContext) {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);
    }

ValueStateは、Java標準ライブラリのAtomicReferenceAtomicLongに似たラッパークラスです。 コンテンツを操作するための3つの方法が提供されます; updateは状態を設定し、valueは現在の値を取得し、clearはコンテンツを削除します。 アプリケーションの開始時やValueState#clearの呼び出し後など、特定のキーの状態が空の場合、ValueState#valuenullを返します。 ValueState#valueによって返されたオブジェクトの修正はシステムによって認識されることが保証されていないため、全ての変換はValueState#updateを使って実行する必要があります。 それ以外の場合、耐障害性は内部でFlinkによって自動的に管理されるため、標準の変数と同様に操作できます。

以下に、フラグ状態を使って潜在的な不正トランザクションを追跡する方法の例を示します。

@Override
public void processElement(
        Transaction transaction,
        Context context,
        Collector<Alert> collector) throws Exception {

    // Get the current state for the current key
    Boolean lastTransactionWasSmall = flagState.value();

    // Check if the flag is set
    if (lastTransactionWasSmall != null) {
        if (transaction.getAmount() > LARGE_AMOUNT) {
            // Output an alert downstream
            Alert alert = new Alert();
            alert.setId(transaction.getAccountId());

            collector.collect(alert);            
        }

        // Clean up our state
        flagState.clear();
    }

    if (transaction.getAmount() < SMALL_AMOUNT) {
        // Set the flag to true
        flagState.update(true);
    }
}

全てのトランザクションについて、不正検知機能はそのアカウントのフラグ状態をチェックします。 ValueStateは常に現在のキー、つまりアカウントにスコープされることに注意してください。 フラグがnull以外の場合、そのアカウントで確認された最後のトランザクションは少額だったため、このトランザクションが高額の場合、検知器は不正アラートを出力します。

そのチェックの後、フラグの状態は無条件にクリアされます。 現在のトランザクションによって不正アラートが発生しパターンが終了したか、現在のトランザクションによってアラートが発生せずにパターンが崩れたため、再起動する必要があります。

最後に、トランザクションの額が少額かどうかをチェックします。 そうであれば、次のイベントでチェックできるようにフラグが設定されます。 全てのValueStateはnull許容のため、ValueState<Boolean>には、unset (null)、truefalseの3つの状態があることに注意してください。 このジョブは、フラグが設定されているかどうかを確認するために、unset(null)とtrueのみを使います。

Fraud Detector v2: State + Time = ❤️ #

詐欺師は、テストトランザクションが気づかれる可能性を減らすために、高額の購入をするまであまり待ちません。 例えば、不正検知機能に1分のタイムアウトを設定するとします; つまり、前の例では、トランザクション3と4はお互いに1分以内に発生した場合にのみ不正と見なされます。 FlinkのKeyedProcessFunctionを使うと、将来のある時点でコールバックメソッドを呼び出すタイマーを設定できます。

新しい要件に殉教するためにジョブを変更する方法を見てみましょう:

  • フラグがtrueに設定されるたびに、今後1分間のタイマーも設定します。
  • タイマーが起動したら、フラグの状態をクリアしてフラグをリセットします。
  • フラグがクリアされた場合は、タイマーをキャンセルする必要があります。

タイマーをキャンセルするには、設定された時間を覚えておく必要があり、覚えているということは状態を意味するため、フラグの状態とともにタイマーの状態を作成することから始めます。

private transient ValueState<Boolean> flagState;
private transient ValueState<Long> timerState;

@Override
public void open(OpenContext openContext) {
    ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
            "flag",
            Types.BOOLEAN);
    flagState = getRuntimeContext().getState(flagDescriptor);

    ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
            "timer-state",
            Types.LONG);
    timerState = getRuntimeContext().getState(timerDescriptor);
}

KeyedProcessFunction#processElementは、タイマーサービスを含むContextを使って呼び出されます。 タイマーサービスを使って現在時刻の照会、タイマーの登録、タイマーの削除ができます。 これにより、フラグが設定されるたびに将来1分間のタイマーを設定し、タイムスタンプをtimerStateに保存できます。

if (transaction.getAmount() < SMALL_AMOUNT) {
    // set the flag to true
    flagState.update(true);

    // set the timer and timer state
    long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
    context.timerService().registerProcessingTimeTimer(timer);
    timerState.update(timer);
}

処理時間は実時間であり、オペレータを実行しているマシンのシステムクロックによって決まります。

タイマーが起動すると、KeyedProcessFunction#onTimerを呼び出します。 このメソッドを上書きすることで、フラグをリセットするコールバックを実装できます。

public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
    // remove flag after 1 minute
    timerState.clear();
    flagState.clear();
}

最後に、タイマーをキャンセルするために、登録されているタイマーを削除しタイマーの状態を削除する必要があります。 これをヘルパーメソッドでラップし、flagState.clear()の代わりにこのメソッドを呼び出すことができます。

private void cleanUp(Context ctx) throws Exception {
    // delete timer
    Long timer = timerState.value();
    ctx.timerService().deleteProcessingTimeTimer(timer);

    // clean up all state
    timerState.clear();
    flagState.clear();
}

これで、完全に機能するステートフルな分散ストリーミングアプリケーションが完成しました!

最終的なアプリケーション #

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;

    private transient ValueState<Boolean> flagState;
    private transient ValueState<Long> timerState;

    @Override
    public void open(OpenContext openContext) {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
                "timer-state",
                Types.LONG);
        timerState = getRuntimeContext().getState(timerDescriptor);
    }

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {

        // Get the current state for the current key
        Boolean lastTransactionWasSmall = flagState.value();

        // Check if the flag is set
        if (lastTransactionWasSmall != null) {
            if (transaction.getAmount() > LARGE_AMOUNT) {
                //Output an alert downstream
                Alert alert = new Alert();
                alert.setId(transaction.getAccountId());

                collector.collect(alert);
            }
            // Clean up our state
            cleanUp(context);
        }

        if (transaction.getAmount() < SMALL_AMOUNT) {
            // set the flag to true
            flagState.update(true);

            long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
            context.timerService().registerProcessingTimeTimer(timer);

            timerState.update(timer);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
        // remove flag after 1 minute
        timerState.clear();
        flagState.clear();
    }

    private void cleanUp(Context ctx) throws Exception {
        // delete timer
        Long timer = timerState.value();
        ctx.timerService().deleteProcessingTimeTimer(timer);

        // clean up all state
        timerState.clear();
        flagState.clear();
    }
}

予想される出力 #

提供されたTransactionSourceを使ってこのコードを実行すると、アカウント3に対して不正アラートを発行します。 task managerのログに次の出力が表示されるはずです:

2019-08-19 14:22:06,220 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:11,383 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:16,551 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:21,723 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:26,896 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
inserted by FC2 system