1、 版本&语言
- Flink 1.7.2
- Java 1.8
- Kafka 0.10.2
2、实时热门商品统计(窗口聚合、窗口分组、TopN)
2.1 需求
- 窗口(1小时)滑窗为(5分钟)实时统计前N个热门商品(TopN)
2.2 数据
2.2.1 数据来源
2.2.2 数据存储
2.2.3 数据格式
543462,1715,1464116,pv,1511658000
2.3 实现
2.3.1 核心逻辑
DataStream dataStream = source
.map(new ParseSourceData())
.assignTimestampsAndWatermarks(new WatermarkExtractor());
DataStream processedStream = dataStream
.filter((FilterFunction<UserBehavior>) value -> value.getBehavior().equals("pv"))
.keyBy("itemId")
.timeWindow( Time.hours(1), Time.minutes(5) )
.aggregate( new CountAgg(), new WindowResult() ) // 窗口聚合
.keyBy("windowEnd") // 按照窗口分组
.process( new TopNHotItems() );
2.3.2 UserBehavior.class
@Data
public class UserBehavior {
private long userId;
private long itemId;
private long categoryId;
private String behavior;
private long timestamp;
}
2.3.3 ParseSourceData
private static class ParseSourceData implements MapFunction<String, UserBehavior> {
@Override
public UserBehavior map(String s) throws Exception {
String[] userBehavior = s.split(",");
return new UserBehavior(Long.valueOf(userBehavior[0].trim()), Long.valueOf(userBehavior[1].trim()), Long.valueOf(userBehavior[2].trim()), userBehavior[3],Long.valueOf(userBehavior[4].trim()));
}
}
2.3.4 WatermarkExtractor
private static class WatermarkExtractor extends AscendingTimestampExtractor<UserBehavior> {
@Override
public long extractAscendingTimestamp(UserBehavior userBehavior) {
// 原始数据单位秒,将其转成毫秒
return userBehavior.getTimestamp() * 1000;
}
}
2.3.5 CountAgg
public class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
public Long createAccumulator(){
return 0L;
}
public Long add(UserBehavior in, Long acc){
return acc + 1;
}
public Long getResult(Long acc){
return acc;
}
public Long merge(Long a, Long b){
return a + b;
}
}
2.3.6 WindowResult
private static class WindowResult implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
@Override
public void apply(Tuple key, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
Long itemId = ((Tuple1<Long>) key).f0;
out.collect( new ItemViewCount(itemId, window.getEnd(), input.iterator().next()) );
}
}
2.3.7 TopNHotItems
public class TopNHotItems extends KeyedProcessFunction<Long, ItemViewCount, String> {
private int topSize = 3;
private ListState<ItemViewCount> itemState;
@Override
public void open(Configuration parameters) {
itemState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("item-state",ItemViewCount.class) );
}
@Override
public void processElement(ItemViewCount value, Context ctx, Collector<String> collector) throws Exception {
itemState.add(value);
ctx.timerService().registerEventTimeTimer( value.getWindowEnd() + 1 );
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
List<ItemViewCount> allItems = new ArrayList();
for(ItemViewCount item : itemState.get()){
allItems.add(item);
}
allItems.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return (int) (o2.getCount() - o1.getCount());
}
});
List<ItemViewCount> sortedItems = new ArrayList();
for (int i = 0; i < topSize; i++) {
sortedItems.add(allItems.get(i));
}
// 清空状态
itemState.clear();
// 将排名结果格式化输出
StringBuilder result = new StringBuilder();
result.append("时间:").append( new Timestamp( timestamp - 1 ) ).append("\n");
// 输出每一个商品的信息
for(int i = 0; i < topSize; i++){
ItemViewCount currentItem = sortedItems.get(i);
result.append("No").append(i + 1).append(":")
.append(" 商品ID=").append(currentItem.getItemId())
.append(" 浏览量=").append(currentItem.getCount())
.append("\n");
}
result.append("================================");
// 控制输出频率
Thread.sleep(1000);
out.collect(result.toString());
}
}
3、订单超时失效(基于CEP)
3.1 需求
3.2 数据
字段 | 描述 |
---|
orderId | 订单ID |
eventType | 订单状态(create、pay) |
txId | 交易编号(仅eventType为pay时,有值) |
eventTime | 订单发生状态的时间 |
3.3 实现
3.3.1 读取数据
val orderEventStream = env.socketTextStream("localhost", 7777)
.map(data => {
val dataArray = data.split(",")
OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
})
.assignAscendingTimestamps(_.eventTime * 1000L)
.keyBy(_.orderId)
3.3.2 定义一个Patten(类似于12306买票,一定时间内订单从创建到完成支付,视为订单完成)
val orderPayPattern = Pattern.begin[OrderEvent]("begin").where(_.eventType == "create")
.followedBy("follow").where(_.eventType == "pay")
.within(Time.minutes(30))
3.3.3 超时预警
val patternStream = CEP.pattern(orderEventStream, orderPayPattern)
val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeout")
val resultStream = patternStream.select(orderTimeoutOutputTag,
new OrderTimeoutSelect(),
new OrderPaySelect())
resultStream.print("payed")
resultStream.getSideOutput(orderTimeoutOutputTag).print("timeout")
4、实时对账
4.1 需求
4.2 数据
字段 | 描述 |
---|
txId | 交易编号 |
payChannel | 支付渠道(wechat、alipay) |
eventTime | 支付时间 |
4.3 实现
4.3.1 核心逻辑(双流Join)
// 定义接收流事件的样例类
case class ReceiptEvent(txId: String, payChannel: String, eventTime: Long)
object TxMacthDetect {
// 定义侧数据流tag
val unmatchedPays = new OutputTag[OrderEvent]("unmatchedPays")
val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatchedReceipts")
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 读取订单事件流
val resource = getClass.getResource("/OrderLog.csv")
val orderEventStream = env.readTextFile(resource.getPath)
// val orderEventStream = env.socketTextStream("localhost", 7777)
.map(data => {
val dataArray = data.split(",")
OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
})
.filter(_.txId != "")
.assignAscendingTimestamps(_.eventTime * 1000L)
.keyBy(_.txId)
// 读取支付到账事件流
val receiptResource = getClass.getResource("/ReceiptLog.csv")
val receiptEventStream = env.readTextFile(receiptResource.getPath)
// val receiptEventStream = env.socketTextStream("localhost", 8888)
.map( data => {
val dataArray = data.split(",")
ReceiptEvent( dataArray(0).trim, dataArray(1).trim, dataArray(2).toLong )
} )
.assignAscendingTimestamps(_.eventTime * 1000L)
.keyBy(_.txId)
// 将两条流连接起来,共同处理
val processedStream = orderEventStream.connect(receiptEventStream)
.process( new TxPayMatch() )
processedStream.print("matched")
processedStream.getSideOutput(unmatchedPays).print("unmatchedPays")
processedStream.getSideOutput(unmatchedReceipts).print("unmatchReceipts")
env.execute("tx match job")
}
class TxPayMatch() extends CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]{
// 定义状态来保存已经到达的订单支付事件和到账事件
lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(
new ValueStateDescriptor[OrderEvent]("pay-state", classOf[OrderEvent])
)
lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(
new ValueStateDescriptor[ReceiptEvent]("receipt-state", classOf[ReceiptEvent])
)
// 订单支付事件数据的处理
override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent,
(OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
// 判断有没有对应的到账事件
val receipt = receiptState.value()
if( receipt != null ){
// 如果已经有receipt,在主流输出匹配信息,清空状态
out.collect((pay, receipt))
receiptState.clear()
} else {
// 如果还没到,那么把pay存入状态,并且注册一个定时器等待
payState.update(pay)
ctx.timerService().registerEventTimeTimer( pay.eventTime * 1000L + 5000L )
}
}
// 到账事件的处理
override def processElement2(receipt: ReceiptEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent,
(OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
// 同样的处理流程
val pay = payState.value()
if( pay != null ){
out.collect((pay, receipt))
payState.clear()
} else {
receiptState.update(receipt)
ctx.timerService().registerEventTimeTimer( receipt.eventTime * 1000L + 5000L )
}
}
override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent,
(OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
// 到时间了,如果还没有收到某个事件,那么输出报警信息
if( payState.value() != null ){
// recipt没来,输出pay到侧输出流
ctx.output(unmatchedPays, payState.value())
}
if( receiptState.value() != null ){
ctx.output(unmatchedReceipts, receiptState.value())
}
payState.clear()
receiptState.clear()
}
}
}