当前位置: 首页 > 知识库问答 >
问题:

PolledProcessor在Spring云数据流上的问题

李宜然
2023-03-14

我正在使用PolledProcessor提出一个spring云数据流处理器。我遵循了下面的示例https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers。下面是我的代码。我将带有源管道的流部署到这个处理器(源polled-processor)到scdf,并让源发布一些消息。我确认处理器每秒轮询一次来自scdf rabbitmq的消息,但是结果总是false。我去了scdf rabbitmq控制台,我看到这些消息都在队列中。因此,处理器不轮询消息,尽管它一直在代码中轮询。我还看到队列没有消费者。看起来scdf没有将此处理器绑定到队列。知道为什么吗?

public interface PolledProcessor {
    @Input
    PollableMessageSource source();

    @Output
    MessageChannel dest();
}

@SpringBootApplication
@EnableBinding(PolledProcessor.class)
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(PollableMessageSource source, MessageChannel dest) {
        return args -> {
            while (true) {
                boolean result = source.poll(dest::send);
                Thread.sleep(1000);
            }
        };
    }
}

下面是源和处理器之间队列的状态

共有1个答案

柳飞飙
2023-03-14

测试了一个Spring Cloud Stream应用程序,没有任何问题:

@SpringBootApplication
@EnableBinding(Polled.class)
public class So69383266Application {

    public static void main(String[] args) {
        SpringApplication.run(So69383266Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(PollableMessageSource source) {
        return args -> {
            while (true) {
                boolean result = source.poll(System.out::println);
                System.out.println(result);
                Thread.sleep(1000);
            }
        };
    }

}

interface Polled {

    @Input
    PollableMessageSource source();

}
false
GenericMessage [payload=byte[6], headers={...
true
false

我建议您在amqpmessageSource.doReceive()中设置一个断点,看看发生了什么。

编辑

@Bean
public ApplicationRunner runner(PollableMessageSource source) {
    return args -> {
        while (true) {
            DirectFieldAccessor dfa = new DirectFieldAccessor(source);
            log.info(dfa.getPropertyValue("source.h.advised.targetSource.target.queue").toString());
            boolean result = source.poll(System.out::println);
            System.out.println(result);
            Thread.sleep(1000);
        }
    };
}
 类似资料:
  • 我曾经使用过SpringCloudDataFlow、rabbitmq和kafka,但我想知道是否可以使用GooglePub/sub安装scdf。 我不想创建一个流(新的应用程序spring cloud stream),将源或接收器连接到gcp,我希望google pub/sub over spring cloud data flow server用作中间消息代理。 有什么建议吗?

  • 我有一个现有的过程,我正试图转换成SCDF实现。目前的流程是, HTTP接收器(接收HTTP POST数据)->RabbitMQ->MQ接收器服务->处理/转换->DB接收器

  • 我正在从Spring XD迁移到Spring Cloud Data Flow。当我寻找模块列表时,我意识到一些源码没有在Spring Cloud Flow中列出--其中一个是Kafka源码。 我的问题是为什么在spring cloud data flow中KAFKA源从标准源列表中删除?

  • 我想使用Cloud Dataflow,PubSub和Bigquery将tableRow写入PubSub消息,然后将它们写入Bigquery。我希望表名、项目id和数据集id是动态的。 我在internet上看到下面的代码,我不明白如何传递数据行参数。 先谢谢你,盖尔

  • 我在这个网站上用docker compose启动了Spring云数据流。 https://dataflow.spring.io/docs/installation/local/docker/ 我创建了3个应用程序,源,处理器 我跑了 当我运行docker compose-f时/docker编写。yml-f/docker创作普罗米修斯。yml,所有我的容器都按照docker compose中的指定启