问题内容: 有没有一种方法可以配置Spring Kafka使用者以跳过无法读取/处理(已损坏)的记录? 我看到一种情况,如果无法反序列化,则消费者将停留在同一记录上。这是消费者抛出的错误。 使用者轮询该主题,并一直循环循环打印相同的错误,直到程序被杀死为止。 在具有以下消费者工厂配置的@KafkaListener中, 问题答案: 您需要:https : //docs.spring.io/sprin
问题内容: 我有一个风暴拓扑来处理来自Kafka的消息,并根据手头的任务在Cassandra中进行HTTP调用/保存。我会尽快处理这些消息。由于来自外部源(例如HTTP)的响应,很少有消息没有得到完全处理。如果HTTP服务器在一段时间后不响应/返回错误消息以重试,我想为重试实现指数补偿机制。我想不出什么主意就能实现它们。我想知道如果还有其他可以容错的解决方案,那么其中哪一个将是更好的解决方案。由于
问题内容: 我有一个带有2个分区的Kafka集群。我一直在寻找一种将分区数增加到3的方法。但是,我不想丢失该主题中的现有消息。我尝试停止Kafka,修改文件以将分区数增加到3,然后重新启动Kafka。但是,这似乎并没有改变任何东西。使用Kafka ,我仍然看到它仅使用2个分区。我正在使用的Kafka版本是0.8.2.2。在0.8.1版中,曾经有一个名为的脚本,我想可能可以解决问题。但是,我在0.8
问题内容: 使用Apache Kafka Java客户端(0.9),我试图使用Kafka Producer类 将一连串的记录发送到代理。 异步发送方法会立即返回一会儿,然后在很短的时间内开始阻塞每个调用。大约三十秒钟后,客户端开始引发异常(TimeoutException),并显示消息 “批处理已过期” 。 在什么情况下会引发此异常? 问题答案: 此异常表示您正在以比发送记录更快的速率对记录进行排
问题内容: 我已经设计了一个Spring Boot REST API ADD和GET方法 卡夫卡听众 在服务层中,我需要返回来自的项目列表。 使用Spring kafka进行REST API的最佳方法是什么? 问题答案: 您需要使用a 将结果返回到rest控制器。 参见ReplyingKafkaTemplate。 2.1.3版引入了KafkaTemplate的子类来提供请求/回复语义。该类名为Re
问题内容: 使用KTable时,当实例/使用者数等于分区数时,Kafka流不允许实例从特定主题的多个分区中读取。我尝试使用GlobalKTable实现此目的,但问题是数据将被覆盖,也无法对其应用聚合。 假设我有一个名为“ data_in”的主题,具有3个分区(P1,P2,P3)。当我运行Kafka流应用程序的3个实例(I1,I2,I3)时,我希望每个实例都从“ data_in”的所有分区中读取数据
问题内容: 在轮询Kafka时,我已经使用该功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后,并从一个话题。 在轮询数据之前,是否可以迭代调用每个主题名称 来 达到结果?偏移量如何精确存储在Kafka中? 我每个主题有一个分区,并且只有一个使用者可以读取所有主题。 问题答案: Kafka如何存储每个主题的偏移量? 卡夫卡已将抵销存储从动物园管理员转移到卡夫卡经纪人
问题内容: 我是一名学习Kafka的新学生,在了解多个消费者(到目前为止,文章,文档等对他们没有太大帮助)方面,我遇到了一些基本问题。 我尝试做的一件事是编写我自己的高级Kafka生产者和消费者并同时运行它们,将100条简单消息发布到某个主题,然后让消费者检索它们。我已经成功地做到了这一点,但是当我尝试引入另一个使用者来使用与刚刚发布消息的主题相同的主题时,它没有收到消息。 据我了解,对于每个主题
问题内容: 我正在使用具有512兆内存ram的DigiOcean实例,使用kafka出现以下错误。我不是Java熟练的开发人员。如何调整卡夫卡以利用少量的ram。这是一个开发服务器。我不想为一台更大的机器每小时多付钱。 问题答案: 您可以通过编辑来调整JVM堆大小,依此类推: 该参数指定最小堆大小。要使服务器至少启动,请尝试对其进行更改以使用更少的内存。假设您只有512M,则还应该更改最大堆大小(
问题内容: 我目前无法在KSTREAM APP 中 反序列化avro PRIMITIVE密钥 用avro模式编码的密钥(已在模式注册表中注册), 当我使用kafka-avro-console-consumer时,我可以看到密钥已正确反序列化 但是不可能使其在KSTREAM应用程序中工作 密钥的avro模式是主要的: 我已经关注了合流的文档 它对于该值工作得很好,但是该键将是一个字符串,该字符串包含
问题内容: 我正在用Spring Boot编写一个应用程序,所以我要写给Kafka: 然后在我的方法里面: 但是我觉得我只是依靠它来工作,我怎么知道它是否起作用?如果它是异步的,那么返回200代码并希望它能奏效是个好习惯吗?我很困惑。如果没有Kafka,这不会失败吗?不应该提示我捕获异常吗? 问题答案: 是的,如果Kafka不可用,该呼叫将失败,但是如果您异步发送它,则不会通知任何人。您可以指定将
问题内容: 我有一个简单的Java生产者,如下所示 我正在尝试读取以下数据 但是消费者没有阅读来自kafka的任何消息。如果我在下面添加以下内容 然后,消费者开始阅读该主题。但是,每当使用者重新启动时,它都会从我不希望的主题开头读取消息。如果我在启动Consumer时添加以下配置 然后它从主题读取消息,但是如果使用者在处理所有消息之前重新启动,则它不会读取未处理的消息。 有人可以让我知道出了什么问
问题内容: 我想通过Java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符下创建一个主题,并且如果我通过java api推送消息,它也可以正常工作。但是我想通过java api创建一个主题。经过长时间的搜索,我发现了以下代码, 我尝试了上面的代码,它表明创建了主题,但是无法在该主题中推送消息。我的代码有什么问题吗?还是通过其他方式实现以上目标? 问题答案:
问题内容: 要开发我的Kafka连接器,我需要添加一个connect-API依赖项。 我应该使用哪一个? 例如mongodb连接器使用来自Maven Central的connect- api 但是来自开发人员指南的链接转到https://packages.confluent.io/maven/org/apache/kafka/connect- api/5.5.0-ccs/ ,此外还有版本。 因此,
问题内容: 我是Kafka的新手,正在使用新的KafkaProducer和KafkaConsumer,版本:0.9.0.1 在创建特定主题之后,java中是否有任何方法可以更改/更新特定主题的分区数。 我没有使用Zookeeper创建主题。当发布请求到达时,我的KafkaProducer会自动创建主题。 如果还不够,我还可以提供更多详细信息 问题答案: 是的,有可能。您必须在中访问scala类以添