现在,我有一个Spring Boot CLI应用程序,当应用程序启动时,它会自动启动Kafka消费者。我的任务是更新提供API的应用程序,允许在特定条件下启动或停止Kafka消费者。所以,我将使用SpringBootStarterWeb创建该API。但我找不到一种方法来手动管理消费过程。我需要的是
关于如何手动管理消费过程的任何建议?
技术细节:
@KafkaListener
用于创建侦听器如果您的应用程序中配置了多个消费者,则必须使用唯一密钥来区分他们
下面的示例考虑多消费者配置。
消费者配置示例:
consumers:
"fun-consumer-key":
topic: fun.topic-1
consumerProcessOnStart: org.kafka.ProcessConsumedMessage
consumerProperties:
"[bootstrap.servers]": localhost:9092,localhost:9093
// Other consumer configs
消费者工厂侦听器:
@Component
@Slf4j
public class ConsumerFactoryListener<K, V> implements Listener<K, V> {
@Override
public void consumerAdded(final String id, final Consumer<K, V> consumer) {
//
}
@Override
public void consumerRemoved(final String id, final Consumer<K, V> consumer) {
//
}
}
用于保存使用者对象的AppPropertiesConfig:
@UtilityClass
public class AppPropertiesConfig {
private static Map<String, Object> configConsumers = new ConcurrentHashMap<>();
public static Map<String, Object> getConfigConsumers() {
return consumerMap;
}
}
消息监听器:Ack
public class AutoAckMessageListener<K, V> extends
implements MessageListener<K, V> {
private final ProcessConsumedMessage<K, V> consumeMessage;
@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord) {
onMessageConsume(consumerRecord, consumeMessage);
}
}
public class AckMessageListener<K, V> extends
implements AcknowledgingMessageListener<K, V> {
private final ProcessConsumedMessage<K, V> consumeMessage;
@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord, final Acknowledgment acknowledgment) {
onMessageConsume(consumerRecord, consumeMessage);
acknowledgment.acknowledge();
}
}
// You can put this method in an abstract class and both listener classes can extend this Abstract class with onMessageConsume method
public void onMessageConsume(ConsumerRecord<K, V> consumerRecord,
final ProcessConsumedMessage<K, V> consumeMessage) throws InterruptedException {
// Your custom processing implementation
consumeMessage.process(consumerRecord.key(), consumerRecord.value());
}
初始化消息侦听器:
public MessageListener getListener(String className) {
final ProcessConsumedMessage consumeMessage = (ProcessConsumedMessage) getClassFromName();
MessageListener listener;
if (isAutoCommitEnabled) {
listener = new AutoAckMessageListener(consumeMessage);
} else {
listener = new AckMessageListener(consumeMessage);
}
return listener;
}
启动消费者:
public void startConsumer(final String key, final String topic,
final Object messageListener, final Map<String, Object> consumerProperties) {
// Check already created, start and return
ConcurrentMessageListenerContainer<K, V> container =
(ConcurrentMessageListenerContainer<K, V>) AppPropertiesConfig
.getConfigConsumers().get(key); // key - "fun-consumer-key"
if (container != null) {
if (!container.isRunning()) {
container.start();
}
return;
}
final DefaultKafkaConsumerFactory<K, V> factory = new DefaultKafkaConsumerFactory<>(
consumerProperties);
factory.addListener(consumerFactoryListener);
final ConcurrentKafkaListenerContainerFactory<K, V> containerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(factory);
final ContainerProperties containerProperties = containerFactory.getContainerProperties();
containerProperties.setPollTimeout(pollTimeout);
// auto-commit??
if (!isAutoCommitEnabled) {
containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
}
containerFactory.setErrorHandler(getErrorHandler(<some retry configurations object>));
// create the container
container = containerFactory.createContainer(topic);
container.setupMessageListener(messageListener);
// start
container.start();
AppPropertiesConfig.getConfigConsumers().put(key, container);
}
private SeekToCurrentErrorHandler getErrorHandler() {
// Provide your error handler. Ex: SeekToCurrentErrorHandler
}
阻止消费者:
public void stopConsumer(final String key) {
if (StringUtils.isBlank(key)) {
return;
}
final ConcurrentMessageListenerContainer<K, V> container
= (ConcurrentMessageListenerContainer<K, V>) AppPropertiesConfig
.getConfigConsumers().get(key);
if (container == null || !container.isRunning()) {
throw new Exception();
}
try {
container.stop();
} catch (Exception e) {
// log
throw e;
} finally {
AppPropertiesConfig.getConfigConsumers().remove(key);
}
}
我正在开发一个Spring Boot应用程序,它使用以Kafka主题为源的Spring集成流。我们的集成流程开始使用一个包含带有spring framework . cloud . stream . annotation . input和Output注释的SubscribableChannels的接口。这些被配置为通过spring . Cloud . stream . Kafka . bindin
我正在尝试运行一个简单的Spring Boot Kafka应用程序,但我无法使其工作。我遵循了各种教程,现在我正在实现这个教程,但当我启动应用程序时,会发生以下情况: 我可以在控制台中写入,但消费者没有收到任何消息。 这是我的SpringApplication类: application.yml: 消费者类、生产者类及其配置类与教程中所写的相同。< br >在我的server.properties
我想这个话题发生了什么...偏移坏了还是我不知道... 有人知道会发生什么吗?谢谢
例如,分区有1-10的偏移量。我只想从3-8消费。在消耗了第8条消息后,程序应该退出。
我相信这三种类型的确认由于生产者属性仅限于领导者和生产者,我希望生产者在消费者通过kafka broker消费来自存储/队列的消息时收到具体的消息。还请纠正我,如果我在制作人的“acks”属性上有错误,它的默认值是“-1”,它确认所有副本是否已接收/存储消息,但它是否与消费者有关,或者我们是否可以在消费者提交且Kafka向制作人发送确认时创建一个桥梁?
我在我的工作区中使用STS IDE运行了几个Spring Boot应用程序,在我对其中一个项目进行maven更新后,每个项目都在应用程序启动过程后立即停止。我甚至创建了一个最小的例子,只是为了开始一些事情,同样的事情发生了。 这是我的pom.xml 即使是那些入门示例也会在启动后立即停止。我会非常感谢这里的一些帮助。 编辑:正如Alexandru Marina在评论中所说,我使用的是快照而不是稳定