scala prettyprint-override"> val kafkaConsumerSettings: ConsumerSettings[String, String] =
ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaConfig.server)
.withGroupId(kafkaConfig.group)
.withProperties(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "100",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SSL"
)
Consumer
.plainSource(kafkaConsumerSettings, Subscriptions.topics(kafkaConfig.topic))
.runWith(Sink.foreach(println))
但是,consumer只从主题中第一个未提交的消息开始轮询。我希望总是从偏移量0开始,不管提交的消息是什么。使用Alpakka消费者,如何手动指定偏移量?
我想您需要添加几个配置项:
>
consumerconfig.enable_auto_commit_config->false
,这样作业就不会保存任何偏移量
consumerconfig.auto_offset_reset_config->“最早”
,以便作业从头开始。
我做了一个poc,其中我使用spark流从Kafka读取数据。但我们的组织要么使用ApacheFlink,要么使用Kafka消费者从ApacheKafka读取数据,作为标准流程。所以我需要用Kafka消费者或ApacheFlink替换Kafka流媒体。在我的应用程序用例中,我需要从kafka读取数据,过滤json数据并将字段放入cassandra中,因此建议使用kafka consumer而不是f
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我已经通过Docker compose运行了Kafka和Zookeeper。我可以使用Kafka终端向主题发送/使用消息,也可以通过Conduktor监控所有内容。但不幸的是,我无法通过使用阿尔帕卡连接器的Scala应用程序使用MSG。该应用程序连接到主题,但每当我向主题发送MSG时,都没有任何反应。 只有Kafka和动物园管理员正在通过码头工人组成运行。我直接在主机上运行 Scala 消费者应用
我对骆驼生产商有很好的了解,但我不能对各种骆驼消费者保持清醒的头脑。特别是事件驱动消费者和轮询消费者,camel如何知道为这些消费者调用回调? 消费者的一般流量是多少?
我知道什么是生产者和消费者。但官方文件显示 < li >它是流媒体平台。 < li >它是企业消息系统。 < li>Kafka具有从数据库和其他系统导入和导出数据的连接器。 这是什么意思? 我知道生产者是向Kafka Broker发送数据的客户端应用程序,消费者也是从Kafka Broker读取数据的客户端应用程序。 但我的问题是,消费者可以将数据推送到Kafka Broker吗? 据我所知,我认
我们有一个非常简单的Kafka Consumer(v 2.6.2)。它是使用者组中唯一的使用者,并且该组是唯一一个阅读主题的组(有6个分区,其中有大约300万个事件)。Broker也是2.6.x版本 由于我们需要实现一个“只有一次”的场景,我们深入研究了一下,如果我们真的只使用一次写入主题的每个事件。不幸的是,我们发现:消费者有时会跳过一个偏移量,有时甚至会跳过一组分区的偏移量。 消费者除了记录之