使用此Kafka Connect连接器: https://www.confluent.io/hub/confluentinc/kafka-connect-s3 我手动将其安装到我的kafka Connect Docker映像的插件中。我的目的是使用Kafka Connect将来自Kafka主题的Avro记录写入S3。 在运行时,使用Kafka Connect,我会得到以下错误: 在ConFluen
试图弄清楚我是否可以使用spring kafka和spring kafka测试为@KafkaListener编写单元测试。 我的听众课。 我的测试类别: 我的测试配置类: 有什么简单的方法可以做到这一点吗? 或者我应该以其他方式测试@KafkaListener?在单元测试中,如何确保在Kafka中收到新消息时调用@KafkaListener。
我有一个主题a,有12个分区。我在一个集群中有3个Kafka经纪人。对于主题A,每个代理有4个分区。我没有创建任何副本,因为我不关心恢复能力。 我有一个简单的Java消费者使用kafka客户端库。我在属性中提到了以下内容 消费者记录和打印记录的代码更多,工作正常。我在主题中有12条消息,我通过“kafka-run-class.shkafka.admin.消费者组命令”验证了每个分区中都有一条消息。
我知道每个分区分配给一个Kafka消费者(在消费者组内),但一个Kafka消费者可以同时使用多个分区。如果每个用户都有一个到分区的开放连接,那么我可以想象每个用户都有成千上万个打开的连接。如果这是真的,那么在决定分区数量时,这似乎是需要注意的,不是吗?
我有一个Kafka消费者,其中消息通过HTTP POST调用传递给另一个应用程序。我还使用手动提交偏移量 确认。确认(); 有一些HTTP返回错误代码,我们忽略错误并提交偏移量,还有一些错误代码我们不提交偏移量。问题是,kafka使用者仅在我重新启动使用者时才轮询未提交的消息。如果分区中有未提交的消息,是否还有轮询消息的地方?
如果我需要使用特定消费组的最新提交偏移量(用于从Spark结构化流开始偏移),我应该使用什么。 我的代码显示已弃用。 官方文件: 偏移量和使用者位置Kafka为分区中的每个记录维护一个数字偏移量。该偏移量充当该分区内记录的唯一标识符,还表示使用者在分区中的位置。例如,位于位置5的使用者已使用偏移量为0到4的记录,并将接下来接收偏移量为5的记录。实际上,与消费者的用户相关的位置有两个概念:消费者的位
我有: 连接的Kafka消费者 此外,我有一个方法,它接受两个参数:消费者和一个重新平衡侦听器,该侦听器跟踪分配给消费者的分区 此方法在计时器上运行,其目标是处理记录,直到没有剩余的记录可读取,或者直到所有分区中的某个最长时间。 由于重新平衡可能发生在使用过程中(在consumer.poll()已触发多次之后),因此我希望检测此情况,重置并从所有分配的分区(即使已分配)的最后提交偏移量开始重新启动
如果我在这个问题中使用最上面的答案,请清除Kafka队列并更改日志。保持ms,在我的主机意外重启后,此更改是否会持续?
我遇到了以下关于从生产者同步发送。我知道上下文生产者中的异步机制 在此将来调用get()将阻塞,直到相关请求完成,然后返回记录的元数据或引发发送记录时发生的任何异常。 什么是真正的意思相关联的请求完成,我是相当这不是指完整的请求,但在什么程度上这个短语是指?直到经纪人?直到生产者等使用的缓冲区...? 当ack=all与同步生产者和异步生产者一起使用时,它有什么不同?两个场景都被阻塞以进行确认?
我正在考虑创建一个独立的Kafka生产者,它作为守护进程运行,通过套接字接收消息,并将其可靠地发送给Kafka。 但是,我决不能是第一个想到这个想法的人。这样做的目的是避免使用PHP或Node编写Kafka生成器,而只是通过套接字将消息从这些语言传递到独立的守护进程,这些语言负责传递,而主应用程序则一直在做自己的事情。 此守护进程应负责在发生中断时进行重试传递,并充当服务器上运行的所有程序的传递点
从这篇文章https://www.confluent.io/blog/transactions-apache-kafka/ 使用为至少一次交付语义配置的vanilla Kafka生产者和消费者,流处理应用程序可能会以以下方式完全丢失一次处理语义: 制片人。由于内部重试,send()可能导致重复写入消息B。这是由幂等生产者解决的,而不是本文其余部分的重点 2.我们可能会重新处理输入消息A,导致重复的
Kafka文件说,幂等生产者是可能的,与相同的生产者会话,我无法理解这一点。 比方说,Kafka为每条消息添加序列号,最后一个序列号在Kafka中维护(不确定它维护在哪里)。 它是如何生成序列号的,它保存在哪里? 为什么当制作人崩溃并再次出现时,它不能保持序列? 我怎样才能使它在制作人会话之间真正幂等?
我需要更改Kafka设置中的值序列化器/反序列化器(出于测试目的,我一直在使用IntegerSerializer/IntegerDeserializer)。使用javaapi,它完全按照预期工作;但是,当使用控制台工具时,它似乎无法正常工作。 我所做的所有故障排除都让我得出了一个结论:Kafka控制台制作人似乎忽略了序列化程序选项。我尝试了和,并将其设置为带有参数。 它不仅不能将数据序列化为整数,
我已经启动了我的zookeeper和Kafka服务器。我开始制作Kafka,它发送10条主题为“xxx”的消息。然后阻止了我的Kafka制作人。现在我开始使用Kafka,并订阅了主题“xxx”。我的消费者使用我的Kafka制作人发送的10条消息,这10条消息现在没有运行。我需要我的Kafka使用者只能使用来自运行Kafka服务器的消息。有没有办法做到这一点?以下是我的消费者财产。
我有以下代码 消费者订阅的主题会不断收到记录。有时,消费者会因处理步骤而崩溃。然后,当使用者重新启动时,我希望它从主题的最新偏移量开始使用(即,忽略在使用者关闭时发布到主题的记录)。我认为方法可以确保这一点。然而,这种方法似乎毫无效果。消费者从其崩溃的偏移量开始消费。 什么是正确的方式使用? 编辑:使用以下配置创建消费者