当前位置: 首页 > 知识库问答 >
问题:

java中的Kafka消费者不消费消息

袁鸿雪
2023-03-14

我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。

consumer.java

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;



public class KafkaConsumer extends  Thread {
    final static String clientId = "SimpleConsumerDemoClient";
    final static String TOPIC = " AATest";
    ConsumerConnector consumerConnector;


    public static void main(String[] argv) throws UnsupportedEncodingException {
        KafkaConsumer KafkaConsumer = new KafkaConsumer();
        KafkaConsumer.start();
    }

    public KafkaConsumer(){
        Properties properties = new Properties();
        properties.put("zookeeper.connect","10.200.208.59:2181");
        properties.put("group.id","test-group");      
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(TOPIC).get(0);
        System.out.println(stream);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while(it.hasNext())
            System.out.println("from it");
            System.out.println(new String(it.next().message()));

    }

    private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
        for(MessageAndOffset messageAndOffset: messageSet) {
            ByteBuffer payload = messageAndOffset.message().payload();
            byte[] bytes = new byte[payload.limit()];
            payload.get(bytes);
            System.out.println(new String(bytes, "UTF-8"));
        }
    }
}

当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行

[2015-04-30 15:57:31,284] INFO Accepted socket connection from /10.200.208.59:51780 (org.apache.zookeeper.
server.NIOServerCnxnFactory)
[2015-04-30 15:57:31,284] INFO Client attempting to establish new session at /10.200.208.59:51780 (org.apa
che.zookeeper.server.ZooKeeperServer)
[2015-04-30 15:57:31,315] INFO Established session 0x14d09cebce30007 with negotiated timeout 6000 for clie
nt /10.200.208.59:51780 (org.apache.zookeeper.server.ZooKeeperServer)

此外,当我运行指向 AATest 主题的单独控制台使用者时,我将获取生产者为该主题生成的所有数据

消费者和代理在同一台机器上,而生产者在不同的机器上。这其实类似于这个问题。但是经历它能帮助我。请帮助我。

共有3个答案

郦磊
2023-03-14

也可能有这样一种情况,当新消费者添加到同一个组id时,kafka需要很长时间来重新平衡消费者组。检查kafka日志,看看启动消费者后组是否重新平衡

阎德宇
2023-03-14

在我们的例子中,我们通过以下步骤解决了我们的问题:

我们发现的第一件事是,KafkaProducer 有一个名为“重试”的配置,其默认值表示“不重试”。此外,KafkaProducer 的 send 方法是异步的,无需调用 send 方法结果的 get 方法。这样,就不能保证在不重试的情况下将生成的消息传递到相应的代理。因此,您必须稍微增加它,或者可以使用 KafkaProducer 的幂等性或事务模式。

第二个案例是关于Kafka和动物园管理员的版本。我们选择了Kafka和Zookeeper 3.4.4的1.0.0版本。特别是,Kafka 1.0.0与Zookeepher的连接存在问题。如果Kafka由于意外的异常而失去了与Zookeeper的连接,那么它将失去尚未同步的分区的领导权。关于这个问题有一个bug主题:https://issues.apache.org/jira/browse/KAFKA-2729在Kafka日志中找到了相应的日志,表明上面的主题中存在相同的问题后,我们将Kafka代理版本升级到1.1.0。

同样重要的一点是要注意,小规模的分区(如100或更少)会增加生产者的吞吐量,因此如果没有足够的消费者,那么可用的消费者就会陷入线程卡在延迟消息的结果上(我们以分钟为单位测量延迟,大约10-15分钟)。因此,您需要根据可用资源正确平衡和配置应用程序的分区大小和线程数。

麹学文
2023-03-14

不同的答案,但在我的例子中,它碰巧是消费者的初始偏移量(auto.offset.reset)。因此,设置auto.offset.reset=最早的解决了我的场景中的问题。这是因为我先发布事件,然后再启动消费者。

默认情况下,使用者只使用启动后发布的事件,因为< code > auto . offset . reset = latest 默认情况下。

例如。消费者属性

bootstrap.servers=localhost:9092
enable.auto.commit=true
auto.commit.interval.ms=1000
session.timeout.ms=30000
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
class KafkaEventConsumerSpecs extends FunSuite {

  case class TestEvent(eventOffset: Long, hashValue: Long, created: Date, testField: String) extends BaseEvent

  test("given an event in the event-store, consumes an event") {

    EmbeddedKafka.start()

    //PRODUCE
    val event = TestEvent(0l, 0l, new Date(), "data")
    val config = new Properties() {
      {
        load(this.getClass.getResourceAsStream("/producer.properties"))
      }
    }
    val producer = new KafkaProducer[String, String](config)

    val persistedEvent = producer.send(new ProducerRecord(event.getClass.getSimpleName, event.toString))

    assert(persistedEvent.get().offset() == 0)
    assert(persistedEvent.get().checksum() != 0)

    //CONSUME
    val consumerConfig = new Properties() {
      {
        load(this.getClass.getResourceAsStream("/consumer.properties"))
        put("group.id", "consumers_testEventsGroup")
        put("client.id", "testEventConsumer")
      }
    }

    assert(consumerConfig.getProperty("group.id") == "consumers_testEventsGroup")

    val kafkaConsumer = new KafkaConsumer[String, String](consumerConfig)

    assert(kafkaConsumer.listTopics().asScala.map(_._1).toList == List("TestEvent"))

    kafkaConsumer.subscribe(Collections.singletonList("TestEvent"))

    val events = kafkaConsumer.poll(1000)
    assert(events.count() == 1)

    EmbeddedKafka.stop()
  }
}

但是如果消费者首先启动然后发布,消费者应该能够消费事件,而不需要将auto.offset.reset设置为最早

https://kafka.apache.org/documentation/#consumerconfigs

 类似资料:
  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 我要求从主题中读取消息,对它们进行批处理,然后将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的 from 和 to 偏移量都存储在数据库中。为了实现这一点,我通过向读取器分配分区来为每个分区创建一个Kafka使用者,基于先前存储的偏移量,使用者寻求该位置并开始读取。我已关闭自动提交,并且不提交来自使用者的偏移量。对于每个批处理

  • 消费者使用Spring的JavaConfig类如下: Kafka主题侦听器使用@KafkaListener注释,如下所示: 我的pom包括依赖项: 现在当我打包到war并部署到tomcat时,它不会显示任何错误,即使在调试模式下也不会显示任何错误,只是部署war什么都没有。 请帮助我了解是否缺少触发kafkalistner的某些配置。 谢谢Gary我添加了上下文。xml和web。xml,但我得到了