我试图在不使用@Kafkalistener的情况下编写kafka consumer,下面是我用于配置侦听器的代码行:
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kafka cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
// allows a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "org");
// automatically reset the offset to the earliest offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ContainerProperties containerProperties=new ContainerProperties("in.t");
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Consumer receiver() {
return new Consumer();
}
}
在这里,我如何配置topic和listener方法,我的consumer类可以有多个方法。
另外,我想知道在将@kafkalistener与Kafka流一起使用时是否会遇到任何潜在问题。
附言:我不想使用@KafkaListener。
使用Spring Boot和Spring Kafka,可以创建没有@KafkaListener的消费者,并创建额外的显式Kafka bean(ConcurrentMessageListenerContainer等):
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: "org.apache.kafka.common.serialization.StringSerializer"
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
listener:
type: BATCH
consumer:
clientId: "applicationClientId"
groupId: "applicationGroup"
keyDeserializer: "org.apache.kafka.common.serialization.StringDeserializer"
valueDeserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
maxPollRecords: 50
fetchMinSize: 50
autoOffsetReset: earliest
properties:
spring:
json:
trusted:
packages: "*"
@Configuration
@EnableKafka
@RequiredArgsConstructor //Lombok annotation
public class KafkaConsumerConfig implements KafkaListenerConfigurer {
private final BeanFactory beanFactory;
private final SomethingApplicationService somethingApplicationService
@SneakyThrows //Lombok annotation
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
// The following may be in a loop with different services and topics
val endpoint = new MethodKafkaListenerEndpoint<String, KafkaReceiptRequest>();
endpoint.setBeanFactory(beanFactory);
endpoint.setBean(somethingApplicationService);
endpoint.setMethod(somethingApplicationService.getClass().getDeclaredMethod("processList", List.class));
endpoint.setId("Unique id for this endpoint");
endpoint.setTopics("topic1", ...);
endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
registrar.registerEndpoint(endpoint);
}
@Bean
public MessageHandlerMethodFactory messageHandlerMethodFactory() {
return new DefaultMessageHandlerMethodFactory();
}
}
@Service
public class SomethingApplicationService {
public void processList(List<Data> list) {
//processing logic
}
}
@Data //Lombok annotation
public class Data {
private String name;
private String value;
}
>
@kafkalistener不适用于Kafka Streams。它适用于普通的消费者。Kafka Stream可以通过StreamsBuilderFactoryBean
和特定的@Bean
来管理KStream。
如果您不想使用@kafkalistener
,则需要使用手册kafkalistener容器
创建。Kafka的KafkaListenerContainerFactory
可以用于此目的,但仅限于Spring Kafka2.2
之后,而且绝对不能用于Spring Boot。
因此,您别无选择,除非手动创建一个ConnettMessageListenerContainer
。这里已经通过ContainerProperties
注入了目标MessageListener
。对于您自定义的消费者POJO,您需要考虑将其包装到RecordMessagingMessageListenerAdapter中。并且只有最后一个必须被注入到ConnettMessageListenerContainer
中。
这就是@KafkaListener
的底层工作原理。
我有一个Kafka的话题,我正在听。然后将消息内容写入websocket通道,在该通道中我有一个订阅了该通道的SockJS客户机。这很管用。然后我创建了一个新的主题,然后添加了第二个KafKalistener。但是,当调用secong侦听器时,我看到它正在尝试处理/读取与第一个KafkaListener和主题相对应的有效负载,由于它没有被配置为这样做,因此会引发一个MessageConversio
我使用@KafkaListener,我需要一个动态的主题名,所以我使用SpEL'\uu listener'来实现这一点 它工作得非常好。 主要问题是当我想添加另一个注释时,它会触发某些方面的编程 @MyCustomAnnotationToRecordPerformance@KafkaListener(主题 = "#{__ listener.my道具}")公共无效监听器Kafka(@Payload
我只想了解@kafkaListener的范围是什么,原型还是单例。在单个主题的多个消费者的情况下,返回的是单个实例还是多个实例。在我的情况下,我有多个客户订阅单个主题并获得报告。我只是想知道如果 > 多个客户希望同时查询报告。在我的例子中,我在成功使用消息后关闭容器,但同时如果其他人想要获取报告,则容器应该打开。 如何将作用域更改为与容器的id相关联的原型(如果不是),以便每次都可以生成单独的实例
我使用的是spring-boot 2.3.2。使用spring-kafka->2.5.4发布。release kafka-clients->2.5.0 我有以下简单的监听器 null 如果我使用 然后它就会失败,不再有异常循环
我正在尝试使用SpringKafka将kafka与我的SpringBoot(v2.0.6版本)应用程序集成。现在我想要一个消费者和一个生产者。我让制作人工作得很好,我可以看到通过控制台消费者发送到主题的消息。我无法使用消费者代码,当Kafka主题中出现新消息时,它不会被调用。 这是我的Kafka配置类: 以下是我的pom依赖项: 以及消费者代码: 我正在我的计算机上运行kafka,正如我所说的——
所以我很想知道。这两个有什么不同?2.在哪种场景下选择哪一种?