当前位置: 首页 > 知识库问答 >
问题:

spring集成在bean之间发布订阅

尹晟
2023-03-14

谢谢你提前阅读。在我的主要方法中,我有一个出版频道

@Bean(name = "feeSchedule")
public SubscribableChannel getMessageChannel() {
    return new PublishSubscribeChannel();
}

在执行长时间运行流程的服务中,它创建了一个收费计划,我将频道注入其中

@Service
public class FeeScheduleCompareServiceImpl implements FeeScheduleCompareService {

    @Autowired
    MessageChannel outChannel;

    public List<FeeScheduleUpdate> compareFeeSchedules(String oldStudyId) {
    List<FeeScheduleUpdate> sortedResultList = longMethod(oldStudyId);
    outChannel.send(MessageBuilder.withPayload(sortedResultList).build());
        return sortedResultList;
    } 
}

这就是我正在努力解决的问题。我想使用CompletableFuture并在另一个Springbean中获得未来事件的有效负载。我需要一个未来来从消息返回有效负载。我想我想创建一个ServiceActivator作为消息的endpoint,但正如我所说的,我需要它来返回未来a的有效负载。

@org.springframework.stereotype.Service
public class SFCCCompareServiceImpl implements SFCCCompareService {
     @Autowired
    private SubscribableChannel outChannel;

     @Override
    public List<SFCCCompareDTO> compareSFCC(String state, int service){
    ArrayList<SFCCCompareDTO> returnList = new ArrayList<SFCCCompareDTO>();
    CompletableFuture<List<FeeScheduleUpdate>> fa =  CompletableFuture.supplyAsync( () ->
            {  //block A   WHAT GOES HERE?!?!
                    outChannel.subscribe()
            }
    );
    CompletableFuture<List<StateFeeCodeClassification>> fb =  CompletableFuture.supplyAsync( () ->
            {  
                  return this.stateFeeCodeClassificationRepository.findAll();       
            }
    );
    CompletableFuture<List<SFCCCompareDTO>> fc = fa.thenCombine(fb,(a,b) ->{
        //block C
        //get in this block when both A & B are complete
        Object theList = b.stream().forEach(new Consumer<StateFeeCodeClassification>() {
            @Override
            public void accept(StateFeeCodeClassification stateFeeCodeClassification) {
                a.stream().forEach(new Consumer<FeeScheduleUpdate>() {
                    @Override
                    public void accept(FeeScheduleUpdate feeScheduleUpdate) {
                        returnList new SFCCCompareDTO();
                    }
                });
            }
        }).collect(Collectors.toList());
        return theList;
    });
    fc.join();
    return returnList;
}

}

我想会有一个服务激活器,比如:

@MessageEndpoint
public class UpdatesHandler implements MessageHandler{

    @ServiceActivator(requiresReply = "true")
    public List<FeeScheduleUpdate> getUpdates(Message m){
        return (List<FeeScheduleUpdate>) m.getPayload();
    }
}

共有1个答案

娄森
2023-03-14

你的问题不清楚,但我会尽力帮你提供一些信息。

Spring集成不提供CompletableFuture支持,但它提供了异步处理和回复。

有关详细信息,请参阅异步网关。另请参见异步服务激活器。

outChannel。subscribe()应该附带MessageHandler回调。

 类似资料:
  • 我有一个基于DSL的流,它使用拆分迭代对象列表并发送Kafka消息: 在所有消息发出后,我需要调用服务,还需要记录处理了多少消息。我知道一种方法是使用publishSubscribeChannel,其中第一个subscribe执行实际的Kafka发送,然后聚合执行服务调用: 我在弄清楚如何使用DSL在pubSubChannel中实际执行部分时遇到了问题。到目前为止,我已经尝试过: 有什么指示吗?

  • 我正在尝试实现Spring与MQTT的集成。我正在使用Mosquitto作为MQTT经纪人。并在下面的链接中提供了文档的引用。我已经创建了一个项目并添加了所有所需的jar文件。当我执行时。 当我通过MQTT代理发布消息时,我收到以下错误。 添加@SpringBootApplication注释后,请找到下面的堆栈跟踪

  • 我有一个用修饰的类,它应该重写这个: bean用于设置外部文件的路径,该文件包含在启动期间必须读取的注册代码。它用于类中: 在尝试调试时,我在每个方法以及test config类的构造函数中设置了一个断点。的构造函数断点被命中,因此我知道我的测试配置类实例化了,但是该类的方法从未被命中。相反,正常的类的方法被点击,中的总是而不是预期的。 不知道为什么会这样。任何想法都将不胜感激。

  • 简介 Redis 的列表类型键可以用来实现队列,并且支持阻塞式读取,所以 Redis 能够非常容易的实现一个高性能的优先队列。同时在更高层面上,Redis 还支持“发布/订阅”的消息模式,可以基于此构建一个聊天系统。 发布示例 发布(Publish)即将消息发布到频道中。示例代码: // 发送消息 Redis::publish('chan-1', 'Hello, World!'); // 发送消息

  • 我对Spring Integration是新手。我尝试使用文件拆分器将消息从文件中拆分出来,然后使用.aggregate()构建单个消息并发送到输出通道。我有标记为true,因此apply-sequence现在默认为false。我已经使用EnricHeaders将correlationId设置为常量“1”。我在设置realease策略时遇到了困难,因为我没有在序列结束上保持。下面是我的代码的外观。

  • 主要内容:发布/订阅流程,常用命令汇总,基本命令应用Redis PubSub 模块又称发布订阅者模式,是一种消息传递系统,实现了消息多播功能。发布者(即发送方)发送消息,订阅者(即接收方)接收消息,而用来传递消息的链路则被称为  channel。在 Redis 中,一个客户端可以订阅任意数量的 channel(可译为频道)。 消息多播:生产者生产一次消息,中间件负责将消息复制到多个消息队列中,每个消息队列由相应的消费组进行消费,这是分布式系统常用的