当前位置: 首页 > 知识库问答 >
问题:

Kafka-生产到一个主题,然后从该主题消费后?

况承福
2023-03-14

我以前是学习Kafka的传统ActiveMQ用户。我有一个问题。

使用Active MQ,您可以执行以下操作:

  • 将100条消息提交到队列中

我试着在Kafka做同样的事情

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

public class KafkaTest {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTest.class);
    public static final String MY_GROUP_ID = "my-group-id";
    public static final String TOPIC = "topic";

    KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

    @Before
    public void before() {
        kafka.start();
    }

    @After
    public void after() {
        kafka.close();
    }

    @Test
    public void testPipes() throws ExecutionException, InterruptedException {

        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        consumerProps.put("group.id", MY_GROUP_ID);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        ExecutorService es = Executors.newCachedThreadPool();
        Future consumerFuture = es.submit(() -> {
            try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
                consumer.subscribe(Collections.singletonList(TOPIC));
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                    for (ConsumerRecord<String, String> record : records) {
                        LOG.info("Thread: {}, Topic: {}, Partition: {}, Offset: {}, key: {}, value: {}", Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(), record.key(), record.value().toUpperCase());
                    }
                }
            } catch (Exception e) {
                LOG.error("Consumer error", e);
            }
        });

        Thread.sleep(10000); // NOTICE! if you remove this, the consumer will not receive the messages. because the consumer won't be registered yet before the messages come rolling on in.

        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Future producerFuture = es.submit(() -> {
            try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
                int counter = 0;

                while (counter <= 100) {
                    System.out.println("Sent " + counter);
                    String msg = "Message " + counter;
                    producer.send(new ProducerRecord<>(TOPIC, msg));
                    counter++;
                }

            } catch (Exception e) {
                LOG.error("Failed to send message by the producer", e);
            }
        });

        producerFuture.get();
        consumerFuture.get();
    }
}

如果不启动Consumer,等待它启动,然后运行producer,则此示例不起作用。

谁能告诉我如何修改我的示例程序,以在消息等待被消费的地方执行操作?

共有1个答案

咸亦
2023-03-14

在您的消费者配置中,您需要添加auto.offset.reset=最早或在订阅后调用see kToStart

否则,它从主题末尾开始读取。换句话说,如果您在生产者之后启动消费者,它将在所有存量数据之后开始读取。

 类似资料:
  • 如何在apache/kafka中使用regex消费所有主题?我尝试了上面的代码,但不起作用。

  • 我试着把这个理论与缩放工人做比较。 但是,使用版本1.2.1时,storm Kafka spout在多个不同的拓扑中的行为并不像我预期的那样。 为单个主题的所有拓扑中的kafka spout使用者设置一个公共client.id和group.id,每个拓扑仍然订阅所有可用的分区和重复的元组,并在重新提交已提交的元组时抛出错误。 如果有人能解释一下 Kafka喷口的这种行为的实现逻辑是什么? 有解决此

  • 我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。

  • 我有一个spring boot项目,我是spring-kafka来连接底层的kafka事件枢纽。 我不得不在同一节消费者课上听2个不同的话题。我有两种方法可以这样做。 一个是要有两个这样的Kafka听众: 另一种方法是在同一个kafkaListener中有两个主题,如下所示 ===================edit===============application.yml中的Kafka属性

  • 我有一个用例“XML文件==>Kafka主题==>Build REST API to Query”来自Kafka主题的数据。我熟悉将数据转换为Avro格式,并编写到kafka主题。 您能建议如何发布XML吗?

  • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。