我看到了一些奇怪的行为。我使用Flink 1.12编写了一些Flink处理器,并试图让它们在Amazon EMR上运行。然而,Amazon EMR目前只支持Flink 1.11.2。当我降级时,我莫名其妙地发现水印不再传播。 主题上只有一个分区,并行度设置为1。这里有我遗漏的东西吗?我觉得我有点疯了。 这是Flink 1.12的输出: 这是Flink 1.11的输出: 下面是公开它的集成测试:
我以前是学习Kafka的传统ActiveMQ用户。我有一个问题。 使用Active MQ,您可以执行以下操作: 将100条消息提交到队列中 我试着在Kafka做同样的事情 如果不启动Consumer,等待它启动,然后运行producer,则此示例不起作用。 谁能告诉我如何修改我的示例程序,以在消息等待被消费的地方执行操作?
我正在编写一个将处理事件消息的应用程序(发布到主题)。我有多个endpoint应该使用这些消息(和),出于实用的原因,我想将这些endpoint一起部署在一个聚合包中。 有了Spring Cloud Stream,我可以使用Spring。云流动绑定。文件上载已完成。group=元数据读取器,为第一个endpoint设置消费者组;我还想在配额检查组下处理消息,但基于属性的配置只允许每个消息队列绑定一
我有一个azure函数v3,用C#写的,用类库的方法。 < li >该功能由blob存储触发。 < li >该功能正在使用消费计划。 问题是冷启动可能长达30分钟!我已经查阅了这个链接的文档 但是没有关于预期冷起动正时的具体数字。 一个有趣的观察是,如果我导航到门户,并点击刷新按钮: 那么该功能会立即被触发。 这是正常的预期行为吗 您能给我指一下任何明确说明消费计划中冷启动时间为0-30-50分钟
我正在使用spring boot构建一个web应用程序,现在我需要接收实时通知。我正计划使用apache kafka作为这方面的消息代理。要求用户具有不同的角色,并且根据角色,他们应该接收其他用户正在执行的操作的通知。 我设置了一个生产者和消费者,作为消费者,我可以接收发布到一个主题的信息,比如说topic1。 我遇到的问题是,我可以让多个用户收听同一个主题,而每个用户都应该得到发布到该主题的消息
我有一个关于2个代理上的3个分区的主题。(Kafka版本:0.8.1) 使用不同的用户guid(如:FC42B34DD7658503E040970A2C437358)作为分区密钥批量添加消息。(约10K条消息) 在加载消息时,我有一个正在运行的消费者(consumer1),它开始很好地处理消息。 然后我用相同的消费者组ID启动了另一个消费者(consumer2)。 我希望两个消费者都应该分配负载。
我看不出我做错了什么。如有任何帮助,不胜感激。
我对再平衡有些怀疑。现在,我正在手动将分区分配给使用者。因此,根据文件,如果消费者离开/崩溃在一个消费群体中,就不会有再平衡。 假设同一组中有3个分区和3个使用者,每个分区都是手动分配给每个使用者的。一段时间后,第三个消费者倒下了。既然没有再平衡,我可以采取什么措施来确保停机时间最小化?我是否需要更改前两个分区中任何一个的配置,以从第三个分区或其他分区开始使用?
我在 Java 中有以下场景: 1生产者线程将事件对象存储到队列中。阻塞它不是一个选项。它应该始终将每个元素存储在队列的末尾并退出(因此没有有界队列)。 1个消费者线程等待队列中有WINDOW_SIZE数量的事件。然后它应该从队列中检索所有WINDOW_SIZE事件进行处理,但只删除其中的一半(即WINDOW_SIZE/2),重叠率为50%。 我的问题是,您将使用哪个(并发)集合来高效地实现这一点
我试图用阻塞队列实现一些消费者-生产者问题。为了达到某种目的,我决定编写文件搜索工具。 我认为搜索机制是递归工作的,每个新目录都将有新的线程池来提高搜索速度。 我的问题是,我不知道如何实现最终停止打印线程(消费者)的机制——当搜索线程完成工作时。 我试图用一些想法来做到这一点,比如毒丸,但它效果不佳(线程在打印任何结果之前停止)。任何想法我该怎么做? 下面是一些代码: 搜索机制: } 打印机: }
我正在开发一个Spring Boot应用程序,它使用以Kafka主题为源的Spring集成流。我们的集成流程开始使用一个包含带有spring framework . cloud . stream . annotation . input和Output注释的SubscribableChannels的接口。这些被配置为通过spring . Cloud . stream . Kafka . bindin
我并不是在寻找API来完成这个内部实现细节。 我知道最新版本的Kafka在一个特殊的Kafka主题__consumer_offset中为消费者群体存储偏移量。 我的问题是: 这个主题中的数据结构到底是什么? 当一个消费者群体死亡并出现时,Kafka如何在Topic Partitions中查找该消费者群体上次消费的偏移量? 就我的理解而言,Kafka主题不适合查找数据:例如:用于查询,例如: 基本上
我想知道一个使用者如何从多个分区使用消息,具体来说,从不同的分区读取消息的顺序是什么? 我看了一眼源代码(Consumer,Fetcher),但我不能完全理解。 这是我以为会发生的: 分区是顺序读取的。也就是说:在继续下一个分区之前,一个分区中的所有消息都将被读取。如果我们达到< code>max.poll.records而没有消耗整个分区,则下一次读取将继续读取当前分区,直到耗尽为止,然后继续下
我试图授权用户使用Oauth2从我的Django REST框架API访问一些资源。 大多数关于Oau2和API的答案都涉及使API成为提供者。 但是我打算和很多REST APIs共用一个Oauth2提供者,想不通怎么消费(不是怎么提供Oauth2)。 我不知道用户如何登录提供程序 SSO,然后将其令牌传送到我的消费 API,该 API 必须针对我的提供程序对用户进行身份验证(获取其信息,主要是授权
有人能解释一下为什么下面的测试失败了吗? 我试图进一步简化“不好”的观察结果,但找不到任何可以删除的东西来简化它。 然而,我目前的理解是,它是一个可观察的(不管它是如何构造的),应该发出一个值,然后完成。然后,我们基于该可观察对象制作两个类似的对象实例,并在那些使用可观察对象的对象上调用一个方法,记下已经这样做了,然后返回Observable.empty()。 有人能解释为什么使用这个可观察的会导