4.5 高阶API-TalosConsumer

优质
小牛编辑
139浏览
2023-12-01

TalosConsumer API

如前所述,TalosConsumer是消费数据的高阶API,它可以自动的进行re-balance,commit等操作;

TalosConsumer没有暴露接口给用户,只要实例化就自动work,消息拉取后的处理逻辑是通过注册的回调接口MessageProcessor来完成的, 详细说明如下;

消息被消费后需要进行commit操作,记录消费的checkpoint,关于checkpoint,Talos提供三种场景的Best Practice,详细如下:

MessageProcessor 接口

MessageProcessor接口是用来实现消息的消费处理逻辑,用户需要实现这个接口;在TalosConsumer中,每个Partition对应一个线程来fetch数据,同时每个Partition都对应一个MessageProcessor实例,该MessageProcessor只处理当前Partition拉取的数据,可以看到,它提供了3个接口:

  public void init(TopicAndPartition topicAndPartition, long startMessageOffset);
  public void process(List<MessageAndOffset> messages, MessageCheckpointer messageCheckpointer);
  public void shutdown(MessageCheckpointer messageCheckpointer);

一般来讲,用户只需要实现process接口即可,其他接口函数体可以为空;

TalosConsumer可以自动进行消息的commit操作,也支持用户自行控制commit,通过process接口的实现,TalosConsumer为用户提供了灵活的使用场景,大致如下:

  A. 系统默认根据一定策略自行commit,具体策略可参见配置说明中的[场景3];这种情况下用户只需要实现自己的消息处理逻辑就好了,不需要关心消息消费到哪里了,checkpoint到哪里了;
  B. 用户使用`MessageCheckpoint.checkpoint()`接口自行控制commit的时机;
  C. 用户使用`MessageCheckpoint.checkpoint(long commitOffset)`接口自行控制commit的时机和具体offset

系统默认的是A场景,如果用户需要使用后两种场景,则需要将checkpoint.auto.commit的开关关掉,具体见配置说明的[场景2]

Checkpoint 管理的 Best Practice

下面就上述三种场景的Best Practice进行描述

A. Auto Commit

auto-commit

如图,Auto Commit是系统默认的方式,这种场景下,用户只需要在process函数中实现消息的处理逻辑就好了,不需要关心何时commit,系统的commit策略请参见配置说明的[场景3];

可以看出,在这种场景下,系统可能调用了多次process接口后才调用一次checkpoint进行commit,记录当前消费的offset位置;

B. Using MessageCheckpoint.checkpoint()

在这种场景下,用户可以自己控制何时进行checkpoint,一般来讲用户希望自己能在process消息被调用后立即对当前这批进行commit或者某种确定的condition成立的条件下进行commit,这两种情况我们建议用户在process接口中调用这个接口:MessageCheckpoint.checkpoint()

checkpoint

如图,第一种场景是用户在每次process被调用时都进行checkpoint;第二种场景是用户在某种期望的condition条件成立后进行commit;

请注意

  1. 不要忘了将galaxy.talos.consumer.checkpoint.auto.commit置为false;

  2. 当调用checkpoint()时,commit的offset一定是此次process(List<MessageAndOffset> messages, MessageCheckpointer messageCheckpointer)中messages这个list中最后一条Message的Offset

C. Using MessageCheckpoint.checkpoint(long commitOffset)

这种场景一般是用户对于单条消息的处理逻辑比较重,且希望处理一条就commit一下;

checkpoint-offset

如图,用户处理每一条Message后都调用commit,此时需要指定当前这条Message的Offset进行commit;

请注意

  1. 不要忘了将galaxy.talos.consumer.checkpoint.auto.commit置为false;

  2. 用户调用checkpoint(long commitOffset)时,这个commitOffset的范围一定是 (lastCommitOffset, messages.get(messages.size()-1).getMessageOffset()]

TalosConsumer 配置说明

必需配置项
NameDescriptionDefault
galaxy.talos.service.endpoint指定Talos Server的URI,可以配置http和https,相关集群对应的URI请见集群信息--
可选配置项 & 场景 (针对High Level TalosConsumer)
[场景1] 用户希望拉取数据再快一点/慢一点
  • galaxy.talos.consumer.fetch.interval.ms

    该配置指定consumer中每个partition拉取数据的时间间隔/频率

    Default: 200ms

  • galaxy.talos.consumer.max.fetch.records

    该配置指定Consumer中每个partition一次拉取数据的batch大小最大值(条目数)

    Default: 1000

[场景2] 用户选择是否自己控制消息的commit操作
  • galaxy.talos.consumer.checkpoint.auto.commit

    该配置指定系统是否自动commit消费的数据,默认是true,表示系统自动进行checkpoint的commit操作;如果用户想要自己进行commit,则将该配置置为false;

    Default: true

[场景3] 当用户选择让Consumer自动进行commit的时候,控制commit的频率;下述条件满足任何一个即刻进行commit
  • galaxy.talos.consumer.commit.offset.record.fetched.num

    该配置指定了consumer在消费了多少条记录后commit一次offset

    Default: 10000

  • galaxy.talos.consumer.commit.offset.interval.milli

    从时间维度上配置commit的频率,上次commit与下一次commit之间的interval

    Default: 5000ms

[场景4] 当拉取消息时出现 MESSAGE_OFFSET_OUT_OF_RANGE, 用户希望的处理方式
  • galaxy.talos.consumer.out.of.range.reset.latest.offset

    该配置指定当用户拉取消息时offset出现越界了是否从该partition的最新offset开始拉取,默认为false,即出现越界则从当前Partition的有效StartOffset开始读取;如果设置为true则从该partition的EndOffset开始读取。

    什么时候会出现 MESSAGE_OFFSET_OUT_OF_RANGE?举个例子,假设用户消息保存1天,用户使用TalosConsumer消费了一部分数据,然后将程序停了;过了1天或几天后,用户重新启动TalosConsumer,默认是从LastCommitOffset开始读取(可通过场景5进行改变),但是由于LastCommitOffset指向的数据已经过期,此时TalosConsumer会根据该项配置决定从哪里开始消费数据。

    Default: false

[场景5] 用户启动TalosConsumer的时候想要重置开始读取消息的Offset
  • galaxy.talos.consumer.start.reset.offset.value

    该项配置有两个取值:-1,-2;分别表示从当前Partition的StartOffset和EndOffset开始读取消息,默认取值是StartOffset

    Default: -1

  • galaxy.talos.consumer.start.whether.reset.offset

    该项配置表示TalosConsumer启动时是否重置offset,默认是false,即不重置

    Default: false

    注意】请用户仔细理解这两项配置,程序判断基于的规则是:当用户第一次启动Consumer时(即LastCommitOffset不存在)或者'galaxy.talos.consumer.start.whether.reset.offset'为true时,程序根据'galaxy.talos.consumer.start.reset.offset.value'的值开始读取offset,否则从LastCommitOffset开始读取; 这样会产生如下几种场景:

    1) 默认情况下第一次启动Consumer,程序从StartOffset开始读取;如果用户想在第一次启动时从EndOffset开始读取,只需要将配置'galaxy.talos.consumer.start.reset.offset.value'置为'-2'即可

    2) 默认情况下Consumer重启(非第一次启动)时,程序是从LastCommitOffset开始读取;如果用户想要在重启时从StartOffset或EndOffset开始读取,则需要同时配置上面两项,将'galaxy.talos.consumer.start.whether.reset.offset'设置为'true',另一项配置为相应的值(-1/-2);