我使用的是一个Java应用程序,它让消费者开始阅读Kafka主题。每次我需要启动消费者应用程序时,我都必须使用cmd中的命令启动Zookeeper和Kafka服务器。是否可以用小型Java程序启动/停止它们?非常感谢。
我的结构是这样的:日志文件 但我卡在Kafka到Logstash部分。 首先,Filebeat可以向Kafka生成消息,我可以使用以下方式检查它: 也可以由命令使用: 但是,当我尝试使用logstash来消费主题时,没有任何东西可以被检索到,Zoomaster一直在抛出: 2017-11-13 16:11:59205[myid:]-信息[NIOServerCxn.工厂:0.0.0.0/0.0.0.
我使用的是高级消费者,如所述:https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例 我注意到,我的消费者不会永远运行,而是在一段时间后结束。在zookeeper一侧,我看到以下内容: INFO已处理的会话终止的setsionid: 0x144a4854325004d(org.apache.zookeeper.server.准备请
我一直在Kafka消费者方面面临下面的异常。令人惊讶的是,这个问题不一致,旧版本的代码(具有完全相同的配置,但有一些新的不相关功能)按预期工作。有人能帮助确定是什么导致了这种情况吗? 我的应用程序使用以下内容: 自定义侦听器类com。我的公司。听众。Kafka巴奇列斯特纳 附加查询:即使设置了,异常堆栈跟踪仍然包含我省略的完整有效负载。知道为什么吗? 提前感谢! 更新: KafkaBatchLis
我的消费者配置有kafka批处理侦听器配置和@KafkaListener消耗消息列表。我有一个消费者拦截器,我想为每条记录设置唯一ID,并将其值存储在映射诊断上下文(MDC)中。如果我的kafka侦听器消耗单个消息,则唯一ID是正确的。但是我的kafka侦听器消耗消息列表,因此MDC. get(“id”)只获取最后一个值。如何处理它?我的拦截器;
Spring Boot environment侦听kafka主题(@KafkaListener/@StreamListener)将侦听器工厂配置为以批处理模式运行: 或通过应用程序。属性: 如何配置框架,以便给定两个数字:N和T,它将尝试为侦听器获取N条记录,但不会等待超过T秒,如下所述:https://doc.akka.io/docs/akka/2.5/stream/operators/Sour
背景是: 我想在收到kafka消息时触发websocket客户端。对于两个websocket服务器实例,这会产生一个问题,即kafka分区不必与服务于确切websocket客户端的服务器实例相关联。 Kafka主题中已经填充了具有多个分区的消息。我对重置偏移量和真正的“提交日志作为真相的来源”不感兴趣。对于这种情况,kafka只用于将触发器传递给web套接字客户端 我需要的: 我的意图是绕过同一个
假设在Kafka中,我有一个主题“A”的4个分区,并且我有20个消费者组“AC”的消费者。我不需要任何排序,但我想通过扩展我的消费者实例来更快地处理消息。请注意,所有消息都是独立的,可以独立处理。 我查看了消费者配置分区。分配策略,但不确定是否可以根据消息可用性实现消费者到分区的动态分配。
我在我的微服务中定义了一个Kafka消费者。我已经部署了我的应用程序的5个实例。我已将ConvoltKafkaListenerContainerFactory中的并发参数设置为2。这是否意味着每个应用程序实例有2个消费者实例,或者我连接的整个主题有2个消费者实例?
我有多个制作人,可以向一个Kafka主题发送多种类型的事件。 我有一个消费者,它必须消费所有类型的消息。每种类型的消息都有不同的逻辑。 但在这种情况下,所有消息都指向此方法,不仅是EventOne 如果我实现了两种方法(对于每种类型的消息),那么所有消息都只能使用一种方法。 如果我像这样实现监听器: 然后我得到一个例外:org。springframework。Kafka。KafkaListener
我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端
我正在为Spring kafka项目实施Exceptive处理,我有自己的DeadLetterPublishingRecoverer,它处理Kafka侦听器中发生的异常,整个流程是完美的,即当我抛出逻辑中的任何异常时,我可以根据框架实现将其记录并发送到DLQ主题。消费者记录出现问题,如果我更改消费者记录值中的任何内容,相同的消费者记录将发布到DLQ主题,更改实际上是错误的,我想记录原始消息。 ..
我的Spring Cloud应用程序中有3个不同的流。每天有数十万条记录,然而,每天大约有3到4条记录消失。我在日志中看到了它们,但并没有完成所有的连接。 代码: 少数几个不起作用的日志之一: 21年9月1日上午5:48:15.863 e2e64c0faa9e 05:48:15.863[订单-chargeds-charges-v3-df80aa80-d3f6-48c7-a862-7a05c55d5
我是Kafka新手,我正在使用Kafka1.0。 我使用拉取模式读取kafka消息,也就是说,我定期查看Kafka主题以获取新消息,但我没有将偏移量写回Kafka。 我会问Kafka如何知道我消耗了哪些偏移量,或者Kafka记住进度的机制是什么(Kafka偏移量)
我们使用的是spring集成kafka版本3.1.2。RELEASE and int kafka:消息驱动的通道适配器,用于使用来自远程kafka主题的消息。生产者发送加密消息,我们使用反序列化器解密实际消息。我们可以使用主题中发布的所有消息。我们将自动提交用作false。我们想知道在成功处理消息后如何从我们的服务提交或确认消息。有人能帮助我们如何提交从消息驱动通道读取的消息并提供一些参考实现吗?