当前位置: 首页 > 工具软件 > Quasar-Java > 使用案例 >

Java的协程Quasar

广宏远
2023-12-01

协程是对函数和线程进一步优化的产物, 是一种函数的编排方式, 将传统意义上的函数拆成更小粒度的过程. 简单说, 就是比函数粒度还要小的可手动控制的过程. 

协程可以通过yield 来调用其它协程,接下来的每次协程被调用时,从协程上次yield返回的位置接着执行,通过yield方式转移执行权的协程之间不是调用者与被调用者的关系,而是彼此对称、平等的。

协程vs函数

函数可以调用其他函数,调用者等待被调用者结束后继续执行,因此函数的生命期遵循后进先出,即最后一个被调用的函数最先结束返回。协程的生命期完全由对它们的使用需要来决定。
函数的起始处是惟一的入口点,每当函数被调用时,执行都从被调用函数的起始处开始。协程可以有多个入口点,协程的起始处是第一个入口点,每个yield返回出口点都是再次被调用执行时的入口点。
函数只在结束时一次性的返回全部结果值。协程可以在yield时不调用其他协程,而是每次返回一部分的结果值,这种协程常称为生成器或迭代器。
现代的指令集架构通常提供对调用栈的指令支持,便于实现可递归调用的函数。在以Scheme为代表的提供续体的语言环境下,可用此控制状态抽象表示来实现协程。

函数可以看作是特定状况的协程,任何函数都可转写为不调用yield的协程

协程vs线程

协程占用内存小, 在Quasar库实现中, 一个空闲的Fiber只占400个字节左右的内存, 而一个Java线程需要至少1MB, 是协程的千倍. Fibers provide functionality similar to threads, and a similar API, but they’re not managed by the OS. They are lightweight in terms of RAM (an idle fiber occupies ~400 bytes of RAM) and put a far lesser burden on the CPU when task-switching. You can have millions of fibers in an application

另外的好处是对比使用多线程来解决IO阻塞任务,使用协程不用加锁,访问共享的数据不用进行同步操作。使用协程之所以不需要加锁不是因为所有的协程只在一个线程中运行,而是因为协程的非抢占式的特点。也就是说,协程在没主动交出CPU之前都是不会被突然切换到其它协程上。而线程是抢占式的,使用多线程你不能确定线程什么时候被操作系统调度,什么时候被切换,因此需要用锁到实现一种“原子操作”的语义。

协程vs异步回调

常见的做法是使用非阻塞的IO(比如是异步IO,又或者是在syscall上自己实现的一套异步IO,如asio)并且将处理操作写在回调函数中。这样的做法一般没什么问题,但当回调函数变多,一段连贯的业务代码就会被拆分到多个回调函数之中,增加维护的成本。因此使用协程可以用同步的写法写出效果相当于是异步的代码。

在Java中通过Quasar库实现协程

对应JDK8的最高版本为0.7.9, 需要在maven中引入依赖

<dependency>
	<groupId>co.paralleluniverse</groupId>
	<artifactId>quasar-core</artifactId>
	<version>0.7.9</version>
	<classifier>jdk8</classifier>
</dependency>

通过一个channel, 将生成的数据推送给处理者, 这个流程是可以多级串联的, 达到生成和处理交叉进行的效果.

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.Channels;

import java.util.concurrent.ExecutionException;

public class FiberExample {
    private static void printer(Channel<Integer> in) throws SuspendExecution,  InterruptedException {
        Integer v;
        while ((v = in.receive()) != null) {
            System.out.println("<< " + v);
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
        //定义两个Channel
        Channel<Integer> naturals = Channels.newChannel(1024, Channels.OverflowPolicy.BLOCK, true, true);
        Channel<Integer> squares = Channels.newChannel(1024, Channels.OverflowPolicy.BLOCK, true, true);

        //运行两个Fiber实现.
        new Fiber<>(() -> {
            for (int i = 0; i < 1000; i++) {
                System.out.println(">> " + i);
                naturals.send(i);
            }
            naturals.close();
        }).start();

        new Fiber<>(() -> {
            while (!naturals.isClosed()) {
                Integer v = naturals.receive();
                System.out.println("< " + v);
                squares.send(v * v);
            }
            System.out.println("Stopped receiving messages");
            squares.close();
        }).start();

        System.out.println("Reached printer");
        printer(squares);
    }
}

 在Fiber的处理方法中做Thread相关的操作会引起fiber失效.

 类似资料: