4.4 高阶API-TalosProducer
TalosProducer API
addUserMessage(List<Message> messageList
)
参数:messageList, 函数接受一个Message的List,用户可以对每条Message设置PartitionKey(optional),指示消息发送到哪个partition,如果不指定,系统会随机指定partition
返回值:空,消息发送的返回结果是通过用户注册的Callback
异常:ProducerNotActiveException, 该异常表明producer还没有初始化完成,较低概率发生,如发生重试下就可
TalosProducer 配置说明
如下是TalosProducer的配置说明:
必需配置项
Name | Description | Default |
---|---|---|
galaxy.talos.service.endpoint | 指定Talos Server的URI,可以配置http和https,相关集群对应的URI请见集群信息 | -- |
可选配置项 & 场景 (针对High Level TalosProducer)
[场景1] Client 内存不是很大,使用Producer时无法缓存过多的数据,希望缓存的数据少一点;或者希望缓存的数据增大;
- galaxy.talos.producer.max.buffered.message.number
TalosProducer内存中可以buffer的总消息数目,如果总数量超过这个配置数就会暂时block接口
addUserMessage
,直到剩余数量小于这个数值Default: 1000000
- galaxy.talos.producer.max.buffered.message.bytes
同
galaxy.talos.producer.max.buffered.message.number
,从总体字节数目来限制总buffer的消息字节Default: 500MB
[场景2] Producer 会缓存数据,积攒batch进行发送,希望改变buffer的一些机制(例如希望发送的快一点/希望积攒的batch大一点等),下面任何一个条件满足,batch都会被发送
- galaxy.talos.producer.max.buffered.milli.secs
该配置限制了消息被发送到某个partition之前在client内存中最多被缓存的时间,超过这个时间,就会被发送出去。一方面尽可能的积攒一批batch的消息,另一方面也不让消息堆积太长时间而没有被发送出去
Default: 200ms
- galaxy.talos.producer.max.put.message.number
Producer每次从Buffer中取出消息发送给单个partition的最大消息条目数
Default: 2000
- galaxy.talos.producer.max.put.message.bytes
Producer每次从Buffer中取出消息发送给单个partition的最大字节数
Default: 4MB
[场景3] 用户希望处理回调的线程数目多一些/少一些
- galaxy.talos.producer.thread.pool.size
该配置用来指定producer内部用以做messageCallback的线程数目,即用户发送消息后执行回调函数的线程数
Default: 16
[场景4] 用户Message没有配置PartitionKey,希望对Dispatcher进行控制
- galaxy.talos.producer.update.partition.id.interval.milli
当用户在Message中不指定partitionKey的时候, TalosProducer中的dispatcher会分配给当前轮询到的partition id,该配置指定轮询partition id的时间间隔
Default: 100ms
- galaxy.talos.producer.update.partition.msgnumber
Dispatcher轮询partition id的另一个维度,如果上一次push到某个partition的消息数量超过该配置的阈值,partition id轮询到下一个
Default: 1000
[场景5] 用户选择消息的压缩类型; 注:一般情况下不需要对压缩类型进行修改,如有需要请与我们联系
- galaxy.talos.producer.compression.type
消息的压缩类型,可选择的类型有:"NONE", "SNAPPY" 和 "GZIP"
Default: SNAPPY