4.4 高阶API-TalosProducer

优质
小牛编辑
133浏览
2023-12-01

TalosProducer API

addUserMessage(List<Message> messageList)

参数:messageList, 函数接受一个Message的List,用户可以对每条Message设置PartitionKey(optional),指示消息发送到哪个partition,如果不指定,系统会随机指定partition

返回值:空,消息发送的返回结果是通过用户注册的Callback

异常:ProducerNotActiveException, 该异常表明producer还没有初始化完成,较低概率发生,如发生重试下就可

TalosProducer 配置说明

如下是TalosProducer的配置说明:

必需配置项
NameDescriptionDefault
galaxy.talos.service.endpoint指定Talos Server的URI,可以配置http和https,相关集群对应的URI请见集群信息--
可选配置项 & 场景 (针对High Level TalosProducer)
[场景1] Client 内存不是很大,使用Producer时无法缓存过多的数据,希望缓存的数据少一点;或者希望缓存的数据增大;
  • galaxy.talos.producer.max.buffered.message.number

    TalosProducer内存中可以buffer的总消息数目,如果总数量超过这个配置数就会暂时block接口addUserMessage,直到剩余数量小于这个数值

    Default: 1000000

  • galaxy.talos.producer.max.buffered.message.bytes

    galaxy.talos.producer.max.buffered.message.number,从总体字节数目来限制总buffer的消息字节

    Default: 500MB

[场景2] Producer 会缓存数据,积攒batch进行发送,希望改变buffer的一些机制(例如希望发送的快一点/希望积攒的batch大一点等),下面任何一个条件满足,batch都会被发送
  • galaxy.talos.producer.max.buffered.milli.secs

    该配置限制了消息被发送到某个partition之前在client内存中最多被缓存的时间,超过这个时间,就会被发送出去。一方面尽可能的积攒一批batch的消息,另一方面也不让消息堆积太长时间而没有被发送出去

    Default: 200ms

  • galaxy.talos.producer.max.put.message.number

    Producer每次从Buffer中取出消息发送给单个partition的最大消息条目数

    Default: 2000

  • galaxy.talos.producer.max.put.message.bytes

    Producer每次从Buffer中取出消息发送给单个partition的最大字节数

    Default: 4MB

[场景3] 用户希望处理回调的线程数目多一些/少一些
  • galaxy.talos.producer.thread.pool.size

    该配置用来指定producer内部用以做messageCallback的线程数目,即用户发送消息后执行回调函数的线程数

    Default: 16

[场景4] 用户Message没有配置PartitionKey,希望对Dispatcher进行控制
  • galaxy.talos.producer.update.partition.id.interval.milli

    当用户在Message中不指定partitionKey的时候, TalosProducer中的dispatcher会分配给当前轮询到的partition id,该配置指定轮询partition id的时间间隔

    Default: 100ms

  • galaxy.talos.producer.update.partition.msgnumber

    Dispatcher轮询partition id的另一个维度,如果上一次push到某个partition的消息数量超过该配置的阈值,partition id轮询到下一个

    Default: 1000

[场景5] 用户选择消息的压缩类型; 注:一般情况下不需要对压缩类型进行修改,如有需要请与我们联系
  • galaxy.talos.producer.compression.type

    消息的压缩类型,可选择的类型有:"NONE", "SNAPPY" 和 "GZIP"

    Default: SNAPPY