发生了一件事,这件事是面向宇宙公开的,事件的所有信息会通过各种载体以事件源为中心向外传播,在时间和空间的方向上传播。 有兴趣的观察者会根据此事件的信息做出反应,或者把信息进行加工处理继续传播,或者加载处理成消息向某些特定的接收器传播;没有兴趣的观察者则不为所动。
举例:太阳升起来了,超新星爆发了, 一个人出生或者死亡,中华人民共和国成立,光览意外断掉,数据中心中某一台服务器的硬盘故障。
事件转信息:按记叙文六要素:Event(起因,经过,结果,时间,地点,人物)。
##消息
特定的接发送方向特定的接收方发送的待定的信息。
举例:早上打开手机收到很多推送消息;收到邮件,收到微信,发了一条短信。
类型 | 公开范围 | 源地址是否明确 | 目的地址是否存在 | 是否有特定的中转站或者接收方 |
---|---|---|---|---|
事件 | 整个系统 | 明确,事件源 | 不确定 | 无 |
消息 | 待定范围 | 明确,消息发送方 | 明确,消息接收方,或者中转站 | 有,消息队列 |
同步,意味着等待。 你需要别人做一件事,在别人做事儿的时候,你干等着。等别人做完,你拿到结果再做下一件事。
软件系统中,常见的比如阻塞I/O操作。当你要读一个文件,或者发送一段数据到网络上,这段数据没有读完或者发送完,程序会进入睡眠状态,等待操作完成。再例如,你在打电话,别人在说话的时候,你必须等他说完。
异步,意味着没有等待。 你需要别人做一件事儿,在别人做事时,你不用干等着。你可以继续做下一件事儿。你可以在你把所有事儿做完后再去看看,你让别人做的事儿做好了没。或者别人做好时主动通知你一下。
可以看出一个人做事,是没法异步的。在计算机里如果只有一个线程,那也是没法异步的,在多个线程中,一个线程把任务提交给另一个线程执行,而自己干其他的事,这才叫异步执行任务。
在软件系统中,经常需要进行I/O操作,如果在I/O操作时,你的程序进入了睡眠状态,并等I/O操作完成才被唤醒,这就叫阻塞。
非阻塞意味着你不会进入睡眠状态。 比如你在读一个socket,如果有数据就直接读成功,没有数据也不会进入睡眠,而是立即返回错误码,告诉你没有数据,一会儿再来读吧。明显非阻塞和轮询一起用。
有这样一种场景,有一个集合,里边是0到1亿之间的奇数。如果在计算机里保存这个集合,
将会占用很大的空间。但是,我们既然知识是奇数,就可以只保存这个规律.
生成器需要外部触发一次运行,如gen.send(param), 这时,生成器会运行到yield这行并暂停,并返回。
等待下次触发,会拿到param,并继续运行到yield. 直到结束,抛出StopIteration异常。
def odd(n):
for x in range(1,n,2):
y = yield x
print(y)
for x in odd(10):
print(x)
odd就是一个生成器函数,因为有yield, gen = odd(10), gen是个生成器
生成器实现了迭代器的接口,所以可以用for去迭代。
gen=odd(8)
y=gen.send(None) #send None表示开始生成器,返回1
print("begin", y) #1
print(next(gen)) #3
print(next(gen)) #5
print(gen.send(10)) #7这里把10send到yield
print(gen.send(100)) #这里把100send到yield, 但是抛出StopIteration异常
def wrapper_odd(n):
for x in odd(n):
yield x
def wrapper_odd2(n):
yield from odd(n)## python生成器
有这样一种场景,有一个集合,里边是0到1亿之间的奇数。如果在计算机里保存这个集合,
将会占用很大的空间。但是,我们既然知识是奇数,就可以只保存这个规律.
生成器需要外部触发一次运行,如gen.send(param), 这时,生成器会运行到yield这行并暂停,并返回。
等待下次触发,会拿到param,并继续运行到yield. 直到结束,抛出StopIteration异常。
def odd(n):
for x in range(1,n,2):
y = yield x
print(y)
for x in odd(10):
print(x)
odd就是一个生成器函数,因为有yield, gen = odd(10), gen是个生成器
生成器实现了迭代器的接口,所以可以用for去迭代。
gen=odd(8)
y=gen.send(None) #send None表示开始生成器,返回1
print("begin", y) #1
print(next(gen)) #3
print(next(gen)) #5
print(gen.send(10)) #7这里把10send到yield
print(gen.send(100)) #这里把100send到yield, 但是抛出StopIteration异常
### 如果你想在一个生成器里返回另一个生成器
def wrapper_odd(n):
for x in odd(n):
yield x
### python有更简单的语法糖 yield from
def wrapper_odd2(n):
yield from odd(n)
TravelList把list变成一个非list成员,如果list里的成员也是个list就递归下去。
def TravelList(lst): #gen=TravelList([1,2,3]), gen是个生成器,可以用for x in gen迭代
for item in lst:
if isinstance(item, list): #如果item是list,递归调用
yield from TravelList(item)
else:
yield item #如果item不是list
lst = [1,2,3,[4,5,6],7,[8,9,[10,11,[12,13,[14]],15],16]]
for x in TravelList(lst):
print(x)
说了这么多概念,他们是什么关系呢?为什么要有这么多概念呢?
因为需要提高效率。不同应用场景下,使用最合适的模型才是最好的。
同步的优点是 编程时简单,代码易读,出错概率小,自上而下写代码,自然。
同步的缺点是遇到阻塞时,后续代码不能执行,导致效率低,吞吐低。
异步的优点:无需阻塞,把任务交给其他线程完成,自己可以做更多的事儿
异步的缺点:编程时麻烦,经常需要设置回调函数,使得代码不再是自上而下,理解困难。
要么需要不断查看异步任务完成,要么设置回调,要么等待异步任务完成。
下面以要请求两个http服务得到结果,并进行计算来举例看一下,同步和异步的区别。
一个更真实的例子:如一个用户要登录系统,
第一步:要去鉴权服务鉴权
第二步,鉴权成功要去数据库拉取用户信息
第三步,返回用户信息
伪代码
同步:
def login(p1, p2):
param1 = http.get(p1) #这里会阻塞,等鉴权完成
if !param1.ok: #鉴权失败
return
param2 = http.get(p2) #这里会阻塞,等拉取完成
return (param1,param2)
异步回调编程:
看了如下代码,各种回调,本来同步代码里顺序的三步,现在硬生生被分成三个函数。
想到被libevent, epoll支配的痛苦了吧。
def login(p1, p2):
http.get(p1, OnGetP1) #由于是异步,这里不会阻塞,实际上这个请求任务交给另一个线程去做了
#这里可以直接返回了。 也可以处理另一个用户的请求
def OnGetP1(param1, p2): #param1就是http.get异步执行完的结果
if !param1.ok: #鉴权失败
return
http.get(p2, OnGetP2, param1) #由于是异步,这里不会阻塞
def OnGetP2(param1, param2): #param2就是http.get异步执行完的结果
return (param1,param2)
在上面的例子中,同步会被阻塞,一个线程同时只能处理一个用户的请求。
而异步可以同时处理多个请求,就在http.get(p1, OnGetP1)后,可以立即
响应另一个用户的请求,而不等这个http请求结束。
但是,回调模式让人过于痛苦,所以为了解决这些痛苦,就有了以下方式
其中是协程方案,尤其在GO语言原生支持协程的情况下,使得异步与同步编程一模一样。
#!/bin/env python
# -*- coding:utf8 -*-
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
address = ('127.0.0.1', 9876)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
print("start tcp server at:", address)
def handle_client(cli):
data = cli.recv(1024) #收数据,可能阻塞
print("recv:", data)
cli.send(data) #发数据可能阻塞
cli.close()
while True:
conn, addr = sock.accept()
print("accept client:", addr)
handle_client(conn) #这里阻塞了,没法处理下一个客户端了
异步是一个任务由其他线程执行,主线程可以先做其他事,需要的时候再看异步任务有没有完成
异步任务可以是计算任务,也可以是I/O任务。
计算密集型任务消耗CPU,一般使用多线程,多进程的方式,让一个线程对应一个CPU核心
由于CPU核心数量毕竟有限,所以使用多线程、多进程也不会创建很多线程,进程,开销也小。
而I/O则不同,以常见的tcp服务为例,可能会有上百万个客户端连接过来,每个客户端都要I/O
如果每个客户端给开一个进程或者线程进行I/O,开销巨大。大部分CPU资源将开销在线程调度上
再者,底层网卡一般只有一个,即使这么多客户端需要并行,到网卡也需要排队,线性化处理
所以,我们何不使用一个线程专门做I/O, 另一个线程做计算。异步I/O由此而来。
linux epoll机制,允许一个线程监听n个socket, 并以事件的形式进行通知用户线程是否可以进行I/O
我们可以单独开一个线程,进行epoll, 并进行数据读写,读写好之后再通知主线程。
对于主线程来说这就叫异步I/O了
在编程中,经常需要定时去做一件事儿, 如sleep x秒后。 把定时任务也单独放在一个线程中执行,
执行完成后再通知主线程。 当然sleep这个函数除了睡眠,它什么都不做。
#异步&异步I/O能有很多方式,其中python的asyncio包有各种方式。
import time
import urllib.request
import concurrent.futures #真的异步I/O与计算
URLS=['https://www.baidu.com', 'https://www.taobao.com',
'https://www.python.org', 'https://www.bilibili.com']
#load_url是会阻塞的
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
time.sleep(10)
return conn.read()
#pyhon多线程去执行任务,返回future,有线程池。 这与C++11的promise/future非常类似
#然而python有GIL锁,同时只能进行一个线程。所以想真正在实现并行计算,还是多进程好
#但是,对于I/O密集型应用,多线程开销太多。 Python一个线程对应一个操作系统线程
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
#提交阻塞任务给线程池
future_to_url = {executor.submit(load_url, url, 30):url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result() #获取对象结果,会阻塞 future.set_result由线程池里的线程完成时调用
except Exception as exc:
print("%r generated an exception: %s" % (url, exc))
else:
print("%r page is %d bytes" % (url, len(data)))
至此,高并发I/O的问题还是没有解决。多进程与多线程的问题是不能满足高并发
epoll方式可以满足高并发,但是,基于事件循环,使得完整的代码逻辑在I/O时变得破碎
import asyncio
async def say_after(delay,what):
print("begin say_after:", delay, what)
await asyncio.sleep(delay)
print(what)
async def main(): #这种async函数就叫协程了
print("hello 1")
await asyncio.sleep(1) #这里会阻塞,并在1秒后从这里恢复运行,这不就是线程吗? await就是从语法上支持了协程的切换与调度
print("hello 2")
print("--------------------")
#这才叫并发执行,虽然也在同一个线程,但是这里提交的是任务,主线程会依次把每个任务执行完成,或者执行到挂起
task1 = asyncio.create_task(say_after(1,"hello"))
task2 = asyncio.create_task(say_after(2,"world"))
print(f"started at {time.strftime('%X')}")
await task1
await task2
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
python的coroutine在创建后是不会运行的,要等主线程去执行到协程。也就是asyncio.run
另外,python也不会创建多个线程去并发执行coroutine,所以这些coroutine并没有异步执行
一个coroutine,不执行到await他的这行代码,他是不会去异步执行的,如果想异步执行,只能async.create_task(co())了。
这样包装成task后,不需要await就会被调度执行
而javscript则不同,javascript引擎是单线程,但是浏览器,或者Node同时存在多个线程去并发进行I/O,这就是真正的异步了
因此,在js中, 调用main(),立即会执行print.
python与js的await/async的区别:https://zhuanlan.zhihu.com/p/103107567
python如果也想多个coroutine并发执行,则需要asyncio.run_coroutine_threadsafe(coro,loop),把coroutine交给另一个loop执行
https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
官网中明确说明,coroutine执行方式有三种:1. asyncio.run(co()) 2. asyncio.create_task(co()) 3. await co()
python的协程是可以await的,这与golang不同。golang的协程没法await,需要自已用用sync.WaitGroup来同步
python async/await用法大全 https://www.imooc.com/article/263959
const http = require(‘https’);
//我们把依靠回调函数的http.request封装了promise,可以进行await了
async function http_req(url) {
//promise对象,表示结果要等异步执行完成才能确定,如果调用await promise_obj,则会等到promise完成
//完成后再执行下一行代码
//promise对象创建时处于pending状态,如果调用了resolve,则会到fullfilled状态,如果调用reject,则会进入rejected状态。
return new Promise((resolve, reject) => {
const options = {
host: url,
port: 443,
path: '/'
};
// 发出请求。 这里如果也能await就好了, axios就可以await
const req = http.request(options); //这一步可能阻塞,但是实际上发请求拿响应这件事儿,被封装成task提交到另一个线程去了
req.end();
req.on('response', (resp) => { //接收响应还是要等回调
console.log(`获得响应的信息: ${resp.statusCode}`);
//如果使用回调函数,那就要在这里再来一次http.request,这要是嵌套深了,很痛苦
resolve(resp); //一段时间后,这里的resolve调用使得promise从pending状态转移到了fullfilled状态
});
req.on('error', (e) => {
reject(e); //一段时间后,这里reject使得promise从pending状态转移到了rejected状态
})
});
}
//有时候需要等第一个http请求成功再进行第二个请求
//如果使用回调函数,那就要在回调函数里再发起一次http请求
//未避免这种复杂的代码架构,js有async, await, await可以等一个异步操作结束
async function http2req() { //http2req()是async函数,所以返回的是Promise对象,就不用自己再new promise了
h1 = await http_req("www.taobao.com") //await可以确保h1得到值才进行一下步;所以这一步实际上会挂起http2req这个函数,一会执行完成再从这里恢复,这不就是协程吗?
console.log(`await1获得响应的信息: ${h1.statusCode}`);
h2 = await http_req("www.python.org") //这里就可以使用h1的值了,不需要深度嵌套回调了 了
console.log(`await2获得响应的信息: ${h2.statusCode}`);
//throw new Error("bad bad bad"); 抛出异常就是调用reject了
//在async函数里返回值会被封装成promise对象,同时把返回值设置好,并修改Promise对象的状态为fullfilled. 省得调用resolve了
return { "h1": h1, "h2": h2 };
}
console.log("send request")
ans = http2req()
console.log("ans is a promise")
//最终还是得靠回调函数,主线程才能到得结果。因为JS是单线程程序
ans.then((res) => {
console.log("res:", res);
}).catch((e) => {
console.log("error catch:", e)
})
//javascript里提供的http库,file库,这些所有涉及到I/O的库,已经不是操作系统原生接口了
//是js专门hook的接口,所以调用这些I/O接口,并不会阻塞,只是提交了一个任务(Promise),由专门的I/O线程去处理
//这个I/O线程可以理解为使用了epoll的线程,等这I/O完成,再通知主线程去执行回调函数。
//然后,厉害的是async/await.
//async使得一个函数的返回值变成了promise, 在函数体中return就是fullfill 这个promise, 在函数体中throw Error就是reject这个Promise
//await更强大,他能等一个promise完成。怎么等呢?就是挂起当前函数,等到promise变成fullfilled或者rejected后,再恢复运行
//这里很像是一个线程,或者进程切换时的场景,但是,这只是用户态一个小函数的状态保存,所以这种用户态轻量级线程,我们就叫它协程了
var sleep = function (time) {
return new Promise(function (resolve, reject) {
setTimeout(function () {
resolve();
}, time);
});
};
async function exec() {
await sleep(2000);
}
async function go() {
console.log(Date.now())
c1 = exec() //js任务已经开始执行,但是python却不是
c2 = exec()//js任务已经开始与上一个任务并发执行,但是python却不是
//c1,c2已经开始运行了
console.log(c1, c2)
await c1;
await c2;
console.log(Date.now())
}
async function go2() {
console.log(Date.now())
await exec() //c1运行2s
await exec() //c2运行2s
//c1,c2已经开始运行了
console.log(Date.now())
}
//go(); // 2s
go2(); //4s
linux内核支持AIO. 其实是用户线程把I/O请求提交给内核,内核完成I/O后通过信号,回调函数方式通知用户线程。
AIO不需要进行内核态.因此本质上要比使用epoll更加高效。但是听说AIO不成熟,对文件只能O_DIRECT.对socket不是刚好?
下面这段代码引用了:https://blog.csdn.net/summer_zgh/article/details/82416427
AIO也支持回调(还支持在新线程中回调,没有线程池也不好用),wait.
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <fcntl.h>
#include <aio.h>
#define BUFFER_SIZE 1024
int MAX_LIST = 2;
int main(int argc, char **argv)
{
//aio操作所需结构体
struct aiocb rd;
int fd, ret, couter;
fd = open("test.txt", O_RDONLY);
if (fd < 0)
{
perror("test.txt");
}
//将rd结构体清空
bzero(&rd, sizeof(rd));
//为rd.aio_buf分配空间
rd.aio_buf = malloc(BUFFER_SIZE + 1);
//填充rd结构体
rd.aio_fildes = fd;
rd.aio_nbytes = BUFFER_SIZE;
rd.aio_offset = 0;
//进行异步读操作
ret = aio_read(&rd);
if (ret < 0)
{
perror("aio_read");
exit(1);
}
couter = 0;
// 循环等待异步读操作结束
while (aio_error(&rd) == EINPROGRESS)
{
printf("第%d次\n", ++couter);
}
//获取异步读返回值
ret = aio_return(&rd);
((char *)rd.aio_buf)[ret] = 0;
printf("\n\n返回值为:%d\n %s\n", ret, rd.aio_buf);
return 0;
}
前面提到过多次,I/O操作是主线程中完成(reactor),还是在其他线程中读写好数据再通知主线程查看结果(proactor)
其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操作。
详解可以看:https://cloud.tencent.com/developer/article/1488120
采用proactor模式,而且需要回调函数。不用看就知道使用起来有多麻烦了吧。
C++20 引用协程,可以更好的使用boost.asio co_await
看看boost的文档:没有co_await之前,各种回调,简单让人头大
https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/example/cpp17/coroutines_ts/echo_server.cpp
#include <cstdlib>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
class session
{
public:
session(boost::asio::io_service& io_service)
: socket_(io_service)
{
}
tcp::socket& socket()
{
return socket_;
}
void start() //开始session就是要注册一个回调函数,当socket数据读好之后再调用回调, proactor模式,
{
socket_.async_read_some(boost::asio::buffer(data_, max_length),
boost::bind(&session::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handle_read(const boost::system::error_code& error, //proactor模式,所以这里回调过来。数据已经读好在Buffer里了
size_t bytes_transferred)
{
if (!error)
{
boost::asio::async_write(socket_, //要写数据,当然也是先请求写,把写请求提交给io_service事件循环啦,等写完成再回调handle_write
boost::asio::buffer(data_, bytes_transferred),
boost::bind(&session::handle_write, this,
boost::asio::placeholders::error));
}
else
{
delete this;
}
}
void handle_write(const boost::system::error_code& error) //这里回调这来,已经写好了。而不是像Epoll,通知你可以写了。
{
if (!error)
{
socket_.async_read_some(boost::asio::buffer(data_, max_length),
boost::bind(&session::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
else
{
delete this;
}
}
private:
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
};
class server
{
public:
server(boost::asio::io_service& io_service, short port)
: io_service_(io_service), //初始化了io_service
acceptor_(io_service, tcp::endpoint(tcp::v4(), port)) //初始化了acceptor
{
session* new_session = new session(io_service_); //新建立那个session,等着新的客户端连接来
acceptor_.async_accept(new_session->socket(), //异步accept,这里其实不会accept,而是有客户端连接来,才会调用handle_accept
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
void handle_accept(session* new_session, //这个回调执行了,说明有连接来,io_service的epoll发现了,并回调
const boost::system::error_code& error)
{
if (!error)
{
new_session->start(); //会话开始。到此一个新连接开始处理。
new_session = new session(io_service_); //这段代码又重复了上边的逻辑,因为,又需要注册异步回调函数accept了。也就说上面的注册只生效一次
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
else
{
delete new_session;
}
}
private:
boost::asio::io_service& io_service_;
tcp::acceptor acceptor_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: async_tcp_echo_server <port>\n";
return 1;
}
//创建io_service,可以理解为事件循环
boost::asio::io_service io_service;
using namespace std; // For atoi.
//创建tcp server
server s(io_service, atoi(argv[1]));
//开始事件循环
io_service.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
c++也支持协程:https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/example/cpp17/coroutines_ts/echo_server.cpp
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/write.hpp>
#include <cstdio>
using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::use_awaitable;
namespace this_coro = boost::asio::this_coro;
#if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
# define use_awaitable \
boost::asio::use_awaitable_t(__FILE__, __LINE__, __PRETTY_FUNCTION__)
#endif
//处理客户端连接,也是个协程
awaitable<void> echo(tcp::socket socket)
{
try
{
char data[1024];
for (;;)
{
//不用去注册回调函数来处理读数据成功了,就在这里。没有回调函数,代码不会被突然分割到其他函数中,看起来省脑子多了
std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);
//写数据也是
co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);
}
}
catch (std::exception& e)
{
std::printf("echo Exception: %s\n", e.what());
}
}
//这是awaitable函数
awaitable<void> listener()
{
//运行器
auto executor = co_await this_coro::executor;
tcp::acceptor acceptor(executor, {tcp::v4(), 55555});
for (;;)
{
//在这里等待接收一个连接,不用创建回调函数了,一个co_await,
tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
//创建一个协程去处理客户端连接
co_spawn(executor, echo(std::move(socket)), detached);
}
}
int main()
{
try
{
//创建事件循环
boost::asio::io_context io_context(1);
//停机信号也是这么简洁
boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ io_context.stop(); });
//创建了个协程,listener
co_spawn(io_context, listener(), detached);
//事件循环跑起来
io_context.run();
}
catch (std::exception& e)
{
std::printf("Exception: %s\n", e.what());
}
}
/*
g++ (GCC) 10.2.0
boost_1_75
g++ -std=c++20 -fcoroutines -L/home/wangyonggang.adson/software/lib64/ ./boost_asio_test.cpp -o boost_asio -lboost_system -lpthread -lstdc++ -lrt
*/
还是最初的问题,如果每个tcp客户端创建一个线程,受限于操作系统线程数量和调度性能,不能支持高并发。
于是就有两种解决思路
1.使用单线程,事件循环,用epoll技术监听多个文件描述符。 解决了阻塞和多线程开销过度问题,但是存在回调函数,代码逻辑割裂。
2.优化线程异性问题,使用用户态轻量级线程–协程,并重写所有I/O接口,使得I/O时自动挂起协程。I/O完成时唤醒协程。 解决了阻塞和线程过于重量级的问题,同时代码不再割裂。 但是,缺点是有的第三方库还是会阻塞一个操作系统线程,因此需要把第三方库重写一遍。
GO语言号称现代的C语言,语言级别协程支持。但是不是异步I/O. I/O还是本协程完成的。
处理器核心数:操作系统线程数据:GO协程数 = K:M:N
想了解MPG参考这个:https://www.jianshu.com/p/eae8b66e766d
package main
import (
"fmt"
"io"
"log"
"net"
)
func worker(conn net.Conn) {
defer conn.Close()
b := make([]byte, 512)
for {
size, err := conn.Read(b)#看着像是阻塞读,其实是向Epoll注册了fd,并挂起协程,等有数据来,再唤醒协程来读,然后从此处继续执行
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received %v bytes from %v\n", size, conn.RemoteAddr())
size, err = conn.Write(b[0:size])
if err != nil {
log.Fatal(err)
}
fmt.Printf("Written %v bytes to %v\n", size, conn.RemoteAddr())
}
}
func main() {
listner, err := net.Listen("tcp", "127.0.0.1:8085")
if err != nil {
log.Fatal(err)
}
fmt.Printf("Listering on %v\n", listner.Addr())
for {
conn, err := listner.Accept() #看着像是阻塞,其实是向Epoll注册了fd,并挂起协程,等有连接来,再唤醒协程,继续执行
if err != nil {
log.Fatal(err)
}
fmt.Printf("Accepted connection to %v from %v\n", conn.LocalAddr(), conn.RemoteAddr())
go worker(conn) #go一个,启动一个协程。不用像python/js/c++,非要把函数声明为异步函数。 Go协程轻量级,每个请求给分配一个,百万并发没问题
}
}
到此, js中的那些async/await; C++中的co_await, co_spawn, awaitable; python中的async await, yield, yield from;
以及这前语言中的各种I/O回调机制;各种协程,同步异步I/O都应该非常清楚了本质上是一样的。
当你看到async, yield, promise你立即想到,这段代码不会立即执行,要提交成任务,再执行。想拿到结果需要等等。
怎么等呢,await. 更高级的是使用协程,把async和await隐藏了,在所有I/O操作时,自动挂起协程,I/O完成再唤醒协程。