当前位置: 首页 > 知识库问答 >
问题:

如何使用Spring集成转换器处理在 Kafka 中接收的消息

狄飞尘
2023-03-14

我将下面的数据发布到kafka并通过Spring集成通道接收并转换为Log对象,我如何使用Spring集成转换器将下面的数据转换为Log对象?感谢这里的任何帮助

'日志(客户端键=字符串,有效负载=字符串)”

这是通道适配器代码

@Bean
public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(
            kafkaListenerContainer());
    kafkaMessageDrivenChannelAdapter.setPayloadType(Log.class);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(inputChannel());
    return kafkaMessageDrivenChannelAdapter;
}

当我尝试使用下面的方法在服务激活器中进行转换时

 ObjectMapper objectMapper = new ObjectMapper();
 Log msg = objectMapper.readValue(arg0.getPayload().toString() , Log.class);

它的失败

com.fasterxml.jackson.core.JsonParseException: 无法识别的令牌“日志”:期待(“真”、“假”或“空”)

共有1个答案

司徒啸
2023-03-14

首先,'Log(clientKey=string, valload=string)'看起来不像您希望在Spring集成中从后面转换的格式良好的JSON。

另一个问题是,您的dokafkaMessageDrivenChannelAdapter.setPayloadType(Log.class);。这样您就不需要任何下游转换,集成记录消息监听器中的消息转换器将为我们完成转换工作。

然而,我们仍然需要有一个适当的数据能够转换。

您还需要记住,Apache Kafka 有自己的机制来从线路中反序列化字节[]。有关转换和(反)序列化的更多信息,请参阅 Spring for Apache Kafka: https://docs.spring.io/spring-kafka/docs/current/reference/html/_reference.html#serdes

对于正确的JSON转换,Spring Integration提供了一个JsonToObjectTransformer组件,该组件可以与@Transformer注释一起使用:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-transformation-chapter.html#_common_transformers

更新

@Transformer(inputChannel = "inputChannel", outputChannel = "processChannel")
@Bean
public JsonToObjectTransformer jsonToObjectTransformer() {
   return new JsonToObjectTransformer(Log.class);
}
 类似资料:
  • 我们一直在使用SI Kafka进行一个新项目,并取得了很大成功。在最近的一次切换之前,我们使用KafkaTopicOffsetManager来管理我们的消费者主题偏移量。为了避免每个消费者/主题对都有额外的主题,并使用Burrow或lag监控,我们决定使用最新的KafkaNativeOffsetManager,它使用Kafka提供的本机偏移管理。但在切换之后,我们注意到目标主题的消息消耗持续滞后。

  • 我配置了一个基于Web服务的入站消息传递网关。我想记录传入的SOAP消息(信封和里面的所有消息)。最好的方法是什么? 我曾尝试使用带有日志通道适配器的有线抽头,但不知道一个好的表达式值来获取实际的SOAP XML。如果入站网关配置为不提取有效负载,则我将SaajSoapMessage视为有效负载,否则将DOMSource视为有效负载。是否有一个表达式将SaajSoapMessage作为XML字符串

  • 我在Databricks上阅读下面的博客 https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html 在解释spark-kafka集成如何使用WAl接收器的过程时,它说 1.Kafka数据由在火花工作线程/执行程序中运行的Kafka接收器持续接收。这使用了Kafka

  • 当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该

  • 我正在从事一个spring批处理项目,其用例是:spring批处理作业依赖于SFTP服务器(远程目录)上的文件,因此,一旦文件在SFTP服务器上可用,相应的作业(spring批处理)就应该启动。此外,我不想先开始工作,然后再查找文件,因为这将是基于时间的方法,而不是基于通知的方法。所以我想使用spring集成(sftp入站通道适配器)。作为入站适配器(SFTP)的一部分,一旦我在SFTP服务器的远

  • 我开始在我的项目中使用spring-integration-kafka,我可以生产和消费来自kafka的消息。但是现在,我想为特定的分区生成消息,并从特定的分区消费消息。 例如,我想向分区3生成消息,而消费将只接收来自分区3的消息。 到目前为止,我的主题有8个分区,我可以向特定的分区发送消息,但是我还没有找到配置消费者只接收来自特定分区的消息的方法。 因此,任何关于我应该如何配置消费者与sping