我一直在使用covid19api持有的数据实现Kafka生产者/消费者和流。
我试图从endpoint中提取每天的案例https://api.covid19api.com/all.然而,这个服务——以及这个API中的其他服务——拥有自疾病开始以来的所有数据(确诊、死亡和恢复病例),但积累了数据,而不是日常病例,这就是我最终要实现的。
使用transformValues和StoreBuilder(正如这里推荐的那样)对我也不起作用,因为场景不同。我使用transformValue功能实现了一些不同的功能,但每次检索到的前一个值都是主题的开头,而不是实际的前一个:
@Override
public String transform(Long key, String value) {
String prevValue = state.get(key);
log.info("{} => {}", key, value) ;
if (prevValue != null) {
Covid19StatDto prevDto = new Gson().fromJson(prevValue, Covid19StatDto.class);
Covid19StatDto dto = new Gson().fromJson(value, Covid19StatDto.class);
log.info("Current value {} previous {} ", dto.toString(), prevDto.toString());
dto.setConfirmed(dto.getConfirmed() - prevDto.getConfirmed());
String newDto = new Gson().toJson(dto);
log.info("New value {}", newDto);
return newDto;
} else {
state.put(key, value);
}
return value;
}
当我使用流处理主题时,如何从主题中获取之前的消息?任何帮助或建议都将不胜感激。
问候。
问题不仅仅是您只在状态存储中存储每个键的第一个值吗?如果在每个后续消息中,您始终希望看到前一条消息,那么您需要始终将当前消息存储在状态存储中,作为最后一步,对于exmaple:
@Override
public String transform(Long key, String value) {
String prevValue = state.get(key);
log.info("{} => {}", key, value) ;
if (prevValue != null) {
Covid19StatDto prevDto = new Gson().fromJson(prevValue, Covid19StatDto.class);
Covid19StatDto dto = new Gson().fromJson(value, Covid19StatDto.class);
log.info("Current value {} previous {} ", dto.toString(), prevDto.toString());
dto.setConfirmed(dto.getConfirmed() - prevDto.getConfirmed());
String newDto = new Gson().toJson(dto);
log.info("New value {}", newDto);
return newDto;
}
// Always update the state store:
state.put(key, value);
return value;
}
我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测
我有一个 kafka 消费者类,它有一个主主题侦听器和一个 DLQ 侦听器。当主主题监听器无法处理消费者记录时,根据我的 bean 工厂,记录被推送到 DLQ 主题中。因此,DLQ 成功处理了该消息。但是,当我重新启动使用者应用程序时,我看到 DLQ 处理的消息再次被主主题侦听器使用,尽管它已成功处理。有人可以帮助我如何防止主要主题重新使用DLQ处理的消息吗?提前感谢您! Kafka·Consum
我有一个基于Spring boot的KStreams应用程序,我在其中加入跨多个主题的数据。当一个主题出现延迟时,处理情况的最佳实践是什么?我读过一些链接,比如如何管理Kafka KStream到KStream窗口连接?和其他人。 下面是我的示例代码(Spring Boot应用程序),用于为两个主题--雇员和财务--生成模拟数据。下面是员工主题的代码: 对于金融主题也是如此:
我是Kafka流处理器的新手,接触到了“拓扑”的关键概念。 我创建了源处理器,它从如下“源主题”中读取: 上面的代码片段将创建(如果我的理解正确的话)一个名为“source”的源流处理器,并将侦听Kafka主题“source topic”。 我没有为这个“SOURCE”流处理器编写任何代码,它是如何从kafka主题中获取消息的?它是由kafka stream API本身照顾的“特殊”类型的流处理器
我遵循这篇文档来实现上述场景。 那么,有没有人可以建议我如何一次使用多个订阅者从主题中读取消息。
我是Kafka的新手,正在开发一个原型,将专有的流媒体服务连接到Kafka中。 我希望得到一个主题上发送的最后一条消息的密钥,因为我们的内部流消费者需要用连接时收到的最后一条消息的ID登录。 我尝试使用使用者执行以下操作,但当同时运行控制台使用者时,我看到消息被重播。 这是意料之中的行为还是我走错了路?