当前位置: 首页 > 工具软件 > sawmill > 使用案例 >

logzio/sawmill 改造增加switch功能

商正诚
2023-12-01

logzio/sawmill 作为Json Pipeline式处理工具,早期版本每个节点都是processor的,非常易用,高性能。

近期增加了if statement功能。却没有多分支类似Switch Case功能。这里提供一个在if statment实现原理基础上的Switch Case的实现方式。

首先logzio/sawmill 执行分成两大步骤,解析json串初始化Pipeline,其次在Executor中将json与pipeline传入按pipeline节点逐一处理。

 pipeline = new Pipeline.Factory().create("{\"steps\": [" + config + "]}");

 groupId = "group4";

 String data = "{\"tag\":\"c\",\"m\":1}";
 Doc doc = null;

        try {
            doc = new Doc(objectMapper.readValue(data, HashMap.class));
        } catch (IOException e) {
            e.printStackTrace();
        }
pipelineExecutor.execute(pipeline, doc);

config是我自定义的用模版搞得一个对应json。

分析  new Pipeline.Factory().create()方法逐层进入,找到对应类ExecutionStepDefinitionParser

public class ExecutionStepDefinitionParser {
    public static List<ExecutionStepDefinition> parse(List<Map<String, Object>> configMapList) {
        if (configMapList == null) return null;

        return configMapList.stream().map(ExecutionStepDefinitionParser::parse).collect(Collectors.toList());
    }

    private static ExecutionStepDefinition parse(Map<String, Object> configMap) {
        String type = JsonUtils.getTheOnlyKeyFrom(configMap);
        Map<String, Object> executionStepConfig = JsonUtils.getMap(configMap, type, true);

        if (type.equals("if")) {
            return parseConditional(executionStepConfig);
        }
        return parseProcessor(type, executionStepConfig);
    }

    private static ConditionalExecutionStepDefinition parseConditional(Map<String, Object> config) {
        Map<String, Object> condition = JsonUtils.getMap(config, "condition", true);
        List<Map<String, Object>> onTrue = JsonUtils.getList(config, "then", true);
        List<Map<String, Object>> onFalse = JsonUtils.getList(config, "else", false);

        ConditionDefinition conditionDefinition = ConditionDefinitionParser.parse(condition);
        List<ExecutionStepDefinition> onTrueDefinitions = parse(onTrue);
        List<ExecutionStepDefinition> onFalseDefinitions = parse(onFalse);
        return new ConditionalExecutionStepDefinition(conditionDefinition, onTrueDefinitions, onFalseDefinitions);
    }

    private static ProcessorExecutionStepDefinition parseProcessor(String processorType, Map<String, Object> config) {
        String name = JsonUtils.getString(config, "name", false);
        Map<String, Object> processorConfig = JsonUtils.getMap(config, "config", true);
        ProcessorDefinition processorDefinition = new ProcessorDefinition(processorType, processorConfig);

        List<Map<String, Object>> onFailure = JsonUtils.getList(config, "onFailure", false);
        List<ExecutionStepDefinition> onFailureExecutionStepDefinitions = parse(onFailure);

        List<Map<String, Object>> onSuccess = JsonUtils.getList(config, "onSuccess", false);
        List<ExecutionStepDefinition> onSuccessExecutionStepDefinitions = parse(onSuccess);

        return new ProcessorExecutionStepDefinition(processorDefinition, name, onFailureExecutionStepDefinitions, onSuccessExecutionStepDefinitions);
    }
}

比较明显的发现if statment实现逻辑,和condition的作用。

public class ConditionalExecutionStepDefinition implements ExecutionStepDefinition {
    private ConditionDefinition condition;
    private List<ExecutionStepDefinition> onTrue;
    private Optional<List<ExecutionStepDefinition>> onFalse;

    public ConditionalExecutionStepDefinition(ConditionDefinition condition, List<ExecutionStepDefinition> onTrue, List<ExecutionStepDefinition> onFalse) {
        this.condition = condition;
        this.onTrue = onTrue;
        this.onFalse = Optional.ofNullable(onFalse);
    }

    public ConditionDefinition getConditionDefinition() {
        return condition;
    }

    public List<ExecutionStepDefinition> getOnTrue() {
        return onTrue;
    }

    public Optional<List<ExecutionStepDefinition>> getOnFalse() {
        return onFalse;
    }
}

简单来说condition表达式成立则走onTrue List,失败则走onFalse List。

二、分析需求

所谓的多分支Switch case 若按Java的理解Switch只有一个condition,case对结果进行对应,对应上则走对应逻辑。

但由于目前logzio/sawmill已经实现的confition都是boolean的,若重新构建一种condition则需做很多重复工作(看起来只是返回非boolean的,但是语法等需要重复的实现)。

所以我将需求想象为每个case都有一个condition.condition成立则走对应逻辑,不成立则下一个case。
 

当需求分析到如上,我想到了责任链,通过维护case list的index来保证链式调用,当condition成立直接去执行对应的processor列表。

代码改造:修改类ExecutionStepDefinitionParser



/**
 * @program bdp-parent
 * @description: 此类的作用:1、解析Switch case的层次结构
 *                          2、使NewConditionalExecutionStepDefinition 产生作用
 * @author: back
 * @create: 2019/09/02 15:00
 */
public class ExecutionStepDefinitionParser {



    public static List<ExecutionStepDefinition> parse(List<Map<String, Object>> configMapList) {
        if (configMapList == null) return null;
        return configMapList.stream().map(NewExecutionStepDefinitionParser::parse).collect(Collectors.toList());
    }
    private static ExecutionStepDefinition parse(Map<String, Object> configMap) {
        String type = JsonUtils.getTheOnlyKeyFrom(configMap);

        if(type.equals("switch")){
            List<Map<String, Object>> executionStepConfig = JsonUtils.getList(configMap, type, true);
            return parseSwitchConditional(executionStepConfig);
        }
        Map<String, Object> executionStepConfig = JsonUtils.getMap(configMap, type, true);
        if (type.equals("if")) {
            return parseConditional(executionStepConfig);
        }
        return parseProcessor(type, executionStepConfig);
    }
    /**
     * case 的list流经此处
     * @param caseList
     * @return
     */
    private static ConditionalExecutionStepDefinition parseSwitchConditional(List<Map<String, Object>> caseList) {
        NewConditionalExecutionStepDefinition condition1 =
                new  NewConditionalExecutionStepDefinition(null,null,null);
        List<ConditionalExecutionStepDefinition> conditionalExecutionStepDefinitions
                = new ArrayList<ConditionalExecutionStepDefinition>();
        for (Map<String, Object> map : caseList){
            Map<String, Object> aCase = JsonUtils.getMap(map, "case", true);
            Map<String, Object> condition = JsonUtils.getMap(aCase, "condition", true);
            List<Map<String, Object>> onTrue = JsonUtils.getList(aCase, "then", true);
            ConditionDefinition conditionDefinition = ConditionDefinitionParser.parse(condition);
            List<ExecutionStepDefinition> onTrueDefinitions = parse(onTrue);
            ConditionalExecutionStepDefinition steps =
                    new ConditionalExecutionStepDefinition(conditionDefinition, onTrueDefinitions, null);
            conditionalExecutionStepDefinitions.add(steps);
        }
        condition1.setCaseConditionList(conditionalExecutionStepDefinitions);
        return condition1;
    }

    private static ConditionalExecutionStepDefinition parseConditional(Map<String, Object> config) {
        Map<String, Object> condition = JsonUtils.getMap(config, "condition", true);
        List<Map<String, Object>> onTrue = JsonUtils.getList(config, "then", true);
        List<Map<String, Object>> onFalse = JsonUtils.getList(config, "else", false);

        ConditionDefinition conditionDefinition = ConditionDefinitionParser.parse(condition);
        List<ExecutionStepDefinition> onTrueDefinitions = parse(onTrue);
        List<ExecutionStepDefinition> onFalseDefinitions = parse(onFalse);
        return new ConditionalExecutionStepDefinition(conditionDefinition, onTrueDefinitions, onFalseDefinitions);
    }

    private static ProcessorExecutionStepDefinition parseProcessor(String processorType, Map<String, Object> config) {
        String name = JsonUtils.getString(config, "name", false);
        Map<String, Object> processorConfig = JsonUtils.getMap(config, "config", true);
        ProcessorDefinition processorDefinition = new ProcessorDefinition(processorType, processorConfig);

        List<Map<String, Object>> onFailure = JsonUtils.getList(config, "onFailure", false);
        List<ExecutionStepDefinition> onFailureExecutionStepDefinitions = parse(onFailure);

        List<Map<String, Object>> onSuccess = JsonUtils.getList(config, "onSuccess", false);
        List<ExecutionStepDefinition> onSuccessExecutionStepDefinitions = parse(onSuccess);

        return new ProcessorExecutionStepDefinition(processorDefinition, name, onFailureExecutionStepDefinitions, onSuccessExecutionStepDefinitions);
    }
}

新增类:

/**
 * @program bdp-parent
 * @description:
 *       变相的责任链 来使onfalse时重新执行下一个case的链条,维护index来保证链条顺序
 *
 * @author: back
 * @create: 2019/09/02 16:24
 */
public class NewConditionalExecutionStepDefinition extends ConditionalExecutionStepDefinition {

    private List<ConditionalExecutionStepDefinition> steps =
                new LinkedList<ConditionalExecutionStepDefinition>();

    private AtomicInteger index = new AtomicInteger(0);

    private List<ExecutionStepDefinition> defaultStep = null;

    public NewConditionalExecutionStepDefinition(ConditionDefinition condition, List<ExecutionStepDefinition> onTrue, List<ExecutionStepDefinition> onFalse) {
        super(condition, onTrue, onFalse);
    }

    public void setCaseConditionList(List<ConditionalExecutionStepDefinition> steps) {
        this.steps = steps;
    }

    public void setDefaultStep(List<ExecutionStepDefinition> defaultStep ){
        this.defaultStep = defaultStep;
    }

    public ConditionDefinition getConditionDefinition() {
        return this.steps.get(index.get()).getConditionDefinition();
    }


    public List<ExecutionStepDefinition> getOnTrue() {
        return this.steps.get(index.get()).getOnTrue();
    }

    public Optional<List<ExecutionStepDefinition>> getOnFalse() {

        int oldIndex = index.get();
        if(oldIndex >= steps.size()-1)
            return Optional.empty();
        index.set(index.get()+1);
        Integer currentIndex = index.get();

        ExecutionStepDefinition a = this;
        return Optional.of(new ArrayList<ExecutionStepDefinition>(){
            {
                add(a);
            }
        });
        //List<ExecutionStepDefinition> executionStepDefinitions = new ArrayList<ExecutionStepDefinition>();
//        executionStepDefinitions.add(this);
//        Optional<List<ExecutionStepDefinition>> executionStepDefinitions1 = Optional.of(executionStepDefinitions);
        //return executionStepDefinitions1;
    }
}

改造完成,经测试达到效果。

附带一份改造后的使用语法:condition的种类请看具体实现类。这里只是简单展示。count是本地扩展的processor可以随笔改成你自己的processor的list

{
   "switch": [
         {"case": {
            "condition": {
                "hasValue": {
                         "field": "tag"
                         "possibleValues": "a"
                }
            },
            "then": [{
                      "count":{"config": {}}
                    }]
          }
         },
         {"case": {
            "condition": {
                "hasValue": {
                         "field": "tag"
                         "possibleValues": "b"
                }
            },
            "then": [{
                      "count":{"config": {}}
                    }]
          }
         }
    ]
}

 

 类似资料: