我正在编写一个Kafka Streams应用程序。它执行以下步骤“1)消耗输入数据2)在1小时窗口内根据新密钥消重记录3)重新选择密钥4)在1小时窗口内计数密钥5)发送到下游。
我是Kafka流的新手。我的理解是,为了将窗口保持为1小时,我将 commit.interval.ms
也设置为1小时。这是正确的做法吗?
一旦我部署了我的应用程序与真实的流量,它似乎应用程序不断发送消息,而我认为它每小时只会发送一堆消息?
感谢任何帮助!!
我的配置:
commit.interval.ms = 3600000
request.timeout.ms = 600000
retries = 20
retry.backoff.ms = 1000
cache.max.bytes.buffering = 10485760
// dedupe by new key per window(1hr)
stream = inputStream
.selectKey(... )
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
// only keep the latest event for each customized key
.reduce((event1, event2) -> event2)
.toStream()
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
.reduce((event1, event2) -> {
long count1 = event1.getCount();
long count2 = event2.getCount();
event2.setCount(count1 + count2);
return event2;
})
.toStream()
.to(OUTPUT_TOPIC);
我是Kafka流的新手。我的理解是,为了将窗口保持为1小时,我将 commit.interval.ms 也设置为1小时。这是正确的做法吗?
提交间隔与您的处理逻辑无关。
您可能希望研究一下< code>suppress()运算符。下面的block post也可能有所帮助:
Kafka Streams的处理模型是连续的,默认情况下会发送连续的结果更新。这就是为什么基本上每个输入消息都会得到一条输出消息,因为处理输入消息会修改结果。
我试图在不和谐的情况下每隔x秒发送一条消息。js机器人。我知道如何做到这一点,但我遇到的问题是,即使我启用了slowmode,它也会发送垃圾邮件。我怎样才能解决这个问题?
问题内容: 我想从中选择记录,但是,每天都有很多价值,因为它们每15分钟保存一个值,所以我只希望每天有第一个或最后一个值。 表架构: 错误的SQL: 如果可能的话,我想一口气… 非常感谢。 编辑: 我的意思是,我每天都有一个查询,但是我只想从c中进行单个查询 EDIT2: 我有 64,260 条记录,执行和响应该查询需要很长时间,有时还会超时。 问题答案: 要获得每个日期的第一个条目,您可以做
我的问题是,我无法让我的机器人显示成员作者、日期和消息。以下是错误: 忽略异常在on_messageTraceback(最近的调用最后):文件"C:\Program Files(x86)\Python38-32\lib\site-包\discord\client.py",第270行,在_run_event等待coro(*args,**kwargs)类型错误:on_message()缺少1个必需的位
我有两张桌子叫酒店和描述。描述表包含每个酒店的描述。 在hotels表中,id和city_id一起构成主键(id本身不是主键) 在描述表中,每个酒店最多可以有8种描述类型(一般、位置、房间、外部、愉悦、包含、大堂) 我想使用酒店表和描述表更新主表。对于Hotel表中的每一条记录,我应该在主表中插入一行。当我执行左外部联接时,Hotel表中的每个记录都会产生多个记录,因为一个Hotel有0个或更多的
我们有一个Spring Boot应用程序,用于在另一个组件上执行负载测试。我们每分钟最多需要发送35000条JMS消息,因此我使用调度器每分钟运行一次任务。 问题是当我保持低强度时,它会设法在指定的时间间隔(一分钟)内发送消息。但是当强度很高时,发送消息块需要超过1分钟。对以下实现有任何建议吗? 调度程序类 用于发送消息的类
问题内容: 我一直在开发Android应用程序,我每小时需要执行1个任务。我使用以下代码: 它对我有用,但是我的客户告诉我该任务只能工作1次,而不能工作1个小时。我在哪里弄错了?请告诉我。谢谢。 问题答案: 根据您的代码,ALARM_PERIOD为1000L,作为重复间隔。因此,我怀疑警报会在每1000毫秒内触发一次。 如果您设置每小时的重复间隔,则应为3600000L。并请注意,如果电话重新启动