logzio/sawmill 作为Json Pipeline式处理工具,早期版本每个节点都是processor的,非常易用,高性能。
近期增加了if statement功能。却没有多分支类似Switch Case功能。这里提供一个在if statment实现原理基础上的Switch Case的实现方式。
首先logzio/sawmill 执行分成两大步骤,解析json串初始化Pipeline,其次在Executor中将json与pipeline传入按pipeline节点逐一处理。
pipeline = new Pipeline.Factory().create("{\"steps\": [" + config + "]}");
groupId = "group4";
String data = "{\"tag\":\"c\",\"m\":1}";
Doc doc = null;
try {
doc = new Doc(objectMapper.readValue(data, HashMap.class));
} catch (IOException e) {
e.printStackTrace();
}
pipelineExecutor.execute(pipeline, doc);
config是我自定义的用模版搞得一个对应json。
分析 new Pipeline.Factory().create()方法逐层进入,找到对应类ExecutionStepDefinitionParser
public class ExecutionStepDefinitionParser {
public static List<ExecutionStepDefinition> parse(List<Map<String, Object>> configMapList) {
if (configMapList == null) return null;
return configMapList.stream().map(ExecutionStepDefinitionParser::parse).collect(Collectors.toList());
}
private static ExecutionStepDefinition parse(Map<String, Object> configMap) {
String type = JsonUtils.getTheOnlyKeyFrom(configMap);
Map<String, Object> executionStepConfig = JsonUtils.getMap(configMap, type, true);
if (type.equals("if")) {
return parseConditional(executionStepConfig);
}
return parseProcessor(type, executionStepConfig);
}
private static ConditionalExecutionStepDefinition parseConditional(Map<String, Object> config) {
Map<String, Object> condition = JsonUtils.getMap(config, "condition", true);
List<Map<String, Object>> onTrue = JsonUtils.getList(config, "then", true);
List<Map<String, Object>> onFalse = JsonUtils.getList(config, "else", false);
ConditionDefinition conditionDefinition = ConditionDefinitionParser.parse(condition);
List<ExecutionStepDefinition> onTrueDefinitions = parse(onTrue);
List<ExecutionStepDefinition> onFalseDefinitions = parse(onFalse);
return new ConditionalExecutionStepDefinition(conditionDefinition, onTrueDefinitions, onFalseDefinitions);
}
private static ProcessorExecutionStepDefinition parseProcessor(String processorType, Map<String, Object> config) {
String name = JsonUtils.getString(config, "name", false);
Map<String, Object> processorConfig = JsonUtils.getMap(config, "config", true);
ProcessorDefinition processorDefinition = new ProcessorDefinition(processorType, processorConfig);
List<Map<String, Object>> onFailure = JsonUtils.getList(config, "onFailure", false);
List<ExecutionStepDefinition> onFailureExecutionStepDefinitions = parse(onFailure);
List<Map<String, Object>> onSuccess = JsonUtils.getList(config, "onSuccess", false);
List<ExecutionStepDefinition> onSuccessExecutionStepDefinitions = parse(onSuccess);
return new ProcessorExecutionStepDefinition(processorDefinition, name, onFailureExecutionStepDefinitions, onSuccessExecutionStepDefinitions);
}
}
比较明显的发现if statment实现逻辑,和condition的作用。
public class ConditionalExecutionStepDefinition implements ExecutionStepDefinition {
private ConditionDefinition condition;
private List<ExecutionStepDefinition> onTrue;
private Optional<List<ExecutionStepDefinition>> onFalse;
public ConditionalExecutionStepDefinition(ConditionDefinition condition, List<ExecutionStepDefinition> onTrue, List<ExecutionStepDefinition> onFalse) {
this.condition = condition;
this.onTrue = onTrue;
this.onFalse = Optional.ofNullable(onFalse);
}
public ConditionDefinition getConditionDefinition() {
return condition;
}
public List<ExecutionStepDefinition> getOnTrue() {
return onTrue;
}
public Optional<List<ExecutionStepDefinition>> getOnFalse() {
return onFalse;
}
}
简单来说condition表达式成立则走onTrue List,失败则走onFalse List。
二、分析需求
所谓的多分支Switch case 若按Java的理解Switch只有一个condition,case对结果进行对应,对应上则走对应逻辑。
但由于目前logzio/sawmill已经实现的confition都是boolean的,若重新构建一种condition则需做很多重复工作(看起来只是返回非boolean的,但是语法等需要重复的实现)。
所以我将需求想象为每个case都有一个condition.condition成立则走对应逻辑,不成立则下一个case。
当需求分析到如上,我想到了责任链,通过维护case list的index来保证链式调用,当condition成立直接去执行对应的processor列表。
代码改造:修改类ExecutionStepDefinitionParser
/**
* @program bdp-parent
* @description: 此类的作用:1、解析Switch case的层次结构
* 2、使NewConditionalExecutionStepDefinition 产生作用
* @author: back
* @create: 2019/09/02 15:00
*/
public class ExecutionStepDefinitionParser {
public static List<ExecutionStepDefinition> parse(List<Map<String, Object>> configMapList) {
if (configMapList == null) return null;
return configMapList.stream().map(NewExecutionStepDefinitionParser::parse).collect(Collectors.toList());
}
private static ExecutionStepDefinition parse(Map<String, Object> configMap) {
String type = JsonUtils.getTheOnlyKeyFrom(configMap);
if(type.equals("switch")){
List<Map<String, Object>> executionStepConfig = JsonUtils.getList(configMap, type, true);
return parseSwitchConditional(executionStepConfig);
}
Map<String, Object> executionStepConfig = JsonUtils.getMap(configMap, type, true);
if (type.equals("if")) {
return parseConditional(executionStepConfig);
}
return parseProcessor(type, executionStepConfig);
}
/**
* case 的list流经此处
* @param caseList
* @return
*/
private static ConditionalExecutionStepDefinition parseSwitchConditional(List<Map<String, Object>> caseList) {
NewConditionalExecutionStepDefinition condition1 =
new NewConditionalExecutionStepDefinition(null,null,null);
List<ConditionalExecutionStepDefinition> conditionalExecutionStepDefinitions
= new ArrayList<ConditionalExecutionStepDefinition>();
for (Map<String, Object> map : caseList){
Map<String, Object> aCase = JsonUtils.getMap(map, "case", true);
Map<String, Object> condition = JsonUtils.getMap(aCase, "condition", true);
List<Map<String, Object>> onTrue = JsonUtils.getList(aCase, "then", true);
ConditionDefinition conditionDefinition = ConditionDefinitionParser.parse(condition);
List<ExecutionStepDefinition> onTrueDefinitions = parse(onTrue);
ConditionalExecutionStepDefinition steps =
new ConditionalExecutionStepDefinition(conditionDefinition, onTrueDefinitions, null);
conditionalExecutionStepDefinitions.add(steps);
}
condition1.setCaseConditionList(conditionalExecutionStepDefinitions);
return condition1;
}
private static ConditionalExecutionStepDefinition parseConditional(Map<String, Object> config) {
Map<String, Object> condition = JsonUtils.getMap(config, "condition", true);
List<Map<String, Object>> onTrue = JsonUtils.getList(config, "then", true);
List<Map<String, Object>> onFalse = JsonUtils.getList(config, "else", false);
ConditionDefinition conditionDefinition = ConditionDefinitionParser.parse(condition);
List<ExecutionStepDefinition> onTrueDefinitions = parse(onTrue);
List<ExecutionStepDefinition> onFalseDefinitions = parse(onFalse);
return new ConditionalExecutionStepDefinition(conditionDefinition, onTrueDefinitions, onFalseDefinitions);
}
private static ProcessorExecutionStepDefinition parseProcessor(String processorType, Map<String, Object> config) {
String name = JsonUtils.getString(config, "name", false);
Map<String, Object> processorConfig = JsonUtils.getMap(config, "config", true);
ProcessorDefinition processorDefinition = new ProcessorDefinition(processorType, processorConfig);
List<Map<String, Object>> onFailure = JsonUtils.getList(config, "onFailure", false);
List<ExecutionStepDefinition> onFailureExecutionStepDefinitions = parse(onFailure);
List<Map<String, Object>> onSuccess = JsonUtils.getList(config, "onSuccess", false);
List<ExecutionStepDefinition> onSuccessExecutionStepDefinitions = parse(onSuccess);
return new ProcessorExecutionStepDefinition(processorDefinition, name, onFailureExecutionStepDefinitions, onSuccessExecutionStepDefinitions);
}
}
新增类:
/**
* @program bdp-parent
* @description:
* 变相的责任链 来使onfalse时重新执行下一个case的链条,维护index来保证链条顺序
*
* @author: back
* @create: 2019/09/02 16:24
*/
public class NewConditionalExecutionStepDefinition extends ConditionalExecutionStepDefinition {
private List<ConditionalExecutionStepDefinition> steps =
new LinkedList<ConditionalExecutionStepDefinition>();
private AtomicInteger index = new AtomicInteger(0);
private List<ExecutionStepDefinition> defaultStep = null;
public NewConditionalExecutionStepDefinition(ConditionDefinition condition, List<ExecutionStepDefinition> onTrue, List<ExecutionStepDefinition> onFalse) {
super(condition, onTrue, onFalse);
}
public void setCaseConditionList(List<ConditionalExecutionStepDefinition> steps) {
this.steps = steps;
}
public void setDefaultStep(List<ExecutionStepDefinition> defaultStep ){
this.defaultStep = defaultStep;
}
public ConditionDefinition getConditionDefinition() {
return this.steps.get(index.get()).getConditionDefinition();
}
public List<ExecutionStepDefinition> getOnTrue() {
return this.steps.get(index.get()).getOnTrue();
}
public Optional<List<ExecutionStepDefinition>> getOnFalse() {
int oldIndex = index.get();
if(oldIndex >= steps.size()-1)
return Optional.empty();
index.set(index.get()+1);
Integer currentIndex = index.get();
ExecutionStepDefinition a = this;
return Optional.of(new ArrayList<ExecutionStepDefinition>(){
{
add(a);
}
});
//List<ExecutionStepDefinition> executionStepDefinitions = new ArrayList<ExecutionStepDefinition>();
// executionStepDefinitions.add(this);
// Optional<List<ExecutionStepDefinition>> executionStepDefinitions1 = Optional.of(executionStepDefinitions);
//return executionStepDefinitions1;
}
}
改造完成,经测试达到效果。
附带一份改造后的使用语法:condition的种类请看具体实现类。这里只是简单展示。count是本地扩展的processor可以随笔改成你自己的processor的list
{
"switch": [
{"case": {
"condition": {
"hasValue": {
"field": "tag"
"possibleValues": "a"
}
},
"then": [{
"count":{"config": {}}
}]
}
},
{"case": {
"condition": {
"hasValue": {
"field": "tag"
"possibleValues": "b"
}
},
"then": [{
"count":{"config": {}}
}]
}
}
]
}