Coup de Grace

Easy系列 流程引擎源码解读

j-easy系列

这哥们的水平相当不错,不过可能是社区驱动型的…

没人提 enhancement 就做成1000行左右的玩具就不继续下去了

起个名字叫 paper actualizer 好了.

热衷 MIT License 真是个好人啊.

不过我是 WTFPL 的圣人(斜眼


流程引擎

谁不讨厌BPMN的文档呢?

基本抽象:

倒是没什么可说的,不过我想就量级这个事儿摆几个 Scope.

基本用法

class PrintMessageWork implements Work {

    private String message;

    public PrintMessageWork(String message) {
        this.message = message;
    }

    public String getName() {
        return "print message work";
    }

    public WorkReport call() {
        System.out.println(message);
        return new DefaultWorkReport(WorkStatus.COMPLETED);
    }
}


PrintMessageWork work1 = new PrintMessageWork("foo");
PrintMessageWork work2 = new PrintMessageWork("hello");
PrintMessageWork work3 = new PrintMessageWork("world");
PrintMessageWork work4 = new PrintMessageWork("ok");
PrintMessageWork work5 = new PrintMessageWork("nok");

WorkFlow workflow = aNewSequentialFlow() // flow 4
        .execute(aNewRepeatFlow() // flow 1
                    .named("print foo 3 times")
                    .repeat(work1)
                    .times(3)
                    .build())
        .then(aNewConditionalFlow() // flow 3
                .execute(aNewParallelFlow() // flow 2
                            .named("hello world in parallel")
                            .execute(work2, work3)
                            .build())
                .when(WorkReportPredicate.COMPLETED)
                .then(work4)
                .otherwise(work5)
                .build())
        .build();

WorkFlowEngine workFlowEngine = aNewWorkFlowEngine().build();
WorkReport workReport = workFlowEngine.run(workflow);

实现

抽象

实现

Flows

接下来的实现就喜闻乐见了,随便举个并行执行例子.

public class ParallelFlow extends AbstractWorkFlow {

    private List<Work> works = new ArrayList<>();
    private ParallelFlowExecutor workExecutor;

    ParallelFlow(String name, List<Work> works, ParallelFlowExecutor parallelFlowExecutor) {
        super(name);
        this.works.addAll(works);
        this.workExecutor = parallelFlowExecutor;
    }

    public ParallelFlowReport call() {
        ParallelFlowReport workFlowReport = new ParallelFlowReport();
        List<WorkReport> workReports = workExecutor.executeInParallel(works);
        workFlowReport.addAll(workReports);
        return workFlowReport;
    }
}

class ParallelFlowExecutor {

    /*
     * TODO Making the executor configurable requires to answer the following questions first:
     *
     * 1. If the user provides a custom executor, when should it be shutdown? -> Could be documented so the user shuts it down himself
     * 2. If the user provides a custom executor which is shared by multiple parallel flow, shutting it down here (as currently done) may impact other flows
     * 3. If it is decided to shut down the executor at the end of the parallel flow, the parallel flow could not be re-run (in a repeat flow for example) since the executor will be in an illegal state
     */
    private ExecutorService workExecutor;

    ParallelFlowExecutor() {
        this.workExecutor = Executors.newScheduledThreadPool(
            Runtime.getRuntime().availableProcessors());;
    }

    List<WorkReport> executeInParallel(List<Work> works) {
        // re-init in case it has been shut down in a previous run
        //(See question 3).
        if(workExecutor.isShutdown()) {
            workExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
        }

        // submit works to be executed in parallel
        Map<Work, Future<WorkReport>> reportFutures = new HashMap<>();
        for (Work work : works) {
            Future<WorkReport> reportFuture = workExecutor.submit(work);
            reportFutures.put(work, reportFuture);
        }

        // poll for work completion
        int finishedWorks = works.size();
        // FIXME polling futures for completion,
        // not sure this is the best way to run callables in parallel and wait them for completion 
        // (use CompletionService??)
        while (finishedWorks > 0) {
            for (Future<WorkReport> future : reportFutures.values()) {
                if (future != null && future.isDone()) {
                        finishedWorks--;
                }
            }
        }

        // gather reports
        List<WorkReport> workReports = new ArrayList<>();
        for (Map.Entry<Work, Future<WorkReport>> entry : reportFutures.entrySet()) {
            workReports.add(entry.getValue().get());
        }
        // because if not, the workflow engine may run forever.. 
        //(See question 2).
        workExecutor.shutdown(); 
        return workReports;
    }
}


done.