问题内容: 我已经学习apache kafka一个月了。但是,我现在陷入了困境。我的用例是,我有两个或多个在不同计算机上运行的使用者进程。我进行了一些测试,在其中我在kafka服务器中发布了10,000条消息。然后,在处理这些消息时,我杀死了一个使用者进程并重新启动了它。消费者正在将处理后的消息写入文件中。因此,使用结束后,文件显示了超过1万条消息。因此,某些消息是重复的。 在使用者过程中,我已禁
问题内容: 我正在尝试使用Avro来读取和写入Kafka的邮件。有没有人有使用Avro二进制编码器对将放入消息队列中的数据进行编码/解码的示例? 我需要的是Avro而不是Kafka。或者,也许我应该考虑其他解决方案?基本上,我试图在空间方面找到一种更有效的JSON解决方案。刚刚提到了Avro,因为它可以比JSON紧凑。 问题答案: 我终于想起要询问Kafka邮件列表,并得到以下答复,效果很好。 是
问题内容: 想要使用高级消费者API实现延迟的消费者 大意: 按键生成消息(每个消息包含创建时间戳记),以确保每个分区按生成时间对消息进行排序。 auto.commit.enable = false(将在每个消息处理之后显式提交) 消费一条消息 检查消息时间戳,并检查是否经过了足够的时间 处理消息(此操作将永不失败) 提交1个偏移 有关此实现的一些担忧: 提交每个偏移量可能会使ZK变慢 Consu
问题内容: 我编写了一个单一的Kafka使用者(使用Spring Kafka),该使用者从单个主题中读取内容,并且是使用者组的一部分。消耗完一条消息后,它将执行所有下游操作,并移至下一个消息偏移。我将其打包为WAR文件,并且我的部署管道将其推送到单个实例。使用部署管道,我可以将该工件部署到部署池中的多个实例。 但是,当我希望多个消费者作为基础架构的一部分时,我无法理解以下内容: 实际上,我可以在部
问题内容: 在搜索如何通过API创建Kafka主题时,我在Scala中找到了以下示例: 最后一个arg 显然是Scala对象。我不清楚如何使该示例在Java中工作。 这篇文章如何在Clojure中创建Scala对象的问题在Clojure中提出了相同的问题,答案是: 我认为Java中的翻译成: 但是,当我尝试使用该方法(或其他任何数量的变体)时,它们都无法编译。 编译错误是: 我正在使用kafka_
问题内容: 我有一个kafka stream应用程序,等待有关topic的记录被发布。它将接收json数据,并根据我想将该流推送到不同主题的键的值来确定。 这是我的流应用程序代码: 在此代码中,我要检查操作类型,然后根据需要将流推送到相关主题中。 我该如何实现? 编辑: 我已将代码更新为: 问题答案: 您可以使用方法来拆分流。此方法使用谓词将源流分成几个流。 以下代码取自kafka-streams
问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp
问题内容: 我正在使用Java 编写使用者。我想保持消息的实时性,因此,如果有太多消息在等待使用,例如1000条或更多,我应该放弃未使用的消息,并从最后一个偏移量开始使用。 对于此问题,我尝试比较主题的最后提交的偏移量和主题的结束偏移量(仅1个分区),如果这两个偏移量之间的差大于某个值,则将主题的最后提交的偏移量设置为下一个偏移量,这样我就可以放弃那些多余的消息。 现在我的问题是如何获得主题的最终
问题内容: 在kerberosed环境中向kafka主题发送消息时出错。我们在hdp 2.3上有集群 我遵循了这个http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer- with-kerberos/ 但是对于发送消息,我必须先显式地执行kinit,然后才能将消息发送到kafka主题。我试图通过java类进行编织,但是
问题内容: 我在Java中有一个简单的Kafka Consumer,带有以下代码 发布消息后,将成功读取数据,但是当它返回检查它时。hasNext(),它将保持待处理状态,再也不会返回。 什么会拖延这个? m_stream是通过以下方式获得的KafkaStream: 问题答案: 解决方案是添加属性 “ consumer.timeout.ms” 现在,当达到超时时,将引发ConsumerTimeou
问题内容: 当我尝试使用各自的架构在数据上运行带有Avro的Kafka Consumer时 ,它返回错误“ AvroRuntimeException:格式错误的数据。长度为负:-40”。我看到其他人也遇到了类似的问题,即将字节数组转换为json,Avro写入和读取以及Kafka Avro Binary * coder。我还引用了这个Consumer Group Example实例,该实例对我们都有
问题内容: 由于增加了头的记录(ProducerRecord&ConsumerRecord)在卡夫卡0.11,是有可能处理与卡夫卡流的话题时,得到这些标题?当调用类似on的方法时,它提供记录的和的参数,但是我看不到访问的方法。如果我们可以刚好超过s ,那就太好了。 例如 这样的事情会工作: 问题答案: 自Streams API 2.0版以来,可以访问记录头。(有关详细信息,请参见KIP-244。)
问题内容: 与使用控制台脚本保护的Kafka通信时出现问题。Kafka受保护,监听器受保护,机制由。 我做了什么:我尝试使用kafka脚本之一列出一些数据: 但是我明白了 命令失败,这是可以理解的,因为它由sasl保护。 因此,我尝试了如何向该命令添加客户端用户名/密码。首先,我尝试运行脚本,我曾经添加必要的文件。我很快发现我无法直接添加文件,我需要使用文件,所以我做到了。 我的属性文件(请记住,
问题内容: 我正在使用apache kafka进行消息传递。我已经用Java实现了生产者和消费者。我们如何获取主题中的消息数量? 问题答案: 从消费者的角度来看,想到此的唯一方法是实际消费消息并计数。 Kafka代理公开了自启动以来收到的消息数量的JMX计数器,但是您不知道已经清除了其中的多少。 在最常见的情况下,最好将Kafka中的消息视为无限流,而获得当前磁盘上保留的离散值并不重要。此外,在与
问题内容: 我需要在kafka-0.8.2.2.3中删除一个主题。我已使用以下命令删除该主题: 该命令已成功执行,但是当我运行命令以列出主题时,我可以看到该主题仍然存在,并且显示 标记为“删除” 。 当我创建主题DummyTopic时,它会输出异常,该主题已存在,下面是堆栈跟踪: 请让我知道如何删除该主题。 问题答案: 从0.8.2.x版本开始支持删除主题。您必须首先在所有代理上启用主题删除(设置