我观察到,运动流中存在一些记录,但KCL消费者应用程序尚未收到这些记录。发生这种情况的原因是什么?之前和之后的记录都很好。
所有异常都在我的应用程序中消耗,KCL不会收到。所以http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#w1ab1c11c11c15b9的情况也没有发生。
一些细节:打开分片数:4关闭分片数:3运行的工作人员数:5
我从我的Kinesis producer获取了序列号,并在aws cli中使用AT\u sequence\u number作为碎片迭代器类型直接获取了记录,我可以看到这些记录。
因此,流中有它们,但KCL应用程序没有收到它们。
查看运动响应中“millis_behind_latest”的值。如果值为~86399000
当您使用shard\u迭代器检索记录时,该记录已不在流中,因为已超过记录的保留期。因此,您会得到一个空的结果,因为最旧的记录已经过期,不再存在于数据流中。因此,shard\u迭代器现在指向磁盘中的一个空白空间。
当这样的事情发生时,取“next_shard_iterator”的值并使用get_records再次获取运动数据记录。也许数据没有存储在并发/连续内存内存块中,因此我们在数据检索之间得到空结果。
继续取“next_shard_iterator”的值并使用get_records,直到“millis_behind_latest”的值为0。
您肯定会将所有数据添加到运动流中。
希望这个答案有帮助。:)
我有一个 Kafka 应用程序,我一直在使用它 kafka-console-consumer.sh 使用消息,如下所示: 它提供了我通过Kafka消费者给Kafka经纪人写的所有消息,没有任何遗漏。 最近,我将该应用程序部署在另一个环境中,因为某些原因,zookeperhost无法访问。所以我使用的是kafka简单的消费者外壳。sh,如下所示: 但是有了这个,我看到很少的消息(大约5000个中有2
我是一个新的Kafka和使用Apache kafka消费者读取消息从生产者。但当我停下来开始一段时间。之间产生的所有消息都将丢失。如何处理这种情况。我正在使用这些属性“auto.offset.reset”、“latest”和“enable.auto.commit”、“false”。 这是我正在使用的代码。任何帮助都是感激的。
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我正在尝试使用ReplyingKafkaTemplate,偶尔会看到下面的消息。 没有待处理的回复:消费者记录(主题=请求-回复-主题,分区=8,偏移量=1,创建时间=1544653843269,序列化密钥大小=-1,序列化值大小=1609,标题=记录标题(标题=[记录标题(键=kafka_correlationId,值=[-14,65,21,-118,70,-94,72,87,-113,-91,
你好,我正在使用Spring云流编写一个Kafka消费者生产者。在我的消费者内部,我将数据保存到数据库中,如果数据库出现故障,我将手动退出应用程序。重新启动应用程序后,如果数据库仍然关闭,则应用程序将再次停止。现在,如果我第三次重新启动应用程序,中间间隔(两次失败)收到的消息丢失,kafka 消费者会获取最新消息,也会跳过我退出代码的消息。 入站和出站通道绑定器接口 服务等级- 1)生产者服务 2
我有一个使用Kinesis客户端库(KCL)编写的Kinesis消费者。此使用者在假定的IAM角色下运行。 我从文档中了解到: KCL使用应用程序名称创建一个DynamoDB表,并使用该表维护应用程序的状态信息(例如检查点和工作分片映射)。每个应用程序都有自己的DynamoDB表。有关详细信息,请参阅跟踪Amazon Kinesis数据流应用程序状态。 当然,我需要将dynamodb:Create