当前位置: 首页 > 知识库问答 >
问题:

使用KCL 2下特定碎片的记录。x(运动)

翟承志
2023-03-14

我在Kinesis流中的一些特定分片下有一组记录。我正在使用KCL 2. x消费者从kinesis中消费记录,但问题是消费者正在从流中所有可用的分片中获取我的记录。所以有没有什么方法可以在配置configBuilder对象或KCL消费者时指定分片或它们的ID,以便只消费来自指定分片的记录。

示例代码:

configsBuilder = new ConfigsBuilder(
        applicationName,
        streamName,
        kinesisAsyncClient,
        dynamoDbClient,
        cloudWatchClient,
        workerID,
        new RecordProcessorFactory());

scheduler = new Scheduler(
        configsBuilder.checkpointConfig(),
        configsBuilder.coordinatorConfig(),
        configBuilder.leaseManagementConfig(),
        configsBuilder.lifecycleConfig(),
        configsBuilder.metricsConfig(),
        configsBuilder.processorConfig(),
        configBuilder.retrievalConfig()
    );

    // start the kinesis records consumer.
    schedulerThread = new Thread(scheduler);
    schedulerThread.setDaemon(true);
    schedulerThread.start();

提前感谢!

共有1个答案

周楷
2023-03-14

KCL 2。x提供了一个ShardPrioritization界面,允许对碎片进行优先级排序或过滤:

/**
 * Provides logic to prioritize or filter shards before their execution.
 */
public interface ShardPrioritization {

    /**
     * Returns new list of shards ordered based on their priority.
     * Resulted list may have fewer shards compared to original list
     * 
     * @param original
     *            list of shards needed to be prioritized
     * @return new list that contains only shards that should be processed
     */
    List<ShardInfo> prioritize(List<ShardInfo> original);
}

也就是说,您可以提供ShardPrioritify实现,它将只留下与您相关的分片。

之后,只需在协调器配置中指定您的优先级:

configsBuilder.coordinatorConfig
          .shardPrioritization(new CustomShardsPrioritixation())
 类似资料:
  • 我有一个亚马逊运动流,由多个碎片组成。碎片的数量,因此消费者的数量,不是一个常数。 有一种不常见的事件类型,我希望广播给流中的每个消费者。 制作人是否有办法播放一张唱片,即发现碎片并将唱片放在每个碎片上?

  • 我想创建一个控制台附加器,显示一些日志信息,并打印出一个特定的http标头,类似于: 我创建了一个如下所示的< code>logback-spring.xml文件,但是“my-header”只是空白的。 我知道使用< code>logback-access可以访问HTTP请求/响应属性,但是当我尝试设置编码器类时,我无法使用任何经典的logback转换词: 上面的日志给出了以下错误: 如何访问请求

  • 由于不推荐使用TabActivity,我需要找到一种方法来使用片段。在我知道它如何工作之前,我已经使用了碎片,但我需要一个指南来创建我的标签主机与碎片活动。我在互联网上找到了几个例子,它们都是关于将片段放入标签的容器中的。

  • 我正在使用log4j进行日志记录。我有一个场景,我必须为不同的严重性使用单独的日志记录。例如,对于软件包foo,我必须在控制台中打印具有严重性ERROR的消息,而我必须在日志文件中打印具有严重性WARN的消息。我如何配置我的log4j.xml相同。

  • 我可以使用一个片段作为一个活动吗?我已经创建了一个片段,但我希望它有像活动一样的功能,所以我使用片段扩展碎片活动。然而,我有一个带有碎片的导航抽屉。当我更改为“扩展碎片活动”时,我的代码有问题?请给我指路。

  • 如果在创建Kinesis数据流时,我指定了碎片的数量,比如说10个,并且每次我放入记录时,我都会给它分配一个随机分区键,如下所示: kinesis将如何决定将记录放入某个碎片中,如果唯一分区键的数量大于碎片的数量,会发生什么情况?