用户工具

站点工具


code:java:project-loom

Project Loom 体验

Project Loom 的目标是为了让Java 并发代码的编写、调试、调优、维护更加简单,给Java平台引入 Fiber 作为轻量级线程实现。

概念

Fiber(纤程)由两个部分组成:Continuation 和 Scheduler。

Continuation

粗糙地理解一下,continuation (延续)表示程序接下来要做的事情,也就是保存了当前运行的上下文(运行栈)以及接下来要执行的指令等。

delimited continuation (定界延续)可以有返回值,能转换成使用函数来表达,在 Project Loom 里定义了 java.lang.Continuation 来表示一个 delimited continuation。

最重要的一点,continuation 允许用户代码控制 suspend 和 resume,这也是轻量级线程主动让出执行权和恢复执行栈的基础。

Scheduler

调度器用于调度 continuation 的执行,在 Project Loom 里默认使用了 ForkJoinPool,但也允许针对不同的应用场景由用户自定义执行器实现。

设计

理解了几个基本概念以后,再稍微看一下 Fiber 这套架构的大致实现(open jdk 14 ea,所以内容仍然在调整中):

ContinuationContinuationScope scope;Runnable target;yield(ContinuationScope scope);run();ContinuationScopeString name;FiberVExecutor scheduler; // 调度器FiberScope scope; // 关联的 FiberScopeThread carrierThread; // 执行线程Continuation cont; // 关联的 ContinuationRunnable runContinuation; // 调度执行的逻辑short state; // Fiber 状态Object result; // 执行结果awaitTermination();cancel();join();FiberScopeThread owner;FiberScope parent;FiberScope open();Fiber<V> schedule(Runnable task);Fiber<V> schedule(Executor scheduler, Runnable task);Fiber<V> schedule(Callable<? extends V> task);Fiber<V> schedule(Executor scheduler, Callable<? extends V> task);ThreadFiber<?> fiber;FiberScope scope;Continuation cont;

  1. 从底层是增加了 Continuation 的抽象,通过 run 方法执行或者恢复执行,通过静态的 yield 方法来让出执行权,这个一般用户不需要关注。
  2. 上层提供 Fiber,但其实入口是 FiberScope,通过 FiberScope 调度 Runnable 或者 Callable 返回一个 Fiber,使用方持有 Fiber 后可以等待执行结束或者转换成 CompletableFuture 用于添加监听器,FiberScope 的调度方法允许用户指定独立的调度器实现,如果不指定就使用的一个 ForkJoinPool。
  3. Thread 的实现也做了调整,可以返回绑定在当前线程上执行的 FiberScope,Fiber,Continuation 信息。

使用

Fiber API

找个简单的例子来使用一下新的 Fiber API,递归遍历一个用户的 PAI 实验列表,每一次对目录的列表查询都是创建一个新的 Fiber 来实现。

    public static void main(String[] args) throws IOException, InterruptedException {
        Collection<PaiExperiment> experiments = new ConcurrentLinkedQueue<>();
        // 需要启动 fiber 的时候,调用 FiberScope#open 方法,使用 try-with-resource 语法会自动关闭
        try (var scope = FiberScope.open()) {
            scope.schedule(() -> listExperiments(experiments, "135063", 0L));
        }
        // FiberScope 的close方法会等待所有的fiber都执行完才会返回,所以可以获得最终的结果
        System.out.println(JSON.toJSONString(experiments));
    }
 
    private static void listExperiments(Collection<PaiExperiment> result, String userNumber,
                                        Long parentId) {
        List<Long> subDirs = new ArrayList<>();
        String url = "https://pai.alipay.com/api/v1.0/experiment.json?action=queryExperimentTree&userNumber=" +
            userNumber + "&parentId=" + parentId;
        JSONArray experiments = getPaiExperimentInfo(url, "experiments");
        for (int i = 0; i < experiments.size(); i++) {
            JSONObject experiment = experiments.getJSONObject(i);
            if (!experiment.getBooleanValue("isDir")) {
                PaiExperiment paiExperiment = new PaiExperiment();
                paiExperiment.setExperimentId(experiment.getLong("experimentId"));
                paiExperiment.setUserNumber(experiment.getString("userNumber"));
                paiExperiment.setName(experiment.getString("name"));
                paiExperiment.setProjectName(experiment.getString("projectName"));
                result.add(paiExperiment);
            } else {
                subDirs.add(experiment.getLong("id"));
            }
        }
        // 创建了新的 FiberScope,当前的 Fiber 会在这个新建 FiberScope 里的所有 Fiber
        // 执行完成后才会结束,但是因为 FiberScope 的 close 实现是通过 jcu 的 Condition#await 等待
        // 所以也不会占用线程资源
        try (var scope = FiberScope.open()) {
            subDirs.forEach(dir -> scope.schedule(() -> listExperiments(result, userNumber, dir)));
        }
    }

使用起来的成本非常低,再也不用担心因为要执行的任务因为需要访问网络或者读写文件等待担心启动太多的线程了。

生成器

也可以利用 Continuation 的 API 直接实现生成器

    public static void main(String[] args) {
        Supplier<Integer> generator = generator(1);
        for (int i = 0; i < 100; i++) {
            System.out.println(generator.get());
        }
    }
 
    public static Supplier<Integer> generator(int start) {
        final ContinuationScope scope = new ContinuationScope("generator");
        AtomicInteger holder = new AtomicInteger(0);
        final Continuation continuation = new Continuation(scope, () -> {
            int v = start;
            while (true) {
                holder.set(v);
                Continuation.yield(scope);
                v += 1;
            }
        });
        return () -> {
            continuation.run();
            return holder.get();
        };
    }

不知道怎么样优雅地处理 Continuation yield 返回值,所以用一个 holder 中转了一下,但是取值总是来自于局部变量 v。

参考

  1. Project Loom: Fibers and Continuations for the Java Virtual Machine:http://cr.openjdk.java.net/~rpressler/loom/Loom-Proposal.html
code/java/project-loom.txt · 最后更改: 2019/10/07 21:49 由 qiyi