当前位置: 首页 > 知识库问答 >
问题:

使用KafkaStreams处理主题时,如何从主题中获取上一条消息

安高义
2023-03-14

我一直在使用covid19api持有的数据实现Kafka生产者/消费者和流。

我试图从endpoint中提取每天的案例https://api.covid19api.com/all.然而,这个服务——以及这个API中的其他服务——拥有自疾病开始以来的所有数据(确诊、死亡和恢复病例),但积累了数据,而不是日常病例,这就是我最终要实现的。

使用transformValues和StoreBuilder(正如这里推荐的那样)对我也不起作用,因为场景不同。我使用transformValue功能实现了一些不同的功能,但每次检索到的前一个值都是主题的开头,而不是实际的前一个:

@Override
public String transform(Long key, String value) {
    String prevValue = state.get(key);
    log.info("{} => {}", key, value) ;
    if (prevValue != null) {
        Covid19StatDto prevDto = new Gson().fromJson(prevValue, Covid19StatDto.class);
        Covid19StatDto dto = new Gson().fromJson(value, Covid19StatDto.class);

        log.info("Current value {} previous {} ", dto.toString(), prevDto.toString());

        dto.setConfirmed(dto.getConfirmed() - prevDto.getConfirmed());

        String newDto = new Gson().toJson(dto);
        log.info("New value {}", newDto);
        return newDto;
    } else {
        state.put(key, value);
    }
    return value;
}

当我使用流处理主题时,如何从主题中获取之前的消息?任何帮助或建议都将不胜感激。

问候。

共有1个答案

戴原
2023-03-14

问题不仅仅是您只在状态存储中存储每个键的第一个值吗?如果在每个后续消息中,您始终希望看到前一条消息,那么您需要始终将当前消息存储在状态存储中,作为最后一步,对于exmaple:

@Override
public String transform(Long key, String value) {
    String prevValue = state.get(key);
    log.info("{} => {}", key, value) ;
    if (prevValue != null) {
        Covid19StatDto prevDto = new Gson().fromJson(prevValue, Covid19StatDto.class);
        Covid19StatDto dto = new Gson().fromJson(value, Covid19StatDto.class);

        log.info("Current value {} previous {} ", dto.toString(), prevDto.toString());

        dto.setConfirmed(dto.getConfirmed() - prevDto.getConfirmed());

        String newDto = new Gson().toJson(dto);
        log.info("New value {}", newDto);
        return newDto;
    }

    // Always update the state store:
    state.put(key, value);
    return value;
}
 类似资料:
  • 我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测

  • 我有一个 kafka 消费者类,它有一个主主题侦听器和一个 DLQ 侦听器。当主主题监听器无法处理消费者记录时,根据我的 bean 工厂,记录被推送到 DLQ 主题中。因此,DLQ 成功处理了该消息。但是,当我重新启动使用者应用程序时,我看到 DLQ 处理的消息再次被主主题侦听器使用,尽管它已成功处理。有人可以帮助我如何防止主要主题重新使用DLQ处理的消息吗?提前感谢您! Kafka·Consum

  • 我有一个基于Spring boot的KStreams应用程序,我在其中加入跨多个主题的数据。当一个主题出现延迟时,处理情况的最佳实践是什么?我读过一些链接,比如如何管理Kafka KStream到KStream窗口连接?和其他人。 下面是我的示例代码(Spring Boot应用程序),用于为两个主题--雇员和财务--生成模拟数据。下面是员工主题的代码: 对于金融主题也是如此:

  • 我是Kafka流处理器的新手,接触到了“拓扑”的关键概念。 我创建了源处理器,它从如下“源主题”中读取: 上面的代码片段将创建(如果我的理解正确的话)一个名为“source”的源流处理器,并将侦听Kafka主题“source topic”。 我没有为这个“SOURCE”流处理器编写任何代码,它是如何从kafka主题中获取消息的?它是由kafka stream API本身照顾的“特殊”类型的流处理器

  • 我遵循这篇文档来实现上述场景。 那么,有没有人可以建议我如何一次使用多个订阅者从主题中读取消息。

  • 我是Kafka的新手,正在开发一个原型,将专有的流媒体服务连接到Kafka中。 我希望得到一个主题上发送的最后一条消息的密钥,因为我们的内部流消费者需要用连接时收到的最后一条消息的ID登录。 我尝试使用使用者执行以下操作,但当同时运行控制台使用者时,我看到消息被重播。 这是意料之中的行为还是我走错了路?