当前位置: 首页 > 知识库问答 >
问题:

更改提要处理器库不支持更改提要处理器选项提要延迟/检查点频率

容磊
2023-03-14

我按照这个示例代码(https://github . com/Azure/Azure-document db-changeedprocessor-dot net # example)注册一个观察者来处理cosmos db集合中的变更提要。我使用一个实用程序在cosmos db集合中创建新文档(比如在一个for循环中创建400个文档)。我正在使用30秒的FeedPollDelay。但CFP自由党似乎并不认同这一点。即使在源轮询延迟间隔过期之前,ProcessChangesAsync方法也会被重复调用。在第一批中,检索到大约60个文档,在第二批中检索到大约20个文档,在第三批中检索到大约100个文档。

        DocumentCollectionInfo feedCollectionInfo = new DocumentCollectionInfo()
        {
            DatabaseName = databaseName,
            CollectionName = monitoredCollectionName,
            Uri = new Uri(uri),
            MasterKey = masterKey
        };

        DocumentCollectionInfo leaseCollectionInfo = new DocumentCollectionInfo()
        {
            DatabaseName = databaseName,
            CollectionName = leaseCollectionName,
            Uri = new Uri(uri),
            MasterKey = masterKey
        };

        ChangeFeedProcessorOptions feedProcessorOptions = new ChangeFeedProcessorOptions()
        {
            FeedPollDelay = TimeSpan.FromSeconds(30)
            //LeasePrefix = Guid.NewGuid().ToString(),
            //MaxItemCount = 100
        };
        ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder();
        processor = await builder
            .WithHostName(hostName)
            .WithFeedCollection(feedCollectionInfo)
            .WithLeaseCollection(leaseCollectionInfo)
            .WithProcessorOptions(feedProcessorOptions)
            .WithObserver<LiveWorkItemChangeFeedObserver>()
            .BuildAsync();

        await processor.StartAsync();

在第一批中接收60个文档是可以的。但我希望在提要轮询延迟(30秒)间隔到期后,第二批将在单个批次中调用其余340个文档。

但是ProcessChangesAsync方法经常被触发,并且这个选项没有得到遵守。

共有1个答案

邵劲
2023-03-14

当更改提要处理器读取更改提要并且没有发现新的更改时,使用FeedPollDelay,而不是在每个批处理之间。

示例流程:

  1. CFP民意调查的变化,发现X。
  2. 进程更改异步与 X 一起调用
  3. 进程更改异步完成后,CFP 会立即轮询更改,找到 Y。
  4. 进程更改异步使用 Y 调用。
  5. 在进程更改异步完成后,CFP 会立即轮询更改,什么也没找到,等待源便提。
  6. CFP 民意调查的变化,发现 Z.
  7. 进程更改异步使用 Z 调用
  8. 在进程更改异步完成后,CFP 会立即轮询更改,什么也没找到,等待源便提。
  9. 等。。。。
 类似资料:
  • 嘿,我们已经用Cosmos DB和Mongo API使用change feed处理器库6个月了,但现在当我们尝试连接到change处理器库时,我们得到了一个文档异常和mesage 文档已经改变,现在说它不支持它,但我有一个工作数据库说它支持。

  • 我正在编写一些代码,使用Cosmos作为我的存储来实现事件源。我的初始文档成功写入集合。然后,我设置了一个Azure函数,该函数在该集合的提要发生变化时触发,并将项目复制到另一个集合。 我的问题是,虽然如果我在本地调试函数应用程序,这一切都可以正常工作(更改通过并无问题地处理),但函数在作为函数应用程序发布后不会触发。该函数存在,但总执行计数始终为0。就像该函数没有在计时器上运行并检查提要。我的函

  • top看到单个CPU 100%时,就是垂直扩展的时候了。如果需要让CPU使用率最大化,可以配置Redis实例数对应CPU数, Redis实例数对应端口数(8核Cpu, 8个实例, 8个端口), 以提高并发。单机测试时, 单条数据在200字节, 测试的结果为8~9万tps。(未实测)。 另外,对于命令的复杂度一定要关注。

  • 我们正在使用Cosmos DB更改提要来解决我们的一些需求。想了解是否有任何重试机制在更改提要可用。例如,如果我得到一批10个文件,第三个由于某种原因无法处理,我是否可以只重放第三个消息? 有三种消耗更改提要的方式:

  • 如果我正在读写本地文件,那么对远程数据库服务器的更新相对昂贵。如果增加[chunk-size],内存使用量就会上升。 提交频率对编写本地文件并没有太大的影响,所以对我来说,元数据更新才是一个问题。该步骤是可重新启动的,因此从技术上讲,我不需要记录中间提交计数。 对于JobRepository,我可以只使用map或内存数据库,但我需要其他信息,例如持久化的开始/结束时间,而且这个问题只涉及一个步骤。

  • 我试图在Reactor顶部设计一个管道框架。 在每个阶段(不考虑第一个和最后一个阶段),我们都有转换对象的任务(即字符串到其长度或url到其HTML内容等)。举个例子: 您可以看到中间层有3个任务,每个任务将一个X对象转换为一个Y对象(顺便说一句,它始终是一个完全连接的层) 我的问题/困境:我的第一个想法是,我所需要的是通量。merge(),然后将其连接到每个订阅者。例如: 另一种选择是放置处理器