我想在远程位置检查Kafka消费者的连接。 可以确定是否将使用者分配给分区。 在远程位置,我可以从Kafka代理获得有关该主题的详细信息。 但是消费者能否保证消费者能够收到消费者与主题分区匹配的消息?
在Kafka文献中: Kafka的处理方式不同。我们的主题被划分为一组完全有序的分区,每个分区在任何给定时间都由一个使用者使用。这意味着消费者在每个分区中的位置只是一个整数,即要消费的下一条消息的偏移量。这使得消耗量的状态非常小,每个分区只有一个数字。这种状态可以定期检查。这使得消息确认的等价物非常便宜。 然而,按照同一份文件中的快速入门指南,我很容易就能: 使用单个分区创建主题 创建一个游戏机制
我正试图让Kafka的消费者消费一次<我的要求是: 从Topic读取数据 处理数据[涉及调用另一个API] 将响应写回Kafka 我想知道在这种情况下是否可以完全一次? 我知道用例满足Kafka流API,但我想从生产者/消费者API中知道?此外,如果让我们说在处理数据后,消费者因某种原因失败(处理应该只完成一次),那么处理此类情况的最佳方法是什么? 此类情况可以有任何延续/检查点吗? 我知道Kaf
我以前从没用过Kafka。我有两个测试程序访问本地Kafka实例:一个读卡器和一个写卡器。我试图调整我的制作人、消费者和Kafka服务器设置,以获得特定的行为。 我的作者: 我的本地kafka上存在三个主题:政策管理器-100、政策管理器-200、政策管理器-300。它们每个都只有1个分区,以确保所有消息都按kafka收到它们的时间进行排序。我的作者将随机选择其中一个主题并发布一条由一个数字组成的
我被要求评估RabbitMQ而不是Kafka,但我发现很难找到一种比Kafka更适合消息队列的情况。有人知道消息队列在吞吐量、持久性、延迟或易用性方面更适合的用例吗?
我的应用程序使用一台机器上运行的Kafka服务器上的消息,然后将它们转发给另一台在其他实例上运行的远程Kafka服务器。在我将应用程序部署到Cloud Foundry并向第一台Kafka服务器发送消息后,应用程序按预期工作。消息被消费并转发到远程Kafka。 然而,在这之后,我在Cloud Foundry(以及在我的本地机器上以较慢的速度)中得到了下面的无限循环异常: StackTrace: 我的
我目前正在使用SSL身份验证配置Apache Kafka,在启动服务时遇到错误。代理程序似乎启动正确(似乎发生了领导人选举等),但一旦开始执行任何集群操作,我就会在日志中不断看到下面的错误。 尝试重新创建密钥和信任存储,尝试从代理间侦听器中删除SSL(这会导致匿名主体,我不想授予对任何资源的访问权)。 要解释我的配置: 使用SSL principal builder运行Kafka 2.2 我设置了
目标是:开发一个自定义Kafka连接器,该连接器以循环方式从websocket读取消息。我试着给你们举一个我所认识到的例子: 我创建了一个接口IWebsocketClientEndpoint 以及实现上述接口的类: WebsocketClientEndpoint类专用于创建websocket并管理连接、断开连接、发送和接收消息。 目标是:如何在Kafka连接结构中调整我的websocket结构?我
在启动应用程序时,Kafka流出现了奇怪的错误 结果,关于失败流的错误:
我正在尝试使用Kafka流来处理Kafka主题中的一些数据。数据来自Kafka0.11.0编写的Kafka主题,该主题没有嵌入时间戳。在网上读了一些书之后,我明白了我可以通过在自定义类中扩展类并将其传递到中来解决这个问题。 我是这样做的- 我基于github上的这段代码 但是,当我运行
使用kafka-stream0.10.0.0,我在转发消息时定期在StreamTask中看到空指针异常。它在10%到50%的调用之间变化。NPE发生在这个方法中: 似乎在某些情况下,thisNode字段为空。知道是什么导致了这种情况吗?堆栈跟踪在下面。
我有一个kafka streams拓扑,它从输入主题中读取更新某些状态,并确定状态条目是否需要保留在状态存储中,或者可以删除。如果可以删除,它将被删除,否则我有一个标点器,每10秒运行一次,并使状态存储中的项目过期。 我最近发现标点符号在同一个流线程上运行,并且可能会阻塞流的处理。 我可以使用哪些模式在单独的线程池中执行标点符号内部的逻辑以避免阻塞流处理? 谢谢你的帮助。
在stream应用程序内的单个任务中,以下两种方法是否独立运行(这意味着当方法“process”处理来自上游源的传入消息时,方法“标点”也可以基于指定的时间表和作为标点类型的WALL_CLOCK_TIME并行运行?)或者它们是否共享同一个线程,因此它是在给定时间运行的线程,如果是这样的话,如果进程方法不断从上游源获取消息,标点符号方法是否永远不会被调用? > ProcessorContext。调度
我正在运行一个简单的Kafka streams应用程序,它将使用Node JS记录的信息带到一个Kafka主题。 还需要注意的是,时间戳只是一个数字,表示自1970年6月以来的秒数。 我使用scala中的Kafka流来使用这些数据。 例如。 然而,我不确定如何将时间戳(我从nodeJS发送的)提取到这个流中。 例如,如果我尝试做这样的事情 这会导致错误“无法解析符号流”。我在想我该怎么解决这个问题
假设有Kafka主题顺序。数据以JSON格式存储: 定义订单的状态(待定-1,已完成-2)。 完成后如何在“已完成”上进行更改? 正如我所知,Kafka主题是不可变的,我不能更改消息JSON,只需创建一个带有更改值的新消息,对吗?