我们有一个Kafka主题,从事务性生产者那里读取消息。我们希望将kafka connect消费者设置为只读提交的消息。
由于我是Kafka的新手,我需要这里的专家帮助我设置它。根据我最初的研究,我明白我需要在我们的kafka属性中设置isolation.level=read_committed。
现在我有以下问题
我们在/confluent/bin文件夹中放置了Kafka外壳脚本。我的理解是,我需要通过调用这些shell脚本来执行这些命令。
如果需要更多信息,请告诉我。
任何帮助都将不胜感激。谢谢!!!
我指的是正确的属性吗
-是
设置后如何测试我的Kafka连接器是否只读取已提交的消息
-你可以简单地用Java编写一个Kafka制作人,在两个不同的事务中发布两条消息(T1中的m1和T2中的m2)。中止事务T1并提交事务T2。在read committed连接器中,应该只看到m2,而不是m1。
通过命令行我可以看到(LSO)最后一个稳定的Offsetid是什么吗
-如果你说的稳定是指上次读取提交的偏移量,我认为这是不可能的。您可以获得最后一个偏移量,如下所述:Kafka主题的每个分区中的提交数和偏移量
我第一次在kafka中使用Node,使用Kafka-Node。使用消息需要调用外部API,这甚至可能需要一秒钟的时间来响应。我希望克服我的消费者的突然失败,这样,如果一个消费者失败了,另一个将替换它的消费者将收到相同的消息,即它的工作没有完成。 我正在使用Kafka0.10并尝试使用ConsumerGroup。 我想到了在options中设置,并且只在消息的工作完成后提交消息(就像我以前对一些Ja
你好,我一直在使用Spring Kafka活页夹作为消费者。通过查看日志,我能够连接到主题,尽管我不确定它为什么不处理来自制作人的任何消息。 你知道可能遗漏了什么吗?非常感谢。 聚甲醛 应用程序YML 消费者阶层 侦听器类 日志 从日志中可以看到,它能够连接到主题。虽然我不确定为什么我没有收到来自生产者的任何消息。是因为分区被撤销吗?这与为什么我没有收到任何消息有关吗?生产者来自第三方,他需要做些
customProcessor.java 谢谢你的帮助!
我在java中有一个函数,在这个函数中我试图获取未读的消息。例如,如果我在broker中有偏移量为0、1、2的消息,这些消息已经被使用者读取,并且如果我关闭我的使用者一个小时。那时我产生的信息偏移量为3,4,5。之后,当我的消费者启动时,它应该从偏移量3读取消息,而不是从0读取消息。但是,它要么读取所有的消息,要么读取启动Kafka Consumer后产生的消息。我想读那些未读或未提交的消息 我尝
根据我的理解,消费者阅读特定主题的消息,并且消费者客户机将定期提交偏移量。 因此,如果由于某种原因,使用者失败了一个特定的消息,该偏移量将不会被提交,然后您可以返回并重新处理该消息。 是否有任何东西跟踪您刚刚消耗的偏移和您随后提交的偏移?
是否有方法从oracle数据库获取只读JDBC连接。通常,我需要一个jdbc url参数来启用它,比如: 我正在使用薄驱动程序