我们有一个要求,即给定Kafka分区的消息跨越组成消费者组的集群中的所有节点,应该总是一次执行一条消息,没有重叠。它们被处理(稍微)无序是可以容忍的,但是不允许时间重叠。 在重新平衡期间,我们如何才能安全——例如,假设我们自动缩放我们的消费者,并为同一个消费者组启动一个新消费者——那么新消费者将不得不接管同一个消费者组中现有消费者的分区。 对于一个特定的分区P,让我们假设使用者c1以前处理过分区P
随着JavaKafka消费者寻求()它需要我们传入TopicPartion和Offest。但是,我认为这个查找方法会为我的消费者收集订阅的Topic分区。 这是我试图处理的例子。 消费者A订阅了主题“测试主题”分区1和分区2。调用时,我从每个分区读取消息。我处理一些消息,但我的应用程序出现异常。我不调用。现在,我想倒带到上一次中检索到的偏移量,并尝试重新处理它们。那我该怎么做呢?我是否需要检查每个
我已经用Storm构建了一个示例拓扑,使用Kafka作为源。这是一个我需要解决的问题。 每次我杀死一个拓扑并重新启动它时,该拓扑都从一开始就开始处理。 假设Topology处理了主题X中的消息A,然后我终止了该拓扑。 现在,当我再次提交拓扑时,消息A仍然存在,主题X再次被处理。 有没有一个解决方案,也许某种偏移管理来处理这种情况。
我有几个问题。1)我们能不能在k8集群之外以上述方式访问k8集群之外的Kafka?如果是这样,我在某种程度上做错了吗?2)如果端口是打开的,那不是意味着代理是可用的吗? 如有任何帮助,不胜感激。谢谢
什么是port和targetport? 是否为每个代理设置LoadBalancer服务? 这些多个代理是否映射到cloud LB的单个公共IP地址? K8S/Cloud之外的服务如何访问单个代理?通过使用?或者使用?。还有,这里用的是哪个端口?还是? 如何在Kafka Broker的属性中指定此配置?对于k8s集群内部和外部的服务,As端口可能不同。 请帮忙。
kubectl对kafka的描述也显示了暴露的节点 我有一个出版商二进制文件,将一些信息发送到Kafka。由于我有一个3节点集群部署,我使用我的主节点IP和Kafka节点端口(30092)与Kafka连接。 但是我的二进制文件正在获得错误。我无法理解为什么即使在nodePort到targetPort转换成功后,它还是被拒绝。随着进一步的调试,我在kafka日志中看到了以下调试日志:
我正在尝试为Kafka Connect REST API(2.11-2.1.0)配置SSL。 问题所在 我尝试了两种配置(工人配置): 带有前缀 < li >并且不带< code>listeners.https.前缀 两种配置都启动正常,并在尝试连接到https://localhost:9000时显示以下异常: 在日志中,我看到SslContextWorks是使用任何密钥库创建的,但使用密码: 因
我面临的情况是,多个开发人员正在开发特定的kafka源或接收器连接器配置。连接器通过kafka connect REST API进行创建、更新、删除等操作。必须保证 开发人员从不删除或修改其他人的连接器。 开发人员只能管理特定连接器的例如“streams.kafka.connect.sink.Neo4jSinkConnector” 有什么方法可以配置Kafka,将Kafka-Connect RES
在标准/自定义kafkaconnect接收器中,我们如何指定它应该只使用来自kafka主题的read_comitted消息。我可以在这里看到配置,但看不到任何选项(除非这是默认行为)。谢了。https://docs . confluent . io/current/installation/configuration/connect/sink-connect-configs . html
我正在使用Kafka-Connect API实现一个自定义源连接器,可用于轮询REST-API并将JSON响应沉入Kafka主题。 现在我想知道如何实现SourceTask的轮询间隔,JDBC连接器如何提供一个轮询间隔。在某个地方,我必须将线程设置为睡眠状态,但是我必须在哪里执行此操作?
这个问题似乎不是关于特定的编程问题、软件算法或主要由程序员使用的软件工具。如果您认为这个问题在另一个Stack Exchange网站上是主题,您可以留下评论来解释这个问题在哪里可以得到回答。 我们构建了一个定制的Kafka Connect sink,它反过来调用一个远程REST API。我如何将背压传播到Kafka Connect基础设施,以便在远程系统比内部使用者向put()传递消息慢的情况下,
如上所述,我目前正在设置一个Kafka Connect Sink,将数据从Kafka传输到Google云存储中。 然而,一切都进展顺利——它只使用最新的可用偏移量。也就是说,一旦它开始运行,它只将新产生的消息下沉到GCS,而不是来自Kafka的已经存在的消息。我已经尝试删除kafka连接存储/偏移主题,创建一个新的连接器名称等。但是,它总是从最新的偏移量开始。 如果无论如何要为Kafka Conn
我正在使用弹性搜索Kafka连接在独立模式下。我不困惑使用哪种配置来启动Kafka连接并从最后一个故障点开始。 例如,生产者将继续推动记录进入Kafka和消费者,因为弹性搜索接收器连接器正在消费,现在我的由于某种原因我的消费者下降了,但我的骄傲将继续推动信息进入Kafka。现在,当我修复了ES sink连接器端的问题后,如果我重新启动ES sink连接器,它应该从上次故障中选择,而不是从开始或最近
我试图为我的Kafka Connect Sink指定一个主题分区。特别是,我正在使用DataStax Apache Kafka连接器。 有大量与为 Kafka 使用者指定主题分区相关的文档和资源,例如: https://kafka-tutorials.confluent.io/kafka-console-consumer-read-specific-offsets-partitions/kafka
我已经在 2 个节点上部署了 Kafka connect (2.0.0) 集群,这是 Hortonworks 数据平台 3.1.4 的一部分。它被成功部署,kafka 连接也启动了。我能够使用 REST 命令通过 GET 方法列出连接器。 问题是,当我试图在集群模式下使用POST方法创建连接器时,它无法创建连接器,或者如果它确实创建了连接器,则无法创建任务。我在日志中看到以下错误 经过一些跟踪和错