我已经启动了我的zookeeper和Kafka服务器。我开始制作Kafka,它发送10条主题为“xxx”的消息。然后阻止了我的Kafka制作人。现在我开始使用Kafka,并订阅了主题“xxx”。我的消费者使用我的Kafka制作人发送的10条消息,这10条消息现在没有运行。我需要我的Kafka使用者只能使用来自运行Kafka服务器的消息。有没有办法做到这一点?以下是我的消费者财产。
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg1";
props.put("group.id", consumeGroup);
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "100");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
请创建新主题并保留属性ConsumerConfig。自动偏移重置配置为“最新”。确保不提交偏移量。i、 我们不应该使用commitSync()
默认情况下,接收者从每个分配分区的最后提交偏移开始使用记录。如果提交的偏移量不可用,则使用为Kafka消费配置的偏移量重置策略消费者配置#AUTO_OFFSET_RESET_CONFIG将开始偏移量设置为分区上最早或最新的偏移量。
我认为在你的情况下,你正在提交偏移量,或者有一个提交偏移量可用于给定的主题。
设置以下属性:
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
它告诉消费者只阅读最新的消息,即消费者启动后发布的消息。
在本公司的最后一个项目中:客户提出身份验证等请求,应用程序第一层得到客户请求并在Kafka上生成消息,核心服务消费该消息后向银行服务提出rest请求,得到响应后在Kafka上生成响应消息,应用程序第一层将消息传递给客户。是真的Kafka用例,还是去掉第一层和Kafka,在客户端和核心之间使用rest服务更好。谢谢
我已经更新了我的Kafka从版本0.10.2.0到版本2.1.0,现在Kafka不能消费消息。我使用Spring引导,这是我的配置: 我已经更改了组id,以避免旧组id出现问题。我当前的spring版本是2.1。2.释放。在我的应用程序中,我可以看到我的客户是如何不断地重新连接的 你知道这个问题吗?
我想建立一个单一的Spring Boot应用程序,同时做多种不同的任务。我在互联网上做了研究,但我找不到任何出路。我来详细说说。我希望每隔一段时间启动一次作业,例如一天一次。我可以用Spring石英来做。我也想在一个专用的互联网地址上听信息。消息将来自Apache Kafka平台。因此,我想将Kafka集成用于Spring框架。它实际上是否适用(始终监听消息并按时执行计划的作业)
如果每个Kafka消息属于一个特定的会话,如何管理会话关联,以便同一个Spark执行器看到链接到一个会话的所有消息? 如何确保属于会话的消息被Spark executor按照在Kafka中报告的顺序处理?我们能以某种方式实现这一点而不对线程计数施加限制并导致处理开销(如按消息时间戳排序)吗? 何时检查会话状态?在执行器节点崩溃的情况下,如何从最后一个检查点恢复状态?在驱动程序节点崩溃的情况下,如何
我用本地安装的Confluent4.0.0尝试了官方模式-注册表-汇合示例(Consumer/Producer),它可以在发送post请求和在listener接收时发送“Sensor”avro消息,但当我使用Confluent4.0.0附带的kafka-avro-console-consumer工具查看发送的avro消息时,该工具引发了以下错误(a)。我还尝试使用kafka-avro-consol