4.3 低阶API-SimpleProducer/SimpleConsumer
Simple Producer API
SimpleProducer是为用户提供一个同步的put接口,用户使用时需要指定partitionId,每个SimpleProducer对应一个partition,这里建议用户使用同步接口时最好发送batch,故接口参数为List类型;
putMessage(List<Message> messageList
)
参数:messageList, 函数接受一个Message的List,发送给用户初始化SimpleProducer时所指定的partition;
返回值:boolean,函数返回putMessage成功与否;
Simple Consumer API
SimpleProducer是消费数据的同步接口,用户使用simple consumer需要指定partition,指定startOffset获取数据:
请注意:如果使用simple consumer,用户需要自己处理consumer对于partition的分配,consumer的failover问题;同时,如果需要记录消费的offset,也需要自己进行commit。
fetchMessage(long startOffset
)
参数:startOffset,指定从哪里开始拉取数据,包含startOffset
返回值:List<MessageAndOffset>
,返回一个list,成员是MessageAndOffset对象实例
异常:同下一个接口
fetchMessage(long startOffset
, int maxFetchedNumber
)
参数:
- startOffset,指定从哪里开始拉取数据,包含startOffset
- maxFetchednumber,指定一次拉取batch最大的消息条数,如果不配值,即使用上面的接口,则默认去consumer config中的配置
GALAXY_TALOS_CONSUMER_MAX_FETCH_RECORDS
返回值:List<MessageAndOffset>
,返回一个list,成员是MessageAndOffset对象实例
异常:
TException, 从server端返回的错误,可能是
PARTITION_NOT_SERVING
,MESSAGE_OFFSET_OUT_OF_RANGE
,或者其他的异常,具体可看栈信息,然后根据不同的情况不同处理,例如如果是PARTITION_NOT_SERVING
,可以sleep来delay一会;如果是MESSAGE_OFFSET_OUT_OF_RANGE
则需要修改startOffset;IOException,可能是压缩出错,具体看栈信息