核心节点
overlord
middleManager
broke
historical
coordinator
由overlord节点统一管理,负责接收任务,分发任务给middleManager节点,middleManager启动task执行
一、提交任务
http://<OVERLORD_IP>:<port>/druid/indexer/v1/task #提交任务
eg:
curl -X 'POST' -H 'Content-Type:application/json' -d @realtime_task.json xxx:xxx/druid/indexer/v1/task
其中realtime_task.json是任务相关的配置
二、kill任务
http://<OVERLORD_IP>:<port>/druid/indexer/v1/task/{taskId}/shutdown
三、查看任务状态
http://<OVERLORD_IP>:<port>/druid/indexer/v1/task/{taskId}/status
四、管理页面
http://<OVERLORD_IP>:<port>/console.html
五、coordinator节点配置Load规则(realtime data to historical )
打开coordinator 节点管理页面
上图可以看到 historical集群状态(多集群模式)
有集群节点个数,集群存储情况(细化到每个节点)
其中在historical节点配置中,一个druid.server.tier name为一个historical集群,每个datasouce可以配置相应地规则,load到一个或多个historical集群
规则介绍
load:
如图:配置load最近7天的数据
如图:配置drop最近8天的数据
解释:coordinator 会根据以上规则,删除前8天至前7天的数据
历史节点就会保留最近7天的数据,过期则删除
其中,选择interval 自定义区间的数据drop和load规则
六、配置详解
{
"type":"index_realtime”,
#任务类型 overlord集群支持 local(本地) hadoop(hadoop mr) realtime(实时节点)等模式
"resource”:{
#用于高可用性为目
"availabilityGroup":"www_analyze_test_group”,
#用于分组 复制,一般情况可不开
"requiredCapacity”:1
#多少个middleManager去启动
},
"spec": {
"dataSchema" : {
#数据解析规则,非常重要
"dataSource" : "www-analyze-edge”,
#dataSource name
"parser" : {
"type" : "string”,
"parseSpec" : {
"format" : "json”,
#数据源格式
"timestampSpec" : {
"column" : "timestamp”,
#数据源实时列 列名
"format" : "posix”
#数据源时间列类型
},
"dimensionsSpec" : {
"dimensions”:
#dimensions列名 列表
["product","sip","httpcode","domain","metric","province","sizerank","filetyperank","speedrank","timerank","category","idc","isp","hitormiss"],
"dimensionExclusions" :[] ,
"spatialDimensions" : []
}
}
},
"metricsSpec" : [{
#metrics列名 列表 及类型配置
"type" : "count",
"name" : "count"
}
, {
"type" : "longSum",
"name" : "hits",
"fieldName" : "hits"
}
, {
"type" : "longSum",
"name" : "bits",
"fieldName" : "bits"
}
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "hour”,
#数据分段规则
"queryGranularity" : "minute”
#数据存储粒度(可用于聚合)
}
},
"ioConfig" : {
"type" : "realtime",
"firehose": {
"type": "kafka-0.8”,
#实时节点读kafka模式,相关配置可参照kafka官网
"consumerProps": {
"zookeeper.connect": "xxxx”,
"zookeeper.connection.timeout.ms" : "15000",
"zookeeper.session.timeout.ms" : "15000",
"zookeeper.sync.time.ms" : "5000",
"group.id": "www-edge-check",
"fetch.message.max.bytes" : "1048586",
"auto.offset.reset": "largest”,
"auto.commit.enable": "true"
},
"feed": “xxx”
#kafka topic name
}
},
"tuningConfig": {
"type" : "realtime",
"maxRowsInMemory": 5000000,
#数据在内存中最大保留的行数
"intermediatePersistPeriod": "PT2m”,
#内存中数据load到磁盘中的间隔时间,建议在测试时配置一个比较小的值,因为该值是消费则commit的时间间隔 ,如果值较高时,consumerProps有成员加入或退出时,会促发消费者的rebalance操作,此时同组的某些消费者消费了部分数据,还没有commit,就直接rebalance,会导致该部分数据重复读。
"windowPeriod": "PT480m”,
#可以理解为实时节点load到historical节点的时间,建议配置为数据源的延时时间。最好较小。如果配置过大会导致实时节点负载较高,因为historical承担主要的查询功能,该值配置过高会导致实时节点的数据过多,影响性能,甚至会导致实时节点假死
"basePersistDirectory": "\/data0\/tmp\/overlord_realtime\/basePersist_1”,
#本地存放数据的目录
"rejectionPolicy": {
"type": "serverTime"
},
"shardSpec": {
"type": "linear",
"partitionNum": 0
#分区号,可用作复制。broke节点会将查询发至各个实时节点的各个分区,若分区号相同则只会随机选择其中一个分区
}
}
}
}