UseCase—希望在外部服务出现可重试异常时暂停Spring KafkaListener几秒钟,并希望从最早的未提交偏移量恢复。
问题我有-下面是实现。
1) 没有外寻用法-在恢复Spring之后,kafkalistener正在挑选进入主题分区的最新消息。这不符合目的(从上次提交的偏移量到最近的偏移量之间的消息丢失)
2) 使用seek-我不知道如何处理kafkaconsumer
源代码
消费者中的Lisener方法
@KafkaListener(topics = "${kafka.consumer.topic}", containerFactory = "kafkaListenerContainerFactory")
public void onReceiving(@Payload ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
try {
Event event = translate(consumerRecord);
someService.processEvent(event, consumerRecord);
commitOffset(acknowledgment)
} catch(ConsumerException e) {
//DO NOT commit offset
}
}
private void commitOffset(Acknowledgment acknowledgment) {
acknowledgment.acknowledge();
}
Service
public void processEvent(Event event, ConsumerRecord<String, String> consumerRecord) {
try {
//call an external API to get realTime event details
//Have a retry on this client
BusinessEntity businessEntity = externalServiceClient.get(event);
//process the Entity
anotherService.process(businessEntity);
} catch(RetryableException re) {
//feign.RetryableException
//we are using feign declarative clients
consumerErrorHandler.handle(re, consumerRecord);
}
}
错误处理程序--
public class ConsumerErrorHandler implements ErrorHandler {
@Autowired
private final KafkaListenerEndpointRegistry registry;
//org.springframework.core.task.SimpleAsyncTaskExecutor
@Autowrired
private final Executor executor;
@Autowired
private Consumer<String, String> kafkaConsumer;
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
//Trying to delegate this to a new Async thread.
executor.execute(() -> {
registry.getListenerContainers().forEach(container -> {
if ((!container.isContainerPaused() || !container.isPauseRequested())) {
log.info("STOPPING_CONSUMER on error");
Optional<TopicPartition> topicPartition = container.getAssignedPartitions().stream().filter(a -> a.partition() == data.partition()).findFirst();
container.pause();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.info("BEFORE_RESUME");
log.info("SEEK CONSUMER before RESUME to this offset: "+data.offset());
topicPartition.ifPresent(a ->
{
log.info("Seek from the current position: " + data.offset());
kafkaConsumer.seek(a, data.offset());
});
container.resume();
log.info("RESUMING_CONSUMER after seek");
topicPartition.ifPresent(a -> {
log.info("CONSUMER is up NOW ??");
});
}
});
});
}
}
消费者配置
private Map<String, Object> consumerConfigs() {
Map<String, Object> confMap = new HashMap<>();
confMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, pubSubServers);
confMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
confMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
confMap.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupIdConfig);
confMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "50000");
confMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000");
confMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
confMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase());
if (this.securityProtocol.equalsIgnoreCase(SSL)) {
confMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, this.securityProtocol);
confMap.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
this.getClass().getResource(clientTrustStoreLocation).getPath());
confMap.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.sslTrustStorePassword);
confMap.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
this.getClass().getResource(this.clientKeyStoreLocation).getPath());
confMap.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslKeyStorePassword);
confMap.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslKeyPassword);
confMap.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,null);
}
return confMap;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency("1");
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setConsumerTaskExecutor(taskExecutor());
factory.setConsumerFactory(consumerFactory());
factory.setErrorHandler(consumerErrorHandler);
factory.setRetryTemplate(retryTemplate());
return factory;
}
@Bean
public AsyncListenableTaskExecutor taskExecutor() {
return createTaskExecutor("1");
}
private RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(retryPolicy());
template.setBackOffPolicy(backOffPolicy());
return template;
}
private BackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(1000);
return policy;
}
private RetryPolicy retryPolicy() {
SimpleRetryPolicy policy = new SimpleRetryPolicy();
policy.setMaxAttempts("1");
return policy;
}
使用一个消费者数据库。
不能在另一个线程上执行搜索。请参阅
KafkaConsumer
javadocs-它不是线程安全的。
您还必须为其他主题/分区查找任何剩余记录(除非您只有一个主题/分区)。
最后,在容器暂停之前,您不得退出错误处理程序-否则将发生竞争,消费者可能会在暂停之前执行另一次
poll()
。
有关如何执行此类操作的示例,请参阅
。SeekToMONtErrorHandler
和ContainerStoppingErrorHandler
。必须在另一个线程上调用Stop()
以避免死锁,但您可以prence()
容器在消费者线程上(它只是设置一个标志,这样消费者将在下一个轮询()
之前暂停()
要resume()
容器,请使用ApplicationListener
或@EventListener
侦听暂停容器的容器空闲事件(设置idleEventVal
以获取这些事件)。
我正在使用这个库来实现节点kafka与消费者暂停和恢复方法来处理背压。我已经创建了一个小演示,我可以在其中和,但问题是在后它停止了消费消息。 这是我的代码。 任何人都可以帮助我,我在恢复消费者时做错了什么?当我启动使用者时,它只接收一条消息,并且在恢复后仍然不消耗任何其他消息。
问题内容: 我有一个基本的Swing UI,带有一个标记为“播放”的按钮。按下按钮后,标签变为“暂停”。现在,当按下按钮时,它变为“继续”。 在“播放”中,我将实例化并执行一个SwingWorker。我想要的是能够暂停该线程(不要取消该线程),并根据上述按钮按下来恢复它。但是,我不想在doInBackground()中求助于Thread.sleep()。这似乎有点骇人听闻。有什么方法可以阻止运行d
我要做的是暂停< code>KafkaConsumer,如果在使用消息的过程中出现错误。 这是我写的 然后我写了一个REST服务来恢复消费者 现在,我有两个问题。第一个问题:当我打电话给消费者时。来自<code>@KafkaListener</code>注释方法的pause()会发生什么?消费者立即暂停,或者我可以接收到同一主题分区的其他偏移量上的其他消息。例如,我有偏移量为3的“message1
问题内容: 我声明我已阅读有关线程的内容,但从未使用过。所以我问你:) 我有两个线程:和,其中管理GUI和逻辑。 我将从开始。 然后在绘制GUI时,我将其暂停,以等待到达X点的run方法。 当到达X点进入run方法时,我暂停并继续。 并共享一些变量来管理GUI和逻辑… 我可以做吗?如果是,如何?:) 问题答案: 使用和方法: -使当前线程等待,直到另一个线程调用 该对象的方法。 -唤醒正在该对象的
有些情况下,例如爬取大的站点,我们希望能暂停爬取,之后再恢复运行。 Scrapy通过如下工具支持这个功能: 一个把调度请求保存在磁盘的调度器 一个把访问请求保存在磁盘的副本过滤器[duplicates filter] 一个能持续保持爬虫状态(键/值对)的扩展 Job 路径 要启用持久化支持,你只需要通过 JOBDIR 设置 job directory 选项。这个路径将会存储 所有的请求数据来保持一
我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?