spring-boot消费者微服务无法在kafka重新启动后向主题发送消息。
>
消费者和生产者(spring boot微服务)位于同一覆盖网络“Net-Broker”上,因此它们使用服务名“kafka:9092”访问kafka。
一开始一切都很顺利。
然后kafka仅被重新启动,在此之后,消费者不能再从kafka主题发送消息。
由于docker-compose.yml中的一个小更改(例如max_actempts:3=>max_actempts:4),kafka服务将重新启动
docker-compose.yml文件
kafka:
image: wurstmeister/kafka:2.12-2.2.0
depends_on:
- zookeeper
networks:
- net-broker
deploy:
replicas: 1
update_config:
parallelism: 1
delay: 10s
restart_policy:
condition: on-failure
max_attempts: 3
# ports:
# - target: 9094
# published: 9094
# protocol: tcp
# mode: host
environment:
HOSTNAME_COMMAND: "echo ${HOST_IP:-192.168.99.100}"
KAFKA_CREATE_TOPICS: "gnss-topic-${GNSS_TAG}:3:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
BROKER_ID_COMMAND: "echo 101"
KAFKA_LOG_DIRS: "/kafka/kafka-logs"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- kafka-data:/kafka
KafkaProducerConfig类
@Bean
public ProducerFactory<String, GNSSPortHolderDTO> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, gnssConfig.getKafkaBootstapServers());
// high throughput producer (at the expense of a bit of latency and CPU usage)
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
configProps.put(ProducerConfig.LINGER_MS_CONFIG, "20");
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size
// serializers
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for gnss-topic-11.2.1-B5607-1: 30030 ms has passed since batch creation plus linger time
gnss_data-access.1.ll948jogpqil@GDN-S-GNSS2 | 2019-05-08 09:42:33.984 INFO 1 --- [ gnss-view-data] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=gnss-view-data] Marking the coordinator fe7091944126:9092 (id: 2147483546 rack: null) dead
我找到了解决这个问题的办法。
只是提醒一下:
旧值(重新启动后不起作用):kafka_advertised_listeners:inside://:9092,outside:/_{HOSTNAME_COMMAND}:9094
新值(重新启动后工作):kafka_advertised_listeners:inside://kafka:9092,outside:/_{HOSTNAME_COMMAND}:9094
因此,修复方法是为内部广告侦听器指定kafka服务名kafka:9092
。
问题是,即使spring boot producer被配置为使用kafka:9092
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka:9092);
生产者实际上使用kafka容器ID进行通信,而不是使用服务名kafka:9092
,因此在kafka重新启动后,创建了一个新容器(新容器ID),但生产者仍然指向旧容器ID
我正在使用BackoffSupervisor策略来创建一个必须处理某些消息的子参与者。我想实现一个非常简单的重启策略,其中在发生异常时: > 子级将失败消息传播给主管 supervisor重新启动子程序并再次发送失败消息。 主管重试3次后放弃 这给出了类似于以下输出的内容: 但是没有来自钩子的日志
我有一个Kafka集群正在运行,当重新启动应用程序(消费者)时,它会跳过一些在应用程序关闭时推送到主题的消息。 当应用程序启动时,我可以看到它读取带有偏移量的消息,然后将偏移量推送到。然后当应用程序关闭时,带有偏移量的消息被推送到主题。重启应用程序后,它读取并将其偏移量设置为,因此跳过。 这是我的配置:
问题内容: 我的日食没有启动,因为我的计算机有点死机了,所以我不得不强制重新启动它。当我不得不重新启动时,Eclipse是打开的,我相信这很可能是原因。我不知道该如何解决。每当我尝试打开它时,它都会告诉我检查工作区中的.log文件,并显示: http://paste.strictfp.com/26579 而且我不知道如何解决它。请帮忙? 问题答案: 您缺少 第125行的 类,您必须重新安装才能解决
我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:
我希望容器在计算机重新启动后自动启动,所以我使用'--restart=always'标志,但它并没有像我预期的那样运行。当我重新启动系统时,容器没有启动。 docker日志信息 添加docker ps-a message,$docker ps-a CONTAINER ID IMAGE命令CREATED STATUS PORTS NAMES a1f4d5471b0a mysql:8.0“docker
我正在测试Spring Kafka的示例代码。它适用于连接,但不适用于连接。 我已通过成功运行控制台使用者来验证密钥和证书对 kafka 代理有效: 但是我不能使用Spring Boot(2.0.1.RELEASE)和Spring Kafka,使用相同的密钥和证书发送消息。 应用程序.属性 有人成功用SSL配置Spring Boot 2.0 Spring Kafka吗?