我有一个微服务应用程序,它是使用camel kafka开发的,用于消费来自kafka的消息。我用4个并发实例运行这个服务,其中每个实例都有1个消费者,他们消费的主题有20个分区。 在维护期间(每天凌晨1-2点),应用程序将通过停止消费者来暂停消费。 当应用程序暂停消费时,kafka重新平衡发生,一些实例将在停止消费之前分配更多分区。 e、 由instanceA(partition1-5)使用的g分
给定主题名、分区号和偏移量,我如何从该主题中读取一条记录? 在基于Sprng引导的应用程序中,我使用Kafka导入业务数据。导入记录被发送到导入队列,并被一个或多个业务模块使用。即使消费者未能从记录中导入数据以继续从以下记录中导入数据,记录也会始终得到确认。 稍后,用户(在他/她修复了一些相关的业务数据后)可以决定重新发送一个或多个失败(但已确认)的导入记录。 每个记录的偏移量、分区号和主题名都存
注:使用kafka_2.11-0.9.0.1 我创建了一个Kafka主题,名为:
注:使用的Kafka版本:Kafka2.11-0.9.0.1 我有一个名为测试Kafka的主题,它是三个分区和一个复制因子,这个主题在每个分区中都有一些字符串数据,即键和值对。 当我试图通过消费者api获取记录时,消费者。轮询函数不获取任何记录并给出记录。count=0,但是,当我使用相同的逻辑时,只有一个单分区,那么它工作良好并获取记录。 我以为Kafka会在内部处理多个分区,并从每个分区中逐个
我们正在使用Kafka流将数据写入接收器主题。我正在运行一个avro消费者命令行来检查接收器主题中是否有数据: bin/kafka-avro控制台-消费者-主题sink.output.topic-从开始-新消费者-引导-服务器 当我在kafka streams应用程序运行时同时运行消费者时,我会看到数据,但如果我停止消费者并在几分钟后再次运行,我不会看到任何数据。几乎没有可能: 1) 这是因为Ka
我有一个场景,我将迭代一组从数据库中获取的记录,在获取后,我将迭代这些记录,并将每个记录推送到Kafka主题。现在让我们假设我已经检索了10条记录,在迭代中我推送了前5条记录,在第6条记录上有一些异常,我想还原推送到主题中的消息。这类似于数据库事务性。我们能在Kafka中实现原子性吗? 谢谢
我在sping-boot应用程序中使用sping-kafka发送数据主题。我需要从oracle表中获取数据并发送它。 我从oracle表中获取列表。如何将它们发送到主题? 即。 > 有没有办法将它们作为列表发送?如果是,如何发送?如果是,那么如何在消费者端反序列化它? 是否可以使用spring book和spring kafka以流式方式发送数据?如果是,请提供更多信息或样本/片段plz。。。 如
当前实施情况 我有一个Spring批量工作,写一个Kafka主题。我从数据库中读取记录,转换它们,然后写入Kafka主题。 对现有作业的新更改 我应该再写一个审计主题和主要主题。 对于从数据库中读取的每个记录,我正在将一条消息(例如类Abc类型)写入主主题,对于同一条记录,我假设将另一个实体类类型的消息写入审计主题。 问题陈述 目前,我正在使用不同的KakfaTemplate来写入这两个主题,但问
我正在尝试实现Spring kafka消费者,它需要在处理事件时出现某个异常后暂停(例如:在将事件信息存储到DB时,DB已关闭)。 我们如何在spring boot-2.3.8(spring kafka)中使用Resilience4j断路器方法来处理这种情况 寻找一些消费者暂停和恢复的例子。 在Kafka,listerner只是想捕捉解析错误。如果出现5个以上的解析错误,则需要停止侦听器。但我不确
包org.apache.kafka.connect.transforms.predicates; https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/topicnamematches.java https:
我正在使用Kafka连接分布。命令是:bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties 工作人员配置为: Kafka连接重新开始没有错误! java代码如下: 奇怪的事情发生了。我从kafka-logs中获取数据,但在hdfs中没有数据(没有主题目录)。我尝试connector命令: 出什
我已经设置了Spark Structured Streaming(Spark2.3.2)来阅读Kafka(2.0.0)。如果消息在Spark streaming作业启动之前就进入了主题,我无法从主题的开始消费。这是Spark streaming的预期行为吗?它忽略了Spark streaming作业初始运行之前产生的Kafka消息(即使带有.选项(“StratingOffSets”,“Earlis
我正在使用OpaqueTridentKafkaSpout来消费来自Kafka的消息。下面是代码。我忽略了配置,因为这会导致同一kafka消息在多个批处理中到达。 当Kafka喷口开始时,我得到以下错误一次,但之后运行平稳。
我使用storm0.9.4和storm-kafka:0.9.0-wip16a-scala292作为从kafka0.7读取的依赖项。 我们的Kafka保留政策是7天。 我从经纪人的最新偏移量开始读取。
这里可能发生了同样的事情:错误backtype.storm.util-Async循环死亡!BufferUnderFlowException:null,但我将添加一个完整的堆栈跟踪和一些更多的上下文。 Storm版本-9.3 Storm-Kafka版本-9.3 Kafka版本-0.8.2-beta 堆栈跟踪: Spout代码(注意,出于调试目的,我使用的是一个静态定义的分区映射,只有一个代理):