我将下面的数据发布到kafka并通过Spring集成通道接收并转换为Log对象,我如何使用Spring集成转换器将下面的数据转换为Log对象?感谢这里的任何帮助
'日志(客户端键=字符串,有效负载=字符串)”
这是通道适配器代码
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(
kafkaListenerContainer());
kafkaMessageDrivenChannelAdapter.setPayloadType(Log.class);
kafkaMessageDrivenChannelAdapter.setOutputChannel(inputChannel());
return kafkaMessageDrivenChannelAdapter;
}
当我尝试使用下面的方法在服务激活器中进行转换时
ObjectMapper objectMapper = new ObjectMapper();
Log msg = objectMapper.readValue(arg0.getPayload().toString() , Log.class);
它的失败
com.fasterxml.jackson.core.JsonParseException: 无法识别的令牌“日志”:期待(“真”、“假”或“空”)
首先,'Log(clientKey=string, valload=string)'
看起来不像您希望在Spring集成中从后面转换的格式良好的JSON。
另一个问题是,您的dokafkaMessageDrivenChannelAdapter.setPayloadType(Log.class);
。这样您就不需要任何下游转换,集成记录消息监听器
中的消息转换器
将为我们完成转换工作。
然而,我们仍然需要有一个适当的数据能够转换。
您还需要记住,Apache Kafka 有自己的机制来从线路中反序列化字节[]。
有关转换和(反)序列化的更多信息,请参阅 Spring for Apache Kafka: https://docs.spring.io/spring-kafka/docs/current/reference/html/_reference.html#serdes
对于正确的JSON转换,Spring Integration提供了一个JsonToObjectTransformer
组件,该组件可以与@Transformer
注释一起使用:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-transformation-chapter.html#_common_transformers
更新
@Transformer(inputChannel = "inputChannel", outputChannel = "processChannel")
@Bean
public JsonToObjectTransformer jsonToObjectTransformer() {
return new JsonToObjectTransformer(Log.class);
}
我们一直在使用SI Kafka进行一个新项目,并取得了很大成功。在最近的一次切换之前,我们使用KafkaTopicOffsetManager来管理我们的消费者主题偏移量。为了避免每个消费者/主题对都有额外的主题,并使用Burrow或lag监控,我们决定使用最新的KafkaNativeOffsetManager,它使用Kafka提供的本机偏移管理。但在切换之后,我们注意到目标主题的消息消耗持续滞后。
我配置了一个基于Web服务的入站消息传递网关。我想记录传入的SOAP消息(信封和里面的所有消息)。最好的方法是什么? 我曾尝试使用带有日志通道适配器的有线抽头,但不知道一个好的表达式值来获取实际的SOAP XML。如果入站网关配置为不提取有效负载,则我将SaajSoapMessage视为有效负载,否则将DOMSource视为有效负载。是否有一个表达式将SaajSoapMessage作为XML字符串
我在Databricks上阅读下面的博客 https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html 在解释spark-kafka集成如何使用WAl接收器的过程时,它说 1.Kafka数据由在火花工作线程/执行程序中运行的Kafka接收器持续接收。这使用了Kafka
当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该
我正在从事一个spring批处理项目,其用例是:spring批处理作业依赖于SFTP服务器(远程目录)上的文件,因此,一旦文件在SFTP服务器上可用,相应的作业(spring批处理)就应该启动。此外,我不想先开始工作,然后再查找文件,因为这将是基于时间的方法,而不是基于通知的方法。所以我想使用spring集成(sftp入站通道适配器)。作为入站适配器(SFTP)的一部分,一旦我在SFTP服务器的远
我开始在我的项目中使用spring-integration-kafka,我可以生产和消费来自kafka的消息。但是现在,我想为特定的分区生成消息,并从特定的分区消费消息。 例如,我想向分区3生成消息,而消费将只接收来自分区3的消息。 到目前为止,我的主题有8个分区,我可以向特定的分区发送消息,但是我还没有找到配置消费者只接收来自特定分区的消息的方法。 因此,任何关于我应该如何配置消费者与sping