当前位置: 首页 > 知识库问答 >
问题:

将消息转换为多个

叶德本
2023-03-14

我正在尝试将单个输入消息转换为多个消息。我有一个带有以下签名的方法:

@Incoming("CH_IN")
@Outgoing("CH_OUT")
Multi<Message<B>> process(Message<A> in) {

}

A类似于:

class A {
    private List<String> ids

    // getters and setters
}

对于A中的每个id,我想创建一个B的实例。我如何做到这一点并处理消息中的的确认?

这是对我所拥有的东西的简化,但我不确定这是否是正确的方式:

@Incoming("CH_IN")
@Outgoing("CH_OUT")
Multi<Message<B>> process(Message<A> in) {
    List<Message<B>> out = new ArrayList<>();

    // Code to iterate ids and create instances of Message<B>
    // In certain cases the out list will be empty

    return Multi.createFrom().iterable(out).on().completion(in::ack);
}

这是确认信息的正确方式吗?


共有1个答案

齐招
2023-03-14
匿名用户

这是一个简化我所拥有的但我不确定这是否是正确的做法

只要你的评论处理部分不需要任何阻塞调用,它就可以工作,但它不是理想的IMHO:

  • 文体上,混合命令式

相反,我主张:

return Multi.createFrom().item(in)
        .flatMap(m -> Uni.createFrom().item(m).repeat().atMost(5)) //Replace with your actual processing
        .on().completion(in::ack);

...它将整个方法保持为一个反应链,并使您能够在需要时将平映射()传递给其他发布者(例如,如果您的处理需要Web调用、数据库调用或其他阻塞IO)。

这是确认信息的正确方式吗?

只要你只想在处理完成后确认,是的。

唯一的缺点是,如果灾难性故障发生在处理过程的一半,可以想象,在确认中的之前可以处理一些故障。唯一其他明智的行为是使用on()。subscribed(),这只会产生相反的问题(中的将被确认,但部分或所有消息可能不会发布到您的传出频道。)哪个更可取取决于您的用例。

 类似资料:
  • 我想填充Javapojo类与mqtt消息有效载荷在一个Springmvc应用程序。我的代码是: MQTT MessagePayload是{“name”:“abc”,“age”:32},但当它在变量MessagePayload中转换为字符串时,它显示为类似{name:abc,age:32}。你看,去掉所有的双引号。当转到try块时,会抛出一个错误com。fasterxml。杰克逊。果心JsonPar

  • 我有Kafka-Connect,我需要将其与REST API集成,该API在使用单输入多数据模型调用时效果最好。 想象一下以下内容: 源主题- Kafka源主题: REST请求: REST响应: Kafka Sink主题: 所以我想在给定的时间范围内转换多个消息。 Kafka-Connect转换(https://docs.confluent.io/current/connect/transform

  • 使用MVC Java编程配置方式时,如果你想替换Spring MVC提供的默认转换器,完全定制自己的HttpMessageConverter,这可以通过覆写configureMessageConverters()方法来实现。如果你只是想定制一下,或者想在默认转换器之外再添加其他的转换器,那么可以通过覆写extendMessageConverters()方法来实现。 下面是一段例子,它使用定制的Ob

  • 我有一个 X509CertificateObject,一个匹配的 RSAPublicKey,并设法创建一个字节数组,其中包含某个消息对象的有效数字证书,也作为字节数组。 不幸的是,我正在构建的系统只接受对象作为输入。如何将我的基本构建块转换为这样一个有效的 对象? 背景:根据这个示例(摘要是SHA512),我正在试用Java Bouncy Castle RSA盲签名,需要将结果输入到标准签名处理中

  • 我有一条路由(Camel 2.17.3),它使用丰富的 DSL 调用Rest服务并将结果聚合到消息正文中。不过,我遇到了序列化问题。这是我正在尝试的。我的路线如下所示: 正如您所看到的,我想使用enrich()DSL调用一些现有服务,并将这些结果聚合起来,形成一个新的消息体,以便进一步处理。我需要将rest调用的结果从Json转换为MyResponse。我想用这个: 但我需要它已经在我的Aggre

  • 而不是使用关系。关系GetRequest作为请求和响应。有什么方法可以将请求/响应转换为POJO? 我见过这个解决方案,但它比我想要的要复杂一些:将协议缓冲区转换为POJO 我正在使用翻新和谷歌协议缓冲区。 我所拥有的: 我想用的是: 关系: 我的请求最终是这样的,请求必须在这里构建...