当前位置: 首页 > 工具软件 > TITAN Script > 使用案例 >

deploy, bulkLoader data, and query for titan 1.0 with cassandra cluster story

韩彦君
2023-12-01
搞了大约有半个月的titan, 只为了载入2亿点(4中类型 , 10个属性) , 10亿边(3中类型)的图数据,期间遇到了不少的坑,最终配置了cassandra集群,成功导入数据到titan1.0,并查询query

1 部署cassandra 集群

如果你的数据量不是很大,例如1000w点,2000w边,那么估计单击导入就可以,因此也不需要配置cassandra集群, 因为titan自己嵌入了cassandra, 直接 ./bin/titan.sh start 就可以启动了,默认的数据存储位置就在 db/data/ 下面

2 部署titan1.0

直接官网下载titan1.0版本,解压就能用 ,唯一需要的就是配置 jdk1.8版本 ,配置步骤如下:

$ sudo apt-get install software-properties-common

$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update

$ sudo apt-get install oracle-java8-installer

之后 设置环境变量 export JAVA_HOME=/usr/lib/jvm/java-8-oracle

3 准备点和边的数据

我的数据是有databene-benerator生成的,格式如下

point_format: 逗号分割依次为 id,label,property1~property10

1,BankCard,Mf2Ei3Cm9El8Cz0Vt6Iq5Od1Uk3Hk4,Fu6Ph3Ij2Fq6Uh1,La5Rd7Kv4Ze7Xj8Gg2Cv0Cq,Nh1Ma4Di5Ig2Mc0Zn3Xs8Af0Do0Jp0,2054,594,Ir5Gw3Hf7Ty4Tc3Gi5Ki4Pn1Ze7Rn2,1,17675474023,225169216

edge_format:(adj形式)中间有个空格,前面为source,label, 后面为到达的dist,edge_label.

1,BankCard 13:recommend,14:terminal,37:recommend

意思是id为1的点(类型是BankCard,如果和点文件中点的类型不一致,则是两个点),到达id为13 14 37号点,有向边的类型分别得后面的英文.

4 点 和 边 数据解析文件(groovy文件)

点文件解析 文件名:point.groovy
def parse(line, factory) {
    def (id, vertextype, orderno, status, apptime, status2, hstyovertime, curtovertime, identify, vlabel, phonenum, bankcardnum) = line.split(/,/).toList()
    def v1 = factory.vertex(id, vertextype)
    
    if (orderno != null) { v1.property("orderNo", orderno) }
    if (status != null) { v1.property("status", status) }
    if (apptime != null) { v1.property("appTime", apptime) }
    if (status2 != null) { v1.property("status2", status2) }
    if (hstyovertime != null) { v1.property("historyOverdueTime", Integer.valueOf(hstyovertime)) }
    if (curtovertime != null) { v1.property("currentOverdueTime", Integer.valueOf(curtovertime)) }
    if (identify != null) { v1.property("Identification", identify) }
    if (vlabel != null) { v1.property("vlabel", vlabel) }
    if (phonenum != null) { v1.property("phoneNum", phonenum) }
    if (bankcardnum != null) { v1.property("bankCardNum", bankcardnum) }
    v1.property("bulkLoader.vertex.id",id)
    return v1

}

边文件解析 文件名:edge.groovy
def parse(line, factory) {

    def parts = line.split(/ /)

    def (src, vertextype) = parts[0].split(/,/).toList()
    def v1 = factory.vertex(src, vertextype)

    if (parts.length == 2) {
        parts[1].split(/,/).grep {!it.isEmpty() }.each {
            def (dist, edgetype) = it.split(/:/).toList()
            def v2 = factory.vertex(dist)
            def edge = factory.edge(v1, v2, edgetype)
        }
    }
    return v1
}

5 使用bulkLoader 批量导入文件: 文件名 bulkLoader.groovy

path = "/home/ubuntu/titan/data"
graph = TitanFactory.open("conf/titan-cassandra.properties")

mgmt = graph.openManagement()

// define the vertex label
if (!mgmt.containsVertexLabel("ApplyInfo")) {
   ApplyInfo = mgmt.makeVertexLabel('ApplyInfo').make()
} else {
   ApplyInfo = mgmt.getVertexLabel("ApplyInfo")
}

if (!mgmt.containsVertexLabel("Terminal")) {
   Terminal = mgmt.makeVertexLabel('Terminal').make()
} else {
  Terminal = mgmt.getVertexLabel("Terminal")
}

if (!mgmt.containsVertexLabel("BankCard")) {
   BankCard = mgmt.makeVertexLabel('BankCard').make()
} else {
   BankCard = mgmt.getVertexLabel("BankCard")
}

if (!mgmt.containsVertexLabel("Mobile")) {
   Mobile = mgmt.makeVertexLabel('Mobile').make()
} else {
   Mobile = mgmt.getVertexLabel("Mobile")
}

// define the edge label
if (!mgmt.containsEdgeLabel("terminal")) {
    terminal = mgmt.makeEdgeLabel('terminal').multiplicity(MULTI).make()
} else {
    terminal = mgmt.getEdgeLabel("terminal")
}

if (!mgmt.containsEdgeLabel("bankcard")) {
    bankcard = mgmt.makeEdgeLabel('bankcard').multiplicity(MULTI).make()
} else {
    bankcard = mgmt.getEdgeLabel("bankcard")
}

if (!mgmt.containsEdgeLabel("recommend")) {
    recommend = mgmt.makeEdgeLabel('recommend').multiplicity(MULTI).make()
} else {
    recommend = mgmt.getEdgeLabel("recommend")
}

// define the vertex property

// orderNo
if (!mgmt.containsPropertyKey("orderNo")) {
    orderNo = mgmt.makePropertyKey('orderNo').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
    orderNo = mgmt.getPropertyKey("orderNo")
}

// status
if (!mgmt.containsPropertyKey("status")) {
    status = mgmt.makePropertyKey('status').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
    status = mgmt.getPropertyKey("status")
}

// appTime
if (!mgmt.containsPropertyKey("appTime")) {
    appTime = mgmt.makePropertyKey('appTime').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
    appTime = mgmt.getPropertyKey("appTime")
}

// status2
if (!mgmt.containsPropertyKey("status2")) {
    status2 = mgmt.makePropertyKey('status2').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
    status2 = mgmt.getPropertyKey("status2")
}

// historyOverdueTime
if (!mgmt.containsPropertyKey("historyOverdueTime")) {
    historyOverdueTime = mgmt.makePropertyKey('historyOverdueTime').dataType(Integer.class).cardinality(Cardinality.SINGLE).make()
} else {
    historyOverdueTime = mgmt.getPropertyKey("historyOverdueTime")
}

// currentOverdueTime
if (!mgmt.containsPropertyKey("currentOverdueTime")) {
    currentOverdueTime = mgmt.makePropertyKey('currentOverdueTime').dataType(Integer.class).cardinality(Cardinality.SINGLE).make()
} else {
    currentOverdueTime = mgmt.getPropertyKey("currentOverdueTime")
}

// Identification
if (!mgmt.containsPropertyKey("Identification")) {
    Identification = mgmt.makePropertyKey('Identification').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
    Identification = mgmt.getPropertyKey("Identification")
}

// label
if (!mgmt.containsPropertyKey("vlabel")) {
    vlabel = mgmt.makePropertyKey('vlabel').dataType(Integer.class).cardinality(Cardinality.SINGLE).make()
} else {
    vlabel = mgmt.getPropertyKey("vlabel")
}

// phoneNum
if (!mgmt.containsPropertyKey("phoneNum")) {
    phoneNum = mgmt.makePropertyKey('phoneNum').dataType(Long.class).cardinality(Cardinality.SINGLE).make()
} else {
    phoneNum = mgmt.getPropertyKey("phoneNum")
}

// bankCardNum
if (!mgmt.containsPropertyKey("bankCardNum")) {
    bankCardNum = mgmt.makePropertyKey('bankCardNum').dataType(String.class).cardinality(Cardinality.SINGLE).make()
} else {
    bankCardNum = mgmt.getPropertyKey("bankCardNum")
}

// bulkLoader.vertex.id
if (!mgmt.containsPropertyKey("bulkLoader.vertex.id")) {
    blid = mgmt.makePropertyKey("bulkLoader.vertex.id").dataType(Integer.class).make()
} else {
    blid = mgmt.getPropertyKey("bulkLoader.vertex.id")
}

// build index
byPhoneComposite = mgmt.getGraphIndex("byPhoneComposite")
byOrderComposite = mgmt.getGraphIndex("byOrderComposite")
byBulkLoaderVertexId = mgmt.getGraphIndex("byBulkLoaderVertexId")

if (byPhoneComposite == null) {
    byPhoneComposite = mgmt.buildIndex("byPhoneComposite", Vertex.class).addKey(phoneNum).buildCompositeIndex()
}

if (byOrderComposite == null) {
    byOrderComposite = mgmt.buildIndex("byOrderComposite", Vertex.class).addKey(orderNo).buildCompositeIndex()

}


if (byBulkLoaderVertexId == null) {
    byBulkLoaderVertexId = mgmt.buildIndex("byBulkLoaderVertexId", Vertex.class).addKey(blid).buildCompositeIndex()
}
mgmt.commit()

graph.close()


// load vfile
graph = GraphFactory.open('conf/hadoop-graph/hadoop-script.properties')


hdfs.copyFromLocal("${path}/point.v", "point.v")
hdfs.copyFromLocal("${path}/point.groovy", "point.groovy")

graph.configuration.setInputLocation("point.v")
graph.configuration.setProperty("gremlin.hadoop.scriptInputFormat.script", "point.groovy")

blvp = BulkLoaderVertexProgram.build().writeGraph('conf/titan-cassandra.properties').create(graph)
graph.compute(SparkGraphComputer).program(blvp).submit().get()

// load efile

        hdfs.copyFromLocal("${path}/add100_e.txt", "edge.e")
hdfs.copyFromLocal("${path}/edge.groovy", "edge.groovy")

graph.configuration.setInputLocation("edge.e")
graph.configuration.setProperty("gremlin.hadoop.scriptInputFormat.script", "edge.groovy")


blvp = BulkLoaderVertexProgram.build().writeGraph('conf/titan-cassandra.properties').create(graph)
graph.compute(SparkGraphComputer).program(blvp).submit().get()

6 执行 titan

./bin/gremlin.sh ./bulkLoader.groovy 

7 如果数据量很大,就需要配置一些参数

schema.default = none

storage.batch-loading = true
ids.block-size = 200000

ids.authority.wait = 1000

spark.master=local[4]



 类似资料: