我是流媒体代理(如Kafka)的新手,来自排队消息系统(如JMS、Rabbit MQ)。
我从Kafka文档中读到,消息作为记录存储在Kafka分区的偏移量中。消费者从偏移量读取。
消息和记录有什么区别[多个/部分消息是否构成记录?]
当消费者从偏移量读取时,消费者是否有可能读取部分消息?消费者是否需要基于某种逻辑将这些对等消息串起来?
或
1条消息=1条记录=1个偏移量
之所以会出现这个问题,是因为“批量大小”决定了应该向borker发布多少字节的消息。假设有两条消息,message1=100字节,message2=200字节,batchsize设置为150字节。这是否意味着message1中的100字节和message2中的50字节会同时发送给代理?如果是,这两条消息如何存储在偏移量中?
在Kafka中,制作人向主题发送消息或记录(两个术语可以互换使用)。一个主题被划分为一个或多个分区,这些分区分布在Kafka集群中,该集群通常至少由三个代理组成。
消息/记录被发送到前导分区(由单个代理拥有)并与偏移相关联。偏移是单调递增的数字标识符,用于唯一标识主题/分区内的记录,例如存储在记录分区中的第一条消息将具有偏移量0等。
偏移量既用于标识消息在主题/分区中的位置,也用于标识消费者组的位置。
出于优化目的,生产者将对每个分区进行消息批处理。当达到配置的batch.sized
或linger.ms
时,批处理被认为已准备就绪。例如,如果您将batch.size
设置为200KB并且您发送了两条消息(150KB和100KB),它们可能是同一批处理的一部分。但是生产者永远不会将单个消息分割成块。
否,消费者无法读取部分消息。
我有一个问题,假设有一个TOPIC T1,有两个消费者C1和C2属于两个不同的组,电流偏移量是0.我们知道Kafka维护消费者的偏移量。因此,如果 C1 使用消息并且 Offset 变为 1,那么如果 C2 使用消息,它将从 1 偏移量开始,还是从 0 偏移量开始使用消息,会发生什么情况?表示两个不同的消费群体将如何维持抵消? 谢啦
我有以下:Source-Kafka topic(trans)Channel-memory Sink-Hdfs(avro _ event) kafka主题trans中的数据是使用c#生产者编写的,并且有数千条avro记录。当我运行我的水槽消费者时,它开始将数据下沉到hdfs。问题是数据的格式是:模式数据模式数据 而不是: 模式数据数据 我猜这是因为flume需要一个带有{header} {body}
我对非常陌生,我们正在使用。 我需要做的是使用来自主题的消息。为此,我必须用Java编写一个消费者,它将消费来自主题的消息,然后将该消息保存到数据库。保存消息后,将向Java消费者发送一些确认。如果确认为true,则应使用主题中的下一条消息。如果AcknowlDement为false(这意味着由于某些错误消息,从主题读取的信息无法保存到数据库中),则应再次读取该消息。 我认为我需要使用<code>
我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。
我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个
我有一个SOAP Web服务,它发送一个kafka请求消息,并等待一个kafka响应消息(例如,consumer.poll(10000))。 每次调用web服务时,它都会创建一个新的Kafka生产者和一个新的Kafka消费者。 每次调用web服务时,使用者都会收到相同的消息(例如,具有相同偏移量的消息)。 我使用的是Kafka0.9,启用了自动提交,并且自动提交频率为100毫秒。 更新0001