模块架构
micrometer-core
annotation
aop
instrument [插桩]
binder [绑定]
cache [针对缓存的度量采集,ehcache guava jcache等]
commonspool2
db [postgresql等度量采集]
grpc[grpc客服端服务端度量采集]
http[Java_servlet度量采集]
httpcomponents[Apache http-client度量采集]
hystrix
server
jetty
jpa
jvm
kafka
logging
mongodb
okhttp3
system
tomcat
示例一Tomcat
- 通过bindTo注册prometheus
- spring-actuator会自动创建TomcatMetrics
- TomcatMetrics通过MBean技术完成指标采集
public class TomcatMetrics implements MeterBinder, AutoCloseable {
private static final String JMX_DOMAIN_EMBEDDED = "Tomcat";
private static final String JMX_DOMAIN_STANDALONE = "Catalina";
private static final String OBJECT_NAME_SERVER_SUFFIX = ":type=Server";
private static final String OBJECT_NAME_SERVER_EMBEDDED = JMX_DOMAIN_EMBEDDED + OBJECT_NAME_SERVER_SUFFIX;
private static final String OBJECT_NAME_SERVER_STANDALONE = JMX_DOMAIN_STANDALONE + OBJECT_NAME_SERVER_SUFFIX;
@Nullable
private final Manager manager;
private final MBeanServer mBeanServer;
private final Iterable<Tag> tags;
private final Set<NotificationListener> notificationListeners = ConcurrentHashMap.newKeySet();
private volatile String jmxDomain;
public TomcatMetrics(@Nullable Manager manager, Iterable<Tag> tags) {
this(manager, tags, getMBeanServer());
}
public TomcatMetrics(@Nullable Manager manager, Iterable<Tag> tags, MBeanServer mBeanServer) {
this.manager = manager;
this.tags = tags;
this.mBeanServer = mBeanServer;
if (manager != null) {
this.jmxDomain = manager.getContext().getDomain();
}
}
public static void monitor(MeterRegistry registry, @Nullable Manager manager, String... tags) {
monitor(registry, manager, Tags.of(tags));
}
public static void monitor(MeterRegistry registry, @Nullable Manager manager, Iterable<Tag> tags) {
new TomcatMetrics(manager, tags).bindTo(registry);
}
public static MBeanServer getMBeanServer() {
List<MBeanServer> mBeanServers = MBeanServerFactory.findMBeanServer(null);
if (!mBeanServers.isEmpty()) {
return mBeanServers.get(0);
}
return ManagementFactory.getPlatformMBeanServer();
}
@Override
public void bindTo(MeterRegistry registry) {
// 请求相关度量注册MeterRegistry[先注册micrometer,PrometheusMeterRegistry,后注册prometheus的CollectorRegistry]
registerGlobalRequestMetrics(registry);
registerServletMetrics(registry);
registerCacheMetrics(registry);
registerThreadPoolMetrics(registry);
registerSessionMetrics(registry);
}
private void registerSessionMetrics(MeterRegistry registry) {
if (manager == null) {
// If the binder is created but unable to find the session manager don't
// register those metrics
return;
}
Gauge.builder("tomcat.sessions.active.max", manager, Manager::getMaxActive).tags(tags)
.baseUnit(BaseUnits.SESSIONS).register(registry);
Gauge.builder("tomcat.sessions.active.current", manager, Manager::getActiveSessions).tags(tags)
.baseUnit(BaseUnits.SESSIONS).register(registry);
FunctionCounter.builder("tomcat.sessions.created", manager, Manager::getSessionCounter).tags(tags)
.baseUnit(BaseUnits.SESSIONS).register(registry);
FunctionCounter.builder("tomcat.sessions.expired", manager, Manager::getExpiredSessions).tags(tags)
.baseUnit(BaseUnits.SESSIONS).register(registry);
FunctionCounter.builder("tomcat.sessions.rejected", manager, Manager::getRejectedSessions).tags(tags)
.baseUnit(BaseUnits.SESSIONS).register(registry);
TimeGauge.builder("tomcat.sessions.alive.max", manager, TimeUnit.SECONDS, Manager::getSessionMaxAliveTime)
.tags(tags).register(registry);
}
private void registerThreadPoolMetrics(MeterRegistry registry) {
registerMetricsEventually(":type=ThreadPool,name=*", (name, allTags) -> {
Gauge.builder("tomcat.threads.config.max", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "maxThreads"))).tags(allTags).baseUnit(BaseUnits.THREADS)
.register(registry);
Gauge.builder("tomcat.threads.busy", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "currentThreadsBusy"))).tags(allTags)
.baseUnit(BaseUnits.THREADS).register(registry);
Gauge.builder("tomcat.threads.current", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "currentThreadCount"))).tags(allTags)
.baseUnit(BaseUnits.THREADS).register(registry);
Gauge.builder("tomcat.connections.current", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "connectionCount"))).tags(allTags)
.baseUnit(BaseUnits.CONNECTIONS).register(registry);
Gauge.builder("tomcat.connections.keepalive.current", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "keepAliveCount"))).tags(allTags)
.baseUnit(BaseUnits.CONNECTIONS).register(registry);
Gauge.builder("tomcat.connections.config.max", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "maxConnections"))).tags(allTags)
.baseUnit(BaseUnits.CONNECTIONS).register(registry);
});
}
private void registerCacheMetrics(MeterRegistry registry) {
registerMetricsEventually(":type=StringCache", (name, allTags) -> {
FunctionCounter
.builder("tomcat.cache.access", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "accessCount")))
.tags(allTags).register(registry);
FunctionCounter
.builder("tomcat.cache.hit", mBeanServer, s -> safeDouble(() -> s.getAttribute(name, "hitCount")))
.tags(allTags).register(registry);
});
}
private void registerServletMetrics(MeterRegistry registry) {
registerMetricsEventually(":j2eeType=Servlet,name=*,*", (name, allTags) -> {
FunctionCounter.builder("tomcat.servlet.error", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "errorCount"))).tags(allTags).register(registry);
FunctionTimer
.builder("tomcat.servlet.request", mBeanServer,
s -> safeLong(() -> s.getAttribute(name, "requestCount")),
s -> safeDouble(() -> s.getAttribute(name, "processingTime")), TimeUnit.MILLISECONDS)
.tags(allTags).register(registry);
TimeGauge.builder("tomcat.servlet.request.max", mBeanServer, TimeUnit.MILLISECONDS,
s -> safeDouble(() -> s.getAttribute(name, "maxTime"))).tags(allTags).register(registry);
});
}
private void registerGlobalRequestMetrics(MeterRegistry registry) {
registerMetricsEventually(":type=GlobalRequestProcessor,name=*", (name, allTags) -> {
FunctionCounter
.builder("tomcat.global.sent", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "bytesSent")))
.tags(allTags).baseUnit(BaseUnits.BYTES).register(registry);
FunctionCounter
.builder("tomcat.global.received", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "bytesReceived")))
.tags(allTags).baseUnit(BaseUnits.BYTES).register(registry);
FunctionCounter.builder("tomcat.global.error", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "errorCount"))).tags(allTags).register(registry);
FunctionTimer
.builder("tomcat.global.request", mBeanServer,
s -> safeLong(() -> s.getAttribute(name, "requestCount")),
s -> safeDouble(() -> s.getAttribute(name, "processingTime")), TimeUnit.MILLISECONDS)
.tags(allTags).register(registry);
TimeGauge.builder("tomcat.global.request.max", mBeanServer, TimeUnit.MILLISECONDS,
s -> safeDouble(() -> s.getAttribute(name, "maxTime"))).tags(allTags).register(registry);
});
}
}
示例一Grpc
- micrometer针对无法自动集成到spring的度量,一般都会在实现类上给出使用方法
- 可以看到Grpc就是通过拦截器这个扩展点,完成对grpc的度量信息采集
- 同时也通过newResponseCounterFor等方法完成相关度量的创建以及绑定prometheus的CollectorRegistry
/**
* A gRPC server interceptor that will collect metrics using the given
* {@link MeterRegistry}.
*
* <p>
* <b>Usage:</b>
* </p>
*
* <pre>
* Server server = ServerBuilder.forPort(8080)
* .intercept(new MetricCollectingServerInterceptor(meterRegistry))
* .build();
*
* server.start()
* </pre>
*
* @author Daniel Theuke (daniel.theuke@heuboe.de)
* @since 1.7.0
*/
public class MetricCollectingServerInterceptor extends AbstractMetricCollectingInterceptor
implements ServerInterceptor {
public MetricCollectingServerInterceptor(final MeterRegistry registry) {
super(registry);
}
public MetricCollectingServerInterceptor(final MeterRegistry registry,
final UnaryOperator<Counter.Builder> counterCustomizer, final UnaryOperator<Timer.Builder> timerCustomizer,
final Code... eagerInitializedCodes) {
super(registry, counterCustomizer, timerCustomizer, eagerInitializedCodes);
}
public void preregisterService(final BindableService service) {
preregisterService(service.bindService());
}
public void preregisterService(final ServerServiceDefinition serviceDefinition) {
preregisterService(serviceDefinition.getServiceDescriptor());
}
@Override
protected Counter newRequestCounterFor(final MethodDescriptor<?, ?> method) {
return this.counterCustomizer.apply(prepareCounterFor(method, METRIC_NAME_SERVER_REQUESTS_RECEIVED,
"The total number of requests received")).register(this.registry);
}
@Override
protected Counter newResponseCounterFor(final MethodDescriptor<?, ?> method) {
return this.counterCustomizer.apply(
prepareCounterFor(method, METRIC_NAME_SERVER_RESPONSES_SENT, "The total number of responses sent"))
.register(this.registry);
}
@Override
protected Function<Code, Timer> newTimerFunction(final MethodDescriptor<?, ?> method) {
return asTimerFunction(() -> this.timerCustomizer.apply(prepareTimerFor(method,
METRIC_NAME_SERVER_PROCESSING_DURATION, "The total time taken for the server to complete the call")));
}
@Override
public <Q, A> ServerCall.Listener<Q> interceptCall(final ServerCall<Q, A> call, final Metadata requestHeaders,
final ServerCallHandler<Q, A> next) {
final MetricSet metrics = metricsFor(call.getMethodDescriptor());
final Consumer<Status.Code> responseStatusTiming = metrics.newProcessingDurationTiming(this.registry);
final MetricCollectingServerCall<Q, A> monitoringCall = new MetricCollectingServerCall<>(call,
metrics.getResponseCounter());
return new MetricCollectingServerCallListener<>(next.startCall(monitoringCall, requestHeaders),
metrics.getRequestCounter(), monitoringCall::getResponseCode, responseStatusTiming);
}
}
总结
- micrometer-core针对常用中间件予以实现
- 一般没有直接初始化的中间件,其代码注释都提供了使用方式,比如上文的grpc