当前位置: 首页 > 知识库问答 >
问题:

我可以使用Kafka流读写不同类型的消息吗?

高嘉树
2023-03-14

我正在编写一个使用Kafka流的应用程序。它从主题A读取,进行一些转换,然后写入主题B。在转换期间,值按键分组,因此输出键、值类型不同于输入值类型。Kafka流使用特定类型的Serdes(例如String serdes序列化和反序列化字符串)进行序列化和反序列化,因此在数据转换后它将无法工作。如何在Streams API中定义不同的序列化器和反序列化器?

共有1个答案

长孙鸿振
2023-03-14

你当然可以

当您创建流、调用 groupBy 或将输出写入某个主题时,可以提供 Serde序列化。例:

Serde<String> stringSerde = Serdes.String();
Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
Produced<String, YourCustomItem> produced = Produced.with(stringSerde, new JsonSerde<>(YourCustomItem.class));

KStream<String, String> kStream = streamsBuilder.stream("sourceTopicName", consumed);
KStream<String, YourCustomItem> transformedKStream = kStream.mapValues((key, value) -> new YourCustomItem());
transformedKStream.to("destinationTopicName", produced);

transformedKStream.groupByKey(Serialized.with(Serdes.String(), new JsonSerde<>(YourCustomItem.class)));

其中JsonSerde来自spring kafkadependency。或者您可以使用以下Serde

Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
 类似资料:
  • 我们计划编写一个Kafka消费者(java),它读取Kafka队列以执行消息中的操作。

  • 使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?

  • 我希望只使用一次语义,但我不想和消费者一起阅读信息。我宁愿读Kafka的留言。如果我加上处理。保证=恰好一次流配置,将恰好一次语义保留?

  • 假设我有一个抽象类“车辆”和另外两个继承自名为“汽车”和“自行车”的车辆类的子类。 所以我可以随机创建一系列汽车和自行车对象: 并将它们全部添加到Arraylist: 是否有可能将这些对象写入单个文本文件,并根据特定的对象类型(汽车、自行车)从文本文件读取回Arraylist,而无需为每种类型维护不同的文本文件

  • 让我们假设Spring Cloud Stream应用程序从创建。它对事件感兴趣。一旦到达,它将对其进行处理并生成一个输出事件到相同的。 我面临的问题是,由于Kafka流应用程序从/向同一个主题读写,所以它试图处理自己的写操作,这是没有意义的。 如何防止此应用程序处理它生成的事件? 更新:正如Artem Bilan和sobychako指出的,我曾考虑过使用,但有些细节让我怀疑如何处理: OrderC

  • 我已经更新了我的Kafka从版本0.10.2.0到版本2.1.0,现在Kafka不能消费消息。我使用Spring引导,这是我的配置: 我已经更改了组id,以避免旧组id出现问题。我当前的spring版本是2.1。2.释放。在我的应用程序中,我可以看到我的客户是如何不断地重新连接的 你知道这个问题吗?