当前位置: 首页 > 知识库问答 >
问题:

使用自定义MessageConverter创建消息时出现StreamBridge问题

施博文
2023-03-14

我们正在将我们的事件系统迁移到函数式编程模型。我们遵循了下一个“指南”,它对消费者非常有效,但使用StreamBridge的生产者没有正确创建消息。

我们有下一个错误:

java.lang.ClassCastException: class com.streamdemo.domain.Event cannot be cast to class java.lang.String (com.streamdemo.domain.Event is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

我们正在使用< code > 2021 . 0 . 0 spring-cloud版本。

通过简单的配置:

spring.cloud.stream:
  bindings:
    demo-channel-out-0:
      producer:
        use-native-encoding: true
      content-type: application/json

这是我们的自定义MessageConver,它用于我们所有的微服务,因此,保持消息的格式很重要:

public class StreamMessageConverter extends AbstractMessageConverter {

  private static final Logger log = LoggerFactory.getLogger(StreamMessageConverter.class);
  private static final MimeType JSON_MIME_TYPE = MimeType.valueOf("application/json");
  private final Class<?> clazz;
  private final ObjectMapper objectMapper;

  public StreamMessageConverter(ObjectMapper objectMapper) {
    this(objectMapper, Object.class);
  }

  public StreamMessageConverter(ObjectMapper objectMapper, Class<?> clazz) {
    super(JSON_MIME_TYPE);
    this.objectMapper = objectMapper;
    this.clazz = clazz;
  }

  protected boolean supports(Class<?> clazz) {
    return clazz.equals(clazz);
  }

  @Nullable
  protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
    Object payload = message.getPayload();

    try {
      return payload instanceof byte[] ? this.objectMapper.readValue((byte[]) payload, targetClass)
          : this.objectMapper.readValue((String) payload, targetClass);
    } catch (IOException var6) {
      log.info("Unable to read json payload as object", var6);
      return null;
    }
  }

  @Nullable
  protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {
    try {
      return this.objectMapper.writeValueAsString(payload).getBytes("UTF-8");
    } catch (IOException var5) {
      log.info("Unable to write payload as json", var5);
      return null;
    }
  }
}

我们还使用< code>StreamBridge来生成事件:

@Service
public class MessageBroker {

  private final StreamBridge source;

  public MessageBroker(StreamBridge source) {
    this.source = source;
  }

  public void send(Event event) {
    source.send("demo-channel-out-0", event);
  }
}

当<code>源代码。send方法被调用,它在内部调用StreamMessageConverter。convertFromInternal当我们认为它不应该这样做时。在以前的模型(@EnableBinding,@StreamListener)中,只有StreamMessageConverter。convertToInternal被调用,以便将消息序列化并发送到Kafka主题。

我们创建了这个演示项目,您可以在其中重现错误。

共有1个答案

胡致远
2023-03-14

所以发生的事情是正确的。StreamBridge依赖MessageCon的原因是因为我们定义了一个即时传递函数(函数

您的MessageConver似乎是在考虑某些期望的情况下编写的,这是不正确的,也从来没有正确过。您假设如果有效负载不是byte[]它是String。即使使用较旧的编程模型,也有有效负载可能是这两种类型以外的类型的情况。因此,我建议将以下代码块修复为正确的IF语句

return payload instanceof byte[] ? this.objectMapper.readValue((byte[]) payload, targetClass)
          : this.objectMapper.readValue((String) payload, targetClass);

。。。以确保它能处理另一种情况。

也就是说,你的转换器没有做任何特别的事情,特别是在当前的编程模型中。所以从技术上来说,要解决你的问题,你只需要将其全部删除。

 类似资料:
  • 用例-有一个带有消息的主题(空,元数据)。我需要从主题创建一个Ktable,其键(metadata.entity_id)和值为metadata。这个表稍后将被用来与具有相同键的流进行连接。 一旦我将消息推送到主题-METADATA_TOPIC。这会导致以下错误。我在这里遗漏了什么吗?kafka-stream 2.2.0

  • 问题内容: 我可以看到使用表单时如何向字段添加错误消息,但是模型表单呢? 这是我的测试模型: 我的模型表格: 在字段中的错误信息:和是: 这是必填栏 如何以模型形式进行更改? 问题答案: 对于简单的情况,你可以指定自定义错误消息

  • 简介 此消息 用来接收 用户自定义TOPIC消息 发送过来的事件。 消息体 ChannelMessageBean 例子 Kotlin @Subscribe(threadMode = ThreadMode.MAIN) fun onReceiveCustomMessage(customEvent: ChannelMessageBean) { // TODO } ChannelMessageB

  • 当我们通过KSQL创建AVRO消息并试图使用Kafka Connect来消费这些消息时,会发生一些奇怪的事情。有点上下文: 源数据一个第三方提供商在我们的一个Kafka集群上以JSON的形式生成数据(到目前为止,还算不错)。我们实际上看到了数据。 数据转换由于我们的内部系统要求在AVRO中对数据进行编码,我们创建了一个KSQL集群,通过在KSQL中创建以下流将传入数据转换为AVRO: 随着偏移量的

  • 问题 如何定义NotFound消息和其他消息? 解法 import web urls = (...) app = web.application(urls, globals()) def notfound(): return web.notfound("Sorry, the page you were looking for was not found.") # You c

  • 我使用的是jboss KeyCloak1.5最终版本。我开发了与keycloak属性和我的用户企业数据库接口的自定义用户联邦提供程序。 我的需要是发送到用户登录界面自定义的错误消息,基于特定的特定错误与我的遗留用户数据库。