当前位置: 首页 > 知识库问答 >
问题:

如何将Kafka事务生产者与具有多个模式记录的模式注册表一起使用?

曹理
2023-03-14

我正在研究Kafka支持的事务生产者和这两个链接中描述的精确一次处理: 1)https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/2)https://www.confluent.io/blog/transactions-apache-kafka/

对于用户可以轮询记录(例如主题A)、处理记录并发布到多个输出主题(例如B和C)和更改日志主题的流式处理场景,这似乎是一个非常优雅的解决方案。如果正确使用producer的事务API,则这可以按照链接中所述的原子方式发生。

不幸的是,Kafka生产者的定义包括键和值的类型。IProducer

如果我们在主题中对各种记录有多个模式定义,我们就有一个问题,我们需要为每个模式定义一个生产者。如果有这个选择,我们就不能有跨多个输出接收器主题的原子事务。

除非我遗漏了什么,否则Kafka对制片人的事务性支持似乎非常有限。在生产中,在schema Registry中定义schema以处理不断发展的schema以及向前和向后的兼容性是非常实用的。

我想使用IProducer

提前谢谢


共有1个答案

胡新
2023-03-14

如果有不同类型的数据,可以编写一个通用序列化程序,根据主题自动序列化给定的数据项。

在ISerializer中,您可以使用以下方法

byte[] Serialize(T data, SerializationContext context)

SerializationContext包含主题属性。(参考)

或者,您也可以使用标题来存储有关如何序列化标题的一些重要信息。

我不知道。NET,但我假设您可以围绕要生成的每种类型编写一个包装器类,并且您的实际对象(数据)将是该类中可以获得的属性。

然后,可以根据其类型、主题名称或标头中的某些信息序列化实际对象。

 类似资料:
  • 关于Avro序列化的消息是如何被Kafka和Schema Registry处理的,我想要了解的是,从这篇文章中,我了解到模式ID存储在每个消息中的可预测位置,因此我们似乎可以在同一个主题中拥有不同模式的消息,并且能够找到正确的模式,并基于此成功地反序列化它们。另一方面,我看到许多人似乎在使用“一个模式附加到一个主题”的表述,但这意味着每个主题都有一个模式。 那么哪一个是对的呢?我是否可以利用模式注

  • 在Martin Fowler的书中,我读到了和模式。 作者提到,将identityMap放在UnitOfWork内部是一个好主意。但怎么做呢? 据我所知,受会话限制,但作者没有提到 每个unitOfWork实例需要多少个IdentityMap实例? 如果我们有两个并发请求呢?

  • 我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表

  • 我试图针对同一个kafka和zookeeper集群装载两个kafka模式注册表实例。但谢马一家的情况越来越复杂。当运行这两个注册表时,如果我使用api“kafka schema registry”注册一个架构,它似乎是在“schema registry ui other”中创建的,而不是像预期的那样显示在“kafka schema registry ui”中。我的配置是: 不可能有两个分离的模式注

  • 我试图从REST代理发布json模式,但遇到异常 curl-k-x post-h“content-type:application/vnd.schemaregistry.v1+json”--数据“{”schema“:”{“type”:“object”,“properties”:{“firstname”:{“type”:“string”},“lastname”:{“type”:“string”},“