因为qsql-server会根据runnerType来判断,如果不配置双数据源的话,到时使用到spark时会出现强制转换错误。
switch (runnerType) {
case DEFAULT:
resultSet = getJDBCResultSet(h, runner, sql);
break;
case JDBC:
resultSet = getJDBCResultSet(h, runner, sql);
break;
case SPARK:
resultSet = getSparkResultSet(h, runner, sql);
break;
case FLINK:
resultSet = getFlinkResultSet(h, runner, sql);
break;
}
jdbc
ResultSet resultSet = (ResultSet) runner.sql(sql).collect();;
spark
Map.Entry<List<Attribute>, List<GenericRowWithSchema>> sparkData = (Entry<List<Attribute>, List<GenericRowWithSchema>>) runner
.sql(sql).collect();
修改ColumnMetaData的构造函数中label的赋值。
com.qihoo.qsql.server.QuicksqlServerMeta
public QueryResult executeQuery(Object object) throws Exception {
if (object == null) {
return new QueryResult(new ArrayList<>(), new ArrayList<>());
}
Map.Entry<List<Attribute>, List<GenericRowWithSchema>> sparkData = (Entry<List<Attribute>, List<GenericRowWithSchema>>) object;
if (sparkData == null || CollectionUtils.isEmpty(sparkData.getKey())) {
throw new SparkException("collect data error");
}
List<Attribute> attributes = sparkData.getKey();
List<GenericRowWithSchema> value = sparkData.getValue();
List<Object> data = new ArrayList<>();
List<ColumnMetaData> meta = new ArrayList<>();
value.stream().forEach(column -> {
data.add(column.values());
});
//修改ColumnMetaData,由于之前默认别名为空,我把他设置为列名,quicksql在spark情况中没有处理有别名的情况,直接设置为空。
attributes.stream().forEach(attribute -> {
ScalarType columnType = getColumnType(attribute.dataType());
meta.add(new ColumnMetaData(0, false, true, false, false,
1, true, -1, (String) attribute.name(), attribute.name(), (String) null, -1, -1, (String) null, (String) null,
columnType, true, false, false, columnType.columnClassName()));
});
return new QueryResult(meta, data);
}
原因:因为mybatis使用到映射时优先选用别名
org.apache.ibatis.executor.resultset.ResultSetWrapper
public ResultSetWrapper(ResultSet rs, Configuration configuration) throws SQLException {
super();
this.typeHandlerRegistry = configuration.getTypeHandlerRegistry();
this.resultSet = rs;
final ResultSetMetaData metaData = rs.getMetaData();
final int columnCount = metaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
//server端不修改的话,这里到时会存储null
columnNames.add(configuration.isUseColumnLabel() ? metaData.getColumnLabel(i) : metaData.getColumnName(i));
jdbcTypes.add(JdbcType.forCode(metaData.getColumnType(i)));
classNames.add(metaData.getColumnClassName(i));
}
}
private void loadMappedAndUnmappedColumnNames(ResultMap resultMap, String columnPrefix) throws SQLException {
List<String> mappedColumnNames = new ArrayList<>();
List<String> unmappedColumnNames = new ArrayList<>();
final String upperColumnPrefix = columnPrefix == null ? null : columnPrefix.toUpperCase(Locale.ENGLISH);
final Set<String> mappedColumns = prependPrefixes(resultMap.getMappedColumns(), upperColumnPrefix);
for (String columnName : columnNames) {
//这里会报空指针
final String upperColumnName = columnName.toUpperCase(Locale.ENGLISH);
if (mappedColumns.contains(upperColumnName)) {
mappedColumnNames.add(upperColumnName);
} else {
unmappedColumnNames.add(columnName);
}
}