发布日期:2021-11-18 VIP内容

信用卡欺诈检测

在本案例中,我们将构建一个用于对可疑信用卡交易发出警报的欺诈检测系统。通过使用一组简单的规则,Flink可以实现高级业务逻辑和实时操作。

主要内容包括:

实际应用中,数据源将来自外部系统(如Apache Kafka、Rabbit MQ或Apache Pulsar)的数据导入到Flink作业中。而在本案例中,我们将创建一个自定义的数据源,用来模拟生成信用卡交易数据流。该源生成一个无限的信用卡交易流供流程序处理,每个交易包含一个帐户ID (accountId)、事务发生时的时间戳(timestamp)和交易金额(amount)。而附加到源的name只是为了调试目的,所以如果出现错误,我们就知道错误来自何处。

交易流包含来自大量用户的交易数据,因此它需要在多个欺诈检测任务中并行处理。由于欺诈发生在每个帐户的基础上,所以必须确保相同帐户的所有交易都由欺诈检测操作符的相同并行任务处理。

数据源

定义信用卡交易事件类Transaction,代码实现如下:

import java.io.Serializable;
import java.util.Objects;

/**
 *
 * 实体类,代表信用卡交易
 */
public class Transaction implements Serializable{
    public long accountId;     	// 交易账户
    public long timestamp;     	// 交易时间
    public double amount;      	// 交易金额

    public Transaction() { }

    public Transaction(long accountId, long timestamp, double amount) {
        this.accountId = accountId;
        this.timestamp = timestamp;
        this.amount = amount;
    }

    public long getAccountId() {
        return accountId;
    }

    public void setAccountId(long accountId) {
        this.accountId = accountId;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public double getAmount() {
        return amount;
    }

    public void setAmount(double amount) {
        this.amount = amount;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        } else if (o == null || getClass() != o.getClass()) {
            return false;
        }
        Transaction that = (Transaction) o;
        return accountId == that.accountId &&
                timestamp == that.timestamp &&
                Double.compare(that.amount, amount) == 0;
    }

    @Override
    public int hashCode() {
        return Objects.hash(accountId, timestamp, amount);
    }

    @Override
    public String toString() {
        return "Transaction{" +
                "accountId=" + accountId +
                ", timestamp=" + timestamp +
                ", amount=" + amount +
                '}';
    }
}

自定义数据源类MyTransactionSource,代码实现如下:

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;

/**
 *
 * 自定义数据源,继承自SourceFunction
 */
public class MyTransactionSource implements SourceFunction {
    private static final long serialVersionUID = 1L;

    private static final Timestamp INITIAL_TIMESTAMP = Timestamp.valueOf("2021-01-01 00:00:00");
    private static final long SIX_MINUTES = 6 * 60 * 1000;

    private final boolean bounded;      // 标志变量,指示生成流数据还是批数据

    private int index = 0;              	// 交易记录的索引
    private long timestamp;             // 交易发生的时间戳

    private volatile boolean isRunning = true;
    private List data = null;

    public MyTransactionSource(boolean bounded){
        this.bounded = bounded;
        this.timestamp = INITIAL_TIMESTAMP.getTime();

        // 事先存储的信用卡交易数据,在实际中来自于外部数据源系统(如Kafka)
        data = Arrays.asList(
                new Transaction(1, 0L, 188.23),
                new Transaction(2, 0L, 374.79),
                new Transaction(3, 0L, 112.15),
                new Transaction(4, 0L, 478.75),
                new Transaction(5, 0L, 208.85),
                new Transaction(1, 0L, 379.64),
                new Transaction(2, 0L, 351.44),
                new Transaction(3, 0L, 320.75),
                new Transaction(4, 0L, 259.42),
                new Transaction(5, 0L, 273.44),
                new Transaction(1, 0L, 267.25),
                new Transaction(2, 0L, 397.15),
                new Transaction(3, 0L, 0.219),
                new Transaction(4, 0L, 231.94),
                new Transaction(5, 0L, 384.73),
                new Transaction(1, 0L, 419.62),
                new Transaction(2, 0L, 412.91),
                new Transaction(3, 0L, 0.77),
                new Transaction(4, 0L, 22.10),
                new Transaction(5, 0L, 377.54),
                new Transaction(1, 0L, 375.44),
                new Transaction(2, 0L, 230.18),
                new Transaction(3, 0L, 0.80),
                new Transaction(4, 0L, 350.89),
                new Transaction(5, 0L, 127.55),
                new Transaction(1, 0L, 483.91),
                new Transaction(2, 0L, 228.22),
                new Transaction(3, 0L, 871.15),
                new Transaction(4, 0L, 64.19),
                new Transaction(5, 0L, 79.43),
                new Transaction(1, 0L, 56.12),
                new Transaction(2, 0L, 256.48),
                new Transaction(3, 0L, 148.16),
                new Transaction(4, 0L, 199.95),
                new Transaction(5, 0L, 252.37),
                new Transaction(1, 0L, 274.73),
                new Transaction(2, 0L, 473.54),
                new Transaction(3, 0L, 119.92),
                new Transaction(4, 0L, 323.59),
                new Transaction(5, 0L, 353.16),
                new Transaction(1, 0L, 211.90),
                new Transaction(2, 0L, 280.93),
                new Transaction(3, 0L, 347.89),
                new Transaction(4, 0L, 459.86),
                new Transaction(5, 0L, 82.31),
                new Transaction(1, 0L, 373.26),
                new Transaction(2, 0L, 479.83),
                new Transaction(3, 0L, 454.25),
                new Transaction(4, 0L, 83.64),
                new Transaction(5, 0L, 292.44)
        );
    }

    @Override
    public void run(SourceContext<Transaction> sourceContext) throws Exception {
        while(this.isRunning && this.hasNext()) {
            sourceContext.collect(this.next());
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

    private boolean hasNext() {
        // 如果还有数据
        if (index < data.size()) {
            return true;
        }
        // 如果是用于生成批数据
        else if(bounded){
            return false;
        }
        // 如果是用于生成流数据,从头循环
        else {
            index = 0;
            return true;
        }

    }

    // 生成下一个交易数据,交易时间相隔6分钟
    private Transaction next() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        Transaction transaction = data.get(index++);
        transaction.setTimestamp(timestamp);
        timestamp += SIX_MINUTES;
        return transaction;
    }
}

另外,对于检测到的疑似欺诈事件,使用如下的Alert类来表示:

import java.util.Objects;

/**
 * 实体类,代表告警信息
 */
public class Alert {
    private long id;

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        } else if (o == null || getClass() != o.getClass()) {
            return false;
        }
        Alert alert = (Alert) o;
        return id == alert.id;
    }

    @Override
    public int hashCode() {
        return Objects.hash(id);
    }

    @Override
    public String toString() {
        return "Alert{" +
                "id=" + id +
                '}';
    }
}

V1-版本一

根据我们的分析,欺诈操作通常所具有的特征是:先花很小的金额(例如1元或更少)尝试购买一件或多件小物品来测试被盗的号码。如果发现购买有效,他们就会进行更大规模的购买。因此,针对任何在完成小额交易之后紧接着进行大额交易的账户,欺诈检测器应该输出一个警报。

这里我们对小额交易的定义是交易金额小于$1.00,大额交易的定义是交易金额大于$500。假设我们的欺诈检测器为某个帐户处理以下交易流。

要实现这样的功能,欺诈检测器必须记住跨交易的信息;一笔大额交易只有在前一笔交易额很小的情况下才算欺诈。跨交易记忆信息需要状态,所以应该使用KeyedProcessFunction。它提供了对状态和时间的细粒度控制,可根据复杂的需求改进算法。

在本例中,key是当前交易的帐户id(由keyBy()声明),而欺诈检测器为每个帐户维护一个独立的状态。ValueState是使用ValueStateDescriptor创建的,它包含关于Flink应该如何管理该变量的元数据。应该在函数开始处理数据之前注册状态,通常是在open()方法中实现的。

Scala代码:

@SerialVersionUID(1L)
class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {

  @transient private var flagState: ValueState[java.lang.Boolean] = _

  @throws[Exception]
  override def open(parameters: Configuration): Unit = {
    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
    flagState = getRuntimeContext.getState(flagDescriptor)
  }

Java代码:

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private transient ValueState<Boolean> flagState;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);
    }

在下面的代码中,使用标记状态跟踪潜在欺诈交易。

Scala代码:

    override def processElement(
          transaction: Transaction,
          context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
          collector: Collector[Alert]): Unit = {

        // 获取当前key的当前状态
        val lastTransactionWasSmall = flagState.value

        // 检查是否设置了标志 flag
        if (lastTransactionWasSmall != null) {
          if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
            // 输出下游警报
            val alert = new Alert
            alert.setId(transaction.getAccountId)

            collector.collect(alert)
          }
          // 清除状态
          flagState.clear()
        }

        if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
          // 设置标志 flag 为 true
          flagState.update(true)
        }
    }

Java代码:

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {

        // 获取当前key的当前状态
        Boolean lastTransactionWasSmall = flagState.value();

        // 检查是否设置了标志
        if (lastTransactionWasSmall != null) {
            if (transaction.getAmount() > LARGE_AMOUNT) {
                // 向下游输出警报
                Alert alert = new Alert();
                alert.setId(transaction.getAccountId());

                collector.collect(alert);            
            }

            // 清楚状态
            flagState.clear();
        }

        if (transaction.getAmount() < SMALL_AMOUNT) {
            // 将flag设置为true
            flagState.update(true);
        }
    }

对于每个交易,欺诈检测器检查该帐户的标志状态。记住,ValueState的作用域始终是当前key,即账户。如果标志为非空,说明该帐户上一次的交易额很小,因此如果这次交易的金额很大,则该检测器输出一个欺诈警报。

检查之后,该标志状态将被无条件清除。要么是当前交易导致了欺诈警报,并且模式已经结束,要么是当前交易没有导致警报,模式已经被打破,需要重新建立。

最后,检查交易金额,看看它是否小额。如果是,则设置该标志,以便下一个事件可以检查它。注意,ValueState实际上有三种状态,unset (null)、true和false。本例中仅使用unset (null)和true来检查是否设置了标志。

V2-版本二,状态 + 时间 = 心跳

在先尝试用小额购买之后,骗子不会等很长时间再来进行大的购买,以减少他们的测试交易被注意到的机会。假设我们想要为欺诈检测器设置1分钟的超时,例如,在前面的例子中,交易3和交易4只有在间隔1分钟内发生时才被认为是欺诈。Flink的KeyedProcessFunction提供了设置定时器的方法,我们可以设置在将来某个时间点调用回调方法的定时器。

我们需要修改之前的作业,以符合新的要求:

  • 每当将标志flag设置为true,也要设置一个1分钟的定时器。
  • 当定时器触发时,通过清除其状态来重置标记。
  • 如果标记被清除,那么定时器应该被取消。

要取消定时器,就必须记住它的设置时间,而记住意味着状态,因此我们从创建计时器状态和标记状态开始。

Scala代码:

@SerialVersionUID(1L)
class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {

  @transient private var flagState: ValueState[java.lang.Boolean] = _	// 标志状态
  @transient private var timerState: ValueState[java.lang.Long] = _		// 计时器状态

  @throws[Exception]
  override def open(parameters: Configuration): Unit = {
    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
    flagState = getRuntimeContext.getState(flagDescriptor)

    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
    timerState = getRuntimeContext.getState(timerDescriptor)
  }

Java代码:

    private transient ValueState<Boolean> flagState;		// 标志状态
    private transient ValueState<Long> timerState;		// 计算器状态

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
                "timer-state",
                Types.LONG);
        timerState = getRuntimeContext().getState(timerDescriptor);
    }

KeyedProcessFunction#processElement被调用,其传入参数Context包含一个定时器服务。定时器服务可用于查询当前时间、注册定时器和删除定时器。这样,可以在每次设置标志时设置1分钟的定时器,并将时间戳存储在timerState中。

Scala代码:

if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
   // 将标志设为true
   flagState.update(true)

   // 设置定时器和定时器状态
   val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
   context.timerService.registerProcessingTimeTimer(timer)
   timerState.update(timer)
}

Java代码:

if (transaction.getAmount() < SMALL_AMOUNT) {
    // 将标志设为true
    flagState.update(true);

    // 设置定时器和定时器状态
    long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
    context.timerService().registerProcessingTimeTimer(timer);		// 注册定时器
    timerState.update(timer);
}

处理时间为挂钟时间,由运行操作符的机器的系统时钟决定。当定时器触发时,它调用KeyedProcessFunction#onTimer方法。重写此方法实现回调以重置标记。

Scala代码:

override def onTimer(
    timestamp: Long,
    ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
    out: Collector[Alert]): Unit = {
  // 1分钟后删除定时器
  timerState.clear()
  flagState.clear()
}

Java代码:

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) {
    // 1分钟后删除定时器
    timerState.clear();
    flagState.clear();
}

最后,要取消定时器,需要删除已注册的定时器并删除定时器状态。我们将其封装在辅助方法中并调用该方法,而不是直接调用flagState.clear()。

Scala代码:

@throws[Exception]
private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
  // 删除定时器
  val timer = timerState.value
  ctx.timerService.deleteProcessingTimeTimer(timer)

  // 清除所有状态
  timerState.clear()
  flagState.clear()
}

Java代码:

private void cleanUp(Context ctx) throws Exception {
    // 删除定时器
    Long timer = timerState.value();
    ctx.timerService().deleteProcessingTimeTimer(timer);

    // 清除所有状态
    timerState.clear();
    flagState.clear();
}

这就完成了一个功能齐全、有状态的分布式流应用程序!

完整代码如下。

Scala代码:

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.walkthrough.common.entity.Alert
import org.apache.flink.walkthrough.common.entity.Transaction

object FraudDetector {
  val SMALL_AMOUNT: Double = 1.00
  val LARGE_AMOUNT: Double = 500.00
  val ONE_MINUTE: Long  = 60 * 1000L
}

@SerialVersionUID(1L)
class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {

  @transient private var flagState: ValueState[java.lang.Boolean] = _
  @transient private var timerState: ValueState[java.lang.Long] = _

  @throws[Exception]
  override def open(parameters: Configuration): Unit = {
    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
    flagState = getRuntimeContext.getState(flagDescriptor)

    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
    timerState = getRuntimeContext.getState(timerDescriptor)
  }

  override def processElement(
      transaction: Transaction,
      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
      collector: Collector[Alert]): Unit = {

    // 获取当前key的当前状态
    val lastTransactionWasSmall = flagState.value

    // 检查是否设置了标志 flag
    if (lastTransactionWasSmall != null) {
      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
        // 输出下游警报
        val alert = new Alert
        alert.setId(transaction.getAccountId)

        collector.collect(alert)
      }
      // 清除状态
      cleanUp(context)
    }

    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
      // 设置标志 flag 为 true
      flagState.update(true)
      val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE

      context.timerService.registerProcessingTimeTimer(timer)
      timerState.update(timer)
    }
  }

  override def onTimer(
      timestamp: Long,
      ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
      out: Collector[Alert]): Unit = {
    // 1分钟后移除flag
    timerState.clear()
    flagState.clear()
  }

  @throws[Exception]
  private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
    // 删除timer
    val timer = timerState.value
    ctx.timerService.deleteProcessingTimeTimer(timer)

    // 清楚所有状态
    timerState.clear()
    flagState.clear()
  }
}

Java代码:

import com.xueai8.fraud.entity.Alert;
import com.xueai8.fraud.entity.Transaction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 自定义欺诈检测器,继承自KeyedProcessFunction类
 */
class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert>{

	private static final double SMALL_AMOUNT = 1.00;
	private static final double LARGE_AMOUNT = 500.00;
	private static final long ONE_MINUTE = 60 * 1000L;

	private transient ValueState<Boolean> flagState;			// 标志状态
	private transient ValueState<Long> timerState;				// 定时器状态

	// 在开始处理之前先注册状态 flag
	@Override
	public void open(Configuration parameters) throws Exception {
		super.open(parameters);

		ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>("flag", Types.BOOLEAN);
		flagState = this.getRuntimeContext().getState(flagDescriptor);

		ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>("timer-state",Types.LONG);
		timerState = this.getRuntimeContext().getState(timerDescriptor);
	}

	/**
     * 对于每个交易事件,这个方法都会被调用。
	 * 这个版本在每个交易上生成一个警告。
	 * @param transaction
     * @param context
     * @param collector
     * @throws Exception
	 */
	@Override
	public void processElement(Transaction transaction,
							   Context context,
							   Collector<Alert> collector) throws Exception {
		// 获取当前key的当前状态
		Boolean lastTransactionWasSmall = flagState.value();

		// 检查是否设置了标志 flag
		if (lastTransactionWasSmall != null) {
			if (transaction.getAmount() > LARGE_AMOUNT) {
				// 输出下游警报
				Alert alert = new Alert();
				alert.setId(transaction.getAccountId());
				collector.collect(alert);
			}
			// 清除状态
			cleanUp(context);
		}

		// 如果当前交易金额超低(< 1.00)
		if (transaction.getAmount() < SMALL_AMOUNT) {
			// 将标志设为true
			flagState.update(true);

			// 设置定时器
			long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
			context.timerService().registerProcessingTimeTimer(timer);

			// 设置定时器状态
			timerState.update(timer);
		}
	}

	@Override
	public void onTimer(long timestamp, OnTimerContext ctx, Collector out) {
		// 1分钟后删除定时器
		timerState.clear();
		flagState.clear();
	}

	private void cleanUp(Context ctx) throws Exception {
		// 删除定时器
		Long timer = timerState.value();
		ctx.timerService().deleteProcessingTimeTimer(timer);

		// 清除所有状态
		timerState.clear();
		flagState.clear();
	}
}

测试程序如下。

测试程序如下。

Java代码:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FraudDetectionJob {

	public static void main(String[] args) throws Exception {
		// 设置流执行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 设置自定义数据源
		DataStream<Transaction> transactions = env
				.addSource(new MyTransactionSource(false))
				.name("transactions");

		// 测试并查看生成的模拟交易数据
		transactions.print();

		// 欺诈检测并生成告警信息
		DataStream<Alert> alerts = transactions
				.keyBy(transaction -> transaction.accountId)
          		        .process(new FraudDetector2())
				.name("fraud-detector");

		alerts.print();

		// 执行流程序
		env.execute("Transaction Stream");
	}
}

使用已准备好的 TransactionSource 数据源运行这个代码,将会检测到账户3的欺诈行为,并输出报警信息。能够在task manager的日志中看到下边输出:

2019-08-19 14:22:06,220 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
2019-08-19 14:22:11,383 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
2019-08-19 14:22:16,551 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
2019-08-19 14:22:21,723 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}
2019-08-19 14:22:26,896 INFO  org.apache.flink.walkthrough.common.sink.AlertSink            - Alert{id=3}