我已经建立了一个实验性的Kafka环境,有3个代理和一个有3个分区的主题,我有一个生产者和一个消费者。我想为特定使用者修改分区的偏移量。我在kafka文档中读到,kafka中的使用者提交/获取API可以提交特定的偏移量或获取使用者读取的最新偏移量。以下是API的链接:
但是,我已经生成了一些消息,我的使用者已经使用了这些消息并输出每个已读消息的偏移量。
如果有人能帮上忙,我将不胜感激。我想知道我的代码的哪一部分是错的。或者可能API有问题。请不要犹豫,提出任何有用的意见。我的代码与我提供的链接中的代码完全相同。但是,如果你需要看我的代码,请告诉我把它放在这里。
Kafka版本是0.10.2.0
主题:“TestPIC3”
………………………………。
使用者配置:
props.put("group.id", "test");
props.put("client.id", "MyConsumer");
public class KafkaOffsetManage {
public static void main(String[] args) {
BlockingChannel channel = new BlockingChannel("localhost", 9095,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
final String MY_GROUP = "test";
final String MY_CLIENTID = "MyConsumer";
int correlationId = 0;
final TopicAndPartition testPartition0 = new TopicAndPartition("testpic3",0);
final TopicAndPartition testPartition1 = new TopicAndPartition("testpic3",1);
final TopicAndPartition testPartition2 = new TopicAndPartition("testpic3",2);
channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
System.out.println("+++++++++++++++++++++++++++");
System.out.println(metadataResponse.errorCode());
if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
Broker offsetManager = metadataResponse.coordinator();
// if the coordinator is different, from the above channel's host then reconnect
channel.disconnect();
channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
System.out.println("Connected to Offset Manager");
System.out.println(offsetManager.host() + ", Port:"+ offsetManager.port());
} else {
// retry (after backoff)
}
// How to fetch offsets
List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
partitions.add(testPartition0);
//partitions.add(testPartition1);
OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
MY_GROUP,
partitions,
(short) 2 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
correlationId,
MY_CLIENTID);
try {
channel.send(fetchRequest.underlying());
OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);
short offsetFetchErrorCode = result.error();
if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
channel.disconnect();
// Go to step 1 and retry the offset fetch
} else if (offsetFetchErrorCode == ErrorMapping.OffsetsLoadInProgressCode()) {
// retry the offset fetch (after backoff)
} else {
long retrievedOffset = result.offset();
String retrievedMetadata = result.metadata();
System.out.println("The retrieved offset is:"+ Long.toString(retrievedOffset));
System.out.println(retrievedMetadata);
System.out.println(result.toString());
}
}
catch (Exception e) {
channel.disconnect();
// Go to step 1 and then retry offset fetch after backoff
}
}
}
+++++++++++++++++++++++++++
0
Connected to Offset Manager
user-virtual-machine, Port:9093
------------------------
The retrieved offset is:-1
OffsetMetadataAndError[-1,,3]
Process finished with exit code 0
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.0</version>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
谢谢,
这种情况发生是因为偏移量过期。Kafka有两个参数控制着这种行为。首先是“__consumer_offsets”主题的“retention.ms”设置。它应该等于-1来禁用该主题内记录的过期。我假设使用Kafka版本1.1.x。使用命令检查主题配置:
$ ./kafka-configs.sh --entity-type topics \
--entity-name __consumer_offsets \
--zookeeper localhost:2181 \
--describe
Configs for topic '__consumer_offsets' are compression.type=producer,cleanup.policy=compact,min.insync.replicas=2,segment.bytes=104857600,retention.ms=-1,unclean.leader.election.enable=false
如果不满足配置设置,请使用命令进行更改:
$ ./kafka-configs.sh --entity-type topics \
--entity-name __consumer_offsets \
--zookeeper localhost:2181 \
--alter \
--add-config retention.ms=-1
假设设置了保留策略,接下来需要检查主题中是否有任何提交的消息。默认情况下,Kafka不允许阅读内部主题。要更改此行为,请创建一个具有使用者设置的文件:
$ echo exclude.internal.topics=false > consumer.properties
$ ./kafka-console-consumer.sh --consumer.config consumer.properties \
--from-beginning \
--topic __consumer_offsets \
--zookeeper localhost:2181 \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
[test_client,Test.Purposes,2]::[OffsetMetadata[13,NO_METADATA],CommitTime 1534165245681,ExpirationTime 1534251645681]
[test_client,Test.Purposes,0]::[OffsetMetadata[14,NO_METADATA],CommitTime 1534165245776,ExpirationTime 1534251645776]
[test_client,Test.Purposes,1]::[OffsetMetadata[8,NO_METADATA],CommitTime 1534165690946,ExpirationTime 1534252090946]
ExpirationTime = CommitTime + offsets.retention.minutes
$ date -d @1534165245
Mon Aug 13 16:00:45 UTC 2018
$ date -d @1534251645
Tue Aug 14 16:00:45 UTC 2018
正好是24小时。
因此,解决不正确偏移量问题的方法是增加“offsets.retention.minutes”设置,记住这会影响代理内存的使用,当系统中有许多死亡的使用者组时,还会定期提交未更改的偏移量以增加过期时间。
我有一个话题是两个消费群体消费的。题目中有10条留言。 现在我开始应用程序2(消费者组2),它正在消费相同的主题。它不在处理消息。当我描述kafka-consumer-groups(带有--group consumerGroup2)时,它令人惊讶地显示CURRENT-OFFSET=10和LOG-END-OFFSET=10。 理想情况下,这种情况不应该发生,并且kafka应该能够识别对于消费者组2没
我的用例是使用kafka消费者api,这样我们就可以从kafka主题中手动读取最后一次成功处理的数据的偏移量,然后手动确认Kafka的成功处理数据。(这是为了减少数据丢失)。然而,在我当前的实现中,程序向前移动并从下一个偏移读取,即使我注释掉了“ack.acknowledge()”。我是新来的Kafka和实现我的消费者下面的方式(我们使用Spring引导) 问题是:即使我注释掉ack.acknow
对于Java Kafka消费者函数,它要求传入和。但是,我认为这个seek方法将为我的使用者获取订阅的TopicPartitions的集合。 这是我试图处理的例子。 使用者A订阅主题“test-topic”分区1和2。当调用时,我从每个分区读取消息。我处理了一些消息,但我的应用程序得到了一个异常。我不调用。现在,我要倒回到在上次中检索到的偏移量,并尝试重新处理它们。那么我该怎么做呢?我是否需要检查
我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个
我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。