当前位置: 首页 > 知识库问答 >
问题:

使用Alpakka连接器的多个使用者线程

能烨华
2023-03-14
public Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> source() {
Source finalSource = Source.empty();
        for (int index = 0; index < consumerConfig.getNoOfConsumers(); index++) {
            finalSource = finalSource.merge(Consumer.committableSource(consumerSettings, subscription));
        }
return finalSource;
}

共有1个答案

黄浩涆
2023-03-14

你凭什么相信你需要更多的线程?此外,您还希望跨多个流共享单个Kafka消费者客户端实例。

您不应该将来自多个consumer.committableSource元素合并到一个流中,因为它不能用于批提交。

多次运行相同的流设置会解决您的需求吗?

 类似资料:
  • 下面的代码片段是从JoinedStreams的javadoc复制的 这两个流仅基于一个键(通过< code>t =计算)进行连接 我会问我如何基于多个键进行连接,例如,one.a = two.a和

  • 我有一个任务来创建一个独立的java应用程序来做以下事情: 解析一个。csv文件。(这将有大约300万条记录) 对于每条记录,在几个DB表中插入约15行 如果出现错误,则输出。csv记录是否已成功处理 我目前的设计思路是: 读这本书。csv文件(这一步还没有考虑太多) 每个线程都有逻辑: 创建数据库连接

  • 假设我们有一个AWS FIFO SQS队列和两个消息生产者A和B。每条消息都发送了一个等于生产者名称的组ID。换句话说,生产者A将组ID“A”添加到每条消息中,生产者B将组ID“B”添加到每条消息中。我们还有3个消费者X、Y和Z正在使用可见性超时的消息。让我们假设队列中有5条消息——3条来自生产者A的消息和2条来自生产者B的消息。见下图 考虑到给定的条件,我们将有以下工作流程: 其中一个消费者,例

  • 我正在尝试使用条件连接多个表,但遇到了一些问题,请帮助我:我有一个sql查询,如: 这三个表:截止时间(has country)、国家、交易所(has country)是3个实体类。 我如何使用hibernate标准像这样加入,我下面的代码仍然不完整:

  • 为了实现这一点,我使用了队列/线程池机制。最初,我创建一个固定数量线程的池,并有一个队列datastructure来存储客户机地址。这个队列在所有线程之间共享,因此我使用“互斥”来锁定/解锁这个队列。在主服务器线程中,我创建一个套接字,将其绑定到全局端口/地址,然后在“recvfrom”调用上阻止服务器。任何希望与服务器通信的客户端都会向侦听全局端口地址的主线程服务器发送“HI”消息。主服务器线程

  • 我有一个使用KCL 2. x从Kinesis消费记录的应用程序,不同流分片中存在的数据格式不同,我想使用具有不同配置的不同KCL消费者单独处理它们。例如,如果我在Kinesis流中有3个分片,我将为不同的分片生成3个具有不同配置的不同消费者,即每个分片1个消费者。当我创建多个消费者时,我面临的问题是,如果我通过java Code配置3个不同的消费者,那么他们中的任何一个都在所有分片上出现租赁锁,而