我正在设计一个ApacheStorm拓扑(使用streamparse),它由一个喷口(ApacheKafka喷口)和一个具有并行性的螺栓构建
螺栓分批读取信息。如果批量成功完成,我手动提交apache kafka偏移。
当mysql上的螺栓插入失败时,我不会在Kafka中提交偏移量,但是一些消息已经在喷口发送到螺栓的消息队列中。
应该删除队列中已经存在的消息,因为我无法在不丢失先前失败消息的情况下推进kafka偏移量。
streamparse中是否有方法在启动时清除队列中已经存在的所有消息或使其失败?
我不知道streamparse,但我得到的印象是,您希望将元组捆绑起来,并将它们作为批处理编写。假设您已写入偏移量10。现在,您的螺栓收到偏移量11-15,批处理写入失败。偏移量15-20已排队,您不希望立即处理它们,因为这样会无序处理批次。
这种理解正确吗?
首先,我将放弃手动提交偏移量。你应该让壶嘴来处理。假设您使用的是storm kafka client
,则可以将其配置为在确认相应的元组和所有前面的元组后仅提交偏移量。
您可能应该做的是在螺栓中跟踪(或者更好的是,在您的数据库中)失败批次中的最高偏移量。然后,当您的Bolt未能写入偏移量11-15时,您可以使用偏移量使Bolt失败每个元组
此解决方案假设您不在喷口和写入器螺栓之间对消息流进行重新排序,因此消息按照它们发出的顺序到达螺栓。
我有一个kafkaspout,两个bolt用于处理数据,两个bolt用于在mongodb中存储处理过的数据 我正在使用apache flux创建拓扑,在那里我将数据从Kafka读取到spout。一切都运行得很好,但每次运行拓扑时,它都从一开始就处理kafka中的所有消息。并且一旦它处理了所有的消息,它就不会等待更多的消息和崩溃。 我如何使Storm拓扑只处理最新的消息。 这是我的拓扑文件.yaml
我想了解为什么我不能在LinearLayoutManager上使用scrollToPositionWithOffset方法?请看图片了解我的意思: 一点背景: 图像中的第一行(带有)是滚动RecyclerView以使位置(在本例中为50)可见-这通常意味着所选位置显示在可见的RecyclerView的底部(其中位置50在“滚动”后首先可见)。而我总是想在顶部展示它。根据我的研究,使用这个方法似乎是
我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。
我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?
我用Kafka壶嘴来消费信息。但是,如果我必须更改拓扑并上传,那么它将从旧消息恢复还是从新消息开始?Kafka壶嘴给了我们从哪里消费的时间戳,但我怎么知道时间戳呢?
我一直在使用IDE(Eclipse)测试Storm拓扑的远程提交。我成功地将简单的storm拓扑上传到了远程storm集群,但奇怪的是,当我检查storm UI以确定远程提交的拓扑是否正常工作时,我在UI中看到了just_acker bolt,但其他bolt和spout却不在那里。之后,我从命令行手动提交了拓扑,并再次检查了Storm UI,它正常工作,没有问题。我一直在找原因,但没有找到。我在下