当前位置: 首页 > 知识库问答 >
问题:

Springboot cloud Stream with Kafka

袁炳
2023-03-14

我正在尝试使用Kafka使用Spring Boot云流设置一个项目。我设法构建了一个简单的示例,其中侦听器从一个主题获取消息,并在处理后将输出发送到另一个主题。

我的侦听器和频道配置如下:

@Component
public class FileEventListener {
    private FileEventProcessorService fileEventProcessorService;

    @Autowired
    public FileEventListener(FileEventProcessorService fileEventProcessorService) {
        this.fileEventProcessorService = fileEventProcessorService;
    }

    @StreamListener(target = FileEventStreams.INPUT)
    public void handleLine(@Payload(required = false) String jsonData) {
        this.fileEventProcessorService.process(jsonData);
    }
}

public interface FileEventStreams {
    String INPUT = "file_events";
    String OUTPUT = "raw_lines";

    @Input(INPUT)
    SubscribableChannel inboundFileEventChannel();

    @Output(OUTPUT)
    MessageChannel outboundRawLinesChannel();
}

此示例的问题在于,当服务启动时,它不会检查主题中已存在的消息,它只处理启动后发送的那些消息。我对 Springboot 流和 kafka 很陌生,但对于我所读到的内容,这种行为可能与我正在使用订阅频道的事实相对应。例如,我尝试使用队列通道来查看它是如何工作的,但我发现了以下异常:

Error creating bean with name ... nested exception is java.lang.IllegalStateException: No factory found for binding target type: org.springframework.integration.channel.QueueChannel among registered factories: channelFactory,messageSourceFactory

所以,我的问题是:

  1. 如果我想在应用程序启动后处理主题中存在的所有消息(而且消息也只由一个使用者处理),那么我就走对了
  2. 即使QueueChannel不是实现1中所述行为的正确选择。)我需要向我的项目添加什么才能使用这种类型的频道

谢谢

共有1个答案

黄跃
2023-03-14

>

  • 添加spring.cloud.stream.bindings.file_events.group=foo

    • 默认情况下,匿名组仅从主题的末尾开始使用,与组的绑定从开头开始使用。

    不能将可轮询通道用于绑定,它必须是订阅通道

  •  类似资料:

    相关问答

    相关文章

    相关阅读