当前位置: 首页 > 知识库问答 >
问题:

空闲源计时器上的Flink Keyedprocess函数

景靖琪
2023-03-14

我有一个听Kafka的Flink过程。然后,将消耗的消息保存在并发哈希映射中一段时间,然后需要将其放入cassandra。

操作员链类似于

DataStream<Message> datastream = KafkaSource.createsource();
DataStream<Message> decodededMessage = datastream.flatmap(new DecodeMessage());

decodedMessage.assigneTimestampsandWatermarks(new AscendingTimestampExtractor<Message>(){

  public long extractAscendingTimestamp(Message m){

      return message.getTimestamp();
  }
  
}).keyBy((KeySelector<Message>) x-> x.getID())
  .process(new Timerfunction())
  .addSink(new MySink());



class TimerFunction extends KeyedProcessFunction<Integer,Message,Message>{

    private ValueState<Message> x;

    public void processElement(){
//some logic to create timestamp for one minute

     context.timerService().registerEventTimeTimer(x.getTimestamp());
     }
  
    public void onTimer()
  // output values on trigger
}


在使用EventTime时,我有一些疑问

  • 消息将具有唯一的id、时间戳和其他一些属性。一分钟内可能有一百万个唯一的钥匙。keyBy操作会影响性能吗

我需要涵盖以下场景

>

  • ID为1的X消息在8小时1分1秒到达

    ID为2的Y消息在8小时1分4秒到达

    由于我使用Id作为密钥,这两条消息都应该设置一个计时器,以8小时2分钟0秒触发。根据闪烁留档,如果计时器的时间戳相同,它将只触发一次。我面临一个问题,源空闲几分钟,计时器一直在等待下一个水印,永远不会触发。如何处理空闲源?

    在这种情况下,使用processingtime是更好的选择吗?

    我也有使用Flink v1的限制。8所以需要一些关于该版本的信息。

    提前感谢

  • 共有1个答案

    满子实
    2023-03-14

    我不完全理解你的问题;缺少太多背景。但是我可以提供几点:

    (1) keyBy代价高昂:它强制序列化/反序列化以及网络洗牌。

    (2) 当且仅当计时器用于相同的时间戳和相同的密钥时,才会对其进行重复数据消除。

    (3) 至于空闲源问题,当事件再次开始流动时,事件时间计时器将最终触发,因为这将推进水印。如果迫不及待,可以使用https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java,或切换到处理时间。

     类似资料:
    • 打印空闲线程钩子函数的运行次数 打印空闲线程钩子函数的运行次数 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the first v

    • 我有两个WebServices。一个webservice调用另一个webservice。这些web服务托管在两个不同的AWS实例上。如果被调用的webservice空闲了一段时间,我会得到这个异常- com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:从服务器成功接收的最后一个数据包是在49,381,933毫秒之前。最后一个成功发送到服务

    • 函数功能:获得设备内存总量 函数方法 free = device.getFreeMemory() 返回值 类型 说明 free string 设备空闲内存,单位:字节 函数用例 free = device.getFreeMemory() dialog(free,5000) 注意事项 目前积木编程函数和触动精灵函数不通用,请仔细查看本手册,此手册中函数仅支持积木编程,不支持触动精灵,同理请勿将触动

    • 问题内容: 如何在Go HTTP客户端中使空闲超时? 空闲超时意味着从HTTP客户端内部调用Conn接口的Read / Write方法时发生超时。当客户端下载文件并且由于读取超时而导致下载失败时,此功能很有用。 问题答案: 您需要创建自己的,并返回,以设置适当的读写期限。 该会是这个样子:

    • 检测空闲连接和超时是为了及时释放资源。常见的方法发送消息用于测试一个不活跃的连接来,通常称为“心跳”,到远端来确定它是否还活着。(一个更激进的方法是简单地断开那些指定的时间间隔的不活跃的连接)。 处理空闲连接是一项常见的任务,Netty 提供了几个 ChannelHandler 实现此目的。表8.4概述。 Table 8.4 ChannelHandlers for idle connections

    • 问题内容: 我正在运行通过node- mysql模块连接到MySQL的Node服务器。连接和查询MySQL最初运行良好,没有任何错误,但是,将Node服务器闲置几个小时后的第一个查询会导致错误。错误是熟悉的,来自node-mysql模块的内部。 堆栈跟踪(请注意,跟踪的三个条目属于我的应用程序的错误报告代码): 我的云节点服务器和MySQL服务器以及两者的本地设置都发生此错误。 我的问题: 这个问