saiku通过添加schema和datasource的形式管理对接入系统的数据源,然后提供界面作为直观的分析数据方式,界面产生mdx,由mondrian连接数据源,解析mdx和执行查询
kylin提供大规模数据的olap能力,通过saiku与kylin的对接,利用saiku的友好界面来很方面的查询
关于saiku与kylin的对接,https://github.com/mustangore/kylin-mondrian-interaction githups上这个工程通过修改mondrian添加KylinDialect对添加kylin作为数据源的支持,另外还有有赞的Kylin, Mondrian, Saiku系统的整合,通过以上mondrian的patch,添加kylin的jdbc包后即可在saiku中编写scheam定义cube并查询。
如上的整合,需要手动配置数据源,编写schema的操作,感觉比较繁琐,可以通过修改saiku的代码,到kylin中获取project和cube的各种信息,根据一定规则转换生成schema并作为数据源管理起来,这样就很直接将saiku与kylin无缝对接起来。
话不多说,上代码:
saiku在saiku-beans.xml中定义RepositoryDatasourceManager用于管理已经添加至系统中的datasource,并默认会加在foodmart和earthquaker这两个source,如下
<bean id="repositoryDsManager" class="org.saiku.service.datasource.RepositoryDatasourceManager" init-method="load" destroy-method="unload">
<property name="userService" ref="userServiceBean"/>
<property name="configurationpath" value="${repoconfig}"/>
<property name="datadir" value="${repodata}"/>
<property name="foodmartdir" value="${foodmartrepo}"/>
<property name="foodmartschema" value="${foodmartschema}"/>
<property name="foodmarturl" value="${foodmarturl}"/>
<property name="earthquakeDir" value="${earthquakerepo}"/>
<property name="earthquakeSchema" value="${earthquakeschema}"/>
<property name="earthquakeUrl" value="${earthquakeurl}"/>
<property name="repoPasswordProvider" ref ="repoPasswordProviderBean"/>
<property name="defaultRole" value="${default.role}"/>
<property name="externalPropertiesFile" value="${external.properties.file}"/>
<!-- If you change the repoPassword set this property for at least 1 restart to update the old repo password-->
<!--<property name="oldRepoPassword" value="sa!kuanalyt!cs"/>-->
<span style="white-space:pre"> </span>
</bean>
定义 public class KylinResourceDatasourceManager implements IDatasourceManager,saiku中的datasourcemanage会把加入的数据源的信息记录至虚拟文件系统中,所以在我们的datasourcemanager中依样启动虚拟文件系统,启动时,为了避免处理类似cube更新等问题,选择每次启动时全部初始化
//启动的时候删除所有schema,重新初始化,具体删除代码参考<span style="font-family: Arial, Helvetica, sans-serif;">RepositoryDatasourceManager</span>
deleteAllSchema();
deleteAllSource();
接下来就是加载kylin中的cube
List<ProjectInstance> projects = null;
try {
//请求kylin的restful接口,获取所有的project
projects = restService.httpGet("projects", new TypeReference<List<ProjectInstance>>() {
}, null);
} catch (Exception e) {
}
if (projects != null) {
//遍历project,获取每一个cube
for (ProjectInstance project : projects) {
List<CubeInstance> cubes = getCubes(project.getName());
for (CubeInstance cubeInstance : cubes) {
String newCubeName = project.getName() + "#" + cubeInstance.getName();
//获取到的cube信息存在datasource这个map中
datasources.put(newCubeName, getSaikuDatasource(newCubeName));
}
}
}
private SaikuDatasource getSaikuDatasource(String datasourceName) {
if (datasourceName.contains("#")) {
String cubeName = datasourceName.split("#")[1].trim();
CubeDesc[] cubeDescs = null;
try {
<span style="white-space:pre"> </span>//cube的具体信息获取
cubeDescs = restService.httpGet("cube_desc/" + cubeName, new TypeReference<CubeDesc[]>() {
}, null);
} catch (Exception e) {
e.printStackTrace();
}
List<DataModelDesc> modelDescs = null;
try {
<span style="white-space:pre"> </span>//cube对应的model信息
modelDescs = restService.httpGet("models", new TypeReference<List<DataModelDesc>>() {
}, new BasicNameValuePair("modelName", cubeDescs[0].getModelName()), new BasicNameValuePair("projectName", datasourceName.split("#")[0].trim()));
} catch (Exception e) {
e.printStackTrace();
}
if (cubeDescs != null && cubeDescs.length == 1 && modelDescs != null) {
try {
<span style="white-space:pre"> </span>//生成schema信息并添加到虚拟文件系统中
addSchema(SchemaUtil.createSchema(datasourceName, cubeDescs[0], modelDescs.get(0)), "/datasources/" + datasourceName.replace("#", ".") + ".xml", datasourceName);
} catch (Exception e) {
e.printStackTrace();
}
String project = new String();
if (datasourceName.contains("#")) {
project = datasourceName.split("#")[0].trim();
} else
project = datasourceName;
Properties properties = new Properties();
<span style="white-space:pre"> </span>//传给mondrian的datasouce相关信息
properties.put("location", "jdbc:mondrian:Jdbc=jdbc:kylin://" + kylinUrl + "/" + project + ";JdbcDrivers=" + KYLINE_DRIVER + ";Catalog=mondrian:///datasources/" + datasourceName.replace("#", ".") + ".xml");
properties.put("driver", "mondrian.olap4j.MondrianOlap4jDriver");
properties.put("username", userName);
properties.put("password", passWord);
properties.put("security.enabled", false);
properties.put("advanced", false);
return new SaikuDatasource(cubeName, SaikuDatasource.Type.OLAP, properties);
}
}
return null;
}
public class SchemaUtil {
private static String newLine = "\r\n";
public static String createSchema(String dataSourceName, CubeDesc cubeDesc, DataModelDesc modelDesc) {
StringBuffer sb = new StringBuffer();
sb = appendSchema(sb, dataSourceName, cubeDesc, modelDesc);
// System.out.println("********************************" + sb.toString());
return sb.toString();
}
public static StringBuffer appendSchema(StringBuffer sb, String dataSourceName, CubeDesc cubeDesc, DataModelDesc modelDesc) {
sb.append("<?xml version='1.0'?>").append(newLine)
.append("<Schema name='" + dataSourceName.split("#")[0].trim() + "' metamodelVersion='4.0'>")
// .append("<Schema name='" + dataSourceName + "' metamodelVersion='4.0'>")
.append(newLine);
sb = appendTable(sb, cubeDesc.getDimensions());
sb = appendDimension(sb, cubeDesc.getDimensions(), modelDesc);
sb = appendCube(sb, dataSourceName, cubeDesc, modelDesc);
sb.append("</Schema>").append(newLine);
return sb;
}
public static StringBuffer appendTable(StringBuffer sb, List<DimensionDesc> dimensionDescList) {
// Map<String, List<String>> table2Column = new HashMap<String, List<String>>();
Set<String> tables = getTables(dimensionDescList);
sb.append("<PhysicalSchema>").append(newLine);
for (String key : tables) {
sb.append("<Table name='" + key + "'/>").append(newLine);
}
sb.append("</PhysicalSchema>").append(newLine);
return sb;
}
public static Map<String, JoinDesc> getJoinDesc(DataModelDesc modelDesc){
Map<String, JoinDesc> joinDescMap = new HashMap<String, JoinDesc>();
for(LookupDesc lookupDesc : modelDesc.getLookups()){
if(!joinDescMap.containsKey(dealTableName(lookupDesc.getTable())))
joinDescMap.put(dealTableName(lookupDesc.getTable()), lookupDesc.getJoin());
}
return joinDescMap;
}
public static StringBuffer appendDimension(StringBuffer sb, List<DimensionDesc> dimensionDescList, DataModelDesc modelDesc) {
StringBuffer hierSb = new StringBuffer();
for (DimensionDesc dimensionDesc : dimensionDescList) {
String table = dealTableName(dimensionDesc.getTable());
Map<String, JoinDesc> joinDescMap = getJoinDesc(modelDesc);
Set<String> columns = getColumns(dimensionDesc);
if(joinDescMap.containsKey(table))
sb.append("<Dimension name='" + dimensionDesc.getName() + "' key='" + joinDescMap.get(table).getPrimaryKey()[0] + "' table='" + table + "'>").append(newLine);
else
sb.append("<Dimension name='" + dimensionDesc.getName() + "' key='" + columns.iterator().next() + "' table='" + table + "'>").append(newLine);
hierSb.append("<Hierarchies>").append(newLine);
sb.append("<Attributes>").append(newLine);
for(String column : columns){
// add Attributes to stringbuffer
sb = addAttribute(sb, column);
if(joinDescMap.containsKey(table)){
int index = getForeignKeyIndex(column,joinDescMap.get(table));
if(index != -1)
hierSb = addJoinHierarchy(hierSb, index, table, joinDescMap.get(table), dealTableName(modelDesc.getFactTable()));
else
// add Hierarchy to stringbuffer
hierSb = addHierarchy(hierSb, column);
}else
// add Hierarchy to stringbuffer
hierSb = addHierarchy(hierSb, column);
}
if(joinDescMap.containsKey(table)){
for(String primaryKey : joinDescMap.get(table).getPrimaryKey()){
if(!columns.contains(primaryKey))
sb = addAttribute(sb, primaryKey);
}
}
sb.append("</Attributes>").append(newLine);
hierSb.append("</Hierarchies>").append(newLine);
sb.append(hierSb);
hierSb.delete(0, hierSb.length());
sb.append("</Dimension>").append(newLine);
}
return sb;
}
public static Set<String> getColumns(DimensionDesc dimensionDesc){
Set<String> columns = new HashSet<String>();
if (dimensionDesc.getColumn() != null || dimensionDesc.getDerived() != null) {
if(dimensionDesc.getColumn() != null) {
// for (String column : dimensionDesc.getColumn()) {
columns.add(dimensionDesc.getColumn());
// }
}
if (dimensionDesc.getDerived() != null) {
for (String derived : dimensionDesc.getDerived()) {
columns.add(derived);
}
}
} else {
columns.add(dimensionDesc.getName());
}
return columns;
}
public static StringBuffer addAttribute(StringBuffer sb, String attr) {
sb.append("<Attribute hasHierarchy='false' levelType='Regular' name='" + attr + "'>").append(newLine)
.append("<Key>").append(newLine)
.append("<Column name='" + attr + "'/>").append(newLine)
.append("</Key>").append(newLine)
.append("</Attribute>").append(newLine);
return sb;
}
public static StringBuffer addHierarchy(StringBuffer sb, String attr) {
sb.append("<Hierarchy name='" + attr + "' hasAll='true'>").append(newLine)
.append("<Level attribute='" + attr + "'/>").append(newLine)
.append("</Hierarchy>").append(newLine);
return sb;
}
public static StringBuffer addJoinHierarchy(StringBuffer sb, int index, String table, JoinDesc joinDesc, String factTable){
sb.append("<Hierarchy hasAll='true' name='"+ joinDesc.getPrimaryKey()[index] +"'>").append(newLine)
.append("<Join leftKey='"+ joinDesc.getPrimaryKey()[index] +"' rightKey='"+ joinDesc.getForeignKey()[index] +"'>").append(newLine)
.append("<Table name='"+ factTable +"'/>").append(newLine)
.append("<Table name='"+ table +"'/>").append(newLine)
.append("<RelationOrJoin type='"+ joinDesc.getType() +"' />").append(newLine)
.append("</Join>").append(newLine)
.append("<Level attribute='" + joinDesc.getForeignKey()[index] + "'/>").append(newLine)
.append("</Hierarchy>").append(newLine);
return sb;
}
public static int getForeignKeyIndex(String attr, JoinDesc joinDesc){
for(int i=0; i<joinDesc.getPrimaryKey().length; i++){
if(joinDesc.getPrimaryKey()[i].equals(attr)){
return i;
}
}
return -1;
}
public static String dealTableName(String tableName){
if(tableName.contains("."))
return tableName.split("\\.")[1];
else
return tableName;
}
public static StringBuffer appendCube(StringBuffer sb, String cubeName, CubeDesc cubeDesc, DataModelDesc modelDesc) {
sb.append("<Cube name='" + cubeName.split("#")[1].trim() + "'>").append(newLine);
sb = addCubeDimension(sb, cubeDesc.getDimensions());
sb.append("<MeasureGroups>").append(newLine);
// Set<String> tables = getTables(cubeDesc.getDimensions());
// for(String table : tables) {
sb.append("<MeasureGroup table='" + dealTableName(modelDesc.getFactTable()) + "'>").append(newLine);
sb = addDimensionLink(sb, cubeDesc.getDimensions(), modelDesc);
// if(table.equals(modelDesc.getFactTable().trim())) {
sb.append("<Measures>").append(newLine);
for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
// sb.append("<MeasureGroup>").append(newLine);
sb = addMeasure(sb, measureDesc, getColumn(cubeDesc));
}
sb.append("</Measures>").append(newLine);
// }
sb.append("</MeasureGroup>").append(newLine);
// }
sb.append("</MeasureGroups>").append(newLine);
sb.append("</Cube>").append(newLine);
return sb;
}
public static StringBuffer addCubeDimension(StringBuffer sb, List<DimensionDesc> dimensionDescs) {
sb.append("<Dimensions>").append(newLine);
for (DimensionDesc dimensionDesc : dimensionDescs) {
sb.append("<Dimension source='" + dimensionDesc.getName() + "' visible='true'/>").append(newLine);
}
sb.append("</Dimensions>").append(newLine);
return sb;
}
public static StringBuffer addDimensionLink(StringBuffer sb, List<DimensionDesc> dimensionDescs, DataModelDesc modelDesc){
sb.append("<DimensionLinks>" ).append(newLine);
for(DimensionDesc dimensionDesc : dimensionDescs) {
if(dimensionDesc.getTable().contains(modelDesc.getFactTable())) {
sb.append("<FactLink dimension='" + dimensionDesc.getName() + "'/>").append(newLine);
}else{
LookupDesc[] lookupDescs = modelDesc.getLookups();
for(LookupDesc lookupDesc : lookupDescs){
if(dimensionDesc.getTable().contains(lookupDesc.getTable())){
for(String primaryKey : lookupDesc.getJoin().getPrimaryKey())
sb.append("<ForeignKeyLink dimension='" + dimensionDesc.getName() + "' foreignKeyColumn='"+ primaryKey +"'/>").append(newLine);
}
}
}
}
sb.append(" </DimensionLinks>").append(newLine);
return sb;
}
public static StringBuffer addMeasure(StringBuffer sb, MeasureDesc measureDesc, String defaultColumn) {
FunctionDesc funtionDesc = measureDesc.getFunction();
String aggregator = funtionDesc.getExpression().trim().toLowerCase();
//mondrian only have distinct-count
if(aggregator.equals("count_distinct")){
aggregator = "distinct-count";
}
if(funtionDesc.getParameter().getValue().equals("1")) {
sb.append("<Measure aggregator='" + aggregator + "' column='" + defaultColumn + "' name='" + measureDesc.getName() + "' visible='true'/>")
.append(newLine);
}
else
sb.append("<Measure aggregator='" + aggregator + "' column='" + funtionDesc.getParameter().getValue() + "' name='" + measureDesc.getName() + "' visible='true'/>")
.append(newLine);
return sb;
}
public static Set<String> getTables(List<DimensionDesc> dimensionDescList){
Set<String> tables = new HashSet<String>();
for (DimensionDesc dimensionDesc : dimensionDescList) {
String table = dealTableName(dimensionDesc.getTable());
if (!tables.contains(table)) {
tables.add(table);
}
}
return tables;
}
public static String getColumn(CubeDesc cubeDesc){
RowKeyDesc rowKey = cubeDesc.getRowkey();
return rowKey.getRowKeyColumns()[0].getColumn();
}
}
然后就是初始化的时候另起线程,检查是否有新添加的cube,有的话加入
if (projects != null) {
for (ProjectInstance project : projects) {
List<CubeInstance> cubes = getCubes(project.getName());
for (CubeInstance cubeInstance : cubes) {
String newCubeName = project.getName() + "#" + cubeInstance.getName();
if (!datasources.containsKey(newCubeName))
datasources.put(newCubeName, getSaikuDatasource(newCubeName));
}
}
}
这样就可以通过saiku愉快的查询kylin的cube了
相关工程:
saiku 3.8.8 https://github.com/OSBI/saiku
mondrian 4.4 https://github.com/pentaho/mondrian
kylin 1.5.3 https://github.com/apache/kylin