stepchain 通用业务流程流水线处理框架。
类似于Commons Chain和Commons Pipeline这样的Java Pipeline Step Chain用于组织复杂处理流程执行的流行技术。
Java Pipeline Step Chain like Apache Commons Chain and Commons Pipeline。 A popular technique for organizing the execution of complex processing flows is the "Chain of Responsibility" pattern。
gitee: https://gitee.com/zengfr/stepchain
github: https://github.com/zengfr/stepchain-spring-boot-starter/
Repositories Central Sonatype Mvnrepository
Feature: 1、支持通用业务job、services子流程无限制拆分。 2、支持业务子流程串行化、业务子流程并行化,可配置化。 3、支持Config业务子流程开启或禁用、配置串行或并行以及并行数的统一配置。 4、支持业务流程以及子流程任意无限嵌套。 5、支持配置中心、缓存、统一数据接口、redis、Es、日志Trace等。 6、支持并行分支,支持条件分支if/else、switch、loop子流程. 7、支持Processor定时调度FixedRate、FixedDelay。 备注:只开源了通用部分(不影响使用),去除了有关框架组件包括:配置中心、缓存中心、数据接口以及业务相关DataMiddle等部分API。
Maven Dependency: Maven(Not Use Spring Boot): <dependency> <groupId>com.github.zengfr.project</groupId> <artifactId>stepchain</artifactId> <version>0.0.7</version> <dependency> Maven(Use Spring Boot): <dependency> <groupId>com.github.zengfr.project</groupId> <artifactId>stepchain-spring-boot-starter</artifactId> <version>0.0.7</version> <dependency> Gradle: compile group: 'com.github.zengfr.project', name: 'stepchain', version: '0.0.7' compile group: 'com.github.zengfr.project', name: 'stepchain-spring-boot-starter', version: '0.0.7'
interface Pipeline ChainBuilder StepBuilder Step Chain javadoc api文档
1、StepChain 的中心思想是什么?如何做到通用的? 答: 1.1、任何业务逻辑处理抽象成1\input输入 2\ processor处理器 3\output输出.中间过程结果产生和组合成dataMiddle。 1.2、任何业务逻辑处理使用多个processor组合执行。 2、StepChain 如何并行和串行执行多个processor? 答: 串行step=pipeline.createStep();step.put(processors);//processors串行执行. 并行step=pipeline.createStep(4);step.put(processors);//processors同时4个并行执行. 3、Stepchain 如何创建processor? 3.1、实现 IProcessor 接口。 3.2、使用IProcessorBuilder: <I> IProcessor<I, Boolean> createProcessor(Predicate<I> predicate); <I> IProcessor<I, Boolean> createProcessor(Consumer<I> consumer); <I, O> IProcessor<I, O> createProcessor(Function<I, O> func); 4、StepChain 如何复用和组合processor? 4.1、使用IChainBuilder、IChain: 4.2、使用IProcessorBuilder: <A, B, C> IProcessor<A, C> createProcessor(IProcessor<A, B> first, IProcessor<B, C> second); <A, B, C, D> IProcessor<A, D> createProcessor(IProcessor<A, B> processor1, IProcessor<B, C> processor2, IProcessor<C, D> processor3); 5、StepChain 如何按条件复用和组合processor? 答: case1、已有trueProcessor\falseProcessor2个 创建 validator 则按条件执行2则之1. IConditionSelectorProcessor<String, Boolean, String> p3 = pipeline.createConditionValidatorProcessor(validator, trueProcessor, falseProcessor); case2、已有processor 创建 validator 创建循环执行体,validator 返回false时终止执行。 IConditionLoopProcessor<String, String> p2 = pipeline.createConditionLoopProcessor(validator, processor); case3、已有processor创建 switch 逻辑,根据selector返回的key执行某1分支branchProcessor如果返回的key不在分支中 则执行默认key对应的分支branchProcessor。 IConditionSelectorProcessor<String, String, String> p1 = pipeline.createConditionSelectorProcessor(selector); p1.setBranch(S key, IProcessor<I, O> processor); p1setDefaultBranch(S key); case4、已有processor创建 if/else if/else 逻辑,根据validator返回的结果与result对比一致则执行分支branchProcessor,如果没有返回一致的 则执行默认分支branchProcessor。 pipeline.createConditionValidatorSelectorProcessor(); public interface IConditionValidatorSelectorProcessor<I,O> extends IProcessor<I, O> { void setBranch(IProcessor<I, Boolean> validator,Boolean result,IProcessor<I, O> processor); void setDefaultBranch(IProcessor<I, O> processor); }
public interface IStep<I> extends IStepProcessor<I> { void put(IStepProcessor<I> processor); void put(IStepProcessor<I>... processorArray); void put(Collection<StepProcessor<I>> processors); void put(IProcessor<I, Boolean> processor); void put(IProcessor<I, Boolean>... processorArray); void put(IChain<I, Boolean> chain); void put(IChain<I, Boolean>... processorArray); void put(Function<I, Boolean> func); void put(Function<I, Boolean>... processorArray); } public interface IChain<A, B> extends IProcessor<A, B> { <C> IChain<A, C> next(IProcessor<B, C> process); <C> IChain<A, C> next(Function<B, C> func); } public interface IChainBuilder { <A, B> IChain<A, B> createChain(Function<A, B> func); <A, B> IChain<A, B> createChain(IProcessor<A, B> processor); <A, B, C> IChain<A, C> createChain(IProcessor<A, B> processor1, IProcessor<B, C> processor2); } public interface IStepBuilder { <T> IStep<T> createStep(); <T> IStep<T> createStep(int parallelCount); <T> IStep<T> createStep(String parallelCountConfigName); }
PipelineTest.java
Demo&Test you can use AbstractProcessor AbstractStepProcessor
import com.github.zengfr.project.stepchain abstract class AbstractProcessor<I, O> implements Processor<I, O>{} abstract class AbstractStepProcessor<A> extends AbstractProcessor<A, Boolean> implements StepProcessor<A>{}
import com.github.zengfr.project.stepchain.Chain; import com.github.zengfr.project.stepchain.Pipeline; import com.github.zengfr.project.stepchain.Step; import com.github.zengfr.project.stepchain.context.ContextBuilder; import com.github.zengfr.project.stepchain.context.UnaryContext; import com.github.zengfr.project.stepchain.test.context.SetProductContext; import com.github.zengfr.project.stepchain.test.context.SetProductDataMiddle; import com.github.zengfr.project.stepchain.test.processor.DiscountProcessor; import com.github.zengfr.project.stepchain.test.processor.FeeProcessor; import com.github.zengfr.project.stepchain.test.processor.IncreaseProcessor; import com.github.zengfr.project.stepchain.test.processor.InitProcessor; import com.github.zengfr.project.stepchain.test.processor.TaxProcessor; public class PipelineTest { public static void testPipeline(IPipeline pipeline) throws Exception { //Demo精简版 只开源了通用部分(不影响使用) SetProductRequest req = new SetProductRequest(); SetProductResponse resp = new SetProductResponse(); SetProductDataMiddle middle = new SetProductDataMiddle(); SetProductContext context = new SetProductContext(req, middle, resp); IStep<SetProductContext> step = pipeline.createStep(); step.put(new InitProcessor()); step.put(new TaxProcessor()); step.put(new FeeProcessor()); step.put(new IncreaseProcessor()); step.put(new DiscountProcessor()); step.put((c) -> { c.middle.Price += 10; return true; }); step.process(context); System.out.println(context.middle.Price); } public static void testPipeline2(IPipeline pipeline) throws Exception { Function<UnaryContext<Integer>, Boolean> func = (context) -> { if (context.context == null) context.context = 1; context.context += 1; return true; }; Function<UnaryContext<Integer>, String> func3 = (context) -> { if (context.context == null) context.context = 1; context.context += 1; return JSON.toJSONString(context.context); }; UnaryContext<Integer> context = pipeline.createContext(12345678); IStep<UnaryContext<Integer>> step = pipeline.createStep(); IStep<UnaryContext<Integer>> step2 = pipeline.createStep(); IChain<UnaryContext<Integer>, Boolean> c2 = pipeline.createChain(func); IChain<UnaryContext<Integer>, String> c3 = pipeline.createChain(func3); Function<String, Integer> func4 = null; Function<Integer, String> func5 = null; Function<String, Boolean> func6 = null; IChain<String,Integer > c4 = pipeline.createChain(func4); IChain<Integer, String> c5 = pipeline.createChain(func5); IChain<String, Boolean> c6 = pipeline.createChain(func6); IChain<UnaryContext<Integer>, Boolean> c7 = c3.next(c4).next(c5).next(c6); step2.put(c2); step2.put(step); step2.put(func); //step2.put(c7); step2.process(context); System.out.println(context.context); } public static void testPipeline3(IPipeline pipeline) throws Exception { IProcessor<String, String> selector = null; IProcessor<String, Boolean> validator = null; IProcessor<String, String> processor = null; IProcessor<String, String> first = null; IProcessor<String, String> second = null; IConditionSelectorProcessor<String, Boolean, String> p3 = pipeline.createConditionValidatorProcessor(validator, first, second); IConditionLoopProcessor<String, String> p2 = pipeline.createConditionLoopProcessor(validator, processor); IConditionSelectorProcessor<String, String, String> p1 = pipeline.createConditionSelectorProcessor(selector); }
@RunWith(SpringRunner.class) @SpringBootTest(classes = StepChainTestApplication.class) public class StepChainSpringBootTest { @Autowired protected IPipeline pipeline; @Test public void testPipeline() throws Exception { PipelineTest.testPipeline(pipeline); } @Test public void testPipeline2() throws Exception { PipelineTest.testPipeline2(pipeline); }
问题内容: 我试图在尽可能短的时间内插入大量(-ish)元素,并且尝试了以下两种选择: 1)流水线: 2)批处理: 我没有注意到任何明显的时差(实际上,我希望批处理方法会更快):对于大约250K的插入,流水处理大约需要7秒,而批处理大约需要8秒。 阅读有关流水线的文档, “使用流水线使我们能够立即将两个请求都发送到网络上,从而消除了大部分延迟。此外,它还有助于减少数据包碎片:单独发送20个请求(等
业务流程 PDF版下载 第一步:第三方服务商注册应用 在如流企业管理后台注册成为第三方服务商, 注册成为第三方服务商之后,可登录如流企业管理后台创建第三方应用。 第二步:企业管理员授权应用 在如流管理后台应用市场找到适用的第三方应用后,管理员安装并授权第三方应用。 系统将展示第三方应用授权页面,管理员根据授权页面的引导,确认授权内容,完成授权操作 。 第三步:第三方服务商提供服务 完成授权后,第三
对于MIPS架构的标准5级管道,并假设一些指令相互依赖,如何将管道气泡插入到以下汇编代码中? 首先我们插入一个气泡,我们 如您所见,当I3暂停时,I4可以继续解码。对不对?下一个 我认为这在MIPS的标准管道中是可能的,但有人说,每当插入气泡时,整个管道都会停顿。如何才能解决这个问题?
我发现JVM只有一个线程池用于并行处理流。我们在一个大的流上有一个I/O阻塞的函数,这导致了与不相关的并行流一起使用的不相关的或者快速的函数的活跃度问题。 stream上没有允许使用备用线程池的方法。 有没有一种简单的方法来避免这个问题,也许是以某种方式指定要使用哪个线程池?
我是apache beam环境中的新手。正在尝试为批处理业务流程安装apache beam管道。 我对批次的定义如下 批次== 作业/子作业之间可能存在依赖关系。 apache波束管道可以用我的自定义批次映射吗??
我知道这里之前有人问过这个问题:Kafka流并发? 但这对我来说很奇怪。根据文档(或者我可能遗漏了什么),每个分区都有一个任务,这意味着不同的处理器实例,每个任务由不同的线程执行。但是当我测试它的时候,我看到不同的线程可以得到不同的处理器实例。因此,如果你想在处理器中保持内存状态(老式的方式),你必须锁定? 线程ID:88 ID:c667e669-9023-494b-9345-236777e9df
主要内容:实例,实例,实例,实例,实例,实例关键词:流水线,乘法器 硬件描述语言的一个突出优点就是指令执行的并行性。多条语句能够在相同时钟周期内并行处理多个信号数据。 但是当数据串行输入时,指令执行的并行性并不能体现出其优势。而且很多时候有些计算并不能在一个或两个时钟周期内执行完毕,如果每次输入的串行数据都需要等待上一次计算执行完毕后才能开启下一次的计算,那效率是相当低的。流水线就是解决多周期下串行数据计算效率低的问题。 流水线 流水线的基
问题内容: 我必须用Java实现HTTP客户端,并且出于我的需要,似乎最有效的方法是实现HTTP管道(按照RFC2616)。 顺便说一句,我想管道POST。(我也不在谈论多路复用。我在谈论流水线,即在接收到任何HTTP请求的响应之前,通过一个连接发送许多请求) 我找不到明确声明其支持流水线的第三方库。但是我可以使用例如Apache HTTPCore 来构建这样的客户端,或者如果需要的话,可以自己构