当前位置: 首页 > 工具软件 > Go2o > 使用案例 >

同步、异步、非阻塞I/O, 进程,线程,协程 async await go这一篇就够了

长孙逸仙
2023-12-01

事件

发生了一件事,这件事是面向宇宙公开的,事件的所有信息会通过各种载体以事件源为中心向外传播,在时间和空间的方向上传播。 有兴趣的观察者会根据此事件的信息做出反应,或者把信息进行加工处理继续传播,或者加载处理成消息向某些特定的接收器传播;没有兴趣的观察者则不为所动。
举例:太阳升起来了,超新星爆发了, 一个人出生或者死亡,中华人民共和国成立,光览意外断掉,数据中心中某一台服务器的硬盘故障。
事件转信息:按记叙文六要素:Event(起因,经过,结果,时间,地点,人物)。

##消息
特定的接发送方向特定的接收方发送的待定的信息。
举例:早上打开手机收到很多推送消息;收到邮件,收到微信,发了一条短信。

事件与消息的区别

类型公开范围源地址是否明确目的地址是否存在是否有特定的中转站或者接收方
事件整个系统明确,事件源不确定
消息待定范围明确,消息发送方明确,消息接收方,或者中转站有,消息队列

编程模型

  • 在软件系统中,有一种编程模型叫事件驱动,
    当某一事件发生时会调用注册在该事件上的回调函数。例如:libevent库。
  • 在软件系统中,还有一种编程模型叫消息驱动
    当某一事件发生时,会有专门的程序负责决定把消息发到某个具体的消息队列里。如windows就是消息驱动机制,每个窗口有一个消息队列,windows内核程序负责把用户的输入,转化为消息,投递到一个或者多个需要消费此消息的队列里。
  • 在软件系统中,还还有一种编程模型叫数据驱动
    当数据来的时间才做计算,在flink里,event_time开窗口时很明显。
  • 在软件系统中,
    最最常用简单的模型叫轮询模式,CPU不断地查看一件事有没有发生,如有没有数据可以读,读取有没有完成。没完成则sleep一下继续查看。 显然不需要回调函数这么复杂。

同步

同步,意味着等待。 你需要别人做一件事,在别人做事儿的时候,你干等着。等别人做完,你拿到结果再做下一件事。
软件系统中,常见的比如阻塞I/O操作。当你要读一个文件,或者发送一段数据到网络上,这段数据没有读完或者发送完,程序会进入睡眠状态,等待操作完成。再例如,你在打电话,别人在说话的时候,你必须等他说完。

异步

异步,意味着没有等待。 你需要别人做一件事儿,在别人做事时,你不用干等着。你可以继续做下一件事儿。你可以在你把所有事儿做完后再去看看,你让别人做的事儿做好了没。或者别人做好时主动通知你一下。
可以看出一个人做事,是没法异步的。在计算机里如果只有一个线程,那也是没法异步的,在多个线程中,一个线程把任务提交给另一个线程执行,而自己干其他的事,这才叫异步执行任务。

阻塞 & 非阻塞

在软件系统中,经常需要进行I/O操作,如果在I/O操作时,你的程序进入了睡眠状态,并等I/O操作完成才被唤醒,这就叫阻塞。
非阻塞意味着你不会进入睡眠状态。 比如你在读一个socket,如果有数据就直接读成功,没有数据也不会进入睡眠,而是立即返回错误码,告诉你没有数据,一会儿再来读吧。明显非阻塞和轮询一起用。

python生成器

有这样一种场景,有一个集合,里边是0到1亿之间的奇数。如果在计算机里保存这个集合,
将会占用很大的空间。但是,我们既然知识是奇数,就可以只保存这个规律.
生成器需要外部触发一次运行,如gen.send(param), 这时,生成器会运行到yield这行并暂停,并返回。
等待下次触发,会拿到param,并继续运行到yield. 直到结束,抛出StopIteration异常。

def odd(n):
	for x in range(1,n,2):
		y = yield x
		print(y)

for x in odd(10):
	print(x)

odd就是一个生成器函数,因为有yield, gen = odd(10), gen是个生成器
生成器实现了迭代器的接口,所以可以用for去迭代。

gen=odd(8)
y=gen.send(None) #send None表示开始生成器,返回1
print("begin", y) #1
print(next(gen))  #3
print(next(gen))   #5
print(gen.send(10)) #7这里把10send到yield
print(gen.send(100)) #这里把100send到yield, 但是抛出StopIteration异常

python yield from

如果你想在一个生成器里返回另一个生成器

def wrapper_odd(n):
for x in odd(n):
yield x

python有更简单的语法糖 yield from

def wrapper_odd2(n):
yield from odd(n)## python生成器
有这样一种场景,有一个集合,里边是0到1亿之间的奇数。如果在计算机里保存这个集合,
将会占用很大的空间。但是,我们既然知识是奇数,就可以只保存这个规律.
生成器需要外部触发一次运行,如gen.send(param), 这时,生成器会运行到yield这行并暂停,并返回。
等待下次触发,会拿到param,并继续运行到yield. 直到结束,抛出StopIteration异常。

def odd(n):
	for x in range(1,n,2):
		y = yield x
		print(y)

for x in odd(10):
	print(x)

odd就是一个生成器函数,因为有yield, gen = odd(10), gen是个生成器
生成器实现了迭代器的接口,所以可以用for去迭代。

gen=odd(8)
y=gen.send(None) #send None表示开始生成器,返回1
print("begin", y) #1
print(next(gen))  #3
print(next(gen))   #5
print(gen.send(10)) #7这里把10send到yield
print(gen.send(100)) #这里把100send到yield, 但是抛出StopIteration异常

python yield from

### 如果你想在一个生成器里返回另一个生成器
def wrapper_odd(n):
	for x in odd(n):
		yield x
### python有更简单的语法糖 yield from
def wrapper_odd2(n):
	yield from odd(n)

递归生成器

TravelList把list变成一个非list成员,如果list里的成员也是个list就递归下去。

def TravelList(lst): #gen=TravelList([1,2,3]), gen是个生成器,可以用for x in gen迭代
	for item in lst:
		if isinstance(item, list):  #如果item是list,递归调用
			yield from TravelList(item) 
		else:
			yield item #如果item不是list
lst = [1,2,3,[4,5,6],7,[8,9,[10,11,[12,13,[14]],15],16]]
for x in TravelList(lst):
	print(x)

对比与使用场景

说了这么多概念,他们是什么关系呢?为什么要有这么多概念呢?
因为需要提高效率。不同应用场景下,使用最合适的模型才是最好的。

同步与异步对比

同步的优点是 编程时简单,代码易读,出错概率小,自上而下写代码,自然。
同步的缺点是遇到阻塞时,后续代码不能执行,导致效率低,吞吐低。
异步的优点:无需阻塞,把任务交给其他线程完成,自己可以做更多的事儿
异步的缺点:编程时麻烦,经常需要设置回调函数,使得代码不再是自上而下,理解困难。
要么需要不断查看异步任务完成,要么设置回调,要么等待异步任务完成。

下面以要请求两个http服务得到结果,并进行计算来举例看一下,同步和异步的区别。
一个更真实的例子:如一个用户要登录系统,
第一步:要去鉴权服务鉴权
第二步,鉴权成功要去数据库拉取用户信息
第三步,返回用户信息
伪代码
同步:

def login(p1, p2):
  param1 = http.get(p1)  #这里会阻塞,等鉴权完成
  if !param1.ok: #鉴权失败
    return
  param2 = http.get(p2)  #这里会阻塞,等拉取完成
  return (param1,param2)

异步回调编程:
看了如下代码,各种回调,本来同步代码里顺序的三步,现在硬生生被分成三个函数。
想到被libevent, epoll支配的痛苦了吧。

def login(p1, p2):
   http.get(p1, OnGetP1) #由于是异步,这里不会阻塞,实际上这个请求任务交给另一个线程去做了
   #这里可以直接返回了。 也可以处理另一个用户的请求
def OnGetP1(param1, p2): #param1就是http.get异步执行完的结果
    if !param1.ok: #鉴权失败
      return
   http.get(p2, OnGetP2, param1) #由于是异步,这里不会阻塞
def OnGetP2(param1, param2): #param2就是http.get异步执行完的结果
   return (param1,param2)

在上面的例子中,同步会被阻塞,一个线程同时只能处理一个用户的请求。
而异步可以同时处理多个请求,就在http.get(p1, OnGetP1)后,可以立即
响应另一个用户的请求,而不等这个http请求结束。

但是,回调模式让人过于痛苦,所以为了解决这些痛苦,就有了以下方式

解决异步回调的痛点

  • 多进程
  • 多线程
  • 同步非阻塞轮询
  • promise/future
  • async await
  • 协程

其中是协程方案,尤其在GO语言原生支持协程的情况下,使得异步与同步编程一模一样。

普通阻同步塞式TCP服务器

#!/bin/env python
# -*- coding:utf8 -*-
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
address = ('127.0.0.1', 9876)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
print("start tcp server at:", address)


def handle_client(cli):
    data = cli.recv(1024) #收数据,可能阻塞
    print("recv:", data)
    cli.send(data) #发数据可能阻塞
    cli.close()


while True:
    conn, addr = sock.accept()
    print("accept client:", addr)
    handle_client(conn) #这里阻塞了,没法处理下一个客户端了

异步&异步I/O

异步是一个任务由其他线程执行,主线程可以先做其他事,需要的时候再看异步任务有没有完成
异步任务可以是计算任务,也可以是I/O任务。
计算密集型任务消耗CPU,一般使用多线程,多进程的方式,让一个线程对应一个CPU核心
由于CPU核心数量毕竟有限,所以使用多线程、多进程也不会创建很多线程,进程,开销也小。
而I/O则不同,以常见的tcp服务为例,可能会有上百万个客户端连接过来,每个客户端都要I/O
如果每个客户端给开一个进程或者线程进行I/O,开销巨大。大部分CPU资源将开销在线程调度上
再者,底层网卡一般只有一个,即使这么多客户端需要并行,到网卡也需要排队,线性化处理
所以,我们何不使用一个线程专门做I/O, 另一个线程做计算。异步I/O由此而来。
linux epoll机制,允许一个线程监听n个socket, 并以事件的形式进行通知用户线程是否可以进行I/O
我们可以单独开一个线程,进行epoll, 并进行数据读写,读写好之后再通知主线程。
对于主线程来说这就叫异步I/O了

在编程中,经常需要定时去做一件事儿, 如sleep x秒后。 把定时任务也单独放在一个线程中执行,
执行完成后再通知主线程。 当然sleep这个函数除了睡眠,它什么都不做。

#异步&异步I/O能有很多方式,其中python的asyncio包有各种方式。

总的来说分为三类:

  • 异步计算:本质需要多线程支持,python有GIL锁,所以不可能进行异步并行计算。但是可以异步并发计算。
    提交异步task到另一个线程,或者使用协程进行异步
  • 异步I/O: 本质低层还是要依赖epoll. 但是如果直接封装epoll,操作接口还是比较复杂,如果在语法上做
    一些封装,如hook所有的I/O调用,如read, write. 在read,write时并不真正的进行I/O,而是注册到epoll上。
  • 异步定时器:定时任务,需要定时器,不能通过sleep来等。 asyncio包同样有hook sleep这个调用。因此
    调用asyncio.sleep并不会阻塞当前线程。
import time
import urllib.request
import concurrent.futures #真的异步I/O与计算
URLS=['https://www.baidu.com', 'https://www.taobao.com',
 'https://www.python.org', 'https://www.bilibili.com']

#load_url是会阻塞的
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        time.sleep(10)
        return conn.read()
#pyhon多线程去执行任务,返回future,有线程池。 这与C++11的promise/future非常类似
#然而python有GIL锁,同时只能进行一个线程。所以想真正在实现并行计算,还是多进程好
#但是,对于I/O密集型应用,多线程开销太多。 Python一个线程对应一个操作系统线程
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    #提交阻塞任务给线程池
    future_to_url = {executor.submit(load_url, url, 30):url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result() #获取对象结果,会阻塞  future.set_result由线程池里的线程完成时调用
        except Exception as exc:
            print("%r generated an exception: %s" % (url, exc))
        else:
            print("%r page is %d bytes" % (url, len(data)))

至此,高并发I/O的问题还是没有解决。多进程与多线程的问题是不能满足高并发
epoll方式可以满足高并发,但是,基于事件循环,使得完整的代码逻辑在I/O时变得破碎

import asyncio
async def say_after(delay,what):
    print("begin say_after:", delay, what)
    await asyncio.sleep(delay)
    print(what)
async def main(): #这种async函数就叫协程了
    print("hello 1")
    await asyncio.sleep(1) #这里会阻塞,并在1秒后从这里恢复运行,这不就是线程吗? await就是从语法上支持了协程的切换与调度
    print("hello 2")
    print("--------------------")
    #这才叫并发执行,虽然也在同一个线程,但是这里提交的是任务,主线程会依次把每个任务执行完成,或者执行到挂起
    task1 = asyncio.create_task(say_after(1,"hello"))
    task2 = asyncio.create_task(say_after(2,"world"))
    print(f"started at {time.strftime('%X')}")
    await task1  
    await task2
    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

python的coroutine在创建后是不会运行的,要等主线程去执行到协程。也就是asyncio.run
另外,python也不会创建多个线程去并发执行coroutine,所以这些coroutine并没有异步执行
一个coroutine,不执行到await他的这行代码,他是不会去异步执行的,如果想异步执行,只能async.create_task(co())了。
这样包装成task后,不需要await就会被调度执行
而javscript则不同,javascript引擎是单线程,但是浏览器,或者Node同时存在多个线程去并发进行I/O,这就是真正的异步了
因此,在js中, 调用main(),立即会执行print.
python与js的await/async的区别:https://zhuanlan.zhihu.com/p/103107567
python如果也想多个coroutine并发执行,则需要asyncio.run_coroutine_threadsafe(coro,loop),把coroutine交给另一个loop执行
https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
官网中明确说明,coroutine执行方式有三种:1. asyncio.run(co()) 2. asyncio.create_task(co()) 3. await co()
python的协程是可以await的,这与golang不同。golang的协程没法await,需要自已用用sync.WaitGroup来同步
python async/await用法大全 https://www.imooc.com/article/263959

javascript中的异步

const http = require(‘https’);

//我们把依靠回调函数的http.request封装了promise,可以进行await了
async function http_req(url) {
    //promise对象,表示结果要等异步执行完成才能确定,如果调用await promise_obj,则会等到promise完成
    //完成后再执行下一行代码
    //promise对象创建时处于pending状态,如果调用了resolve,则会到fullfilled状态,如果调用reject,则会进入rejected状态。
    return new Promise((resolve, reject) => {
        const options = {
            host: url,
            port: 443,
            path: '/'
        };
        // 发出请求。 这里如果也能await就好了, axios就可以await
        const req = http.request(options); //这一步可能阻塞,但是实际上发请求拿响应这件事儿,被封装成task提交到另一个线程去了
        req.end();

        req.on('response', (resp) => {  //接收响应还是要等回调
            console.log(`获得响应的信息: ${resp.statusCode}`);
            //如果使用回调函数,那就要在这里再来一次http.request,这要是嵌套深了,很痛苦
            resolve(resp); //一段时间后,这里的resolve调用使得promise从pending状态转移到了fullfilled状态
        });
        req.on('error', (e) => {
            reject(e); //一段时间后,这里reject使得promise从pending状态转移到了rejected状态
        })
    });
}

//有时候需要等第一个http请求成功再进行第二个请求
//如果使用回调函数,那就要在回调函数里再发起一次http请求
//未避免这种复杂的代码架构,js有async, await, await可以等一个异步操作结束
async function http2req() { //http2req()是async函数,所以返回的是Promise对象,就不用自己再new promise了
    h1 = await http_req("www.taobao.com") //await可以确保h1得到值才进行一下步;所以这一步实际上会挂起http2req这个函数,一会执行完成再从这里恢复,这不就是协程吗?
    console.log(`await1获得响应的信息: ${h1.statusCode}`);
    h2 = await http_req("www.python.org") //这里就可以使用h1的值了,不需要深度嵌套回调了 了
    console.log(`await2获得响应的信息: ${h2.statusCode}`);
    //throw new Error("bad bad bad"); 抛出异常就是调用reject了
    //在async函数里返回值会被封装成promise对象,同时把返回值设置好,并修改Promise对象的状态为fullfilled. 省得调用resolve了
    return { "h1": h1, "h2": h2 };
}
console.log("send request")

ans = http2req()
console.log("ans is a promise")
//最终还是得靠回调函数,主线程才能到得结果。因为JS是单线程程序
ans.then((res) => {
    console.log("res:", res);
}).catch((e) => {
    console.log("error catch:", e)
})

//javascript里提供的http库,file库,这些所有涉及到I/O的库,已经不是操作系统原生接口了
//是js专门hook的接口,所以调用这些I/O接口,并不会阻塞,只是提交了一个任务(Promise),由专门的I/O线程去处理
//这个I/O线程可以理解为使用了epoll的线程,等这I/O完成,再通知主线程去执行回调函数。

//然后,厉害的是async/await.  
//async使得一个函数的返回值变成了promise, 在函数体中return就是fullfill 这个promise, 在函数体中throw Error就是reject这个Promise
//await更强大,他能等一个promise完成。怎么等呢?就是挂起当前函数,等到promise变成fullfilled或者rejected后,再恢复运行
//这里很像是一个线程,或者进程切换时的场景,但是,这只是用户态一个小函数的状态保存,所以这种用户态轻量级线程,我们就叫它协程了


var sleep = function (time) {
    return new Promise(function (resolve, reject) {
        setTimeout(function () {
            resolve();
        }, time);
    });
};

async function exec() {
    await sleep(2000);
}

async function go() {
    console.log(Date.now())
    c1 = exec() //js任务已经开始执行,但是python却不是
    c2 = exec()//js任务已经开始与上一个任务并发执行,但是python却不是
    //c1,c2已经开始运行了
    console.log(c1, c2)
    await c1;
    await c2;
    console.log(Date.now())
}
async function go2() {
    console.log(Date.now())
    await exec() //c1运行2s
    await exec() //c2运行2s
    //c1,c2已经开始运行了
    console.log(Date.now())
}
//go(); // 2s
go2(); //4s

Linux AIO

linux内核支持AIO. 其实是用户线程把I/O请求提交给内核,内核完成I/O后通过信号,回调函数方式通知用户线程。
AIO不需要进行内核态.因此本质上要比使用epoll更加高效。但是听说AIO不成熟,对文件只能O_DIRECT.对socket不是刚好?
下面这段代码引用了:https://blog.csdn.net/summer_zgh/article/details/82416427
AIO也支持回调(还支持在新线程中回调,没有线程池也不好用),wait.

#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <fcntl.h>
#include <aio.h>

#define BUFFER_SIZE 1024

int MAX_LIST = 2;

int main(int argc, char **argv)
{
    //aio操作所需结构体
    struct aiocb rd;

    int fd, ret, couter;

    fd = open("test.txt", O_RDONLY);
    if (fd < 0)
    {
        perror("test.txt");
    }

    //将rd结构体清空
    bzero(&rd, sizeof(rd));

    //为rd.aio_buf分配空间
    rd.aio_buf = malloc(BUFFER_SIZE + 1);

    //填充rd结构体
    rd.aio_fildes = fd;
    rd.aio_nbytes = BUFFER_SIZE;
    rd.aio_offset = 0;

    //进行异步读操作
    ret = aio_read(&rd);
    if (ret < 0)
    {
        perror("aio_read");
        exit(1);
    }

    couter = 0;
    //  循环等待异步读操作结束
    while (aio_error(&rd) == EINPROGRESS)
    {
        printf("第%d次\n", ++couter);
    }
    //获取异步读返回值
    ret = aio_return(&rd);
    ((char *)rd.aio_buf)[ret] = 0;
    printf("\n\n返回值为:%d\n %s\n", ret, rd.aio_buf);

    return 0;
}

proactor & reactor

前面提到过多次,I/O操作是主线程中完成(reactor),还是在其他线程中读写好数据再通知主线程查看结果(proactor)
其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操作。
详解可以看:https://cloud.tencent.com/developer/article/1488120

c++ boost.asio

采用proactor模式,而且需要回调函数。不用看就知道使用起来有多麻烦了吧。
C++20 引用协程,可以更好的使用boost.asio co_await
看看boost的文档:没有co_await之前,各种回调,简单让人头大

c++ boost.asio + 回调

https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/example/cpp17/coroutines_ts/echo_server.cpp

#include <cstdlib>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>

using boost::asio::ip::tcp;

class session
{
public:
  session(boost::asio::io_service& io_service)
    : socket_(io_service)
  {
  }

  tcp::socket& socket()
  {
    return socket_;
  }

  void start()   //开始session就是要注册一个回调函数,当socket数据读好之后再调用回调, proactor模式,
  {
    socket_.async_read_some(boost::asio::buffer(data_, max_length),
        boost::bind(&session::handle_read, this,
          boost::asio::placeholders::error,
          boost::asio::placeholders::bytes_transferred));
  }

  void handle_read(const boost::system::error_code& error,    //proactor模式,所以这里回调过来。数据已经读好在Buffer里了
      size_t bytes_transferred)
  {
    if (!error)
    {
      boost::asio::async_write(socket_,                      //要写数据,当然也是先请求写,把写请求提交给io_service事件循环啦,等写完成再回调handle_write
          boost::asio::buffer(data_, bytes_transferred),
          boost::bind(&session::handle_write, this,
            boost::asio::placeholders::error));
    }
    else
    {
      delete this;
    }
  }

  void handle_write(const boost::system::error_code& error)  //这里回调这来,已经写好了。而不是像Epoll,通知你可以写了。
  {
    if (!error)
    {
      socket_.async_read_some(boost::asio::buffer(data_, max_length),
          boost::bind(&session::handle_read, this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));
    }
    else
    {
      delete this;
    }
  }

private:
  tcp::socket socket_;
  enum { max_length = 1024 };
  char data_[max_length];
};

class server
{
public:
  server(boost::asio::io_service& io_service, short port)
    : io_service_(io_service), //初始化了io_service
      acceptor_(io_service, tcp::endpoint(tcp::v4(), port)) //初始化了acceptor
  {
    session* new_session = new session(io_service_);  //新建立那个session,等着新的客户端连接来
    acceptor_.async_accept(new_session->socket(),     //异步accept,这里其实不会accept,而是有客户端连接来,才会调用handle_accept
        boost::bind(&server::handle_accept, this, new_session,
          boost::asio::placeholders::error));
  }

  void handle_accept(session* new_session,           //这个回调执行了,说明有连接来,io_service的epoll发现了,并回调
      const boost::system::error_code& error)
  {
    if (!error)
    {
      new_session->start();                         //会话开始。到此一个新连接开始处理。
      new_session = new session(io_service_);       //这段代码又重复了上边的逻辑,因为,又需要注册异步回调函数accept了。也就说上面的注册只生效一次
      acceptor_.async_accept(new_session->socket(),
          boost::bind(&server::handle_accept, this, new_session,
            boost::asio::placeholders::error));
    }
    else
    {
      delete new_session;
    }
  }

private:
  boost::asio::io_service& io_service_;
  tcp::acceptor acceptor_;
};

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: async_tcp_echo_server <port>\n";
      return 1;
    }
    //创建io_service,可以理解为事件循环
    boost::asio::io_service io_service;

    using namespace std; // For atoi.
    //创建tcp server
    server s(io_service, atoi(argv[1]));

    //开始事件循环
    io_service.run();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
}

c++ boost.asio + 协程

c++也支持协程:https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/example/cpp17/coroutines_ts/echo_server.cpp

#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/write.hpp>
#include <cstdio>

using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::use_awaitable;
namespace this_coro = boost::asio::this_coro;

#if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
# define use_awaitable \
  boost::asio::use_awaitable_t(__FILE__, __LINE__, __PRETTY_FUNCTION__)
#endif

//处理客户端连接,也是个协程
awaitable<void> echo(tcp::socket socket)
{
  try
  {
    char data[1024];
    for (;;)
    {
      //不用去注册回调函数来处理读数据成功了,就在这里。没有回调函数,代码不会被突然分割到其他函数中,看起来省脑子多了
      std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);
      //写数据也是
      co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);
    }
  }
  catch (std::exception& e)
  {
    std::printf("echo Exception: %s\n", e.what());
  }
}

//这是awaitable函数
awaitable<void> listener()
{
  //运行器
  auto executor = co_await this_coro::executor;
  tcp::acceptor acceptor(executor, {tcp::v4(), 55555});
  for (;;)
  {
    //在这里等待接收一个连接,不用创建回调函数了,一个co_await,
    tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
    //创建一个协程去处理客户端连接
    co_spawn(executor, echo(std::move(socket)), detached);
  }
}

int main()
{
  try
  {
   //创建事件循环
    boost::asio::io_context io_context(1);

    //停机信号也是这么简洁
    boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
    signals.async_wait([&](auto, auto){ io_context.stop(); });

    //创建了个协程,listener
    co_spawn(io_context, listener(), detached);

    //事件循环跑起来
    io_context.run();
  }
  catch (std::exception& e)
  {
    std::printf("Exception: %s\n", e.what());
  }
}
/*
g++ (GCC) 10.2.0
boost_1_75
g++  -std=c++20  -fcoroutines  -L/home/wangyonggang.adson/software/lib64/ ./boost_asio_test.cpp  -o boost_asio -lboost_system -lpthread -lstdc++ -lrt
*/

从I/O到协程

还是最初的问题,如果每个tcp客户端创建一个线程,受限于操作系统线程数量和调度性能,不能支持高并发。
于是就有两种解决思路
1.使用单线程,事件循环,用epoll技术监听多个文件描述符。 解决了阻塞和多线程开销过度问题,但是存在回调函数,代码逻辑割裂。
2.优化线程异性问题,使用用户态轻量级线程–协程,并重写所有I/O接口,使得I/O时自动挂起协程。I/O完成时唤醒协程。 解决了阻塞和线程过于重量级的问题,同时代码不再割裂。 但是,缺点是有的第三方库还是会阻塞一个操作系统线程,因此需要把第三方库重写一遍。

GO语言,协程解决所有问题

GO语言号称现代的C语言,语言级别协程支持。但是不是异步I/O. I/O还是本协程完成的。
处理器核心数:操作系统线程数据:GO协程数 = K:M:N
想了解MPG参考这个:https://www.jianshu.com/p/eae8b66e766d

package main

import (
        "fmt"
        "io"
        "log"
        "net"
)

func worker(conn net.Conn) {
        defer conn.Close()
        b := make([]byte, 512)
        for {
                size, err := conn.Read(b)#看着像是阻塞读,其实是向Epoll注册了fd,并挂起协程,等有数据来,再唤醒协程来读,然后从此处继续执行
                if err == io.EOF {
                        break
                }
                if err != nil {
                        log.Fatal(err)
                }

                fmt.Printf("Received %v bytes from %v\n", size, conn.RemoteAddr())
                size, err = conn.Write(b[0:size])
                if err != nil {
                        log.Fatal(err)
                }
                fmt.Printf("Written %v bytes to %v\n", size, conn.RemoteAddr())
        }
}

func main() {
        listner, err := net.Listen("tcp", "127.0.0.1:8085")
        if err != nil {
                log.Fatal(err)
        }
        fmt.Printf("Listering on %v\n", listner.Addr())
        for {
                conn, err := listner.Accept() #看着像是阻塞,其实是向Epoll注册了fd,并挂起协程,等有连接来,再唤醒协程,继续执行
                if err != nil {
                        log.Fatal(err)
                }
                fmt.Printf("Accepted connection to %v from %v\n", conn.LocalAddr(), conn.RemoteAddr())
                go worker(conn) #go一个,启动一个协程。不用像python/js/c++,非要把函数声明为异步函数。 Go协程轻量级,每个请求给分配一个,百万并发没问题
        }
}

总结

到此, js中的那些async/await; C++中的co_await, co_spawn, awaitable; python中的async await, yield, yield from;
以及这前语言中的各种I/O回调机制;各种协程,同步异步I/O都应该非常清楚了本质上是一样的。

当你看到async, yield, promise你立即想到,这段代码不会立即执行,要提交成任务,再执行。想拿到结果需要等等。
怎么等呢,await. 更高级的是使用协程,把async和await隐藏了,在所有I/O操作时,自动挂起协程,I/O完成再唤醒协程。

 类似资料: