当前位置: 首页 > 知识库问答 >
问题:

在每次发送命令后从Kafka制作人那里获得响应

陈和裕
2023-03-14

我正在使用spring boot Kafka向一个主题发送消息。我的要求是从表中增量读取数据,并根据日期时间字段将其发布到主题中,因为这是一个预定的过程,所以我需要在每次成功向Kafka发送消息后存储每条消息的日期时间字段。

有什么建议吗?最好的方法是什么?我相信我不能对这样的事情使用异步回调,因为我需要在每次调用生产者后更新变量。

此外,由于基础设施限制,我无法使用Kafka连接。

共有1个答案

黎浩然
2023-03-14
java prettyprint-override">ListenableFuture<SendResult<String, String>> future = template.send(...);
SendResult<String, String> sendResult = future.get(10, TimeUnit.SECONDS);
Long timestamp = sendResult.getProducerRecord().timestamp();

如果发送失败,get将抛出一个异常。

 类似资料:
  • 我知道Kafka制作人会将消息分批处理。每个批属于一个特定的分区。 我的问题是 生产者是否知道每个批次属于哪个分区? 生产者是否知道每个分区的代理地址? 当生产者发送请求时,每个请求包含多个批次还是只包含一个属于目标分区的批次。 如果生产者发送多个批次,接收kafka服务器是否将批次重传到目标分区。

  • 我在用Kafka。 我有10k个jsons列表, 我该怎么做呢? 谢谢

  • 我使用spring框架和有3个代理集群的kafka。我发现使用者没有使用某些消息(假设在所有发送消息中使用0.01%),所以在生产者代码中,我记录了API返回的消息偏移量: 我使用返回偏移量来查询所有分区中的kafka主题,但它没有找到消息(我测试了与消费者使用的和他们在kafka中的消息相关的其他偏移量),问题是什么,我如何确保该消息发送到kafka? 我还在producer中使用了

  • 我们正在创建一个不和谐机器人。 问题是控制台中没有错误,但不和谐聊天也是空的。所以完全没有回应。你能帮我说说我的错误是什么以及如何解决这个问题吗 祝你一切顺利,尼克斯

  • 我正在建立一个社交网络,在那里,用户将拥有一个类似推特的流,记录他们关注的人的所有帖子。 什么是最好的方式来查询这个与Laravel雄辩? 我有三张桌子

  • 在Spark streaming中,我会在日志到达时获取日志。但我希望在一次传递中获得至少N个日志。如何才能实现呢? 从这个答案来看,Kafka似乎有这样一种效用,但在Spark中似乎并不存在,使之成为可能。