在新的SpringBoot(2.0.2)中,通过RabbitBinder在源/处理器/接收器之间发送哈希图的支持似乎有所改变。
公共父pom。所有模块的xml如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>demo.stream</groupId>
<artifactId>demo-stream-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<modules>
<module>source-module</module>
<module>processor-module</module>
<module>sink-module</module>
</modules>
<packaging>pom</packaging>
<name>demo-stream</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
<relativePath />
<!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.RC2</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>
明确规定如下:
如果出站通道上没有设置内容类型属性,Spring Cloud Stream将使用基于Kryo序列化框架的序列化器序列化有效负载。在目标位置反序列化消息需要有效负载类出现在接收方的类路径上。
根据这些规则,它可以使用标准java类型,如“HashMap”,而不需要自定义消息转换器。
无法使此简单代码在源代码和处理器之间正常工作:
源配置和代码:
spring:
application:
name: test
cloud:
config:
uri: http://blade1:8888
name: scdf-tester
stream:
bindings:
output:
#content-type: 'application/x-java-serialized-object'
#content-type: 'application/json'
content-type:
destination: demo-stream-source-output
@EnableBinding(Source.class)
@EnableAutoConfiguration
@EnableScheduling
@Component
public class DemoSource {
@Autowired
private Source channels;
//@InboundChannelAdapter(Source.OUTPUT)
@Scheduled(fixedRate = 2000)
public MessageSource<Map<String, Object>> timerMessageSource() {
Map<String, Object> mapa = new HashMap<>();
mapa.put("string", "string");
mapa.put("string", "string");
mapa.put("long", 1212121L);
mapa.put("integer", 1212121);
Map<String, Object> mapaInner = new HashMap<>();
mapaInner.put("string", "string");
mapaInner.put("string", "string");
mapaInner.put("long", 1212121L);
mapaInner.put("integer", 1212121);
mapa.put("innerMapa", mapaInner);
channels.output().send(MessageBuilder.withPayload(mapa).build());
}
}
从调试器可以清楚地看到,有效负载被转换为JSON字符串(ApplicationJsonMessageMarshallingConverter),而内容类型头被设置为“application/JSON”。
虽然在某些情况下这是可以接受和合法的,但这并不是预期的结果。应为kryo序列化哈希映射为字节数组。
调试器输出如下:
this = {AbstractMessageChannel$ChannelInterceptorList@7877}
logger = {LogFactory$Log4jLog@7884}
interceptors = {CopyOnWriteArrayList@8890} size = 1
0 = {MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor@8991}
messageConverter = {CompositeMessageConverter@9043} "CompositeMessageConverter[converters=[org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter@2863f519, org.springframework.cloud.stream.converter.TupleJsonMessageConverter@e9dffa1, org.springframework.messaging.converter.ByteArrayMessageConverter@cc64ad, org.springframework.cloud.stream.converter.ObjectStringMessageConverter@97df9ef, org.springframework.cloud.stream.converter.JavaSerializationMessageConverter@3e011ee4, org.springframework.cloud.stream.converter.KryoMessageConverter@5f808e72, org.springframework.cloud.stream.converter.JsonUnmarshallingConverter@3c0a7023]]"
this$0 = {MessageConverterConfigurer@9044}
mimeType = null
MessageConverterConfigurer$AbstractContentTypeInterceptor.this$0 = {MessageConverterConfigurer@9044}
size = 1
message = {GenericMessage@9169} "GenericMessage [payload=byte[117], headers={contentType=application/json;charset=UTF-8, id=e1b9b79e-5501-0ebe-178f-59ef5538192c, timestamp=1528647360089}]"
payload = {byte[117]@9373} {"string":"string","integer":1212121,"long":1212121,"innerMapa":{"string":"string","integer":1212121,"long":1212121}}
headers = {MessageHeaders@9374} size = 3
0 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12201} "contentType" -> "application/json;charset=UTF-8"
1 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12202} "id" -> "e1b9b79e-5501-0ebe-178f-59ef5538192c"
2 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12203} "timestamp" -> "1528647360089"
channel = {DirectChannel@6940} "output"
interceptorStack = {ArrayDeque@8833} size = 1
处理器配置和代码:
spring:
application:
name: test
cloud:
config:
uri: http://blade1:8888
name: scdf-tester
stream:
bindings:
output:
#content-type: 'application/x-java-object'
destination: demo-stream-processor-output
input:
content-type: 'application/json;type=java.util.Map'
destination: demo-stream-source-output
处理器案例1:在“StreamListener”中,当Map用作param时,“best effort”逻辑给出一个没有有效负载的标头
接收消息:class org。springframework。消息传递。消息标题
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Map handle(final Map message) {
LOG.info("recv message: {} {}", message.getClass(),message);
//do some transformations...
message.put("transformer_says","hello simon..");
return message;
}
日志输出如下:
2018-06-10 18:55:51 [demo-stream-source-output.anonymous.0EthCzZpTneSPp48retFQg-1] INFO demostream.modules.DemoProcessor - recv message: class org.springframework.messaging.MessageHeaders {amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=demo-stream-source-output, amqp_receivedExchange=demo-stream-source-output, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=demo-stream-source-output.anonymous.0EthCzZpTneSPp48retFQg, amqp_redelivered=false, id=671bc2d8-aaba-5ddb-b02b-f92a9dba3e0f, amqp_consumerTag=amq.ctag-ZnNk0O3vNs0yhOPKMTM3Fg, contentType=application/json;charset=UTF-8, timestamp=1528649751852}
当然,这里有预期的例外:
原因:java。lang.UnsupportedOperationException:MessageHeaders在组织中是不可变的。springframework。消息传递。MessageHeaders。put(MessageHeaders.java:249)
PROCESSOR CASE 2:在“StreamListener”中,当Message用作参数时,“尽最大努力”逻辑给出了map的JSON表示,而不是原始HashMap
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object handle(Message<?> message) {
LOG.info("recv message: {}",message);
String jsonMap = new String((byte[]) message.getPayload());
LOG.info("jsonMap : {}", jsonMap);
return jsonMap;
}
日志输出如下:
2018-06-10 18:52:43 [demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ-1] INFO demostream.modules.DemoProcessor - recv message: GenericMessage [payload=byte[117], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=demo-stream-source-output, amqp_receivedExchange=demo-stream-source-output, amqp_deliveryTag=38, deliveryAttempt=1, amqp_consumerQueue=demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ, amqp_redelivered=false, id=271ad47e-7b36-c01c-0c19-b3201297ddd7, amqp_consumerTag=amq.ctag-QIVzqanCxiNF-HXUTUVn7Q, contentType=application/json;charset=UTF-8, timestamp=1528649563755}]
2018-06-10 18:52:43 [demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ-1] INFO demostream.modules.DemoProcessor - jsonMap : {"string":"string","integer":1212121,"long":1212121,"innerMapa":{"string":"string","integer":1212121,"long":1212121}}
以前的版本(1.5.9)和相关的流云依赖关系可以正常工作。
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>1.5.9.RELEASE</version>
<relativePath></relativePath>
</parent>
请你给一些关于这种行为的建议。
假设我做错了,简单的问题是:
如何使用Kryo ser/der发送HashMap有效负载,在我看来这比JSON序列化更好(更少的进程间开销)
谢谢你!
向伊凡致意
在源/处理器之间进行kryo二进制序列化/反序列化的解决方案是使用以下配置:
本指令:
output.content-type: application/x-java-object
触发kryo二进制序列化(否则JSON是默认的)。
来源(配置和代码)
spring:
cloud:
stream:
bindings:
output:
destination: source-output
content-type: application/x-java-object
@EnableBinding(Source.class)
public class SourceTester {
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "5000"))
public Message<HashMap<String,Object>> source() {
HashMap<String,Object> mapa = new HashMap<>();
mapa.put("foo","bar");
mapa.put("bar",1337);
return MessageBuilder.withPayload(mapa).build();
}
}
或者另一种方法:
@EnableBinding(Source.class)
@EnableAutoConfiguration
@EnableScheduling
@Component
public class DemoSource {
@Autowired
private Source channels;
@Scheduled(fixedRate = 5000)
public MessageSource<Map<String, Object>> timerMessageSource() {
HashMap<String,Object> mapa = new HashMap<>();
mapa.put("foo","bar");
mapa.put("bar",1337);
channels.output().send(MessageBuilder.withPayload(mapa).build());
}
}
处理器(配置和代码)
spring:
cloud:
stream:
bindings:
input:
destination: source-output
output:
destination: processor-output
content-type: application/x-java-object
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public class ProcessorTester {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
protected Message<HashMap> process(Message<HashMap> input){
input.getPayload().put("Processor", "was here");
return input;
}
}
结论:
需要具有“类型参数HashMap”和Message的处理器处理程序(转换器)来自动触发byte[]到HashMap的kryo反序列化:
protected Message<HashMap> process(Message<HashMap> input)
如果禁用输出内容类型指令(源模块或任何其他生产者)
spring:
cloud:
stream:
bindings:
output:
destination: source-output
###content-type: application/x-java-object
默认序列化是HashMap到JSON字符串。
顺致敬意,
伊凡
当用户退出我的应用程序并且我不再希望他接收到设备通知时,我如何处理这种情况。 我试过了 但我仍然会收到设备注册id的通知。 我还确保这是我应该删除的令牌: 或者干脆)。 我还尝试了,但下一次调用时,我收到空值(在第二次尝试时有效)。 我想,在之后,我可以立即再次调用,但这看起来像是一个黑客。还有一个答案是不应该这样做,但它建议删除显然不起作用的令牌。 那么,正确的处理方法是什么呢?
有一个webapp,其中每个请求都会消耗各种外部资源。webapp使用请求获取的bean跟踪那些消耗的资源。然后HandlerInterceptor的afterCompletion方法调用TaskExecutor将此信息存储在数据库中。一切都很好,但需要增加带宽消耗作为另一种资源。计算传出响应大小是servlet过滤器的一项典型任务(以及响应包装器和定制流实现)。因此,这项工作已经完成,并且正在发
传送/接收语音消息 所谓的语音消息是指能传送自己录制之声音简讯的机能。无论对方是在在线或脱机接能传送。若要使用语音消息,传送者或接收者的其中一方需先以 Skype 点数或信用卡,支付语音消息的使用费用。 传送语音消息 1. 进入 Skype 的选单画面后选择(联系人)。 2. 选择语音消息希望传送对象的图标后,按下按钮。. 3. 选择[传送语音消息]。 遵循辅助讯息录制语音消息。最多可录制
问题内容: 我正在尝试从某个门户网站获取交易状态,并且在我的Java应用程序中使用了以下chrome设置, 超时从渲染器接收消息:60.000 并且所有待处理的交易都已超时。 会话信息:headless chrome = 68.0.3440.75 驱动程序信息: chromedriver = 2.38 (0) 平台= Linux 2.6.32-696.23.1.el6.x86_64 x86_64)
由于内容脚本在网页而不是扩展程序的上下文中运行,因此它们通常需要某种与扩展程序其余部分进行通信的方式。例如,RSS 阅读器扩展程序可以使用内容脚本来检测页面上 RSS 摘要的存在,然后通知后台页面以显示该页面的操作图标。 扩展及其内容脚本之间的通信使用消息传递来实现。任何一方都可以监听从另一端发送的消息,并在同一通道上进行响应。消息可以包含任何有效的 JSON 对象(空,布尔值,数字,字符串,数组
ms tcp nodelay 描述: 在信差的 TCP 会话上禁用 nagle 算法。 类型: Boolean 是否必需: No 默认值: true ms initial backoff 描述: 出错时重连的初始等待时间。 类型: Double 是否必需: No 默认值: .2 ms max backoff 描述: 出错重连时等待的最大时间。 类型: Double 是否必需: No 默认值: 15