4.5 高阶API-TalosConsumer
TalosConsumer API
如前所述,TalosConsumer是消费数据的高阶API,它可以自动的进行re-balance,commit等操作;
TalosConsumer没有暴露接口给用户,只要实例化就自动work,消息拉取后的处理逻辑是通过注册的回调接口MessageProcessor来完成的, 详细说明如下;
消息被消费后需要进行commit操作,记录消费的checkpoint,关于checkpoint,Talos提供三种场景的Best Practice,详细如下:
MessageProcessor 接口
MessageProcessor接口是用来实现消息的消费处理逻辑,用户需要实现这个接口;在TalosConsumer中,每个Partition对应一个线程来fetch数据,同时每个Partition都对应一个MessageProcessor实例,该MessageProcessor只处理当前Partition拉取的数据,可以看到,它提供了3个接口:
public void init(TopicAndPartition topicAndPartition, long startMessageOffset);
public void process(List<MessageAndOffset> messages, MessageCheckpointer messageCheckpointer);
public void shutdown(MessageCheckpointer messageCheckpointer);
一般来讲,用户只需要实现process
接口即可,其他接口函数体可以为空;
TalosConsumer可以自动进行消息的commit
操作,也支持用户自行控制commit
,通过process
接口的实现,TalosConsumer为用户提供了灵活的使用场景,大致如下:
A. 系统默认根据一定策略自行commit,具体策略可参见配置说明中的[场景3];这种情况下用户只需要实现自己的消息处理逻辑就好了,不需要关心消息消费到哪里了,checkpoint到哪里了;
B. 用户使用`MessageCheckpoint.checkpoint()`接口自行控制commit的时机;
C. 用户使用`MessageCheckpoint.checkpoint(long commitOffset)`接口自行控制commit的时机和具体offset
系统默认的是A场景,如果用户需要使用后两种场景,则需要将checkpoint.auto.commit
的开关关掉,具体见配置说明的[场景2]
;
Checkpoint 管理的 Best Practice
下面就上述三种场景的Best Practice进行描述
A. Auto Commit
如图,Auto Commit是系统默认的方式,这种场景下,用户只需要在process函数中实现消息的处理逻辑就好了,不需要关心何时commit,系统的commit策略请参见配置说明的[场景3];
可以看出,在这种场景下,系统可能调用了多次process
接口后才调用一次checkpoint进行commit
,记录当前消费的offset位置;
B. Using MessageCheckpoint.checkpoint()
在这种场景下,用户可以自己控制何时进行checkpoint,一般来讲用户希望自己能在process消息被调用后立即对当前这批进行commit或者某种确定的condition成立的条件下进行commit,这两种情况我们建议用户在process接口中调用这个接口:MessageCheckpoint.checkpoint()
如图,第一种场景是用户在每次process被调用时都进行checkpoint;第二种场景是用户在某种期望的condition条件成立后进行commit;
请注意:
不要忘了将
galaxy.talos.consumer.checkpoint.auto.commit
置为false;当调用
checkpoint()
时,commit的offset一定是此次process(List<MessageAndOffset> messages, MessageCheckpointer messageCheckpointer)中messages这个list中最后一条Message的Offset
;
C. Using MessageCheckpoint.checkpoint(long commitOffset)
这种场景一般是用户对于单条消息的处理逻辑比较重,且希望处理一条就commit一下;
如图,用户处理每一条Message后都调用commit,此时需要指定当前这条Message的Offset进行commit;
请注意:
不要忘了将
galaxy.talos.consumer.checkpoint.auto.commit
置为false;用户调用
checkpoint(long commitOffset)
时,这个commitOffset
的范围一定是(lastCommitOffset, messages.get(messages.size()-1).getMessageOffset()]
;
TalosConsumer 配置说明
必需配置项
Name | Description | Default |
---|---|---|
galaxy.talos.service.endpoint | 指定Talos Server的URI,可以配置http和https,相关集群对应的URI请见集群信息 | -- |
可选配置项 & 场景 (针对High Level TalosConsumer)
[场景1] 用户希望拉取数据再快一点/慢一点
- galaxy.talos.consumer.fetch.interval.ms
该配置指定consumer中每个partition拉取数据的时间间隔/频率
Default: 200ms
- galaxy.talos.consumer.max.fetch.records
该配置指定Consumer中每个partition一次拉取数据的batch大小最大值(条目数)
Default: 1000
[场景2] 用户选择是否自己控制消息的commit操作
- galaxy.talos.consumer.checkpoint.auto.commit
该配置指定系统是否自动commit消费的数据,默认是true,表示系统自动进行checkpoint的commit操作;如果用户想要自己进行commit,则将该配置置为false;
Default: true
[场景3] 当用户选择让Consumer自动进行commit的时候,控制commit的频率;下述条件满足任何一个即刻进行commit
- galaxy.talos.consumer.commit.offset.record.fetched.num
该配置指定了consumer在消费了多少条记录后commit一次offset
Default: 10000
- galaxy.talos.consumer.commit.offset.interval.milli
从时间维度上配置commit的频率,上次commit与下一次commit之间的interval
Default: 5000ms
[场景4] 当拉取消息时出现 MESSAGE_OFFSET_OUT_OF_RANGE, 用户希望的处理方式
- galaxy.talos.consumer.out.of.range.reset.latest.offset
该配置指定当用户拉取消息时offset出现越界了是否从该partition的最新offset开始拉取,默认为false,即出现越界则从当前Partition的有效StartOffset开始读取;如果设置为true则从该partition的EndOffset开始读取。
什么时候会出现 MESSAGE_OFFSET_OUT_OF_RANGE?举个例子,假设用户消息保存1天,用户使用TalosConsumer消费了一部分数据,然后将程序停了;过了1天或几天后,用户重新启动TalosConsumer,默认是从LastCommitOffset开始读取(可通过场景5进行改变),但是由于LastCommitOffset指向的数据已经过期,此时TalosConsumer会根据该项配置决定从哪里开始消费数据。
Default: false
[场景5] 用户启动TalosConsumer的时候想要重置开始读取消息的Offset
- galaxy.talos.consumer.start.reset.offset.value
该项配置有两个取值:-1,-2;分别表示从当前Partition的StartOffset和EndOffset开始读取消息,默认取值是StartOffset
Default: -1
- galaxy.talos.consumer.start.whether.reset.offset
该项配置表示TalosConsumer启动时是否重置offset,默认是false,即不重置
Default: false
【注意】请用户仔细理解这两项配置,程序判断基于的规则是:当用户第一次启动Consumer时(即LastCommitOffset不存在)或者'galaxy.talos.consumer.start.whether.reset.offset'为true时,程序根据'galaxy.talos.consumer.start.reset.offset.value'的值开始读取offset,否则从LastCommitOffset开始读取; 这样会产生如下几种场景:
1) 默认情况下第一次启动Consumer,程序从StartOffset开始读取;如果用户想在第一次启动时从EndOffset开始读取,只需要将配置'galaxy.talos.consumer.start.reset.offset.value'置为'-2'即可
2) 默认情况下Consumer重启(非第一次启动)时,程序是从LastCommitOffset开始读取;如果用户想要在重启时从StartOffset或EndOffset开始读取,则需要同时配置上面两项,将'galaxy.talos.consumer.start.whether.reset.offset'设置为'true',另一项配置为相应的值(-1/-2);