当前位置: 首页 > 知识库问答 >
问题:

Spring boot producer在kafka重新启动后无法发送任何消息

年健
2023-03-14

spring-boot消费者微服务无法在kafka重新启动后向主题发送消息。

>

  • 消费者和生产者(spring boot微服务)位于同一覆盖网络“Net-Broker”上,因此它们使用服务名“kafka:9092”访问kafka。

    一开始一切都很顺利。

    然后kafka仅被重新启动,在此之后,消费者不能再从kafka主题发送消息。

    由于docker-compose.yml中的一个小更改(例如max_actempts:3=>max_actempts:4),kafka服务将重新启动

    docker-compose.yml文件

    kafka:
        image: wurstmeister/kafka:2.12-2.2.0
        depends_on:
          - zookeeper
        networks:
          - net-broker
        deploy:
          replicas: 1
          update_config:
            parallelism: 1
            delay: 10s
          restart_policy:
            condition: on-failure
            max_attempts: 3
        # ports:
        #   - target: 9094
        #     published: 9094
        #     protocol: tcp
        #     mode: host
        environment:
          HOSTNAME_COMMAND: "echo ${HOST_IP:-192.168.99.100}"
          KAFKA_CREATE_TOPICS: "gnss-topic-${GNSS_TAG}:3:1"
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
          KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
          KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
          KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
          BROKER_ID_COMMAND: "echo 101"
          KAFKA_LOG_DIRS: "/kafka/kafka-logs"
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
          - kafka-data:/kafka
    
    

    KafkaProducerConfig类

    @Bean
      public ProducerFactory<String, GNSSPortHolderDTO> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
    
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, gnssConfig.getKafkaBootstapServers());
    
        // high throughput producer (at the expense of a bit of latency and CPU usage)
        configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        configProps.put(ProducerConfig.LINGER_MS_CONFIG, "20");
        configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size
    
        // serializers
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    
        return new DefaultKafkaProducerFactory<>(configProps);
      }
    
    org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for gnss-topic-11.2.1-B5607-1: 30030 ms has passed since batch creation plus linger time
    
    gnss_data-access.1.ll948jogpqil@GDN-S-GNSS2    | 2019-05-08 09:42:33.984  INFO 1 --- [ gnss-view-data] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=gnss-view-data] Marking the coordinator fe7091944126:9092 (id: 2147483546 rack: null) dead
    
  • 共有1个答案

    微生新翰
    2023-03-14

    我找到了解决这个问题的办法。

    只是提醒一下:

    • 只能从Docker Net-Broker的覆盖虚拟网络访问Kafka。
    • 出于安全原因,不应从Docker主机IP访问Kafka

    旧值(重新启动后不起作用):kafka_advertised_listeners:inside://:9092,outside:/_{HOSTNAME_COMMAND}:9094

    新值(重新启动后工作):kafka_advertised_listeners:inside://kafka:9092,outside:/_{HOSTNAME_COMMAND}:9094

    因此,修复方法是为内部广告侦听器指定kafka服务名kafka:9092

    问题是,即使spring boot producer被配置为使用kafka:9092

    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka:9092);  
    

    生产者实际上使用kafka容器ID进行通信,而不是使用服务名kafka:9092,因此在kafka重新启动后,创建了一个新容器(新容器ID),但生产者仍然指向旧容器ID

     类似资料:
    • 我正在使用BackoffSupervisor策略来创建一个必须处理某些消息的子参与者。我想实现一个非常简单的重启策略,其中在发生异常时: > 子级将失败消息传播给主管 supervisor重新启动子程序并再次发送失败消息。 主管重试3次后放弃 这给出了类似于以下输出的内容: 但是没有来自钩子的日志

    • 我有一个Kafka集群正在运行,当重新启动应用程序(消费者)时,它会跳过一些在应用程序关闭时推送到主题的消息。 当应用程序启动时,我可以看到它读取带有偏移量的消息,然后将偏移量推送到。然后当应用程序关闭时,带有偏移量的消息被推送到主题。重启应用程序后,它读取并将其偏移量设置为,因此跳过。 这是我的配置:

    • 问题内容: 我的日食没有启动,因为我的计算机有点死机了,所以我不得不强制重新启动它。当我不得不重新启动时,Eclipse是打开的,我相信这很可能是原因。我不知道该如何解决。每当我尝试打开它时,它都会告诉我检查工作区中的.log文件,并显示: http://paste.strictfp.com/26579 而且我不知道如何解决它。请帮忙? 问题答案: 您缺少 第125行的 类,您必须重新安装才能解决

    • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

    • 我希望容器在计算机重新启动后自动启动,所以我使用'--restart=always'标志,但它并没有像我预期的那样运行。当我重新启动系统时,容器没有启动。 docker日志信息 添加docker ps-a message,$docker ps-a CONTAINER ID IMAGE命令CREATED STATUS PORTS NAMES a1f4d5471b0a mysql:8.0“docker

    • 我正在测试Spring Kafka的示例代码。它适用于连接,但不适用于连接。 我已通过成功运行控制台使用者来验证密钥和证书对 kafka 代理有效: 但是我不能使用Spring Boot(2.0.1.RELEASE)和Spring Kafka,使用相同的密钥和证书发送消息。 应用程序.属性 有人成功用SSL配置Spring Boot 2.0 Spring Kafka吗?