当前位置: 首页 > 工具软件 > DataHub > 使用案例 >

datahub消费

欧阳俊晖
2023-12-01

初始化

	/**
	 * 初始化datahub连接
	 */
	protected void initDatahub() {
		String endpoint = dataHubCfg.getEndpoint();
		String accessId = dataHubCfg.getAccessId();
		String accessKey = dataHubCfg.getAccessKey();

		log.info(">>>>>>>datahub info=endpoint:{},accessId:{},accessKey:{}", endpoint, accessId, accessKey);

		// 创建DataHubClient实例
		datahubClient = DatahubClientBuilder.newBuilder().setDatahubConfig(new DatahubConfig(endpoint,
				// 是否开启二进制传输,服务端2.12版本开始支持
				new AliyunAccount(accessId, accessKey), true))
				// 专有云使用出错尝试将参数设置为 false
				// HttpConfig可不设置,不设置时采用默认值
				.setHttpConfig(new HttpConfig().setCompressType(HttpConfig.CompressType.LZ4) // 读写数据推荐打开网络传输 LZ4压缩
						.setConnTimeout(10000))
				.build();
	}

注册

/**
	 * 注册订阅消费
	 * 
	 * @param project
	 * @param topic
	 * @param tableName
	 */
	protected void regSub(String project, String topic, String tableName) {
		log.info(">>>>>>>> register project:{},topic:{},tableName:{}", project, topic, tableName);
		String shardId = "0";
		List<String> shardIds = Arrays.asList("0", "1");
		RecordSchema schema = datahubClient.getTopic(project, topic).getRecordSchema();
		log.info(">>>>>>schama:{}", schema);
		CreateSubscriptionResult createSubscriptionResult = datahubClient.createSubscription(project, topic,
				Constant.subscribtionComment);
		log.info(">>>>create subscription successful ,subId:{}", createSubscriptionResult.getSubId());
		String subId = createSubscriptionResult.getSubId();
		OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(project,
				topic, subId, shardIds);
		SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
		consumeData(project, topic, tableName, shardId, shardIds, schema, subId, subscriptionOffset);
	}

消费

/**
	 * 消费数据
	 * 
	 * @param project
	 * @param topic
	 * @param tableName
	 * @param shardId
	 * @param shardIds
	 * @param schema
	 * @param subId
	 * @param subscriptionOffset
	 */
	private void consumeData(String project, String topic, String tableName, String shardId, List<String> shardIds,
			RecordSchema schema, String subId, SubscriptionOffset subscriptionOffset) {
		// 1、获取当前点位的cursor,如果当前点位已过期则获取生命周期内第一条record的cursor,未消费同样获取生命周期内第一条record的cursor
		String cursor = null;
		// sequence < 0说明未消费
		if (subscriptionOffset.getSequence() < 0) {
			// 获取生命周期内第一条record的cursor
			cursor = datahubClient.getCursor(project, topic, shardId, CursorType.OLDEST).getCursor();
		} else {
			// 获取下一条记录的Cursor
			long nextSequence = subscriptionOffset.getSequence() + 1;
			try {
				// 按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
				cursor = datahubClient.getCursor(project, topic, shardId, CursorType.SEQUENCE, nextSequence)
						.getCursor();
			} catch (SeekOutOfRangeException e) {
				// 获取生命周期内第一条record的cursor
				cursor = datahubClient.getCursor(project, topic, shardId, CursorType.OLDEST).getCursor();
			}
		}
		// 2、读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
		long recordCount = 0L;
		// 每次读取10条record
		int fetchNum = 10;
		while (true) {
			try {
				GetRecordsResult getRecordsResult = datahubClient.getRecords(project, topic, shardId, schema, cursor,
						fetchNum);
				if (getRecordsResult.getRecordCount() <= 0) {
					// 无数据,sleep后读取
					Thread.sleep(1000);
					continue;
				}
				for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
					// 消费数据
					TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
					dealData(data, tableName);
					// 处理数据完成后,设置点位
					++recordCount;
					subscriptionOffset.setSequence(recordEntry.getSequence());
					subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
					if (recordCount % 1000 == 0) {
						// 提交点位点位
						Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
						offsetMap.put(shardId, subscriptionOffset);
						datahubClient.commitSubscriptionOffset(project, topic, subId, offsetMap);
						log.info("commit offset successful");
					}
				}
				cursor = getRecordsResult.getNextCursor();
			} catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
				// 退出. Offline: 订阅下线; SubscriptionSessionInvalid: 表示订阅被其他客户端同时消费
				log.error("handler table {} error ", tableName, e);
				continue;
			} catch (SubscriptionOffsetResetException e) {
				// 表示点位被重置,重新获取SubscriptionOffset信息,这里以Sequence重置为例
				// 如果以Timestamp重置,需要通过CursorType.SYSTEM_TIME获取cursor
				log.error("handler table {} error ", tableName, e);
				subscriptionOffset = datahubClient.getSubscriptionOffset(project, topic, subId, shardIds).getOffsets()
						.get(shardId);
				long nextSequence = subscriptionOffset.getSequence() + 1;
				cursor = datahubClient.getCursor(project, topic, shardId, CursorType.SEQUENCE, nextSequence)
						.getCursor();
				continue;
			} catch (DatahubClientException e) {
				log.error("handler table {} error ", tableName, e);
				subscriptionOffset = datahubClient.getSubscriptionOffset(project, topic, subId, shardIds).getOffsets()
						.get(shardId);
				long nextSequence = subscriptionOffset.getSequence() + 1;
				cursor = datahubClient.getCursor(project, topic, shardId, CursorType.SEQUENCE, nextSequence)
						.getCursor();
				continue;
			} catch (Exception e) {
				log.error("handler table {} error ", tableName, e);

				continue;
			}
		}
	}

 类似资料: