thread.sleep(1000)
来模拟的)。发现
在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。
问题
我如何使消费者从上次消费的消息的索引+1继续。
thread.sleep(1000)
来模拟的)。发现
使用者从一开始就使用所有的消息,忽略上次使用的消息。
期望值
@Configuration
@EnableKafka
public class KafkaConsumerConfig
{
@Bean
public Map<String, Object> consumerConfigs()
{
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "basic-group");
return props;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory()
{
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(100);
return factory;
}
}
KafKaProducerConfig
@Configuration
public class KafkaProducerConfig
{
@Bean
public ProducerFactory<Integer, String> producerFactory()
{
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs()
{
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate()
{
return new KafkaTemplate<>(producerFactory());
}
}
Kafka生产者
@Component
public class KafkaProducer
{
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String message, final String topicName)
{
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>()
{
@Override
public void onSuccess(SendResult<String, String> result)
{
System.out.println("Sent message=[" + message
+ "] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex)
{
System.out.println("Unable to send message=["
+ message + "] due to : " + ex.getMessage());
}
});
}
}
Kafka孔苏梅尔
@Component
public class KafkaConsumer
{
@KafkaListener(id = "basic", topics = "test-1", clientIdPrefix = "test-prefix-id", autoStartup = "true", concurrency = "3")
public void multipleTransactionNotification(@Payload final String message)
{
try
{
Thread.sleep(1000);
System.out.println(message);
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
测试应用程序
@SpringBootApplication
public class TestApplication
{
public static void main(String[] args)
{
SpringApplication.run(TestApplication.class, args);
}
@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx, KafkaProducer producer, NIPKafkaConsumer consumer) {
return args -> {
for (int i = 0; i < 100_000; i++)
{
producer.sendMessage("New Message-" + i, "test-1");
}
};
}
}
Spring Kafka执行kafkaconsumer.poll()
来获取记录,然后为每个记录调用侦听器。
因此,如果批处理包含100条记录,则使用实际的KafKalistener进行整个批处理将需要100秒。
使用者提交偏移量以知道从何处恢复。
默认行为是每5秒自动提交一次偏移量。我的猜测是你在消费者做出补偿之前就杀死了它,这就是为什么它从一开始就重新开始的原因。
请参阅enable.auto.commit
和auto.commit.interval.ms
Kafka使用者配置。
也可以手动提交偏移量。
我正在处理一个kafka用例,在这个用例中,我需要在生产者和消费者端具有事务性语义...我可以使用kafka transaction API 0.11将事务性消息发布到kafka集群,但在消费者端,我面临着一个问题...我在属性文件中设置了但我不能使用它...我可以看到消息被使用但这不是希望的... 生产者代码 ProducerTX.Properties 消费者 感谢你的帮助..谢谢
所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职
我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断
我如何将电话限制在每5秒一次。注意:只能修改reallySlowApi。 编辑:我知道,但是如果Api变得更慢,它就不能解决问题。我需要使用的最佳方式。
我的应用程序有一个生产者和一个消费者。我的生产者不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者监听队列,当有消息在其中时,接受它并处理这条消息。这个过程可能需要几个小时,如果我的消费者没有完成处理当前消息,我不希望他接受队列中的另一条消息。 我认为AKKA和AWS SQS可以满足我的需求。通过阅读文档和示例,akka-camel似乎可以简化我的工作。 我在github
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?