当前位置: 首页 > 知识库问答 >
问题:

有没有办法在Spring应用程序中手动启动/停止Kafka消费者?

毕宇
2023-03-14

现在,我有一个Spring Boot CLI应用程序,当应用程序启动时,它会自动启动Kafka消费者。我的任务是更新提供API的应用程序,允许在特定条件下启动或停止Kafka消费者。所以,我将使用SpringBootStarterWeb创建该API。但我找不到一种方法来手动管理消费过程。我需要的是

  • 在不使用消费者的情况下启动API

关于如何手动管理消费过程的任何建议?

技术细节:

  • @KafkaListener用于创建侦听器

共有1个答案

终波涛
2023-03-14

如果您的应用程序中配置了多个消费者,则必须使用唯一密钥来区分他们
下面的示例考虑多消费者配置。

消费者配置示例:

consumers:
      "fun-consumer-key":
        topic:  fun.topic-1
        consumerProcessOnStart: org.kafka.ProcessConsumedMessage
        consumerProperties:
          "[bootstrap.servers]": localhost:9092,localhost:9093
          // Other consumer configs

消费者工厂侦听器:

@Component
@Slf4j
public class ConsumerFactoryListener<K, V> implements Listener<K, V> {

  @Override
  public void consumerAdded(final String id, final Consumer<K, V> consumer) {
    //
  }

  @Override
  public void consumerRemoved(final String id, final Consumer<K, V> consumer) {
    //
  }

}

用于保存使用者对象的AppPropertiesConfig:

@UtilityClass
public class AppPropertiesConfig {

  private static Map<String, Object> configConsumers = new ConcurrentHashMap<>();

  public static Map<String, Object> getConfigConsumers() {
    return consumerMap;
  }
}

消息监听器:Ack

    public class AutoAckMessageListener<K, V> extends
        implements MessageListener<K, V> {
    
      private final ProcessConsumedMessage<K, V> consumeMessage;
    
      
      @Override
      public void onMessage(ConsumerRecord<K, V> consumerRecord) {
        onMessageConsume(consumerRecord, consumeMessage);
      }
    }

    public class AckMessageListener<K, V> extends
        implements AcknowledgingMessageListener<K, V> {
    
      private final ProcessConsumedMessage<K, V> consumeMessage;
    
      
      @Override
      public void onMessage(ConsumerRecord<K, V> consumerRecord, final Acknowledgment acknowledgment) {
        onMessageConsume(consumerRecord, consumeMessage);
        acknowledgment.acknowledge();
      }
    }

  // You can put this method in an abstract class and both listener classes can extend this Abstract class with onMessageConsume method
  public void onMessageConsume(ConsumerRecord<K, V> consumerRecord,
      final ProcessConsumedMessage<K, V> consumeMessage) throws InterruptedException {
    
    // Your custom processing implementation
    consumeMessage.process(consumerRecord.key(), consumerRecord.value());
  }

初始化消息侦听器:

  public MessageListener getListener(String className) {
    final ProcessConsumedMessage consumeMessage = (ProcessConsumedMessage) getClassFromName();
    MessageListener listener;
    if (isAutoCommitEnabled) {
      listener = new AutoAckMessageListener(consumeMessage);
    } else {
      listener = new AckMessageListener(consumeMessage);
    }
    return listener;
  }

启动消费者:

public void startConsumer(final String key, final String topic,
          final Object messageListener, final Map<String, Object> consumerProperties) {
        
        
        // Check already created, start and return
        ConcurrentMessageListenerContainer<K, V> container =
            (ConcurrentMessageListenerContainer<K, V>) AppPropertiesConfig
            .getConfigConsumers().get(key);  // key - "fun-consumer-key"
        if (container != null) {
          if (!container.isRunning()) {
           
            container.start();
          }
          return;
        }
        
        final DefaultKafkaConsumerFactory<K, V> factory = new DefaultKafkaConsumerFactory<>(
            consumerProperties);
        factory.addListener(consumerFactoryListener);
    
        final ConcurrentKafkaListenerContainerFactory<K, V> containerFactory
            = new ConcurrentKafkaListenerContainerFactory<>();
        containerFactory.setConsumerFactory(factory);
    
        final ContainerProperties containerProperties = containerFactory.getContainerProperties();
        containerProperties.setPollTimeout(pollTimeout);
    
        // auto-commit??
        if (!isAutoCommitEnabled) {
          containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
        }

        containerFactory.setErrorHandler(getErrorHandler(<some retry configurations object>));
        // create the container
        container = containerFactory.createContainer(topic);
        container.setupMessageListener(messageListener);
        // start
        container.start();
        AppPropertiesConfig.getConfigConsumers().put(key, container);
      }
    
      private SeekToCurrentErrorHandler getErrorHandler() {
          // Provide your error handler. Ex: SeekToCurrentErrorHandler
      }

阻止消费者:

  public void stopConsumer(final String key) {
    if (StringUtils.isBlank(key)) {
      return;
    }
    final ConcurrentMessageListenerContainer<K, V> container
        = (ConcurrentMessageListenerContainer<K, V>) AppPropertiesConfig
        .getConfigConsumers().get(key);
    if (container == null || !container.isRunning()) {
      throw new Exception();
    }
    try {
      container.stop();
    } catch (Exception e) {
      // log
      throw e;
    } finally {
      AppPropertiesConfig.getConfigConsumers().remove(key);
    }
  }
 类似资料:
  • 我正在开发一个Spring Boot应用程序,它使用以Kafka主题为源的Spring集成流。我们的集成流程开始使用一个包含带有spring framework . cloud . stream . annotation . input和Output注释的SubscribableChannels的接口。这些被配置为通过spring . Cloud . stream . Kafka . bindin

  • 我正在尝试运行一个简单的Spring Boot Kafka应用程序,但我无法使其工作。我遵循了各种教程,现在我正在实现这个教程,但当我启动应用程序时,会发生以下情况: 我可以在控制台中写入,但消费者没有收到任何消息。 这是我的SpringApplication类: application.yml: 消费者类、生产者类及其配置类与教程中所写的相同。< br >在我的server.properties

  • 我想这个话题发生了什么...偏移坏了还是我不知道... 有人知道会发生什么吗?谢谢

  • 例如,分区有1-10的偏移量。我只想从3-8消费。在消耗了第8条消息后,程序应该退出。

  • 我相信这三种类型的确认由于生产者属性仅限于领导者和生产者,我希望生产者在消费者通过kafka broker消费来自存储/队列的消息时收到具体的消息。还请纠正我,如果我在制作人的“acks”属性上有错误,它的默认值是“-1”,它确认所有副本是否已接收/存储消息,但它是否与消费者有关,或者我们是否可以在消费者提交且Kafka向制作人发送确认时创建一个桥梁?

  • 我在我的工作区中使用STS IDE运行了几个Spring Boot应用程序,在我对其中一个项目进行maven更新后,每个项目都在应用程序启动过程后立即停止。我甚至创建了一个最小的例子,只是为了开始一些事情,同样的事情发生了。 这是我的pom.xml 即使是那些入门示例也会在启动后立即停止。我会非常感谢这里的一些帮助。 编辑:正如Alexandru Marina在评论中所说,我使用的是快照而不是稳定