谢谢你提前阅读。在我的主要方法中,我有一个出版频道
@Bean(name = "feeSchedule")
public SubscribableChannel getMessageChannel() {
return new PublishSubscribeChannel();
}
在执行长时间运行流程的服务中,它创建了一个收费计划,我将频道注入其中
@Service
public class FeeScheduleCompareServiceImpl implements FeeScheduleCompareService {
@Autowired
MessageChannel outChannel;
public List<FeeScheduleUpdate> compareFeeSchedules(String oldStudyId) {
List<FeeScheduleUpdate> sortedResultList = longMethod(oldStudyId);
outChannel.send(MessageBuilder.withPayload(sortedResultList).build());
return sortedResultList;
}
}
这就是我正在努力解决的问题。我想使用CompletableFuture并在另一个Springbean中获得未来事件的有效负载。我需要一个未来来从消息返回有效负载。我想我想创建一个ServiceActivator作为消息的endpoint,但正如我所说的,我需要它来返回未来a的有效负载。
@org.springframework.stereotype.Service
public class SFCCCompareServiceImpl implements SFCCCompareService {
@Autowired
private SubscribableChannel outChannel;
@Override
public List<SFCCCompareDTO> compareSFCC(String state, int service){
ArrayList<SFCCCompareDTO> returnList = new ArrayList<SFCCCompareDTO>();
CompletableFuture<List<FeeScheduleUpdate>> fa = CompletableFuture.supplyAsync( () ->
{ //block A WHAT GOES HERE?!?!
outChannel.subscribe()
}
);
CompletableFuture<List<StateFeeCodeClassification>> fb = CompletableFuture.supplyAsync( () ->
{
return this.stateFeeCodeClassificationRepository.findAll();
}
);
CompletableFuture<List<SFCCCompareDTO>> fc = fa.thenCombine(fb,(a,b) ->{
//block C
//get in this block when both A & B are complete
Object theList = b.stream().forEach(new Consumer<StateFeeCodeClassification>() {
@Override
public void accept(StateFeeCodeClassification stateFeeCodeClassification) {
a.stream().forEach(new Consumer<FeeScheduleUpdate>() {
@Override
public void accept(FeeScheduleUpdate feeScheduleUpdate) {
returnList new SFCCCompareDTO();
}
});
}
}).collect(Collectors.toList());
return theList;
});
fc.join();
return returnList;
}
}
我想会有一个服务激活器,比如:
@MessageEndpoint
public class UpdatesHandler implements MessageHandler{
@ServiceActivator(requiresReply = "true")
public List<FeeScheduleUpdate> getUpdates(Message m){
return (List<FeeScheduleUpdate>) m.getPayload();
}
}
你的问题不清楚,但我会尽力帮你提供一些信息。
Spring集成不提供CompletableFuture
支持,但它提供了异步处理和回复。
有关详细信息,请参阅异步网关。另请参见异步服务激活器。
outChannel。subscribe()
应该附带MessageHandler
回调。
我有一个基于DSL的流,它使用拆分迭代对象列表并发送Kafka消息: 在所有消息发出后,我需要调用服务,还需要记录处理了多少消息。我知道一种方法是使用publishSubscribeChannel,其中第一个subscribe执行实际的Kafka发送,然后聚合执行服务调用: 我在弄清楚如何使用DSL在pubSubChannel中实际执行部分时遇到了问题。到目前为止,我已经尝试过: 有什么指示吗?
我正在尝试实现Spring与MQTT的集成。我正在使用Mosquitto作为MQTT经纪人。并在下面的链接中提供了文档的引用。我已经创建了一个项目并添加了所有所需的jar文件。当我执行时。 当我通过MQTT代理发布消息时,我收到以下错误。 添加@SpringBootApplication注释后,请找到下面的堆栈跟踪
我有一个用修饰的类,它应该重写这个: bean用于设置外部文件的路径,该文件包含在启动期间必须读取的注册代码。它用于类中: 在尝试调试时,我在每个方法以及test config类的构造函数中设置了一个断点。的构造函数断点被命中,因此我知道我的测试配置类实例化了,但是该类的方法从未被命中。相反,正常的类的方法被点击,中的总是而不是预期的。 不知道为什么会这样。任何想法都将不胜感激。
简介 Redis 的列表类型键可以用来实现队列,并且支持阻塞式读取,所以 Redis 能够非常容易的实现一个高性能的优先队列。同时在更高层面上,Redis 还支持“发布/订阅”的消息模式,可以基于此构建一个聊天系统。 发布示例 发布(Publish)即将消息发布到频道中。示例代码: // 发送消息 Redis::publish('chan-1', 'Hello, World!'); // 发送消息
我对Spring Integration是新手。我尝试使用文件拆分器将消息从文件中拆分出来,然后使用.aggregate()构建单个消息并发送到输出通道。我有标记为true,因此apply-sequence现在默认为false。我已经使用EnricHeaders将correlationId设置为常量“1”。我在设置realease策略时遇到了困难,因为我没有在序列结束上保持。下面是我的代码的外观。
主要内容:发布/订阅流程,常用命令汇总,基本命令应用Redis PubSub 模块又称发布订阅者模式,是一种消息传递系统,实现了消息多播功能。发布者(即发送方)发送消息,订阅者(即接收方)接收消息,而用来传递消息的链路则被称为 channel。在 Redis 中,一个客户端可以订阅任意数量的 channel(可译为频道)。 消息多播:生产者生产一次消息,中间件负责将消息复制到多个消息队列中,每个消息队列由相应的消费组进行消费,这是分布式系统常用的