众所周知,go中的异步操作都已经封装在了运行时的过程中,有关socket的网络的异步操作都封装到了go的netpoll中,从而简化了编程形式。本文也就根据evio库总结而来。
如何编写golang的跨平台库,现在主流的方式如下:
evserver
│ go.mod
│ main.go
│
└───show
│ │ show.go
│ │ show_darwin.go
│ │ show_linux.go
通过创建一个package,然后通过命名文件后缀为_{平台后缀}.go的形式,在不同平台运行的时候会自动编译对应的代码。
main.go代码
package main
import (
"evserver/show"
)
func main() {
show := show.GetDefault()
show.ShowHello()
}
show.go
package show
type Show interface {
ShowHello()
}
show_linux.go
package show
import "fmt"
type osDefaulter struct {
}
func GetDefault() osDefaulter{
return osDefaulter{}
}
func(s osDefaulter) ShowHello(){
fmt.Println("linux show")
}
show_darwin.go
package show
import "fmt"
type osDefaulter struct {
}
func GetDefault() osDefaulter{
return osDefaulter{}
}
func(s osDefaulter) ShowHello(){
fmt.Println("darwin show")
}
常见的设计的方式就是通过在package中定义一个接口,然后通过不同平台的后缀文件去调用不同的方式去实现,从而完成package统一的对外提供服务的方式,当然跨平台的库也可以有另外一种方式,即如下:
func ShowHello() string {
if runtime.GOOS == "windows" {
return "windows hello"
} else {
return "other paltform hello"
}
}
但是这种方式针对简单的跨平台性能还可以,针对复杂的跨平台的功能就对代码侵入比较严重。
https://techblog.steelseries.com/2014/04/08/multi-platform-development-go.html
https://blog.gopheracademy.com/advent-2013/day-23-multi-platform-applications/
总体的代码目录如下:
evserver
│ go.mod
│ main.go
│
└───poll
│ │ poller.go
│ │ poller_darwin.go
│ │ poller_linux.go
package main
import (
"evserver/poll"
"fmt"
)
func main() {
IP := "127.0.0.1"
Port := 6667
s := &poll.Server{
Ip:IP,
Port: Port,
}
s.Init()
s.Data = func(c *poll.Conn, in []byte) (out []byte) {
out = in
out = append(out, []byte("back")...)
return
}
fmt.Printf(" running in %s:%d\n", IP, Port)
poll.LoopRun(s)
}
package poll
import (
"log"
"net"
"syscall"
)
const READ_FLAG = 1
const WRITE_FLAG = 2
type Conn struct {
fd int // file descriptor
lnidx int // listener index in the server lns list
out []byte // write buffer
sa syscall.Sockaddr // remote socket address
reuse bool // should reuse input buffer
opened bool // Connection opened event fired
ctx interface{} // user-defined context
loop *loop // Connected loop
}
func (c *Conn) Context() interface{} { return c.ctx }
func (c *Conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *Conn) Wake() {
//if c.loop != nil {
// c.loop.poll.Trigger(c)
//}
}
type loop struct {
idx int // loop index in the server loops list
poll *Poll // epoll or kqueue
packet []byte // read packet buffer
fdConns map[int]*Conn // loop Connections fd -> Conn
}
type Server struct {
Ip string
Port int
fd int
Data func(c *Conn, in []byte)(out []byte)
}
func (s *Server)Init() {
fd ,err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP)
if err != nil {
panic(err)
}
var serverAddr [4]byte
ip := s.Ip
IP := net.ParseIP(ip)
if IP == nil {
log.Fatal("Unable to process IP: ", ip)
}
copy(serverAddr[:], IP[12:16])
if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1); err != nil {
syscall.Close(fd)
panic(err)
}
err = syscall.Bind(fd, &syscall.SockaddrInet4{Port: s.Port, Addr:serverAddr})
if err != nil {
panic(err)
}
err = syscall.Listen(fd, 1024)
if err != nil {
panic(err)
}
s.fd = fd
}
func LoopRun(s *Server){
l := &loop{
poll:OpenPoll(),
packet: make([]byte, 0xFFFF),
fdConns: make(map[int]*Conn),
}
l.poll.AddRead(s.fd)
err := l.poll.Wait(func(fd int, note interface{}) error {
if fd == s.fd {
err := LoopAccept(s, l, fd)
if err != nil {
panic(err)
}
} else {
c := l.fdConns[fd]
flag := note.(int)
if flag == READ_FLAG {
LoopRead(s, l, c)
} else {
LoopWrite(s, l, c)
}
}
return nil
})
if err != nil {
panic(err)
}
}
func LoopAccept(s *Server, l *loop, fd int) error{
nfd, sa, err := syscall.Accept(fd)
if err != nil {
if err != nil {
if err == syscall.EAGAIN {
return nil
}
return err
}
}
if err := syscall.SetNonblock(nfd, true); err != nil {
return err
}
c := &Conn{fd: nfd, sa: sa, loop: l}
c.out = []byte{}
l.fdConns[c.fd] = c
l.poll.AddReadWrite(c.fd)
return nil
}
func LoopRead(s *Server, l *loop, c *Conn)error {
var in []byte
n, err := syscall.Read(c.fd, l.packet)
if n == 0 || err != nil {
if err == syscall.EAGAIN {
return nil
}
return LoopClose(s, l, c)
}
in = l.packet[:n]
if !c.reuse {
in = append([]byte{}, in...)
}
if s.Data != nil {
out := s.Data(c, in)
if len(out) > 0 {
c.out = append(c.out[:0], out...)
}
}
if len(c.out) != 0 {
l.poll.ModReadWrite(c.fd)
}
return nil
}
func LoopWrite(s *Server, l *loop, c *Conn)error {
if c == nil {
return nil
}
if c.out == nil || len(c.out) == 0 {
return nil
}
n, err := syscall.Write(c.fd, c.out)
if err != nil {
if err == syscall.EAGAIN {
return nil
}
return LoopClose(s, l, c)
}
if n == len(c.out) {
// release the connection output page if it goes over page size,
// otherwise keep reusing existing page.
if cap(c.out) > 4096 {
c.out = nil
} else {
c.out = c.out[:0]
}
} else {
c.out = c.out[n:]
}
if len(c.out) == 0{
l.poll.ModRead(c.fd)
}
return nil
}
func LoopClose(s *Server, l *loop, c *Conn)error{
delete(l.fdConns, c.fd)
err := syscall.Close(c.fd)
return err
}
package poll
import (
"syscall"
)
// Poll ...
type Poll struct {
fd int
changes []syscall.Kevent_t
}
// OpenPoll ...
func OpenPoll() *Poll {
l := new(Poll)
p, err := syscall.Kqueue()
if err != nil {
panic(err)
}
l.fd = p
_, err = syscall.Kevent(l.fd, []syscall.Kevent_t{{
Ident: 0,
Filter: syscall.EVFILT_USER,
Flags: syscall.EV_ADD | syscall.EV_CLEAR,
}}, nil, nil)
if err != nil {
panic(err)
}
return l
}
// Close ...
func (p *Poll) Close() error {
return syscall.Close(p.fd)
}
// Wait ...
func (p *Poll) Wait(iter func(fd int, note interface{}) error) error {
events := make([]syscall.Kevent_t, 128)
for {
n, err := syscall.Kevent(p.fd, p.changes, events, nil)
if err != nil && err != syscall.EINTR {
return err
}
p.changes = p.changes[:0]
for i := 0; i < n; i++ {
if fd := int(events[i].Ident); fd != 0 {
var flag int
if events[i].Filter == syscall.EVFILT_READ {
flag = READ_FLAG
} else {
flag = WRITE_FLAG
}
if err := iter(fd, flag); err != nil {
return err
}
}
}
}
}
// AddRead ...
func (p *Poll) AddRead(fd int) {
p.changes = append(p.changes,
syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ,
},
)
}
// AddReadWrite ...
func (p *Poll) AddReadWrite(fd int) {
p.changes = append(p.changes,
syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ,
},
syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_WRITE,
},
)
}
// ModRead ...
func (p *Poll) ModRead(fd int) {
p.changes = append(p.changes, syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_WRITE,
})
}
// ModReadWrite ...
func (p *Poll) ModReadWrite(fd int) {
p.changes = append(p.changes, syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_WRITE,
})
}
// ModDetach ...
func (p *Poll) ModDetach(fd int) {
p.changes = append(p.changes,
syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_READ,
},
syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_WRITE,
},
)
}
import socket
sock = socket.socket()
sock.connect(("127.0.0.1", 6667))
print(sock.send(b"data123121231232"))
print(sock.recv(4096))
此时运行程序go run main.go和测试客户端程序。
此时就可以看出简单的事件驱动的过程。
将main.go进行修改,修改为http的处理方式。
package main
import (
"evserver/poll"
"fmt"
"strconv"
"strings"
"time"
)
func main() {
IP := "127.0.0.1"
Port := 6667
s := &poll.Server{
Ip:IP,
Port: Port,
}
s.Init()
s.Data = func(c *poll.Conn, in []byte) (out []byte) {
if in == nil {
return
}
data := in
//if noparse && bytes.Contains(data, []byte("\r\n\r\n")) {
// // for testing minimal single packet request -> response.
// out = appendresp(nil, "200 OK", "", res)
// return
//}
// process the pipeline
var req request
for {
leftover, err := parsereq(data, &req)
if err != nil {
// bad thing happened
out = appendresp(out, "500 Error", "", err.Error()+"\n")
break
} else if len(leftover) == len(data) {
// request not ready, yet
break
}
// handle the request
out = appendhandle(out, &req)
data = leftover
}
return
}
res = "Hello World!\r\n"
fmt.Printf(" running in %s:%d\n", IP, Port)
poll.LoopRun(s)
}
var res string
type request struct {
proto, method string
path, query string
head, body string
remoteAddr string
}
// appendhandle handles the incoming request and appends the response to
// the provided bytes, which is then returned to the caller.
func appendhandle(b []byte, req *request) []byte {
return appendresp(b, "200 OK", "", res)
}
// appendresp will append a valid http response to the provide bytes.
// The status param should be the code plus text such as "200 OK".
// The head parameter should be a series of lines ending with "\r\n" or empty.
func appendresp(b []byte, status, head, body string) []byte {
b = append(b, "HTTP/1.1"...)
b = append(b, ' ')
b = append(b, status...)
b = append(b, '\r', '\n')
b = append(b, "Server: error\r\n"...)
b = append(b, "Date: "...)
b = time.Now().AppendFormat(b, "Mon, 02 Jan 2006 15:04:05 GMT")
b = append(b, '\r', '\n')
if len(body) > 0 {
b = append(b, "Content-Length: "...)
b = strconv.AppendInt(b, int64(len(body)), 10)
b = append(b, '\r', '\n')
}
b = append(b, head...)
b = append(b, '\r', '\n')
if len(body) > 0 {
b = append(b, body...)
}
return b
}
// parsereq is a very simple http request parser. This operation
// waits for the entire payload to be buffered before returning a
// valid request.
func parsereq(data []byte, req *request) (leftover []byte, err error) {
sdata := string(data)
var i, s int
var top string
var clen int
var q = -1
// method, path, proto line
for ; i < len(sdata); i++ {
if sdata[i] == ' ' {
req.method = sdata[s:i]
for i, s = i+1, i+1; i < len(sdata); i++ {
if sdata[i] == '?' && q == -1 {
q = i - s
} else if sdata[i] == ' ' {
if q != -1 {
req.path = sdata[s:q]
req.query = req.path[q+1 : i]
} else {
req.path = sdata[s:i]
}
for i, s = i+1, i+1; i < len(sdata); i++ {
if sdata[i] == '\n' && sdata[i-1] == '\r' {
req.proto = sdata[s:i]
i, s = i+1, i+1
break
}
}
break
}
}
break
}
}
if req.proto == "" {
return data, fmt.Errorf("malformed request")
}
top = sdata[:s]
for ; i < len(sdata); i++ {
if i > 1 && sdata[i] == '\n' && sdata[i-1] == '\r' {
line := sdata[s : i-1]
s = i + 1
if line == "" {
req.head = sdata[len(top)+2 : i+1]
i++
if clen > 0 {
if len(sdata[i:]) < clen {
break
}
req.body = sdata[i : i+clen]
i += clen
}
return data[i:], nil
}
if strings.HasPrefix(line, "Content-Length:") {
n, err := strconv.ParseInt(strings.TrimSpace(line[len("Content-Length:"):]), 10, 64)
if err == nil {
clen = int(n)
}
}
}
}
// not enough data
return data, nil
}
此时运行go run main.go,并将前文编写的原生的http和evio的http进行对比。同样进行wrk进行测试。
原生http
wrk -t8 -c200 -d60s --latency http://127.0.0.1:8000
Running 1m test @ http://127.0.0.1:8000
8 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.60ms 6.67ms 229.63ms 93.36%
Req/Sec 7.41k 1.73k 14.17k 72.51%
Latency Distribution
50% 2.86ms
75% 4.06ms
90% 7.82ms
99% 35.12ms
3537128 requests in 1.00m, 431.78MB read
Socket errors: connect 0, read 27, write 0, timeout 0
Requests/sec: 58849.98
Transfer/sec: 7.18MB
evio的http(开启3个loop)
wrk -t8 -c200 -d60s --latency http://127.0.0.1:7979
Running 1m test @ http://127.0.0.1:7979
8 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 2.19ms 730.09us 36.98ms 93.56%
Req/Sec 11.39k 1.37k 17.97k 68.33%
Latency Distribution
50% 2.02ms
75% 2.37ms
90% 2.68ms
99% 4.24ms
5444990 requests in 1.00m, 540.05MB read
Requests/sec: 90669.19
Transfer/sec: 8.99MB
自己实现的http
wrk -t8 -c200 -d60s --latency http://127.0.0.1:6667
Running 1m test @ http://127.0.0.1:6667
8 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 2.32ms 1.38ms 47.39ms 95.27%
Req/Sec 11.06k 1.98k 31.58k 74.97%
Latency Distribution
50% 2.04ms
75% 2.41ms
90% 2.90ms
99% 6.85ms
5286393 requests in 1.00m, 529.36MB read
Requests/sec: 87953.74
Transfer/sec: 8.81MB
原生http | evio处理http | 手工实现http | |
---|---|---|---|
Qps | 58849.98 | 90669.19 | 87953.74 |
本文主要就是梳理了一下go的跨平台的主流方式,并简单实现了有关mac的kqueue的过程,后续大家有兴趣可自行开发事件驱动的框架,当前比较火热的是evio,gev和gnet。由于本人才疏学浅,如有错误请批评指正。