我有一个配置了spring kafka的Springboot应用程序,我想处理听主题时可能发生的各种错误。如果由于反序列化或任何其他异常而丢失/无法使用任何消息,将重试2次,然后将消息记录到错误文件中。我有两种方法可以遵循:- 第一种方法(使用带有DeadLetterPublishingRecoverer的SeekTocurInterrorHandler):- 但为此,我们需要添加主题(一个新的.
我对使用创建的主题有异议。在我的应用程序中,我有以下顺序: 创建具有1个分区的新主题,使用将复制因子设置为1 等待结果 立即向新创建的主题发送新消息(通常操作2和3之间的时间约为200毫秒)。 我的代码如下: 制作人有时看不到创建的主题,并抛出以下异常: [Producer clientId=Producer-1]获取相关id为5的元数据时出错:{targetTopic=UNKNOWN_TOPIC
可能是Kafka的复制品——该服务器不是该主题分区的领导者,但没有公认的答案,也没有明确的解决方案。 我有一个简单的java程序来向Kafka传达信息: 我得到了以下例外: 当我尝试使用时,我得到以下错误: 当我描述我的主题时,我有以下信息: 我试着创建一个新的主题,并按照《快速入门指南》中提到的那样生成消息,然后上述步骤都很有效。 我应该在或producer configuration(生产者配
我正在使用以下配置的批处理侦听,但我的消息反序列化错误: 在yml中: 使用上面的方法,我轮询了5条消息,但收到了超过100条消息,当我选中它时,将列表中的一条消息反序列化为多条消息。 我检查了我的投票配置没有工作。有人能给我建议解决办法吗 以下是我的日志:
我对Kafka2.6.0中的消息大小配置有点困惑。但让我们讲一个故事: 我们正在使用由3个节点组成的Kafka集群。到目前为止,消息的标准配置。“zstd压缩”被激活。 相关的代理配置很简单: 此时,生产者配置也很简单: 现在我们想把一个8Mbyte的消息放到一个特定的主题中。这些数据的压缩大小只有200 KB。 如果我将这些数据放入主题中,会出现以下错误: 所以我改变了生产者配置如下: 现在制作
Kafka producer正在发送.gz文件,但无法在消费者端解压缩和读取文件。获取错误为“IOError:不是gzipped文件” producer-bin/kafka-console-producer.sh--broker-list localhost:9092-topic Airport<~/downloads/stocks.json.gz 消费者- 使用者出错-
我刚接触Kafka,正在为我的新应用程序尝试一些小用例。用例基本上是Kafka制作人- 当消费时(步骤2),下面是步骤的顺序...1.消费者。轮询(1.0)1. a.产生多个主题(多个水槽代理正在监听)1.b。产生。轮询()2。每25个msgs刷新()3。提交()每个msgs(asynchCommit=false) 问题1:这个动作顺序对吗!?! 问题2:这会导致数据丢失吗?因为刷新是每25毫秒一
我从以下链接获得所有信息: https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaConsumer.html 当我们运行consumer时,我们没有从consumer端得到任何通知。请给我一点主意。
我想知道Kafka中信息的压缩大小。 我使用kafka 1.1.0和java kafka connect 1.1.0从我的制作人向主题发送消息。 如果消息对我的制作人来说太大,我会得到一个 消息序列化时为xxx字节,大于使用max.request配置的最大请求大小。大小配置。 设置最大请求。将大小设置为合适的值将导致来自代理的错误消息作为消息。代理配置中的max.bytes也必须相应地进行调整。不
我使用新的API创建了一个kafka消费者(http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaConsumer.html) 当前,使用者正在从最小的偏移量开始读取主题中的消息。我想重写这个以读取最新的偏移量。有什么关于如何做到这一点的指示吗?
我正试图找出这两种设置之间的区别。大小和缓冲区。Kafka制作人的记忆。 据我所知。大小:这是可以发送的批次的最大大小。 文档描述了缓冲区。memory as:生产者可以用来缓冲等待发送的记录的内存字节。 我不明白这两者之间的区别。有人能解释一下吗? 谢啦
null 但问题是,如果主题是动态创建的(我的意思是说在使用者代码启动之后),它将不起作用,但API说它将支持动态主题创建。这里是供你参考的链接。 Kafka版本:0.9.0.1 https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaConsumer.html 任何帮助都是非常感
我有一个Spring Boot应用程序,可以使用和。我有一位制片人。 我想写JUnit上面没有任何嘲弄类。我尝试了,但我不确定如何将其连接到我的应用程序定义的kafka代理,所以当我发送主题消息时,消费者(其中存在)应该选择消息并处理它。 有了我也得到了下面的错误。 有人能告诉我如何在不模仿任何类的情况下为我的Kafka制作人编写Junit,它应该用真实的类进行测试。
我需要一些帮助来理解我如何能够提出一个解决方案使用Spring boot、Kafka、Resilence4J来实现来自我的Kafka消费者的微服务调用。假设微服务关闭了,那么我需要使用断路器模式通知我的Kafka消费者停止获取消息/事件,直到微服务启动并运行。
我是Kafka的新手,在尝试一个示例场景时,Kafka生产者以JSON格式向消费者发送用户详细信息。我访问过类似的问题,但我无法得到我需要的答案。 如果我在终端中运行任何一个生产者或消费者,在spring boot中运行另一个生产者或消费者,我不会面临任何问题。错误发生在无限循环中(当生产者和消费者都从不同的spring boot项目启动时): 我在下面提到了消费者配置中的反序列化和受信任包: 我