当前位置: 首页 > 工具软件 > Saiku > 使用案例 >

saiku无缝对接kylin

游高杰
2023-12-01

saiku通过添加schema和datasource的形式管理对接入系统的数据源,然后提供界面作为直观的分析数据方式,界面产生mdx,由mondrian连接数据源,解析mdx和执行查询

kylin提供大规模数据的olap能力,通过saiku与kylin的对接,利用saiku的友好界面来很方面的查询

        关于saiku与kylin的对接,https://github.com/mustangore/kylin-mondrian-interaction githups上这个工程通过修改mondrian添加KylinDialect对添加kylin作为数据源的支持,另外还有有赞的Kylin, Mondrian, Saiku系统的整合,通过以上mondrian的patch,添加kylin的jdbc包后即可在saiku中编写scheam定义cube并查询。

如上的整合,需要手动配置数据源,编写schema的操作,感觉比较繁琐,可以通过修改saiku的代码,到kylin中获取project和cube的各种信息,根据一定规则转换生成schema并作为数据源管理起来,这样就很直接将saiku与kylin无缝对接起来。

话不多说,上代码:

saiku在saiku-beans.xml中定义RepositoryDatasourceManager用于管理已经添加至系统中的datasource,并默认会加在foodmart和earthquaker这两个source,如下

<bean id="repositoryDsManager" class="org.saiku.service.datasource.RepositoryDatasourceManager" init-method="load" destroy-method="unload">
        <property name="userService" ref="userServiceBean"/>
        <property name="configurationpath" value="${repoconfig}"/>
        <property name="datadir" value="${repodata}"/>
        <property name="foodmartdir" value="${foodmartrepo}"/>
        <property name="foodmartschema" value="${foodmartschema}"/>
        <property name="foodmarturl" value="${foodmarturl}"/>
        <property name="earthquakeDir" value="${earthquakerepo}"/>
        <property name="earthquakeSchema" value="${earthquakeschema}"/>
        <property name="earthquakeUrl" value="${earthquakeurl}"/>
        <property name="repoPasswordProvider" ref ="repoPasswordProviderBean"/>
        <property name="defaultRole" value="${default.role}"/>
        <property name="externalPropertiesFile" value="${external.properties.file}"/>
        <!-- If you change the repoPassword set this property for at least 1 restart to update the old repo password-->
        <!--<property name="oldRepoPassword" value="sa!kuanalyt!cs"/>-->
<span style="white-space:pre">	</span>
    </bean>

我们要自动加载kylin中cube作为数据源,所以需要重新定义一个datasourceManager来帮我们加载和管理kylin中那些数据源

定义 public class KylinResourceDatasourceManager implements IDatasourceManager,saiku中的datasourcemanage会把加入的数据源的信息记录至虚拟文件系统中,所以在我们的datasourcemanager中依样启动虚拟文件系统,启动时,为了避免处理类似cube更新等问题,选择每次启动时全部初始化

 //启动的时候删除所有schema,重新初始化,具体删除代码参考<span style="font-family: Arial, Helvetica, sans-serif;">RepositoryDatasourceManager</span>
        deleteAllSchema();
        deleteAllSource();
接下来就是加载kylin中的cube

List<ProjectInstance> projects = null;
            try {
		//请求kylin的restful接口,获取所有的project
                projects = restService.httpGet("projects", new TypeReference<List<ProjectInstance>>() {
                }, null);
            } catch (Exception e) {

            }
            if (projects != null) {
		//遍历project,获取每一个cube
                for (ProjectInstance project : projects) {
                    List<CubeInstance> cubes = getCubes(project.getName());
                    for (CubeInstance cubeInstance : cubes) {
                        String newCubeName = project.getName() + "#" + cubeInstance.getName();
			//获取到的cube信息存在datasource这个map中
                        datasources.put(newCubeName, getSaikuDatasource(newCubeName));
                    }
                }
            }

private SaikuDatasource getSaikuDatasource(String datasourceName) {
        if (datasourceName.contains("#")) {
            String cubeName = datasourceName.split("#")[1].trim();

            CubeDesc[] cubeDescs = null;
            try {
<span style="white-space:pre">		</span>//cube的具体信息获取
                cubeDescs = restService.httpGet("cube_desc/" + cubeName, new TypeReference<CubeDesc[]>() {
                }, null);
            } catch (Exception e) {
                e.printStackTrace();
            }
            List<DataModelDesc> modelDescs = null;
            try {
<span style="white-space:pre">		</span>//cube对应的model信息
                modelDescs = restService.httpGet("models", new TypeReference<List<DataModelDesc>>() {
                }, new BasicNameValuePair("modelName", cubeDescs[0].getModelName()), new BasicNameValuePair("projectName", datasourceName.split("#")[0].trim()));
            } catch (Exception e) {
                e.printStackTrace();
            }
            if (cubeDescs != null && cubeDescs.length == 1 && modelDescs != null) {
                try {
<span style="white-space:pre">		</span>//生成schema信息并添加到虚拟文件系统中
                    addSchema(SchemaUtil.createSchema(datasourceName, cubeDescs[0], modelDescs.get(0)), "/datasources/" + datasourceName.replace("#", ".") + ".xml", datasourceName);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                String project = new String();
                if (datasourceName.contains("#")) {
                    project = datasourceName.split("#")[0].trim();
                } else
                    project = datasourceName;

                Properties properties = new Properties();
<span style="white-space:pre">		</span>//传给mondrian的datasouce相关信息
                properties.put("location", "jdbc:mondrian:Jdbc=jdbc:kylin://" + kylinUrl + "/" + project + ";JdbcDrivers=" + KYLINE_DRIVER + ";Catalog=mondrian:///datasources/" + datasourceName.replace("#", ".") + ".xml");
                properties.put("driver", "mondrian.olap4j.MondrianOlap4jDriver");
                properties.put("username", userName);
                properties.put("password", passWord);
                properties.put("security.enabled", false);
                properties.put("advanced", false);
                return new SaikuDatasource(cubeName, SaikuDatasource.Type.OLAP, properties);
            }
        }
        return null;
    }
public class SchemaUtil {

    private static String newLine = "\r\n";

    public static String createSchema(String dataSourceName, CubeDesc cubeDesc, DataModelDesc modelDesc) {
        StringBuffer sb = new StringBuffer();
        sb = appendSchema(sb, dataSourceName, cubeDesc, modelDesc);
//        System.out.println("********************************" + sb.toString());
        return sb.toString();
    }

    public static StringBuffer appendSchema(StringBuffer sb, String dataSourceName, CubeDesc cubeDesc, DataModelDesc modelDesc) {
        sb.append("<?xml version='1.0'?>").append(newLine)
                .append("<Schema name='" + dataSourceName.split("#")[0].trim() + "' metamodelVersion='4.0'>")
//                .append("<Schema name='" + dataSourceName + "' metamodelVersion='4.0'>")
                .append(newLine);
        sb = appendTable(sb, cubeDesc.getDimensions());
        sb = appendDimension(sb, cubeDesc.getDimensions(), modelDesc);
        sb = appendCube(sb, dataSourceName, cubeDesc, modelDesc);
        sb.append("</Schema>").append(newLine);
        return sb;
    }

    public static StringBuffer appendTable(StringBuffer sb, List<DimensionDesc> dimensionDescList) {
//        Map<String, List<String>> table2Column = new HashMap<String, List<String>>();
        Set<String> tables = getTables(dimensionDescList);
        sb.append("<PhysicalSchema>").append(newLine);
        for (String key : tables) {
            sb.append("<Table name='" + key + "'/>").append(newLine);
        }
        sb.append("</PhysicalSchema>").append(newLine);
        return sb;
    }

    public static Map<String, JoinDesc> getJoinDesc(DataModelDesc modelDesc){
        Map<String, JoinDesc> joinDescMap = new HashMap<String, JoinDesc>();
        for(LookupDesc lookupDesc : modelDesc.getLookups()){
            if(!joinDescMap.containsKey(dealTableName(lookupDesc.getTable())))
                joinDescMap.put(dealTableName(lookupDesc.getTable()), lookupDesc.getJoin());
        }
        return joinDescMap;
    }

    public static StringBuffer appendDimension(StringBuffer sb, List<DimensionDesc> dimensionDescList, DataModelDesc modelDesc) {
        StringBuffer hierSb = new StringBuffer();
        for (DimensionDesc dimensionDesc : dimensionDescList) {
            String table = dealTableName(dimensionDesc.getTable());
            Map<String, JoinDesc> joinDescMap = getJoinDesc(modelDesc);
            Set<String> columns = getColumns(dimensionDesc);
            if(joinDescMap.containsKey(table))
                sb.append("<Dimension name='" + dimensionDesc.getName() + "' key='" + joinDescMap.get(table).getPrimaryKey()[0] + "' table='" + table  + "'>").append(newLine);
            else
                sb.append("<Dimension name='" + dimensionDesc.getName() + "' key='" + columns.iterator().next() + "' table='" + table  + "'>").append(newLine);
            hierSb.append("<Hierarchies>").append(newLine);
            sb.append("<Attributes>").append(newLine);
            for(String column : columns){
                // add Attributes to stringbuffer
                sb = addAttribute(sb, column);

                if(joinDescMap.containsKey(table)){
                    int index = getForeignKeyIndex(column,joinDescMap.get(table));
                    if(index != -1)
                        hierSb = addJoinHierarchy(hierSb, index, table, joinDescMap.get(table), dealTableName(modelDesc.getFactTable()));
                    else
                        // add Hierarchy to stringbuffer
                        hierSb = addHierarchy(hierSb, column);
                }else
                    // add Hierarchy to stringbuffer
                    hierSb = addHierarchy(hierSb, column);
            }
            if(joinDescMap.containsKey(table)){
                for(String primaryKey : joinDescMap.get(table).getPrimaryKey()){
                    if(!columns.contains(primaryKey))
                        sb = addAttribute(sb, primaryKey);
                }
            }
            sb.append("</Attributes>").append(newLine);
            hierSb.append("</Hierarchies>").append(newLine);
            sb.append(hierSb);
            hierSb.delete(0, hierSb.length());
            sb.append("</Dimension>").append(newLine);
        }
        return sb;
    }

    public static Set<String> getColumns(DimensionDesc dimensionDesc){
        Set<String> columns = new HashSet<String>();
        if (dimensionDesc.getColumn() != null || dimensionDesc.getDerived() != null) {
            if(dimensionDesc.getColumn() != null) {
//                for (String column : dimensionDesc.getColumn()) {
                    columns.add(dimensionDesc.getColumn());
//                }
            }
            if (dimensionDesc.getDerived() != null) {
                for (String derived : dimensionDesc.getDerived()) {
                    columns.add(derived);
                }
            }
        } else {
            columns.add(dimensionDesc.getName());
        }
        return columns;
    }

    public static StringBuffer addAttribute(StringBuffer sb, String attr) {
        sb.append("<Attribute hasHierarchy='false' levelType='Regular' name='" + attr + "'>").append(newLine)
                .append("<Key>").append(newLine)
                .append("<Column name='" + attr + "'/>").append(newLine)
                .append("</Key>").append(newLine)
                .append("</Attribute>").append(newLine);
        return sb;
    }

    public static StringBuffer addHierarchy(StringBuffer sb, String attr) {
        sb.append("<Hierarchy  name='" + attr + "' hasAll='true'>").append(newLine)
                .append("<Level attribute='" + attr + "'/>").append(newLine)
                .append("</Hierarchy>").append(newLine);
        return sb;
    }

    public static StringBuffer addJoinHierarchy(StringBuffer sb, int index, String table, JoinDesc joinDesc, String factTable){
        sb.append("<Hierarchy hasAll='true' name='"+ joinDesc.getPrimaryKey()[index] +"'>").append(newLine)
        .append("<Join leftKey='"+ joinDesc.getPrimaryKey()[index] +"' rightKey='"+  joinDesc.getForeignKey()[index]   +"'>").append(newLine)
        .append("<Table name='"+ factTable +"'/>").append(newLine)
        .append("<Table name='"+ table +"'/>").append(newLine)
        .append("<RelationOrJoin type='"+ joinDesc.getType() +"' />").append(newLine)
        .append("</Join>").append(newLine)
        .append("<Level attribute='" + joinDesc.getForeignKey()[index] + "'/>").append(newLine)
        .append("</Hierarchy>").append(newLine);
        return sb;
    }

    public static int getForeignKeyIndex(String attr, JoinDesc joinDesc){
        for(int i=0; i<joinDesc.getPrimaryKey().length; i++){
            if(joinDesc.getPrimaryKey()[i].equals(attr)){
                return i;
            }
        }
        return -1;
    }


    public static String dealTableName(String tableName){
        if(tableName.contains("."))
            return tableName.split("\\.")[1];
        else
            return tableName;
    }

    public static StringBuffer appendCube(StringBuffer sb, String cubeName, CubeDesc cubeDesc, DataModelDesc modelDesc) {
        sb.append("<Cube name='" + cubeName.split("#")[1].trim() + "'>").append(newLine);
        sb = addCubeDimension(sb, cubeDesc.getDimensions());
        sb.append("<MeasureGroups>").append(newLine);
//        Set<String> tables = getTables(cubeDesc.getDimensions());
//        for(String table : tables) {
            sb.append("<MeasureGroup table='" + dealTableName(modelDesc.getFactTable()) + "'>").append(newLine);
            sb = addDimensionLink(sb, cubeDesc.getDimensions(), modelDesc);
//            if(table.equals(modelDesc.getFactTable().trim())) {
                sb.append("<Measures>").append(newLine);
                for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
                    //            sb.append("<MeasureGroup>").append(newLine);
                    sb = addMeasure(sb, measureDesc, getColumn(cubeDesc));
                }
                sb.append("</Measures>").append(newLine);
//            }
            sb.append("</MeasureGroup>").append(newLine);
//        }
        sb.append("</MeasureGroups>").append(newLine);
        sb.append("</Cube>").append(newLine);
        return sb;
    }

    public static StringBuffer addCubeDimension(StringBuffer sb, List<DimensionDesc> dimensionDescs) {
        sb.append("<Dimensions>").append(newLine);
        for (DimensionDesc dimensionDesc : dimensionDescs) {
            sb.append("<Dimension source='" + dimensionDesc.getName() + "' visible='true'/>").append(newLine);
        }
        sb.append("</Dimensions>").append(newLine);
        return sb;
    }

    public static StringBuffer addDimensionLink(StringBuffer sb, List<DimensionDesc> dimensionDescs, DataModelDesc modelDesc){
        sb.append("<DimensionLinks>" ).append(newLine);
        for(DimensionDesc dimensionDesc : dimensionDescs) {
            if(dimensionDesc.getTable().contains(modelDesc.getFactTable())) {
                sb.append("<FactLink dimension='" + dimensionDesc.getName() + "'/>").append(newLine);
            }else{
                LookupDesc[] lookupDescs = modelDesc.getLookups();
                for(LookupDesc lookupDesc : lookupDescs){
                    if(dimensionDesc.getTable().contains(lookupDesc.getTable())){
                        for(String primaryKey : lookupDesc.getJoin().getPrimaryKey())
                            sb.append("<ForeignKeyLink dimension='" + dimensionDesc.getName() + "' foreignKeyColumn='"+  primaryKey +"'/>").append(newLine);
                    }
                }
            }
        }
        sb.append(" </DimensionLinks>").append(newLine);
        return sb;
    }

    public static StringBuffer addMeasure(StringBuffer sb, MeasureDesc measureDesc, String defaultColumn) {
        FunctionDesc funtionDesc = measureDesc.getFunction();
        String aggregator = funtionDesc.getExpression().trim().toLowerCase();
        //mondrian only have distinct-count
        if(aggregator.equals("count_distinct")){
            aggregator = "distinct-count";
        }
        if(funtionDesc.getParameter().getValue().equals("1")) {
            sb.append("<Measure aggregator='" + aggregator + "' column='" + defaultColumn + "' name='" + measureDesc.getName() + "' visible='true'/>")
                    .append(newLine);
        }
        else
            sb.append("<Measure aggregator='" + aggregator + "' column='" + funtionDesc.getParameter().getValue() + "' name='" + measureDesc.getName() + "' visible='true'/>")
                    .append(newLine);
        return sb;
    }

    public static Set<String> getTables(List<DimensionDesc> dimensionDescList){
        Set<String> tables = new HashSet<String>();
        for (DimensionDesc dimensionDesc : dimensionDescList) {
            String table = dealTableName(dimensionDesc.getTable());
            if (!tables.contains(table)) {
                tables.add(table);
            }
        }
        return tables;
    }

    public static String getColumn(CubeDesc cubeDesc){
        RowKeyDesc rowKey = cubeDesc.getRowkey();
        return rowKey.getRowKeyColumns()[0].getColumn();
    }

}
然后就是初始化的时候另起线程,检查是否有新添加的cube,有的话加入

if (projects != null) {
    for (ProjectInstance project : projects) {
        List<CubeInstance> cubes = getCubes(project.getName());
        for (CubeInstance cubeInstance : cubes) {
            String newCubeName = project.getName() + "#" + cubeInstance.getName();

            if (!datasources.containsKey(newCubeName))
              datasources.put(newCubeName, getSaikuDatasource(newCubeName));
            }
           }
 }
这样就可以通过saiku愉快的查询kylin的cube了

相关工程:

saiku 3.8.8  https://github.com/OSBI/saiku

mondrian 4.4 https://github.com/pentaho/mondrian

kylin 1.5.3 https://github.com/apache/kylin


 类似资料: