4.3 低阶API-SimpleProducer/SimpleConsumer

优质
小牛编辑
127浏览
2023-12-01

Simple Producer API

SimpleProducer是为用户提供一个同步的put接口,用户使用时需要指定partitionId,每个SimpleProducer对应一个partition,这里建议用户使用同步接口时最好发送batch,故接口参数为List类型;

putMessage(List<Message> messageList)

参数:messageList, 函数接受一个Message的List,发送给用户初始化SimpleProducer时所指定的partition;

返回值:boolean,函数返回putMessage成功与否;

Simple Consumer API

SimpleProducer是消费数据的同步接口,用户使用simple consumer需要指定partition指定startOffset获取数据:

请注意:如果使用simple consumer,用户需要自己处理consumer对于partition的分配,consumer的failover问题;同时,如果需要记录消费的offset,也需要自己进行commit。

fetchMessage(long startOffset)

参数:startOffset,指定从哪里开始拉取数据,包含startOffset

返回值List<MessageAndOffset>,返回一个list,成员是MessageAndOffset对象实例

异常:同下一个接口

fetchMessage(long startOffset, int maxFetchedNumber)

参数

  • startOffset,指定从哪里开始拉取数据,包含startOffset
  • maxFetchednumber,指定一次拉取batch最大的消息条数,如果不配值,即使用上面的接口,则默认去consumer config中的配置GALAXY_TALOS_CONSUMER_MAX_FETCH_RECORDS

返回值List<MessageAndOffset>,返回一个list,成员是MessageAndOffset对象实例

异常

  • TException, 从server端返回的错误,可能是PARTITION_NOT_SERVINGMESSAGE_OFFSET_OUT_OF_RANGE,或者其他的异常,具体可看栈信息,然后根据不同的情况不同处理,例如如果是PARTITION_NOT_SERVING,可以sleep来delay一会;如果是MESSAGE_OFFSET_OUT_OF_RANGE则需要修改startOffset;

  • IOException,可能是压缩出错,具体看栈信息