在使用SpringCloudStream的流处理应用程序中,我获取一个输入流(键入一个整数),并在其上调用selectKey
,以创建一个具有相同值但具有不同键(字符串)的新主题。输入主题中包含正确JSON格式的记录,例如:
"key": {
"id": 1
},
"value": {
"id": 1,
"public_id": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a", ...
问题是,流处理应用程序创建的主题的值
是包含JSON的字符串,而不是正确的JSON,即:
"key": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a",
"value": "{\"id\":1,\"publicId\":\"4273b60f-6fe6-40be-8602-d0b3ed2ecf2a\"}"
代码如下:
@StreamListener
@SendTo("output")
fun process(@Input("input") stream: KStream<Int, MyObj>): KStream<String, MyObj> =
stream.selectKey { _, value -> value.publicId }
上面的函数所做的是消耗输入流,并生成一个输出流(被发送到输出
)。该输出流与输入流具有相同的值,但只是一个不同的键。(在这种情况下,键来自值的公共ID
属性。)
应用程序。yml
如下:
spring.cloud.stream:
bindings:
input:
destination: input-topic
output:
destination: output-output
kafka:
streams:
binder:
application-id: test-app-id-1
bindings:
input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
output:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
我缺什么了吗?这实际上是一个问题,还是JSON可以作为字符串存储在Spring Cloud Stream生成的消息中?
其他我尝试过但没有产生影响的事情:
spring.cloud.stream.bindings.output.content-type
设置为 application/json
映射
而不是选择键
这意味着您将公共ID:"4273b60f-6fe6-40be-8602-d0b3ed2ecf2a"
作为字符串而不是POJO发送。
如果这是您要发送的内容,则应使用StringSerde而不是JsonSerde
。
编辑
我刚刚用一个Java应用程序测试了它,它像预期的那样工作...
@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class So58538297Application {
public static void main(String[] args) {
SpringApplication.run(So58538297Application.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public KStream<String, Foo> process(@Input(Processor.INPUT) KStream<String, Foo> stream) {
return stream.selectKey((key, value) -> value.getBar());
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
ObjectMapper mapper = new ObjectMapper();
return args -> {
template.send(Processor.INPUT, mapper.writeValueAsString(new Foo("baz")));
};
}
@KafkaListener(id = "outputGroup", topics = Processor.OUTPUT)
public void out(String in, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("out:" + in + ", key:" + key);
}
@KafkaListener(id = "copyOfInput", topics = Processor.INPUT)
public void in(String in) {
System.out.println("in:" + in);
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
}
}
和
spring.application.name=so58538297
spring.kafka.consumer.auto-offset-reset=earliest
和
in:{"bar":"baz"}
out:{"bar":"baz"}, key:baz
使用。但仍不确定为什么上面的方法不起作用。 不管怎样,多亏了之前有人建议这个答案。:)
问题内容: 我想检查a是否仅包含数字。我用这个: 但意识到它也允许和。基本上,我要确保只能包含数字,而不能包含其他字符。由于和都是数字,所以不是正确的方法。也许我需要一个正则表达式?有小费吗? 问题答案: 怎么样
问题内容: 这个想法是读取String并确认它不包含任何数字字符。因此,“ smith23”之类的内容将不被接受。 问题答案: 你想要什么?速度还是简单?为了提高速度,请选择基于循环的方法。为简单起见,请使用一种基于内衬RegEx的方法。 速度 简单
我能够通过使用下面这个问题的注释提供的代码来解决这个问题。所有其他帖子都是有效的! 我使用的有用的东西来自第一个评论。虽然提供的所有示例代码似乎也是有效的!
问题内容: 我正在尝试使用以下命令选择MySQL中仅包含字母数字字符的所有行: 但是,它将返回所有行,而不管它们包含非字母数字字符的事实。 问题答案: 试试这个代码: 这样可以确保所有字符都匹配。
我正在编写一个程序,将二进制数转换为十进制数,我只是一个新手程序员。我一直试图确定输入的字符串是二进制数,这意味着它只包含1和0。到目前为止,我的代码是: 也就是说,无论输入什么(然后退出),即使输入了二进制数,程序都会输出“您没有输入二进制数”。我确信我的if语句是错误的或我的字符声明是错误的,但是我已经尝试了我个人知道的一切来纠正这个问题,但我在网上找不到任何帮助。任何帮助都将不胜感激。