当前位置: 首页 > 知识库问答 >
问题:

使用Apache Flink流处理缓冲转换后的消息(例如,1000计数)

邹俊友
2023-03-14

我正在使用Apache Flink进行流处理。

订阅源消息(例如:Kafka、AWS Kinesis数据流)后,然后使用Flink操作符对流数据应用转换、聚合等,我想缓冲最终消息(例如:1000条),并在单个请求中将每个批发布到外部REST API。

如何在Apache Flink中实现缓冲机制(将每1000条记录创建为批处理)?

Flink Pipiline:流媒体源--

感谢您的帮助!

共有1个答案

潘璞瑜
2023-03-14

我将创建一个具有状态的接收器,该接收器将保留传入的消息。当计数足够高(1000)时,接收器发送批。状态可以在内存中(例如,一个实例变量保存消息的ArrayList),但您应该使用检查点,以便在出现某种故障时可以恢复该状态。

当您的接收器具有检查点状态时,它需要实现Checkpoint edFunction(org.apache.flink.streaming.api.checkpoint),这意味着您需要向接收器添加两个方法:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {

    checkpointedState.clear();

    // HttpSinkStateItem is a user-written class 
    // that just holds a collection of messages (Strings, in this case)
    //
    // Buffer is declared as ArrayList<String>

    checkpointedState.add(new HttpSinkStateItem(buffer));

}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

    // Mix and match different kinds of states as needed:
    //   - Use context.getOperatorStateStore() to get basic (non-keyed) operator state
    //        - types are list and union        
    //   - Use context.getKeyedStateStore() to get state for the current key (only for processing keyed streams)
    //        - types are value, list, reducing, aggregating and map
    //   - Distinguish between state data using state name (e.g. "HttpSink-State")      

    ListStateDescriptor<HttpSinkStateItem> descriptor =
        new ListStateDescriptor<>(
            "HttpSink-State",
            HttpSinkStateItem.class);

    checkpointedState = context.getOperatorStateStore().getListState(descriptor);

    if (context.isRestored()) {

        for (HttpSinkStateItem item: checkpointedState.get()) {
            buffer = new ArrayList<>(item.getPending());  
        }

    }       

}

如果计数未达到阈值,还可以使用接收器中的计时器(如果输入流已设置关键帧/分区)定期发送。

 类似资料:
  • 而不是使用关系。关系GetRequest作为请求和响应。有什么方法可以将请求/响应转换为POJO? 我见过这个解决方案,但它比我想要的要复杂一些:将协议缓冲区转换为POJO 我正在使用翻新和谷歌协议缓冲区。 我所拥有的: 我想用的是: 关系: 我的请求最终是这样的,请求必须在这里构建...

  • 流处理和传统消息处理的基本区别是什么?正如人们所说,kafka是流处理的好选择,但本质上,kafka是一个类似于ActivMQ、RabbitMQ等的消息传递框架。 为什么我们通常不说ActiveMQ也适合流处理呢。 消费者消费消息的速度是否决定了它是否是流?

  • 无法理解我用于开发涉及图像/视频的聊天应用程序的stomp over websocket配置中的不同参数: 我注意到网页中的SockJ发送的消息帧大小为16K。我还测试了消息大小限制是什么决定了我可以传输的消息的最大大小。 你能让我知道什么是: > 流字节限制 发送缓冲区大小限制 http消息缓存大小 什么是部分消息以及如何使用它们,它们在这里有用吗? 此外,我计划将图像/视频的最大大小设置为2G

  • 如果每个Kafka消息属于一个特定的会话,如何管理会话关联,以便同一个Spark执行器看到链接到一个会话的所有消息? 如何确保属于会话的消息被Spark executor按照在Kafka中报告的顺序处理?我们能以某种方式实现这一点而不对线程计数施加限制并导致处理开销(如按消息时间戳排序)吗? 何时检查会话状态?在执行器节点崩溃的情况下,如何从最后一个检查点恢复状态?在驱动程序节点崩溃的情况下,如何

  • 我正在使用来使用来自spring-boot应用程序中某个主题的消息,我需要定期运行该应用程序。spring-kafka版本是2.2.4.发行版。

  • 我正在使用NSeriveBus构建一个系统,它应该只在特定的时间段将消息发送给远程处理程序。到目前为止,我设法将所有消息放在一个处理队列上,并从那里检查远程处理程序的可用性,如果处理程序不可用,我就不会通过边界发送消息。要做到这一点,我正在使用 但国家安全局会继续努力。这不像其他侦听器将在几分钟内启动并运行,但它可能有几个小时的停机窗口,因此这并不完全有效。 想知道是否有办法让总线稍后重试消息,或