初始化
/**
* 初始化datahub连接
*/
protected void initDatahub() {
String endpoint = dataHubCfg.getEndpoint();
String accessId = dataHubCfg.getAccessId();
String accessKey = dataHubCfg.getAccessKey();
log.info(">>>>>>>datahub info=endpoint:{},accessId:{},accessKey:{}", endpoint, accessId, accessKey);
// 创建DataHubClient实例
datahubClient = DatahubClientBuilder.newBuilder().setDatahubConfig(new DatahubConfig(endpoint,
// 是否开启二进制传输,服务端2.12版本开始支持
new AliyunAccount(accessId, accessKey), true))
// 专有云使用出错尝试将参数设置为 false
// HttpConfig可不设置,不设置时采用默认值
.setHttpConfig(new HttpConfig().setCompressType(HttpConfig.CompressType.LZ4) // 读写数据推荐打开网络传输 LZ4压缩
.setConnTimeout(10000))
.build();
}
注册
/**
* 注册订阅消费
*
* @param project
* @param topic
* @param tableName
*/
protected void regSub(String project, String topic, String tableName) {
log.info(">>>>>>>> register project:{},topic:{},tableName:{}", project, topic, tableName);
String shardId = "0";
List<String> shardIds = Arrays.asList("0", "1");
RecordSchema schema = datahubClient.getTopic(project, topic).getRecordSchema();
log.info(">>>>>>schama:{}", schema);
CreateSubscriptionResult createSubscriptionResult = datahubClient.createSubscription(project, topic,
Constant.subscribtionComment);
log.info(">>>>create subscription successful ,subId:{}", createSubscriptionResult.getSubId());
String subId = createSubscriptionResult.getSubId();
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(project,
topic, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
consumeData(project, topic, tableName, shardId, shardIds, schema, subId, subscriptionOffset);
}
消费
/**
* 消费数据
*
* @param project
* @param topic
* @param tableName
* @param shardId
* @param shardIds
* @param schema
* @param subId
* @param subscriptionOffset
*/
private void consumeData(String project, String topic, String tableName, String shardId, List<String> shardIds,
RecordSchema schema, String subId, SubscriptionOffset subscriptionOffset) {
// 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
String cursor = null;
// sequence < 0说明未消费
if (subscriptionOffset.getSequence() < 0) {
// 获取生命周期内第一条record的cursor
cursor = datahubClient.getCursor(project, topic, shardId, CursorType.OLDEST).getCursor();
} else {
// 获取下一条记录的Cursor
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
// 按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
cursor = datahubClient.getCursor(project, topic, shardId, CursorType.SEQUENCE, nextSequence)
.getCursor();
} catch (SeekOutOfRangeException e) {
// 获取生命周期内第一条record的cursor
cursor = datahubClient.getCursor(project, topic, shardId, CursorType.OLDEST).getCursor();
}
}
// 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
long recordCount = 0L;
// 每次读取10条record
int fetchNum = 10;
while (true) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(project, topic, shardId, schema, cursor,
fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// 无数据,sleep后读取
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
// 消费数据
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
dealData(data, tableName);
// 处理数据完成后,设置点位
++recordCount;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
if (recordCount % 1000 == 0) {
// 提交点位点位
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(project, topic, subId, offsetMap);
log.info("commit offset successful");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
// 退出. Offline: 订阅下线; SubscriptionSessionInvalid: 表示订阅被其他客户端同时消费
log.error("handler table {} error ", tableName, e);
continue;
} catch (SubscriptionOffsetResetException e) {
// 表示点位被重置,重新获取SubscriptionOffset信息,这里以Sequence重置为例
// 如果以Timestamp重置,需要通过CursorType.SYSTEM_TIME获取cursor
log.error("handler table {} error ", tableName, e);
subscriptionOffset = datahubClient.getSubscriptionOffset(project, topic, subId, shardIds).getOffsets()
.get(shardId);
long nextSequence = subscriptionOffset.getSequence() + 1;
cursor = datahubClient.getCursor(project, topic, shardId, CursorType.SEQUENCE, nextSequence)
.getCursor();
continue;
} catch (DatahubClientException e) {
log.error("handler table {} error ", tableName, e);
subscriptionOffset = datahubClient.getSubscriptionOffset(project, topic, subId, shardIds).getOffsets()
.get(shardId);
long nextSequence = subscriptionOffset.getSequence() + 1;
cursor = datahubClient.getCursor(project, topic, shardId, CursorType.SEQUENCE, nextSequence)
.getCursor();
continue;
} catch (Exception e) {
log.error("handler table {} error ", tableName, e);
continue;
}
}
}