flink实例-欺诈检测

  • Post author:
  • Post category:其他


出自:https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/walkthroughs/datastream_api.html

FraudDetectionJob.java

package spendreport;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.walkthrough.common.sink.AlertSink;

import org.apache.flink.walkthrough.common.entity.Alert;

import org.apache.flink.walkthrough.common.entity.Transaction;

import org.apache.flink.walkthrough.common.source.TransactionSource;

public class FraudDetectionJob {

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Transaction> transactions = env
        .addSource(new TransactionSource())
        .name("transactions");
    
    DataStream<Alert> alerts = transactions
        .keyBy(Transaction::getAccountId)
        .process(new FraudDetector())
        .name("fraud-detector");

    alerts
        .addSink(new AlertSink())
        .name("send-alerts");

    env.execute("Fraud Detection");
}

}

package spendreport;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.util.Collector;

import org.apache.flink.walkthrough.common.entity.Alert;

import org.apache.flink.walkthrough.common.entity.Transaction;

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

private static final long serialVersionUID = 1L;

private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;

private transient ValueState<Boolean> flagState;
private transient ValueState<Long> timerState;

@Override
public void open(Configuration parameters) {
    ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
            "flag",
            Types.BOOLEAN);
    flagState = getRuntimeContext().getState(flagDescriptor);

    ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
            "timer-state",
            Types.LONG);
    timerState = getRuntimeContext().getState(timerDescriptor);
}

@Override
public void processElement(
        Transaction transaction,
        Context context,
        Collector<Alert> collector) throws Exception {

    // Get the current state for the current key
    Boolean lastTransactionWasSmall = flagState.value();

    // Check if the flag is set
    if (lastTransactionWasSmall != null) {
        if (transaction.getAmount() > LARGE_AMOUNT) {
            //Output an alert downstream
            Alert alert = new Alert();
            alert.setId(transaction.getAccountId());

            collector.collect(alert);
        }
        // Clean up our state
        cleanUp(context);
    }

    if (transaction.getAmount() < SMALL_AMOUNT) {
        // set the flag to true
        flagState.update(true);

        long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
        context.timerService().registerProcessingTimeTimer(timer);

        timerState.update(timer);
    }
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
    // remove flag after 1 minute
    timerState.clear();
    flagState.clear();
}

private void cleanUp(Context ctx) throws Exception {
    // delete timer
    Long timer = timerState.value();
    ctx.timerService().deleteProcessingTimeTimer(timer);

    // clean up all state
    timerState.clear();
    flagState.clear();
}

}

在这里插入图片描述

使用提供的代码运行此代码TransactionSource将发出帐户3的欺诈警报。您应该在任务管理器日志中看到以下输出:

2019-08-19 14:22:06,220 INFO org.apache.flink.walkthrough.common.sink.AlertSink – Alert{id=3}

2019-08-19 14:22:11,383 INFO org.apache.flink.walkthrough.common.sink.AlertSink – Alert{id=3}

2019-08-19 14:22:16,551 INFO org.apache.flink.walkthrough.common.sink.AlertSink – Alert{id=3}

2019-08-19 14:22:21,723 INFO org.apache.flink.walkthrough.common.sink.AlertSink – Alert{id=3}

2019-08-19 14:22:26,896 INFO org.apache.flink.walkthrough.common.sink.AlertSink – Alert{id=3}



版权声明:本文为cliper9768原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。