当前位置: 首页 > 知识库问答 >
问题:

为Azure Cosmos运行多个ChangeFeedProcessor实例

谭玉泽
2023-03-14

我试图运行两个ChangeFeedProcessor实例,它们都指向同一个集合,并在一个Cosmos帐户中使用同一个租约集合。我在这两个实例中都指定了唯一的主机名

我的意图是根据逻辑分区(根据Microsoft文档)将提要负载分布在各个实例之间

当我试图启动第二个实例时,我在控制台中得到以下异常。

com.azure.data.cosmos.internal.changeFeed.implement.exceptionClassifier.classifyClientExceptionClassifier.java:56)com.azure.data.cosmos.internal.partitionprocessorimpl.lambda$run$0(PartitionProcessorimpl.java:115)reactor.core.publisher.monorunnable.block(Monorunnable.java:66)com.azure.data.cosmos.internal.chang在java.util.concurrent.threadPoolExecutor.runworker(threadPoolExecutor.java:1149)在java.util.concurrent.threadPoolExecutor.javer.run(threadPoolExecutor.java:624)在java.lang.thread.run(threadPoolExecutor.javer.run(threadPoolExecutor.java:748)在com.azure.data.cosmos.internal.changeed.implementation.Exception(Ternal.changeFeed.implementation.PartitionProcessorImpl.lambda$run$0(PartitionProcessorImpl.java:115)在reactor.core.publisher.monorunnable.block(Monorunnable.java:66)在com.azure.data.cosmos.internal.changeFeed.implement.PartitionSupervisorImpl$1.run(PartitionSupervisorImpl$1.run(PartitionSupervisorImplow:89)在24)在java.lang.thread.run(thread.java:748)在com.azure.data.cosmos.internal.changeFeed.implement.exceptionClassifier.ClassifyClientException(exceptionClassifier.java:56)在com.azure.data.cosmos.internal.changeFeed.implementation.partitionProcessorImpl.lambda$run$0(PartitionProcessorImpl.java:115)...etc

我使用了下面的maven依赖项

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-cosmos</artifactId>
    <version>3.0.0</version>
    <exclusions>
        <exclusion>
            <artifactId>slf4j-api</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>slf4j-log4j12</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>guava</artifactId>
            <groupId>com.google.guava</groupId>
        </exclusion>
    </exclusions>
</dependency>

代码段

  1. 创建ChangeFeedProcessors列表(用于数据库中找到的所有容器)

        //FEED DATABASE
        CosmosDatabase feedDatabase = cosmosClient.getDatabase(cosmosDbName);

        //LEASE DATABASE
        CosmosDatabase leaseDatabase = cosmosClient.getDatabase(cosmosDbName + LEASES);

        //List of Containers in Feed Database
        List<CosmosContainerProperties> containerPropertiesList = null;
        try {
            Flux<FeedResponse<CosmosContainerProperties>> containers = feedDatabase.readAllContainers();
            List<FeedResponse<CosmosContainerProperties>> list = containers.toStream().collect(Collectors.toList());//Abhishek Optimize
            containerPropertiesList = list.get(0).results();
        }
        catch (Exception e) {
            System.out.println("Fail to query Containers");
            throw new ServiceException("Fail to query Containers");
        }

containerPropertiesList.parallelStream().forEach(cosmosContainerProperties -> {
                //FEED CONTAINER
                String containerName = cosmosContainerProperties.getString("id");
                CosmosContainer feedContainer = feedDatabase.getContainer(containerName);

                //LEASE CONTAINER
                String leaseContainerName = containerName + "-leases";
                CosmosContainer leaseContainer = leaseDatabase.getContainer(leaseContainerName);

                //Building ChangeFeedProcessor for current Container
                ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
                changeFeedProcessorOptions.startTime(OffsetDateTime.now());

                ChangeFeedProcessor changeFeedProcessor = null;
                try {
                    ChangeFeedProcessor.BuilderDefinition builderDefinition = ChangeFeedProcessor.Builder()                           
                            .hostName("Host1")//used Host2 in the other Host
                            .feedContainer(feedContainer)
                            .leaseContainer(leaseContainer)
                            .options(changeFeedProcessorOptions)
                            .handleChanges(docs -> {
                                documentChangeHandler.processChanges(containerName, docs);
                            });
                    changeFeedProcessor = builderDefinition.build();
                }
                catch (Exception e) {
                    System.out.println("Fail to initialize ChangeFeedProcessor for " + containerName);
                }
                resultList.add(changeFeedProcessor);

                System.out.println("processed:  " + leaseContainerName);
            });
public void startChangeFeed() {
        if (null != changeFeedProcessors && !changeFeedProcessors.isEmpty()) {
            changeFeedProcessors.parallelStream().forEach(processor->processor.start().block());
        }
        else {
            System.out.println("changeFeedProcessors list is empty.. probably changeFeedProcessor has not been setup yet");
        }
    }

共有1个答案

丌官哲彦
2023-03-14

从评论来看,该问题与VPN/代理或阻止所需端口范围的东西有关。

直接模式,需要在VPN/代理/防火墙中开放和配置一定的端口范围:

如果无法进行配置,可以切换到网关/HTTP模式。

 类似资料:
  • 自 1.5 后就过时了 在 Hangfire 1.5 之后,您不需要额外的配置来支持多个服务实例处理同一个后台任务,可以跳过本文了。现在使用 GUID 生成服务器标识符,因此所有实例名称都是唯一的。 可以同时在一个程序、机器或多台机器上运行多个服务器实例。每个服务实例使用分布式锁来执行协调逻辑。 在上述情况中,每个Hangfire服务器都有一个唯一的由两部分组成的供默认值标识符。最后一部分是一个程

  • 问题内容: 我在并行计算集群的不同处理器上将Python 3.6脚本作为多个单独的进程运行。多达35个进程同时运行没有问题,但是第36行(及以后)因第二行()上的分段错误而崩溃。有趣的是,第一行不会引起问题。完整的错误消息是: 熊猫和其他一些软件包已安装在虚拟环境中。我已经复制了虚拟环境,因此每个venv中运行的进程不超过24个。例如,上面的错误脚本来自运行在名为的虚拟环境中的脚本。 不论从特定的

  • 我需要在neo4j上存储不同的数据集。我不想在标签中使用相同的实例,因为这可能会影响我想要运行的算法的性能。因此,我正在研究在不同端口上运行多个neo4j实例的方法。 我在这样做时遇到了一些困难,因为网上找到的指南是针对其他版本的neo4j的。谁能帮帮我吗? 我在Windows 10上运行neo4j 3.1.1。 谢谢你!

  • 问题内容: 我想在Centos 7上运行Redis的多个实例。有人可以指出我的正确链接或在此处发布步骤。 我在Google上搜索了该信息,但没有找到任何相关信息。 问题答案: 您可以在单台计算机上使用不同的端口运行Redis的多个实例。如果这与您有关,则可以按照以下步骤操作。 通过安装第一个Redis实例,默认情况下它会监听。 对于第二实例,创建一个新的工作目录 默认的Redis实例用作其工作目录

  • 问题内容: 我正在尝试创建86个task.py实例以同时运行。 问题答案: 等待命令完成。改为使用:

  • 可以运行多个GatewayWorker实例,步骤如下。 假设已有Applications/Chat,想增加Applications/Chat2 1、拷贝Applications/Chat到Applications/Chat2 2、更改Applications/Chat2/start_register.php中的端口,1236改为1237(或者改为其它未被占用端口) 3、更改Applications