当前位置: 首页 > 知识库问答 >
问题:

源/处理器/接收器消息传递(java.util.HashMap)

左丘兴生
2023-03-14

在新的SpringBoot(2.0.2)中,通过RabbitBinder在源/处理器/接收器之间发送哈希图的支持似乎有所改变。

公共父pom。所有模块的xml如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>demo.stream</groupId>
   <artifactId>demo-stream-parent</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <modules>
      <module>source-module</module>
      <module>processor-module</module>
      <module>sink-module</module>
   </modules>
   <packaging>pom</packaging>
   <name>demo-stream</name>
   <description>Demo project for Spring Boot</description>
   <parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>2.0.2.RELEASE</version>
      <relativePath />
      <!-- lookup parent from repository -->
   </parent>
   <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
      <java.version>1.8</java.version>
      <spring-cloud.version>Finchley.RC2</spring-cloud.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-starter-config</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-stream</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-stream-reactive</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
      </dependency>
   </dependencies>
   <dependencyManagement>
      <dependencies>
         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
         </dependency>
      </dependencies>
   </dependencyManagement>
   <build>
      <plugins>
         <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
         </plugin>
      </plugins>
   </build>
   <repositories>
      <repository>
         <id>spring-milestones</id>
         <name>Spring Milestones</name>
         <url>https://repo.spring.io/milestone</url>
         <snapshots>
            <enabled>false</enabled>
         </snapshots>
      </repository>
   </repositories>
</project>

明确规定如下:

如果出站通道上没有设置内容类型属性,Spring Cloud Stream将使用基于Kryo序列化框架的序列化器序列化有效负载。在目标位置反序列化消息需要有效负载类出现在接收方的类路径上。

根据这些规则,它可以使用标准java类型,如“HashMap”,而不需要自定义消息转换器。

无法使此简单代码在源代码和处理器之间正常工作:

源配置和代码:

spring:
  application:
    name: test
  cloud:
    config:
      uri: http://blade1:8888
      name: scdf-tester
    stream:
      bindings:
        output:
          #content-type: 'application/x-java-serialized-object'
          #content-type: 'application/json'
          content-type:
          destination: demo-stream-source-output



@EnableBinding(Source.class)
@EnableAutoConfiguration
@EnableScheduling
@Component
public class DemoSource {

    @Autowired
    private Source channels;

    //@InboundChannelAdapter(Source.OUTPUT)
    @Scheduled(fixedRate = 2000)
    public MessageSource<Map<String, Object>> timerMessageSource() {

            Map<String, Object> mapa = new HashMap<>();
            mapa.put("string", "string");
            mapa.put("string", "string");
            mapa.put("long", 1212121L);
            mapa.put("integer", 1212121);
            Map<String, Object> mapaInner = new HashMap<>();
            mapaInner.put("string", "string");
            mapaInner.put("string", "string");
            mapaInner.put("long", 1212121L);
            mapaInner.put("integer", 1212121);

            mapa.put("innerMapa", mapaInner);

            channels.output().send(MessageBuilder.withPayload(mapa).build());
        }

}

从调试器可以清楚地看到,有效负载被转换为JSON字符串(ApplicationJsonMessageMarshallingConverter),而内容类型头被设置为“application/JSON”。

虽然在某些情况下这是可以接受和合法的,但这并不是预期的结果。应为kryo序列化哈希映射为字节数组

调试器输出如下:

this = {AbstractMessageChannel$ChannelInterceptorList@7877} 
 logger = {LogFactory$Log4jLog@7884} 
 interceptors = {CopyOnWriteArrayList@8890}  size = 1
  0 = {MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor@8991} 
   messageConverter = {CompositeMessageConverter@9043} "CompositeMessageConverter[converters=[org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter@2863f519, org.springframework.cloud.stream.converter.TupleJsonMessageConverter@e9dffa1, org.springframework.messaging.converter.ByteArrayMessageConverter@cc64ad, org.springframework.cloud.stream.converter.ObjectStringMessageConverter@97df9ef, org.springframework.cloud.stream.converter.JavaSerializationMessageConverter@3e011ee4, org.springframework.cloud.stream.converter.KryoMessageConverter@5f808e72, org.springframework.cloud.stream.converter.JsonUnmarshallingConverter@3c0a7023]]"
   this$0 = {MessageConverterConfigurer@9044} 
   mimeType = null
   MessageConverterConfigurer$AbstractContentTypeInterceptor.this$0 = {MessageConverterConfigurer@9044} 
 size = 1
message = {GenericMessage@9169} "GenericMessage [payload=byte[117], headers={contentType=application/json;charset=UTF-8, id=e1b9b79e-5501-0ebe-178f-59ef5538192c, timestamp=1528647360089}]"
 payload = {byte[117]@9373} {"string":"string","integer":1212121,"long":1212121,"innerMapa":{"string":"string","integer":1212121,"long":1212121}}
 headers = {MessageHeaders@9374}  size = 3
  0 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12201} "contentType" -> "application/json;charset=UTF-8"
  1 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12202} "id" -> "e1b9b79e-5501-0ebe-178f-59ef5538192c"
  2 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12203} "timestamp" -> "1528647360089"
channel = {DirectChannel@6940} "output"
interceptorStack = {ArrayDeque@8833}  size = 1

处理器配置和代码:

spring:
  application:
    name: test
  cloud:
    config:
      uri: http://blade1:8888
      name: scdf-tester
    stream:
      bindings:
        output:
          #content-type: 'application/x-java-object'
          destination: demo-stream-processor-output
        input:
          content-type: 'application/json;type=java.util.Map'
          destination: demo-stream-source-output

处理器案例1:在“StreamListener”中,当Map用作param时,“best effort”逻辑给出一个没有有效负载的标头

接收消息:class org。springframework。消息传递。消息标题

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Map handle(final Map message) {
    LOG.info("recv message: {} {}", message.getClass(),message);
    //do some transformations...
    message.put("transformer_says","hello simon..");
    return message;
}

日志输出如下:

  2018-06-10 18:55:51 [demo-stream-source-output.anonymous.0EthCzZpTneSPp48retFQg-1] INFO  demostream.modules.DemoProcessor - recv message: class org.springframework.messaging.MessageHeaders {amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=demo-stream-source-output, amqp_receivedExchange=demo-stream-source-output, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=demo-stream-source-output.anonymous.0EthCzZpTneSPp48retFQg, amqp_redelivered=false, id=671bc2d8-aaba-5ddb-b02b-f92a9dba3e0f, amqp_consumerTag=amq.ctag-ZnNk0O3vNs0yhOPKMTM3Fg, contentType=application/json;charset=UTF-8, timestamp=1528649751852}

当然,这里有预期的例外:

原因:java。lang.UnsupportedOperationException:MessageHeaders在组织中是不可变的。springframework。消息传递。MessageHeaders。put(MessageHeaders.java:249)

PROCESSOR CASE 2:在“StreamListener”中,当Message用作参数时,“尽最大努力”逻辑给出了map的JSON表示,而不是原始HashMap

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object handle(Message<?> message) {
    LOG.info("recv message: {}",message);
    String jsonMap = new String((byte[]) message.getPayload());
    LOG.info("jsonMap : {}", jsonMap);
    return jsonMap;
}

日志输出如下:

2018-06-10 18:52:43 [demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ-1] INFO  demostream.modules.DemoProcessor - recv message: GenericMessage [payload=byte[117], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=demo-stream-source-output, amqp_receivedExchange=demo-stream-source-output, amqp_deliveryTag=38, deliveryAttempt=1, amqp_consumerQueue=demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ, amqp_redelivered=false, id=271ad47e-7b36-c01c-0c19-b3201297ddd7, amqp_consumerTag=amq.ctag-QIVzqanCxiNF-HXUTUVn7Q, contentType=application/json;charset=UTF-8, timestamp=1528649563755}]
2018-06-10 18:52:43 [demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ-1] INFO  demostream.modules.DemoProcessor - jsonMap : {"string":"string","integer":1212121,"long":1212121,"innerMapa":{"string":"string","integer":1212121,"long":1212121}}

以前的版本(1.5.9)和相关的流云依赖关系可以正常工作。

  <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>1.5.9.RELEASE</version>
        <relativePath></relativePath>
    </parent>

请你给一些关于这种行为的建议。

假设我做错了,简单的问题是:

如何使用Kryo ser/der发送HashMap有效负载,在我看来这比JSON序列化更好(更少的进程间开销)

谢谢你!

向伊凡致意

共有1个答案

洪光霁
2023-03-14

在源/处理器之间进行kryo二进制序列化/反序列化的解决方案是使用以下配置:

本指令:

output.content-type: application/x-java-object

触发kryo二进制序列化(否则JSON是默认的)。

来源(配置和代码)

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: source-output
          content-type: application/x-java-object



   @EnableBinding(Source.class)
    public class SourceTester {
        @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "5000"))
        public Message<HashMap<String,Object>> source() {
            HashMap<String,Object> mapa = new HashMap<>();
            mapa.put("foo","bar");
            mapa.put("bar",1337);
            return MessageBuilder.withPayload(mapa).build();
        }
    }

或者另一种方法:

@EnableBinding(Source.class)
@EnableAutoConfiguration
@EnableScheduling
@Component
public class DemoSource {

    @Autowired
    private Source channels;

    @Scheduled(fixedRate = 5000)
    public MessageSource<Map<String, Object>> timerMessageSource() { 
            HashMap<String,Object> mapa = new HashMap<>();
            mapa.put("foo","bar");
            mapa.put("bar",1337);    
        channels.output().send(MessageBuilder.withPayload(mapa).build());
        }

}

处理器(配置和代码)

spring:
      cloud:
        stream:
          bindings:
            input:
              destination: source-output
            output:
              destination: processor-output
              content-type: application/x-java-object



   @EnableBinding(Processor.class)
    @EnableAutoConfiguration
    public class ProcessorTester {
        @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
        protected Message<HashMap> process(Message<HashMap> input){
            input.getPayload().put("Processor", "was here");
            return input;
        }
    }

结论:

需要具有“类型参数HashMap”和Message的处理器处理程序(转换器)来自动触发byte[]到HashMap的kryo反序列化:

protected Message<HashMap> process(Message<HashMap> input)

如果禁用输出内容类型指令(源模块或任何其他生产者)

 spring:
      cloud:
        stream:
          bindings:
            output:
              destination: source-output
              ###content-type: application/x-java-object

默认序列化是HashMap到JSON字符串。

顺致敬意,

伊凡

 类似资料:
  • 当用户退出我的应用程序并且我不再希望他接收到设备通知时,我如何处理这种情况。 我试过了 但我仍然会收到设备注册id的通知。 我还确保这是我应该删除的令牌: 或者干脆)。 我还尝试了,但下一次调用时,我收到空值(在第二次尝试时有效)。 我想,在之后,我可以立即再次调用,但这看起来像是一个黑客。还有一个答案是不应该这样做,但它建议删除显然不起作用的令牌。 那么,正确的处理方法是什么呢?

  • 有一个webapp,其中每个请求都会消耗各种外部资源。webapp使用请求获取的bean跟踪那些消耗的资源。然后HandlerInterceptor的afterCompletion方法调用TaskExecutor将此信息存储在数据库中。一切都很好,但需要增加带宽消耗作为另一种资源。计算传出响应大小是servlet过滤器的一项典型任务(以及响应包装器和定制流实现)。因此,这项工作已经完成,并且正在发

  • 传送/接收语音消息     所谓的语音消息是指能传送自己录制之声音简讯的机能。无论对方是在在线或脱机接能传送。若要使用语音消息,传送者或接收者的其中一方需先以 Skype 点数或信用卡,支付语音消息的使用费用。 传送语音消息 1. 进入 Skype 的选单画面后选择(联系人)。 2. 选择语音消息希望传送对象的图标后,按下按钮。. 3. 选择[传送语音消息]。 遵循辅助讯息录制语音消息。最多可录制

  • 问题内容: 我正在尝试从某个门户网站获取交易状态,并且在我的Java应用程序中使用了以下chrome设置, 超时从渲染器接收消息:60.000 并且所有待处理的交易都已超时。 会话信息:headless chrome = 68.0.3440.75 驱动程序信息: chromedriver = 2.38 (0) 平台= Linux 2.6.32-696.23.1.el6.x86_64 x86_64)

  • 由于内容脚本在网页而不是扩展程序的上下文中运行,因此它们通常需要某种与扩展程序其余部分进行通信的方式。例如,RSS 阅读器扩展程序可以使用内容脚本来检测页面上 RSS 摘要的存在,然后通知后台页面以显示该页面的操作图标。 扩展及其内容脚本之间的通信使用消息传递来实现。任何一方都可以监听从另一端发送的消息,并在同一通道上进行响应。消息可以包含任何有效的 JSON 对象(空,布尔值,数字,字符串,数组

  • ms tcp nodelay 描述: 在信差的 TCP 会话上禁用 nagle 算法。 类型: Boolean 是否必需: No 默认值: true ms initial backoff 描述: 出错时重连的初始等待时间。 类型: Double 是否必需: No 默认值: .2 ms max backoff 描述: 出错重连时等待的最大时间。 类型: Double 是否必需: No 默认值: 15