我目前正在与Kafka和Flink合作,我有kafka在我的本地PC上运行,我创建了一个正在消费的主题。
桌面\kafka\bin\windows
有没有办法进一步了解这条消息的细节?比如说时间?钥匙我查看了Kafka的文档,但没有找到关于这个主题的内容
使用以下命令:
kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_name \
--from-beginning --formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true --property print.value=true \
--property key.deserialzer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
使用开箱即用的控制台消费者(我使用的是Kafka 0.9.0.1),您只能使用不同的格式打印消息的密钥和值。要打印密钥,请设置属性print。key=true
。
还有另一个属性键。分隔符
默认为“\t”(一个选项卡),您也可以更改为任何您想要的内容。
要设置这些属性,您可以创建一个配置文件并使用--consumer。配置
您也可以实现自己的格式化程序,并将其与
--formatter
选项一起使用,但仍然只有键和值,因为这是MessageFormatter特性提供的(请参阅下面的writeTo)。
trait MessageFormatter {
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
def init(props: Properties) {}
def close() {}
}
例如:
./bin/kafka-console-consumer.sh --new-consumer --bootstrap-server kafka-1:9092 --topic topic1 --property print.key=true --property key.separator="-" --from-beginning
key-p1
key-p2
key-p3
null-4
null-8
null-0
我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认
我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我
我正在开发一个spring boot kafka消费者应用程序。它将有不同的消费者在不同的主题上工作。使用者的所有信息都来自application.yml文件。 我无法将应用程序属性中的主题列表设置到KafKalistener。 在这两种情况下,我都得到以下错误: java.lang.IllegalArgumentException:无法解析占位符 从应用程序属性获取主题并将其设置在KafkaLi