当前位置: 首页 > 知识库问答 >
问题:

在KCL Java library for AWS Kinesis的情况下,如何使用requestShutdown和shutdown进行优雅的关闭

商绍元
2023-03-14

我正在尝试使用Java for AWS Kinesis的KCL库的新功能,通过注册shutdown hook来优雅地关闭所有记录处理器,然后优雅地停止工作进程。新的库提供了一个新的接口,需要实现记录处理器。但是它是如何被调用的呢?

首先尝试调用worker.requestShutdown(),然后worker.shutdown(),它是有效的。但是使用它有任何预期的方式吗?那么使用两者有什么用,以及它的好处?

共有1个答案

禄和宜
2023-03-14

正如您可能知道的那样,当您创建Worker时,它

1) 在dynamodb中创建使用者偏移表

2)创建租约,安排租约接受者和租约续订者在配置的时间间隔

如果您有两个分区,那么在您的同一个Dynamodb表中将有两条记录,这意味着分区需要租约。

如。

{
  "checkpoint": "TRIM_HORIZON",
  "checkpointSubSequenceNumber": 0,
  "leaseCounter": 38,
  "leaseKey": "shardId-000000000000",
  "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
  "ownerSwitchesSinceCheckpoint": 0
}

{
  "checkpoint": "49570828493343584144205257440727957974505808096533676050",
  "checkpointSubSequenceNumber": 0,
  "leaseCounter": 40,
  "leaseKey": "shardId-000000000001",
  "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
  "ownerSwitchesSinceCheckpoint": 0
}
  • 租赁协调员ScheduledExecutorService(称为leaseCoordinatorThreadPool)负责获取和续订租赁的时间表

3)然后,对于流中的每个分区,Worker创建一个内部分区消费者,它实际上获取事件,并分派到您的RecordProcencer#进程记录。请参阅ProcessTask#调用

4) 对于您的问题,您必须将您的IRecordProcessorFactory impl注册给工作人员,这将为每个分区消费者提供一个处理器工厂impl。

参见此处的示例,这可能会有所帮助

KinesisClientLibConfiguration streamConfig = new KinesisClientLibConfiguration(
 "consumerName", "streamName", getAuthProfileCredentials(), "consumerName-" + "consumerInstanceId")
            .withKinesisClientConfig(getHttpConfiguration())
            .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); // "TRIM_HORIZON" = from the tip of the stream

Worker consumerWorker = new Worker.Builder()
            .recordProcessorFactory(new DavidsEventProcessorFactory())
            .config(streamConfig)
            .dynamoDBClient(new DynamoDB(new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration())))
            .build();


public class DavidsEventProcessorFactory implements IRecordProcessorFactory {

    private Logger logger = LogManager.getLogger(DavidsEventProcessorFactory.class);

    @Override
    public IRecordProcessor createProcessor() {
        logger.info("Creating an EventProcessor.");
        return new DavidsEventPartitionProcessor();
    }
}

class DavidsEventPartitionProcessor implements IRecordProcessor {

    private Logger logger = LogManager.getLogger(DavidsEventPartitionProcessor.class);

    //TODO add consumername ?

    private String partitionId;

    private ShutdownReason RE_PARTITIONING = ShutdownReason.TERMINATE;

    public KinesisEventPartitionProcessor() {
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.partitionId = initializationInput.getShardId();
        logger.info("Initialised partition {} for streaming.", partitionId);
    }

    @Override
    public void processRecords(ProcessRecordsInput recordsInput) {
        recordsInput.getRecords().forEach(nativeEvent -> {
            String eventPayload = new String(nativeEvent.getData().array());
            logger.info("Processing an event {} : {}" , nativeEvent.getSequenceNumber(), eventPayload);

            //update offset after configured amount of retries
            try {
                recordsInput.getCheckpointer().checkpoint();
                logger.debug("Persisted the consumer offset to {} for partition {}",
                        nativeEvent.getSequenceNumber(), partitionId);
            } catch (InvalidStateException e) {
                logger.error("Cannot update consumer offset to the DynamoDB table.", e);
                e.printStackTrace();
            } catch (ShutdownException e) {
                logger.error("Consumer Shutting down", e);
                e.printStackTrace();
            }
        });
    }

    @Override
    public void shutdown(ShutdownInput shutdownReason) {
        logger.debug("Shutting down event processor for {}", partitionId);

        if(shutdownReason.getShutdownReason() == RE_PARTITIONING) {
            try {
                shutdownReason.getCheckpointer().checkpoint();
            } catch (InvalidStateException e) {
                logger.error("Cannot update consumer offset to the DynamoDB table.", e);
                e.printStackTrace();
            } catch (ShutdownException e) {
                logger.error("Consumer Shutting down", e);
                e.printStackTrace();
            }
        }
    }

}

//然后启动消费者

consumerWorker.run();

现在,当您想要停止您的Consumer实例(Worker)时,您不需要对每个分区Consumer进行太多处理,一旦您要求Worker关闭,它将由Worker负责。

>

  • 关闭后,它会要求负责续订和获取租约的leaseCoordinatorThreadPool停止,并等待终止。

    另一方面,requestShutdown取消承租人,并将关闭通知分区消费者。

    对于requestShutdown,更重要的一点是,如果您想在记录处理器上得到通知,那么您也可以实现IShutdownNotificationAware。这样,当您的记录处理器(RecordProcessor)正在处理一个事件,但工作进程即将关闭时,在竞争条件下,您应该仍然能够提交偏移量,然后关闭。

    Request Shutdown返回一个Shutdown Future,然后调用worker.shutdown

    您必须在您的RecordProcess上实现以下方法才能收到Request estShutdown的通知,

    class DavidsEventPartitionProcessor implements IRecordProcessor, IShutdownNotificationAware {
    
       private String partitionId;
    
       // few implementations
    
        @Override
        public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
            logger.debug("Shutdown requested for {}", partitionId);
        }
    
    }
    

    但是,如果您在通知之前丢失了租约,那么它可能不会被调用。

    新库提供了一个需要实现的记录处理器的新接口。但是它是如何被调用的呢?

    • 实现IRecordProcessor工厂和IRecordProcessor
    • 然后将RecordProcessorFactory连接到工作人员

    尝试先调用工作进程。requestShutdown()然后是worker。shutdown()正常工作。但是,这是否是一种有意的使用方式呢?

    您应该使用ask estShutdown()进行优雅的关闭,这将照顾种族条件。它是在kinise-client-1.7.1中引入的

  •  类似资料:
    • 问题内容: 我一直在寻找一种不用使用collections.sort就可以对数组列表进行排序的方法,因为我自己的逻辑有缺陷,而且我遇到了很多麻烦。 我需要对它进行排序,以便可以使用我创建的一种方法,该方法基本上可以执行collections.swap的工作,以便对数组列表进行完全排序。 这是我的代码: 我对此一直很烦恼。抱歉,这是在伤害社区。 问题答案: 我想,你希望下面的算法:在阵列的其余部分发

    • 问题内容: 我正在运行一个wordpress网站,每个PHP进程的使用空间约为200mb至250mb。使用16GB的ram,服务器只能处理大约70个进程。通过将虚拟内存增加到16GB,它可以处理140个。之后,负载不断增加。如果10分钟内有200个连接,则在3Ghz四核xeon处理器上服务器负载将达到20个! 我曾尝试停用所有插件,但这只会使每个进程的PHP内存使用量减少不到10%。suPHP告诉

    • 我使用开关盒获得较大范围:

    • 问题内容: 我正在解决Programming Pearls,第二版,第1列中的问题。其中一个问题涉及编写一个程序,该程序仅使用大约1 MB的内存将文件内容存储为位数组,每个位代表是否或文件中没有7位数字。由于Java是我最熟悉的语言,因此即使作者似乎已经想到了C和C ++,我还是决定使用它。 由于我是为了解决我正在处理的问题而假装有限的内存,因此我想确保读取文件的过程完全没有缓冲。 在我阅读Jav

    • 我在Log4j2上开发了一个包装器类。使用OSGi的声明性服务,我发布了一个定制的记录器服务,使用我自己的记录器接口,包装器类作为实现。包装器类仅用于以编程方式配置记录器,即消息格式 我想打印日志文件中请求的每个日志的源类/文件名和行号。选项%C/%F和%L只打印有关包装器类中我实际调用log方法的位置的信息。 因此,作为一种锻炼,我每次都把新的Throwable作为参数传递,这样我就可以使用布局