当前位置: 首页 > 工具软件 > dynamic-tp > 使用案例 >

Dynamic-tp通知告警职责链

郎玮
2023-12-01

建造职责链

public class NotifyFilterBuilder {

    private NotifyFilterBuilder() { }

    public static InvokerChain<BaseNotifyCtx> getAlarmInvokerChain() {
        val filters = ApplicationContextHolder.getBeansOfType(NotifyFilter.class);
        Collection<NotifyFilter> alarmFilters = Lists.newArrayList(filters.values());
        alarmFilters.add(new AlarmBaseFilter());
        alarmFilters = alarmFilters.stream()
                .filter(x -> x.supports(NotifyTypeEnum.ALARM))
                .sorted(Comparator.comparing(Filter::getOrder))
                .collect(Collectors.toList());
        // 构造ALARM_FILTER_CHAIN链
        return InvokerChainFactory.buildInvokerChain(new AlarmInvoker(), alarmFilters.toArray(new NotifyFilter[0]));
    }
}

构建头节点

public final class InvokerChainFactory {

    private InvokerChainFactory() { }

    @SafeVarargs
    public static<T> InvokerChain<T> buildInvokerChain(Invoker<T> target, Filter<T>... filters) {

        InvokerChain<T> invokerChain = new InvokerChain<>();
        Invoker<T> last = target;
        for (int i = filters.length - 1; i >= 0; i--) {
            Invoker<T> next = last;
            Filter<T> filter = filters[i];
            last = context -> filter.doFilter(context, next);
        }
        invokerChain.setHead(last);
        return invokerChain;
    }
}

执行报警方法调用如下

public static void doAlarm(ExecutorWrapper executorWrapper, NotifyItemEnum notifyItemEnum) {
    // 根据告警类型获取告警项配置,一个线程池可以配置多个NotifyItem,这里需要过滤
    NotifyHelper.getNotifyItem(executorWrapper, notifyItemEnum).ifPresent(notifyItem -> {
        // 执行责任链
        val alarmCtx = new AlarmCtx(executorWrapper, notifyItem);
        ALARM_INVOKER_CHAIN.proceed(alarmCtx);
    });
}

注意其中filter的掉用会使用next.invoke(context)

@Slf4j
public class NoticeBaseFilter implements NotifyFilter {

    @Override
    public void doFilter(BaseNotifyCtx context, Invoker<BaseNotifyCtx> nextInvoker) {

        val executorWrapper = context.getExecutorWrapper();
        val notifyItem = context.getNotifyItem();
        if (Objects.isNull(notifyItem) || !satisfyBaseCondition(notifyItem, executorWrapper)) {
            log.debug("DynamicTp notify, no platforms configured or notification is not enabled, threadPoolName: {}",
                    executorWrapper.getThreadPoolName());
            return;
        }
        nextInvoker.invoke(context);
    }

详见dynamic-tp源码讲解

 类似资料: