保证日志采集系统flume进程的稳定和出现问题后能及时修复,需对flume进程进行监控。flume目前提供的几种数据监控类型: JMX Reporting、Ganglia Reporting、JSON Reporting、Custom Reporting等。
本文通过Custom Reporting实现自定义数据上报,代码实现并不复杂,但是网上关于Custom Reporting实现细节、怎么调用flume数据监控接口的资料很少,所以特发表这篇文章以备大家不时之需。
flume各组件的指标监控数据 主动上报至公司监控平台Hubble3,通过在Hubble3上配置报警策略,flume进程数据传输量的突增或突减都会触发报警通知,方便开发人员及时发现并修复问题。
调用flume监控数据的核心代码:Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
另外:Flume暂时应该没有提供单位时间内的各组件的数据传输量,所以采用一个数组来缓存最近12分钟的数据,从而计算出十分钟的流量,并上报至监控平台。
完整代码:
1、新建java项目,引入flume相关依赖。
maven项目的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.momo.game.data</groupId>
<artifactId>custom-flume</artifactId>
<version>1.6.0</version>
<name>Custom Flume Kafka Sink</name>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
<version>1.6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
<scope>provided</scope>
</dependency>
<!-- hubble3报警相关依赖-->
<dependency>
<groupId>com.immomo.hubble</groupId>
<artifactId>hubble-client</artifactId>
<version>3.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.immomo.env</groupId>
<artifactId>momo-env</artifactId>
<version>1.0.3-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
2、HubbleMessageSend.java
说明:代码中将flume进程的监控数据重新组合格式化后发送给了监控平台,包括近十分钟数据传输量等。
package com.momo.game.data.utils;
import com.immomo.env.MomoEnv;
import com.immomo.hubble.client.HubbleClient;
import com.immomo.hubble.client.HubbleClientFactory;
import com.immomo.hubble.client.common.MonitorSource;
import com.immomo.hubble.client.monitor.GaugeMonitor;
import org.apache.flume.Context;
import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.util.JMXPollUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* @author jonathon
* @date 2019-03-11
* @desc flume自定义数据上报hubble3
*/
public class Hubble3Reporting implements MonitorService {
private static final Logger logger = LoggerFactory.getLogger(Hubble3Reporting.class);
public static String appKey;
public static String processName;
//自定义统计指标
private static String customKey = "EventDrainSuccessCount";
private static Double[] twelveArray = new Double[12];
//agent每隔minsUnit分钟,传输成功的event数量minsEventsCount
private Double minsEventsCount = 0D;
private int minsUnit = 10;
@Override
public void start() {
logger.info("自定义数据监控——开始执行!");
//初始化数组
for (int i = 0; i < twelveArray.length; i++) {
twelveArray[i] = 0D;
}
while (true) {
//发送flume报警
HubbleMessageSend();
try {
//1分钟发送一次数据
Thread.sleep(60 * 1000);
} catch (InterruptedException e) {
logger.error(e.toString());
}
}
}
@Override
public void stop() {
logger.info("自定义数据监控——执行结束!");
}
@Override
public void configure(Context context) {
logger.info("自定义数据监控——配置初始化!");
String appKeyPath = System.getProperty("appkey_path");
System.setProperty("momo.app.file", appKeyPath);
logger.info("app.yaml文件位置:" + System.getProperty("momo.app.file"));
appKey = MomoEnv.appKey();
logger.info("Flume报警——appKey:" + appKey);
processName = System.getProperty("flume_process_name");
logger.info("flume进程名称:" + processName);
}
public void HubbleMessageSend() {
try {
String[] ipHost = getIpAndHost();
HubbleClient business_client = HubbleClientFactory.getHubbleClientBySource(MonitorSource.BUSINESS);
Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
Map<String, String> tags = new HashMap<String, String>();
Iterator iter = metricsMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
Object key = entry.getKey();
Map<String, String> val = (Map<String, String>) entry.getValue();
Iterator valIter = val.entrySet().iterator();
while (valIter.hasNext()) {
Map.Entry valEntry = (Map.Entry) valIter.next();
Object vKey = valEntry.getKey();
Double vValue = 0D;
try {
vValue = Double.parseDouble(valEntry.getValue().toString());
String hubAction = ("Flume_" + processName + "_" + ipHost[1]).replace(".", "_");
String hubIndicator = (key.toString() + "_" + vKey.toString()).replace(".", "_");
GaugeMonitor cMonitor = business_client.newGauge(hubAction, hubIndicator, tags);
cMonitor.set(vValue);
logger.info("此次hubble3数据上报完成! action: " + hubAction + " indicator: " + hubIndicator + " tags: " + tags.toString() + " value: " + vValue);
//自定义分钟统计
if (vKey.equals(customKey)) {
minStatistic(hubAction, hubIndicator, vValue);
GaugeMonitor cMonitor2 = business_client.newGauge(hubAction, "CUSTOM_" + hubIndicator, tags);
cMonitor2.set(minsEventsCount);
logger.info("此次hubble3数据上报完成! action: " + hubAction + " indicator: CUSTOM_" + hubIndicator + " tags: " + tags.toString() + " value: " + minsEventsCount);
}
} catch (Exception e) {
if (e.toString().contains("NumberFormatException")) {
logger.warn("数据类型转化异常:" + e.getMessage());
} else {
logger.error("数据上报出现异常:", e);
}
}
}
}
} catch (Exception e) {
logger.error("向hubble3发送数据时出错:", e);
}
}
private Double minStatistic(String hubAction, String hubIndicator, Double vValue) {
for (int i = twelveArray.length - 1; i > 0; i--) {
twelveArray[i] = twelveArray[i - 1];
}
twelveArray[0] = vValue;
minsEventsCount = vValue - twelveArray[minsUnit];
return minsEventsCount;
}
public String[] getIpAndHost() {
InetAddress addr = null;
try {
addr = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
logger.error("获取ip和host出错:", e);
}
//获取本机ip
String ip = addr.getHostAddress().toString();
//获取本机计算机名称
String hostName = addr.getHostName().toString();
String[] ih = {ip, hostName};
return ih;
}
}
参考:
Flume文档:http://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html
Flume Github源码:https://github.com/apache/flume(可参考其他几种数据监控的源代码示例)