当前位置: 首页 > 知识库问答 >
问题:

如何让Kafka Streams每1小时窗口每键发送一条记录?

訾淇
2023-03-14

我正在编写一个Kafka Streams应用程序。它执行以下步骤“1)消耗输入数据2)在1小时窗口内根据新密钥消重记录3)重新选择密钥4)在1小时窗口内计数密钥5)发送到下游。

我是Kafka流的新手。我的理解是,为了将窗口保持为1小时,我将 commit.interval.ms 也设置为1小时。这是正确的做法吗?

一旦我部署了我的应用程序与真实的流量,它似乎应用程序不断发送消息,而我认为它每小时只会发送一堆消息?

感谢任何帮助!!

我的配置:

commit.interval.ms = 3600000
request.timeout.ms = 600000
retries = 20
retry.backoff.ms = 1000    
cache.max.bytes.buffering = 10485760

// dedupe by new key per window(1hr)
 stream = inputStream
        .selectKey(... )
        .groupByKey()
        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
        // only keep the latest event for each customized key
        .reduce((event1, event2) -> event2)
        .toStream()
        .groupByKey()
        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
        .reduce((event1, event2) -> {
            long count1 = event1.getCount();
            long count2 = event2.getCount();
            event2.setCount(count1 + count2);
            return event2;
        })
        .toStream()
        .to(OUTPUT_TOPIC);

共有2个答案

齐乐逸
2023-03-14
  1. 我建议您使用最新版本的kafka提供的精确一次保证。使用它,您不必担心重复消息的删除。https://www.baeldung.com/kafka-exactly-once
  2. 配置生产者配置:特别是buffer.memory
秋阳荣
2023-03-14

我是Kafka流的新手。我的理解是,为了将窗口保持为1小时,我将 commit.interval.ms 也设置为1小时。这是正确的做法吗?

提交间隔与您的处理逻辑无关。

您可能希望研究一下< code>suppress()运算符。下面的block post也可能有所帮助:

  • https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
  • https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers

Kafka Streams的处理模型是连续的,默认情况下会发送连续的结果更新。这就是为什么基本上每个输入消息都会得到一条输出消息,因为处理输入消息会修改结果。

 类似资料:
  • 我试图在不和谐的情况下每隔x秒发送一条消息。js机器人。我知道如何做到这一点,但我遇到的问题是,即使我启用了slowmode,它也会发送垃圾邮件。我怎样才能解决这个问题?

  • 问题内容: 我想从中选择记录,但是,每天都有很多价值,因为它们每15分钟保存一个值,所以我只希望每天有第一个或最后一个值。 表架构: 错误的SQL: 如果可能的话,我想一口气… 非常感谢。 编辑: 我的意思是,我每天都有一个查询,但是我只想从c中进行单个查询 EDIT2: 我有 64,260 条记录,执行和响应该查询需要很长时间,有时还会超时。 问题答案: 要获得每个日期的第一个条目,您可以做

  • 我的问题是,我无法让我的机器人显示成员作者、日期和消息。以下是错误: 忽略异常在on_messageTraceback(最近的调用最后):文件"C:\Program Files(x86)\Python38-32\lib\site-包\discord\client.py",第270行,在_run_event等待coro(*args,**kwargs)类型错误:on_message()缺少1个必需的位

  • 我有两张桌子叫酒店和描述。描述表包含每个酒店的描述。 在hotels表中,id和city_id一起构成主键(id本身不是主键) 在描述表中,每个酒店最多可以有8种描述类型(一般、位置、房间、外部、愉悦、包含、大堂) 我想使用酒店表和描述表更新主表。对于Hotel表中的每一条记录,我应该在主表中插入一行。当我执行左外部联接时,Hotel表中的每个记录都会产生多个记录,因为一个Hotel有0个或更多的

  • 我们有一个Spring Boot应用程序,用于在另一个组件上执行负载测试。我们每分钟最多需要发送35000条JMS消息,因此我使用调度器每分钟运行一次任务。 问题是当我保持低强度时,它会设法在指定的时间间隔(一分钟)内发送消息。但是当强度很高时,发送消息块需要超过1分钟。对以下实现有任何建议吗? 调度程序类 用于发送消息的类

  • 问题内容: 我一直在开发Android应用程序,我每小时需要执行1个任务。我使用以下代码: 它对我有用,但是我的客户告诉我该任务只能工作1次,而不能工作1个小时。我在哪里弄错了?请告诉我。谢谢。 问题答案: 根据您的代码,ALARM_PERIOD为1000L,作为重复间隔。因此,我怀疑警报会在每1000毫秒内触发一次。 如果您设置每小时的重复间隔,则应为3600000L。并请注意,如果电话重新启动