当前位置: 首页 > 知识库问答 >
问题:

KStreams-如何处理一个主题的消息延迟

陶宏浚
2023-03-14

我有一个基于Spring boot的KStreams应用程序,我在其中加入跨多个主题的数据。当一个主题出现延迟时,处理情况的最佳实践是什么?我读过一些链接,比如如何管理Kafka KStream到KStream窗口连接?和其他人。

下面是我的示例代码(Spring Boot应用程序),用于为两个主题--雇员和财务--生成模拟数据。下面是员工主题的代码:

private void sendEmpData() {
    IntStream.range(0, 1).forEach(index -> {
        EmployeeKey key = new EmployeeKey();
        key.setEmployeeId(1);

        Employee employee = new Employee();
        employee.setDepartmentId(1000);
        employee.setEmployeeFirstName("John);
        employee.setEmployeeId(1);
        employee.setEmployeeLastName("Doe");

        kafkaTemplateForEmp.send(EMP_TOPIC, key, employee);
    });
}

对于金融主题也是如此:

private void sendFinanceData() {
    IntStream.range(0, 1).forEach(index -> {
        FinanceKey key = new FinanceKey();
        key.setEmployeeId(1);
        key.setDepartmentId(1000);

        Finance finance = new Finance();
        finance.setDepartmentId(1000);
        finance.setEmployeeId(1);
        finance.setSalary(2000);

        kafkaTemplateForFinance.send(FINANCE_TOPIC, key, finance);
    });
}
employeeKStream.join(reKeyedStream,
            (employee, finance) -> new EmployeeFinance(employee.getEmployeeId(),
                    employee.getEmployeeFirstName(),
                    employee.getEmployeeLastName(),
                    employee.getDepartmentId(),
                    finance.getSalary(),
                    finance.getSalaryGrade()),
            JoinWindows.of(windowRetentionTimeMs), //30 seconds
            Joined.with(
                    employeeKeySerde,
                    employeeSerde,
                    financeSerde)).to(outputTopic, Produced.with(employeeKeySerde, employeeFinanceSerde));

共有1个答案

漆雕亮
2023-03-14

如果在Financial主题中有一个匹配键的记录在30秒后到达,那么连接就不会发生。

默认情况下,Kafka Streams使用24小时的宽限期,因此,即使存在滞后或无序的数据,您的联接也应该工作。注意,Kafka中的滞后总是指读取路径!

在财经话题30秒后到达

 类似资料:
  • 我们在RHEL 7.0 VM上部署了一个Java/spring/Tomcat应用程序,它使用AlejandRorivera/Embedded-RabbitMQ,一旦部署了war,它就启动Rabbitmq服务器,并连接到它。我们有多个队列用来处理和过滤事件。 流程如下所示: 我们接收到的事件->发布事件队列->侦听器类筛选事件->发布到另一个队列进行处理->我们发布到另一个队列进行日志记录。 问题是

  • 我有以下代码 我的问题:当我添加多个主题订阅时(即上面的A,B,C),Kstream代码停止接收记录。 参考文献:https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsBuilder.html 相关文件 我想实现的是:让一个Kstream(即上面的“源”)消耗/处理多个主题。

  • 我有一个 kafka 消费者类,它有一个主主题侦听器和一个 DLQ 侦听器。当主主题监听器无法处理消费者记录时,根据我的 bean 工厂,记录被推送到 DLQ 主题中。因此,DLQ 成功处理了该消息。但是,当我重新启动使用者应用程序时,我看到 DLQ 处理的消息再次被主主题侦听器使用,尽管它已成功处理。有人可以帮助我如何防止主要主题重新使用DLQ处理的消息吗?提前感谢您! Kafka·Consum

  • 我一直在使用covid19api持有的数据实现Kafka生产者/消费者和流。 我试图从endpoint中提取每天的案例https://api.covid19api.com/all.然而,这个服务——以及这个API中的其他服务——拥有自疾病开始以来的所有数据(确诊、死亡和恢复病例),但积累了数据,而不是日常病例,这就是我最终要实现的。 使用transformValues和StoreBuilder(正

  • 我有一个主题T,它有4个分区TP1、TP2、TP4和TP4。 假设我有8条消息M1到M8。现在当我的制作人将这些消息发送到主题T时,在以下场景下,Kafka经纪人将如何接收它们: 场景1:只有一个kafka broker实例具有前面提到的分区的主题T。 现在假设kafka broker实例1宕机,消费者会作何反应?我假设我的使用者正在读取broker实例1。

  • 我想将来自AWS Kinesis流的消息的处理延迟一个小时。我已将KCL消费者配置为每四分钟读取一批记录,检查每条记录的时间戳,如果任何记录不到一个小时,则停止处理该批次,无需检查点。我希望同一个消费者实例每四分钟重读一次相同的消息,直到整个批次足够旧可以处理,然后检查点消费者。但是,在实践中,消费者只读取一次消息,这意味着它们被忽略,并且在准备好处理时永远不会再次读取。有没有办法将消费者配置为每