Flink Learning Project :基于Flink的实时热门商品统计和订单支付模块

逄烨
2023-12-01

1、 版本&语言

  • Flink 1.7.2
  • Java 1.8
  • Kafka 0.10.2

2、实时热门商品统计(窗口聚合、窗口分组、TopN)

2.1 需求

  • 窗口(1小时)滑窗为(5分钟)实时统计前N个热门商品(TopN)

2.2 数据

2.2.1 数据来源

  • 用户行为日志

2.2.2 数据存储

  • Kafka

2.2.3 数据格式

543462,1715,1464116,pv,1511658000

2.3 实现

2.3.1 核心逻辑

DataStream dataStream = source
    .map(new ParseSourceData())
    .assignTimestampsAndWatermarks(new WatermarkExtractor());
DataStream processedStream = dataStream
    .filter((FilterFunction<UserBehavior>) value -> value.getBehavior().equals("pv"))
    .keyBy("itemId")
    .timeWindow( Time.hours(1), Time.minutes(5) )
    .aggregate( new CountAgg(), new WindowResult() )   // 窗口聚合
    .keyBy("windowEnd")    // 按照窗口分组
    .process( new TopNHotItems() );

2.3.2 UserBehavior.class

@Data
public class UserBehavior {
    private long userId;
    private long itemId;
    private long categoryId;
    private String behavior;
    private long timestamp;
}

2.3.3 ParseSourceData

private static class ParseSourceData implements MapFunction<String, UserBehavior> {
    @Override
    public UserBehavior map(String s) throws Exception {
        String[] userBehavior = s.split(",");
        return new UserBehavior(Long.valueOf(userBehavior[0].trim()), Long.valueOf(userBehavior[1].trim()), Long.valueOf(userBehavior[2].trim()), userBehavior[3],Long.valueOf(userBehavior[4].trim()));
    }
}

2.3.4 WatermarkExtractor

private static class WatermarkExtractor extends AscendingTimestampExtractor<UserBehavior> {
    @Override
    public long extractAscendingTimestamp(UserBehavior userBehavior) {
    // 原始数据单位秒,将其转成毫秒
    return userBehavior.getTimestamp() * 1000;
    }
}

2.3.5 CountAgg

public class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {

    public Long createAccumulator(){
        return 0L;
    }

    public Long add(UserBehavior in, Long acc){
        return acc + 1;
    }

    public Long getResult(Long acc){
        return acc;
    }

    public Long merge(Long a, Long b){
        return a + b;
    }

}

2.3.6 WindowResult

private static class WindowResult implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
    @Override
    public void apply(Tuple key, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
        Long itemId = ((Tuple1<Long>) key).f0;
        out.collect( new ItemViewCount(itemId, window.getEnd(), input.iterator().next()) );
    }
}

2.3.7 TopNHotItems

public class TopNHotItems extends KeyedProcessFunction<Long, ItemViewCount, String> {

    private int topSize = 3;

    private ListState<ItemViewCount> itemState;

    @Override
    public void open(Configuration parameters) {
        itemState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("item-state",ItemViewCount.class) );
    }

    @Override
    public void processElement(ItemViewCount value, Context ctx, Collector<String> collector) throws Exception {
        itemState.add(value);
        ctx.timerService().registerEventTimeTimer( value.getWindowEnd() + 1 );
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {

        List<ItemViewCount> allItems = new ArrayList();

        for(ItemViewCount item : itemState.get()){
            allItems.add(item);
        }

        allItems.sort(new Comparator<ItemViewCount>() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {
                return (int) (o2.getCount() - o1.getCount());
            }
        });

        List<ItemViewCount> sortedItems = new ArrayList();

        for (int i = 0; i < topSize; i++) {
            sortedItems.add(allItems.get(i));
        }

        // 清空状态
        itemState.clear();

        // 将排名结果格式化输出
        StringBuilder result = new StringBuilder();
        result.append("时间:").append( new Timestamp( timestamp - 1 ) ).append("\n");
        // 输出每一个商品的信息
        for(int i = 0; i < topSize; i++){
            ItemViewCount currentItem = sortedItems.get(i);
            result.append("No").append(i + 1).append(":")
                    .append(" 商品ID=").append(currentItem.getItemId())
                    .append(" 浏览量=").append(currentItem.getCount())
                    .append("\n");
        }
        result.append("================================");
        // 控制输出频率
        Thread.sleep(1000);
        out.collect(result.toString());
    }

}

3、订单超时失效(基于CEP)

3.1 需求

  • 对于超时未支付的订单进行匹配

3.2 数据

字段描述
orderId订单ID
eventType订单状态(create、pay)
txId交易编号(仅eventType为pay时,有值)
eventTime订单发生状态的时间

3.3 实现

3.3.1 读取数据

val orderEventStream = env.socketTextStream("localhost", 7777)
      .map(data => {
        val dataArray = data.split(",")
        OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
      })
      .assignAscendingTimestamps(_.eventTime * 1000L)
      .keyBy(_.orderId)

3.3.2 定义一个Patten(类似于12306买票,一定时间内订单从创建到完成支付,视为订单完成)

val orderPayPattern = Pattern.begin[OrderEvent]("begin").where(_.eventType == "create")
      .followedBy("follow").where(_.eventType == "pay")
      .within(Time.minutes(30))

3.3.3 超时预警

val patternStream = CEP.pattern(orderEventStream, orderPayPattern)
val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeout")
val resultStream = patternStream.select(orderTimeoutOutputTag,
    new OrderTimeoutSelect(),
    new OrderPaySelect())
resultStream.print("payed")
resultStream.getSideOutput(orderTimeoutOutputTag).print("timeout")

4、实时对账

4.1 需求

  • 实时对账

4.2 数据

字段描述
txId交易编号
payChannel支付渠道(wechat、alipay)
eventTime支付时间

4.3 实现

4.3.1 核心逻辑(双流Join)

// 定义接收流事件的样例类
case class ReceiptEvent(txId: String, payChannel: String, eventTime: Long)

object TxMacthDetect {
  // 定义侧数据流tag
  val unmatchedPays = new OutputTag[OrderEvent]("unmatchedPays")
  val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatchedReceipts")

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 读取订单事件流
    val resource = getClass.getResource("/OrderLog.csv")
    val orderEventStream = env.readTextFile(resource.getPath)
//    val orderEventStream = env.socketTextStream("localhost", 7777)
      .map(data => {
      val dataArray = data.split(",")
      OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
    })
      .filter(_.txId != "")
      .assignAscendingTimestamps(_.eventTime * 1000L)
      .keyBy(_.txId)

    // 读取支付到账事件流
    val receiptResource = getClass.getResource("/ReceiptLog.csv")
    val receiptEventStream = env.readTextFile(receiptResource.getPath)
//    val receiptEventStream = env.socketTextStream("localhost", 8888)
      .map( data => {
        val dataArray = data.split(",")
        ReceiptEvent( dataArray(0).trim, dataArray(1).trim, dataArray(2).toLong )
      } )
      .assignAscendingTimestamps(_.eventTime * 1000L)
      .keyBy(_.txId)

    // 将两条流连接起来,共同处理
    val processedStream = orderEventStream.connect(receiptEventStream)
      .process( new TxPayMatch() )

    processedStream.print("matched")
    processedStream.getSideOutput(unmatchedPays).print("unmatchedPays")
    processedStream.getSideOutput(unmatchedReceipts).print("unmatchReceipts")

    env.execute("tx match job")
  }

  class TxPayMatch() extends CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]{
    // 定义状态来保存已经到达的订单支付事件和到账事件
    lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(
      new ValueStateDescriptor[OrderEvent]("pay-state", classOf[OrderEvent])
    )

    lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(
      new ValueStateDescriptor[ReceiptEvent]("receipt-state", classOf[ReceiptEvent])
    )

    // 订单支付事件数据的处理
    override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent,
      (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
      // 判断有没有对应的到账事件
      val receipt = receiptState.value()
      if( receipt != null ){
        // 如果已经有receipt,在主流输出匹配信息,清空状态
        out.collect((pay, receipt))
        receiptState.clear()
      } else {
        // 如果还没到,那么把pay存入状态,并且注册一个定时器等待
        payState.update(pay)
        ctx.timerService().registerEventTimeTimer( pay.eventTime * 1000L + 5000L )
      }
    }

    // 到账事件的处理
    override def processElement2(receipt: ReceiptEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent,
      (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
      // 同样的处理流程
      val pay = payState.value()
      if( pay != null ){
        out.collect((pay, receipt))
        payState.clear()
      } else {
        receiptState.update(receipt)
        ctx.timerService().registerEventTimeTimer( receipt.eventTime * 1000L + 5000L )
      }
    }

    override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent,
      (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
      // 到时间了,如果还没有收到某个事件,那么输出报警信息
      if( payState.value() != null ){
        // recipt没来,输出pay到侧输出流
        ctx.output(unmatchedPays, payState.value())
      }
      if( receiptState.value() != null ){
        ctx.output(unmatchedReceipts, receiptState.value())
      }
      payState.clear()
      receiptState.clear()
    }
  }
}
 类似资料: