当前位置: 首页 > 工具软件 > Micrometer > 使用案例 >

Prometheus系列第十一篇一核心之micrometer源码分析一micrometer-core核心实现一tomcat度量采集

饶元章
2023-12-01

模块架构

micrometer-core
  annotation
  aop
  instrument [插桩]
    binder [绑定]
      cache [针对缓存的度量采集,ehcache guava jcache等]
      commonspool2
      db [postgresql等度量采集]
      grpc[grpc客服端服务端度量采集]
      http[Java_servlet度量采集]
      httpcomponents[Apache http-client度量采集]
      hystrix
      server
      jetty
      jpa
      jvm
      kafka
      logging
      mongodb
      okhttp3
      system
      tomcat

示例一Tomcat

  • 通过bindTo注册prometheus
  • spring-actuator会自动创建TomcatMetrics
  • TomcatMetrics通过MBean技术完成指标采集

public class TomcatMetrics implements MeterBinder, AutoCloseable {

    private static final String JMX_DOMAIN_EMBEDDED = "Tomcat";

    private static final String JMX_DOMAIN_STANDALONE = "Catalina";

    private static final String OBJECT_NAME_SERVER_SUFFIX = ":type=Server";

    private static final String OBJECT_NAME_SERVER_EMBEDDED = JMX_DOMAIN_EMBEDDED + OBJECT_NAME_SERVER_SUFFIX;

    private static final String OBJECT_NAME_SERVER_STANDALONE = JMX_DOMAIN_STANDALONE + OBJECT_NAME_SERVER_SUFFIX;

    @Nullable
    private final Manager manager;

    private final MBeanServer mBeanServer;

    private final Iterable<Tag> tags;

    private final Set<NotificationListener> notificationListeners = ConcurrentHashMap.newKeySet();

    private volatile String jmxDomain;

    public TomcatMetrics(@Nullable Manager manager, Iterable<Tag> tags) {
        this(manager, tags, getMBeanServer());
    }

    public TomcatMetrics(@Nullable Manager manager, Iterable<Tag> tags, MBeanServer mBeanServer) {
        this.manager = manager;
        this.tags = tags;
        this.mBeanServer = mBeanServer;

        if (manager != null) {
            this.jmxDomain = manager.getContext().getDomain();
        }
    }

    public static void monitor(MeterRegistry registry, @Nullable Manager manager, String... tags) {
        monitor(registry, manager, Tags.of(tags));
    }

    public static void monitor(MeterRegistry registry, @Nullable Manager manager, Iterable<Tag> tags) {
        new TomcatMetrics(manager, tags).bindTo(registry);
    }

    public static MBeanServer getMBeanServer() {
        List<MBeanServer> mBeanServers = MBeanServerFactory.findMBeanServer(null);
        if (!mBeanServers.isEmpty()) {
            return mBeanServers.get(0);
        }
        return ManagementFactory.getPlatformMBeanServer();
    }

    @Override
    public void bindTo(MeterRegistry registry) {
        // 请求相关度量注册MeterRegistry[先注册micrometer,PrometheusMeterRegistry,后注册prometheus的CollectorRegistry]
        registerGlobalRequestMetrics(registry);
        registerServletMetrics(registry);
        registerCacheMetrics(registry);
        registerThreadPoolMetrics(registry);
        registerSessionMetrics(registry);
    }

    private void registerSessionMetrics(MeterRegistry registry) {
        if (manager == null) {
            // If the binder is created but unable to find the session manager don't
            // register those metrics
            return;
        }

        Gauge.builder("tomcat.sessions.active.max", manager, Manager::getMaxActive).tags(tags)
                .baseUnit(BaseUnits.SESSIONS).register(registry);

        Gauge.builder("tomcat.sessions.active.current", manager, Manager::getActiveSessions).tags(tags)
                .baseUnit(BaseUnits.SESSIONS).register(registry);

        FunctionCounter.builder("tomcat.sessions.created", manager, Manager::getSessionCounter).tags(tags)
                .baseUnit(BaseUnits.SESSIONS).register(registry);

        FunctionCounter.builder("tomcat.sessions.expired", manager, Manager::getExpiredSessions).tags(tags)
                .baseUnit(BaseUnits.SESSIONS).register(registry);

        FunctionCounter.builder("tomcat.sessions.rejected", manager, Manager::getRejectedSessions).tags(tags)
                .baseUnit(BaseUnits.SESSIONS).register(registry);

        TimeGauge.builder("tomcat.sessions.alive.max", manager, TimeUnit.SECONDS, Manager::getSessionMaxAliveTime)
                .tags(tags).register(registry);
    }

    private void registerThreadPoolMetrics(MeterRegistry registry) {
        registerMetricsEventually(":type=ThreadPool,name=*", (name, allTags) -> {
            Gauge.builder("tomcat.threads.config.max", mBeanServer,
                    s -> safeDouble(() -> s.getAttribute(name, "maxThreads"))).tags(allTags).baseUnit(BaseUnits.THREADS)
                    .register(registry);

            Gauge.builder("tomcat.threads.busy", mBeanServer,
                    s -> safeDouble(() -> s.getAttribute(name, "currentThreadsBusy"))).tags(allTags)
                    .baseUnit(BaseUnits.THREADS).register(registry);

            Gauge.builder("tomcat.threads.current", mBeanServer,
                    s -> safeDouble(() -> s.getAttribute(name, "currentThreadCount"))).tags(allTags)
                    .baseUnit(BaseUnits.THREADS).register(registry);

            Gauge.builder("tomcat.connections.current", mBeanServer,
                    s -> safeDouble(() -> s.getAttribute(name, "connectionCount"))).tags(allTags)
                    .baseUnit(BaseUnits.CONNECTIONS).register(registry);

            Gauge.builder("tomcat.connections.keepalive.current", mBeanServer,
                    s -> safeDouble(() -> s.getAttribute(name, "keepAliveCount"))).tags(allTags)
                    .baseUnit(BaseUnits.CONNECTIONS).register(registry);

            Gauge.builder("tomcat.connections.config.max", mBeanServer,
                    s -> safeDouble(() -> s.getAttribute(name, "maxConnections"))).tags(allTags)
                    .baseUnit(BaseUnits.CONNECTIONS).register(registry);
        });
    }

    private void registerCacheMetrics(MeterRegistry registry) {
        registerMetricsEventually(":type=StringCache", (name, allTags) -> {
            FunctionCounter
                    .builder("tomcat.cache.access", mBeanServer,
                            s -> safeDouble(() -> s.getAttribute(name, "accessCount")))
                    .tags(allTags).register(registry);

            FunctionCounter
                    .builder("tomcat.cache.hit", mBeanServer, s -> safeDouble(() -> s.getAttribute(name, "hitCount")))
                    .tags(allTags).register(registry);
        });
    }

    private void registerServletMetrics(MeterRegistry registry) {
        registerMetricsEventually(":j2eeType=Servlet,name=*,*", (name, allTags) -> {
            FunctionCounter.builder("tomcat.servlet.error", mBeanServer,
                    s -> safeDouble(() -> s.getAttribute(name, "errorCount"))).tags(allTags).register(registry);

            FunctionTimer
                    .builder("tomcat.servlet.request", mBeanServer,
                            s -> safeLong(() -> s.getAttribute(name, "requestCount")),
                            s -> safeDouble(() -> s.getAttribute(name, "processingTime")), TimeUnit.MILLISECONDS)
                    .tags(allTags).register(registry);

            TimeGauge.builder("tomcat.servlet.request.max", mBeanServer, TimeUnit.MILLISECONDS,
                    s -> safeDouble(() -> s.getAttribute(name, "maxTime"))).tags(allTags).register(registry);
        });
    }

    private void registerGlobalRequestMetrics(MeterRegistry registry) {
        registerMetricsEventually(":type=GlobalRequestProcessor,name=*", (name, allTags) -> {
            FunctionCounter
                    .builder("tomcat.global.sent", mBeanServer,
                            s -> safeDouble(() -> s.getAttribute(name, "bytesSent")))
                    .tags(allTags).baseUnit(BaseUnits.BYTES).register(registry);

            FunctionCounter
                    .builder("tomcat.global.received", mBeanServer,
                            s -> safeDouble(() -> s.getAttribute(name, "bytesReceived")))
                    .tags(allTags).baseUnit(BaseUnits.BYTES).register(registry);

            FunctionCounter.builder("tomcat.global.error", mBeanServer,
                    s -> safeDouble(() -> s.getAttribute(name, "errorCount"))).tags(allTags).register(registry);

            FunctionTimer
                    .builder("tomcat.global.request", mBeanServer,
                            s -> safeLong(() -> s.getAttribute(name, "requestCount")),
                            s -> safeDouble(() -> s.getAttribute(name, "processingTime")), TimeUnit.MILLISECONDS)
                    .tags(allTags).register(registry);

            TimeGauge.builder("tomcat.global.request.max", mBeanServer, TimeUnit.MILLISECONDS,
                    s -> safeDouble(() -> s.getAttribute(name, "maxTime"))).tags(allTags).register(registry);
        });
    }
}

示例一Grpc

  • micrometer针对无法自动集成到spring的度量,一般都会在实现类上给出使用方法
  • 可以看到Grpc就是通过拦截器这个扩展点,完成对grpc的度量信息采集
  • 同时也通过newResponseCounterFor等方法完成相关度量的创建以及绑定prometheus的CollectorRegistry


/**
 * A gRPC server interceptor that will collect metrics using the given
 * {@link MeterRegistry}.
 *
 * <p>
 * <b>Usage:</b>
 * </p>
 *
 * <pre>
 * Server server = ServerBuilder.forPort(8080)
 *         .intercept(new MetricCollectingServerInterceptor(meterRegistry))
 *         .build();
 *
 * server.start()
 * </pre>
 *
 * @author Daniel Theuke (daniel.theuke@heuboe.de)
 * @since 1.7.0
 */
public class MetricCollectingServerInterceptor extends AbstractMetricCollectingInterceptor
        implements ServerInterceptor {

   
    public MetricCollectingServerInterceptor(final MeterRegistry registry) {
        super(registry);
    }

  
    public MetricCollectingServerInterceptor(final MeterRegistry registry,
            final UnaryOperator<Counter.Builder> counterCustomizer, final UnaryOperator<Timer.Builder> timerCustomizer,
            final Code... eagerInitializedCodes) {
        super(registry, counterCustomizer, timerCustomizer, eagerInitializedCodes);
    }

    public void preregisterService(final BindableService service) {
        preregisterService(service.bindService());
    }

    public void preregisterService(final ServerServiceDefinition serviceDefinition) {
        preregisterService(serviceDefinition.getServiceDescriptor());
    }

    @Override
    protected Counter newRequestCounterFor(final MethodDescriptor<?, ?> method) {
        return this.counterCustomizer.apply(prepareCounterFor(method, METRIC_NAME_SERVER_REQUESTS_RECEIVED,
                "The total number of requests received")).register(this.registry);
    }

    @Override
    protected Counter newResponseCounterFor(final MethodDescriptor<?, ?> method) {
        return this.counterCustomizer.apply(
                prepareCounterFor(method, METRIC_NAME_SERVER_RESPONSES_SENT, "The total number of responses sent"))
                .register(this.registry);
    }

    @Override
    protected Function<Code, Timer> newTimerFunction(final MethodDescriptor<?, ?> method) {
        return asTimerFunction(() -> this.timerCustomizer.apply(prepareTimerFor(method,
                METRIC_NAME_SERVER_PROCESSING_DURATION, "The total time taken for the server to complete the call")));
    }

    @Override
    public <Q, A> ServerCall.Listener<Q> interceptCall(final ServerCall<Q, A> call, final Metadata requestHeaders,
            final ServerCallHandler<Q, A> next) {

        final MetricSet metrics = metricsFor(call.getMethodDescriptor());
        final Consumer<Status.Code> responseStatusTiming = metrics.newProcessingDurationTiming(this.registry);

        final MetricCollectingServerCall<Q, A> monitoringCall = new MetricCollectingServerCall<>(call,
                metrics.getResponseCounter());

        return new MetricCollectingServerCallListener<>(next.startCall(monitoringCall, requestHeaders),
                metrics.getRequestCounter(), monitoringCall::getResponseCode, responseStatusTiming);
    }
}


总结

  • micrometer-core针对常用中间件予以实现
  • 一般没有直接初始化的中间件,其代码注释都提供了使用方式,比如上文的grpc
 类似资料: