我创建了一个简单的生产者-消费者应用程序,使用自定义序列化器和反序列化器。
在我生成的Message类中添加了一个新方法之后,使用者开始在反序列化时被堆栈。我的生产者使用的是新类(有新方法),消费者使用的是旧类(没有方法)。
public enum MessageType{
A,
B,
C,
}
public class Message implements Serializable {
protected long id;
protected MessageType type;
public Message(int id , MessageType type) {
this.id=id;
this.type=type;
}
public class MessageA extends Message{
private String name;
public MessageA(int id, String name) {
super(id,MessageType.A);
this.name = name;
}
public class MessageSerializer implements Serializer<Message> {
@Override
public byte[] serialize(String topic, Message msg) {
return SerializationUtils.serialize(msg);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public void close() {}
}
public class MessageDeserializer implements Deserializer<Message> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public Message deserialize(String topic, byte[] data) {
if(data == null)
return null;
log.info("Deserializing msg");
Message msg= (Message) SerializationUtils.deserialize(data);
if(msg== null){
log.warn("Deserialization failed, got null as a result.");
return null;
}
log.info("Deserialization complete");
return msg;
}
@Override
public void close() {}
}
JSONSerializer/Deserializer可以处理这些类型的修复吗?如果我要使用JSONSerialzier,它应该只关心类的模式,对吗?
如果您使用JSONSerializer
,它不应受到对方法的更改的影响--只应受到对数据字段的更改的影响。其他序列化器可以序列化包括方法在内的整个对象。
我们正在考虑在我们的for消息传递中使用Kafka,我们的应用程序是使用Spring开发的。所以,我们已经计划用Spring-Kafka。 生产者将消息作为HashMap对象放入队列。我们有JSON序列化器,并且假设映射将被序列化并放入队列。这是生产者配置。 我们看到的文章很少,建议是这样做: 我们不想为创建反序列化程序编写一些代码。有没有我们缺少的样板?任何帮助都将不胜感激!!
我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
我知道什么是生产者和消费者。但官方文件显示 < li >它是流媒体平台。 < li >它是企业消息系统。 < li>Kafka具有从数据库和其他系统导入和导出数据的连接器。 这是什么意思? 我知道生产者是向Kafka Broker发送数据的客户端应用程序,消费者也是从Kafka Broker读取数据的客户端应用程序。 但我的问题是,消费者可以将数据推送到Kafka Broker吗? 据我所知,我认
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认