当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka-消费者:未收到消息

程谭三
2023-03-14

我正在使用以下docker-compose.yaml在docker上运行kafka、zookeeper和kafdrop:

version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.2
    container_name: zookeeper
    ports:
      - 2181:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - ~/.docker/data/zookeeper:/var/lib/zookeeper

  kafka:
    image: confluentinc/cp-kafka:5.5.3
    hostname: kafka
    container_name: kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: "${DOCKERHOST}"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://${DOCKERHOST}:9092"
      KAFKA_ZOOKEEPER_CONNECT: "${DOCKERHOST}:2181"
      KAFKA_LOG_DIRS: /kafka/logs
      KAFKA_LOG_RETENTION_MINUTES: 10
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_MESSAGE_MAX_BYTES: 4194304
      KAFKA_MAX_REQUEST_SIZE: 4194304
    depends_on:
      - zookeeper
    volumes:
      - ~/.docker/data/kafka/:/kafka/
    extra_hosts:
      - "dockerhost:$DOCKERHOST"

  kafdrop:
    image: obsidiandynamics/kafdrop
    container_name: kafdrop
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka:9092"
    depends_on:
      - kafka

我有一个具有以下配置的Spring Boot Producer应用程序-kafkatopicconfig.java:

@Configuration
public class KafkaTopicConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

}

在我的kafkaProducerConfig.java中,我有以下内容:

@Configuration
public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${kafka.maxRequestSize}")
    private String maxRequestSize;

    @Value(value = "${kafka.batchSize}")
    private String batchSize;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
        configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 120000);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

这是一个单独的应用程序,我在我的服务中这样称呼Kafka制作人:

kafkaTemplate.send("test-topic-local", base64string);

在一个完全不同的spring引导应用程序中,我有一个像这样的使用者kafkaconsumerconfig.java:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${kafka.groupId}")
    private String groupId;

    public ConsumerFactory<String, String> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(groupId));
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> testKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory(groupId);
    }

}
@Service
@Slf4j
public class KafkaConsumerService {

    @KafkaListener(topics = "topic", containerFactory = "testKafkaListenerContainerFactory")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        log.info(consumerRecord.toString());
    }

}

我可以看到消费者正在连接到代理,但是有消息的日志。下面是我能看到的完整日志:

2021-03-08 17:15:20.218  INFO 4805 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8081 (http)
2021-03-08 17:15:20.237  INFO 4805 --- [  restartedMain] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2021-03-08 17:15:20.238  INFO 4805 --- [  restartedMain] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.43]
2021-03-08 17:15:20.362  INFO 4805 --- [  restartedMain] o.a.c.c.C.[.[localhost].[/api/v1]        : Initializing Spring embedded WebApplicationContext
2021-03-08 17:15:20.362  INFO 4805 --- [  restartedMain] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2939 ms
2021-03-08 17:15:20.974  INFO 4805 --- [  restartedMain] o.hibernate.jpa.internal.util.LogHelper  : HHH000204: Processing PersistenceUnitInfo [name: default]
2021-03-08 17:15:21.210  INFO 4805 --- [  restartedMain] org.hibernate.Version                    : HHH000412: Hibernate ORM core version 5.4.28.Final
2021-03-08 17:15:21.555  INFO 4805 --- [  restartedMain] o.hibernate.annotations.common.Version   : HCANN000001: Hibernate Commons Annotations {5.1.2.Final}
2021-03-08 17:15:21.792  INFO 4805 --- [  restartedMain] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2021-03-08 17:15:22.351  INFO 4805 --- [  restartedMain] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2021-03-08 17:15:22.410  INFO 4805 --- [  restartedMain] org.hibernate.dialect.Dialect            : HHH000400: Using dialect: org.hibernate.dialect.MySQL8Dialect
2021-03-08 17:15:22.938  INFO 4805 --- [  restartedMain] o.h.e.t.j.p.i.JtaPlatformInitiator       : HHH000490: Using JtaPlatform implementation: [org.hibernate.engine.transaction.jta.platform.internal.NoJtaPlatform]
2021-03-08 17:15:22.964  INFO 4805 --- [  restartedMain] j.LocalContainerEntityManagerFactoryBean : Initialized JPA EntityManagerFactory for persistence unit 'default'
2021-03-08 17:15:23.125  WARN 4805 --- [  restartedMain] JpaBaseConfiguration$JpaWebConfiguration : spring.jpa.open-in-view is enabled by default. Therefore, database queries may be performed during view rendering. Explicitly configure spring.jpa.open-in-view to disable this warning
2021-03-08 17:15:23.362  INFO 4805 --- [  restartedMain] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2021-03-08 17:15:24.007  INFO 4805 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2021-03-08 17:15:24.017  INFO 4805 --- [  restartedMain] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2021-03-08 17:15:24.208  INFO 4805 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-group-01-1
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = group-01
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2021-03-08 17:15:24.453  INFO 4805 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-03-08 17:15:24.454  INFO 4805 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-03-08 17:15:24.454  INFO 4805 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1615203924449
2021-03-08 17:15:24.459  INFO 4805 --- [  restartedMain] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group-01-1, groupId=group-01] Subscribed to topic(s): test-topic-local
2021-03-08 17:15:24.479  INFO 4805 --- [  restartedMain] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2021-03-08 17:15:24.538  INFO 4805 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8081 (http) with context path '/api/v1'
2021-03-08 17:15:24.572  INFO 4805 --- [  restartedMain] c.r.i.t.m.f.FeedConsumerApplication      : Started FeedConsumerApplication in 7.929 seconds (JVM running for 8.652)
2021-03-08 17:15:25.302  INFO 4805 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group-01-1, groupId=group-01] Cluster ID: fq0PTZmwTnKQcCtUDMiQBg

共有1个答案

羊舌昆杰
2023-03-14

在您的发件人中,您有kafkatemplate.send(“topic”,base64string);,基本上您是发布到topic->“topic”,但您的使用者订阅的是topic->“test-topic-local”[这来自您提供的日志。]确保你的消费者正在听你正在制作的主题。

 类似资料:
  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我正在尝试设置一个基本的Java消费者来接收来自Kafka主题的消息。我在-https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例-并具有以下代码: 和 Kafka在有问题的EC2主机上运行,我可以使用kafka-console-producer.sh和kafka-console-consumer.sh工具发送和接收关于主题“测试

  • 我的Spring/java消费者无法访问生产者生成的消息。但是,当我从控制台/终端运行消费者时,它能够接收Spring/java生产者生成的消息。 消费者配置: 监听器配置: Kafka听众: 消费者应用: 测试用例1:通过 我启动了我的Spring/ java生产者并从控制台运行消费者。当我生成消息表单创建者时,我的控制台使用者能够访问该消息。 测试用例2:失败:我启动了spring/java消

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用