当前位置: 首页 > 编程笔记 >

使用GO实现Paxos共识算法的方法

傅元章
2023-03-14
本文向大家介绍使用GO实现Paxos共识算法的方法,包括了使用GO实现Paxos共识算法的方法的使用技巧和注意事项,需要的朋友参考一下

什么是Paxos共识算法

最初的服务往往都是通过单体架构对外提供的,即单Server-单Database模式。随着业务的不断扩展,用户和请求数都在不断上升,如何应对大量的请求就成了每个服务都需要解决的问题,这也就是我们常说的高并发。为了解决单台服务器面对高并发的苍白无力,可以通过增加服务器数量来解决,即多Server-单Database(Master-Slave)模式,此时的压力就来到了数据库一方,数据库的IO效率决定了整个服务的效率,继续增加Server数量将无法提升服务性能。这就衍生出了当前火热的微服务架构。当用户请求经由负载均衡分配到某一服务实例上后,如何保证该服务的其他实例最终能够得到相同的数据变化呢?这就要用到Paxos分布式共识协议,Paxos解决的就是共识问题,也就是一段时间后,无论get哪一个服务实例,都能获取到相同的数据。目前国内外的分布式产品很多都使用了Paxos协议,可以说Paxos几乎就是共识协议的标准和代名词。

Paxos有两种协议,我们常常提到的其实是Basic Paxos,另一种叫Multi Paxos,如无特殊说明,本文中提到的Paxos协议均为Basic Paxos。

Paxos协议是由图灵奖获得者Leslie Lamport于1998年在其论文《The Part-Time Parliament》中首次提出的,讲述了一个希腊小岛Paxos是如何通过决议的。但由于该论文晦涩艰深,当时的计算机界大牛们也没几个人能理解。于是Lamport2001年再次发表了《Paxos Made Simple》,摘要部分是这么写的:

The Paxos algorithm, when presented in plain English, is very simple.

翻译过来就是:不会吧,不会吧,这么简单的Paxos算法不会真的有人弄不懂吧?然而事实却是很多人对Paxos都望而却步,理解Paxos其实并不难,但是Paxos的难点在于工程化,如何利用Paxos协议写出一个能过够真正在生产环境中跑起来的服务才是Paxos最难的地方,关于Paxos的工程化可以参考微信后台团队撰写的《微信自研生产级paxos类库PhxPaxos实现原理介绍》

Paxos如何保证一致性的

Paxos协议一共有两个阶段:Prepare和Propose,两种角色:Proposer和Acceptor,每一个服务实例既是Proposer,同时也是Acceptor,Proposer负责提议,Acceptor决定是否接收来自Proposer的提议,一旦提议被多数接受,那么我们就可以宣称对该提议包含的值达成了一致,而且不会再改变。

阶段一:Prepare 准备

  • Proposer生成全局唯一ProposalID(时间戳+ServerID)
  • Proposer向所有Acceptor(包括Proposer自己)发送Prepare(n = ProposalID)请求
  • Acceptor比较n和minProposal, if n > minProposal, minProposal = n,Acceptor返回已接受的提议(acceptedProposal, acceptedValue)
  • 承诺1:不再接受n <= minProposal的Prepare请求
  • 承诺2:不再接受n < minProposal的Propose请求
  • 应答1:返回此前已接受的提议
  • 当Proposer收到大于半数的返回后
  • Prepare请求被拒绝,重新生成ProposalID并发送Prepare请求
  • Prepare请求被接受且有已接受的提议,选择最大的ProposalID对应的值作为提议的值
  • Prepare请求被接受且没有已接受的提议,可选择任意提议值

    阶段二:Propose 提议

  • Proposer向所有Acceptor(包括Proposer自己)发送Accept(n=ProposalID,value=ProposalValue)请求
  • Acceptor比较n和minProposal, if n >= minProposal, minProposal = n, acceptedValue = value,返回已接受的提议(minProposal,acceptedValue)
  • 当Proposer收到大于半数的返回后
  • Propose请求被拒绝,重新生成ProposalID并发送Prepare请求
  • Propose请求被接受,则数据达成一致性

一旦提议被半数以上的服务接受,那么我们就可以宣称整个服务集群在这一提议上达成了一致。

需要注意的是,在一个服务集群中以上两个阶段是很有可能同时发生的。 例如:实例A已完成Prepare阶段,并发送了Propose请求。同时实例B开始了Prepare阶段,并生成了更大的ProposalID发送Prepare请求,可能导致实例A的Propose请求被拒绝。 每个服务实例也是同时在扮演Proposer和Acceptor角色,向其他服务发送请求的同时,可能也在处理别的服务发来的请求。

使用GO语言实现Paxos协议

服务注册与发现

由于每个服务实例都是在执行相同的代码,那我们要如何知晓其他服务实例的入口呢(IP和端口号)?方法之一就是写死在代码中,或者提供一份配置文件。服务启动后可以读取该配置文件。但是这种方法不利于维护,一旦我们需要移除或添加服务则需要在每个机器上重新休息配置文件。

除此之外,我们可以通过一个第三方服务:服务的注册与发现来注册并获知当前集群的总服务实例数,即将本地的配置文件改为线上的配置服务。

服务注册:Register函数,服务实例启动后通过调用这个RPC方法将自己注册在服务管理中

func (s *Service) Register(args *RegisterArgs, reply *RegisterReply) error {
 s.mu.Lock()
 defer s.mu.Unlock()
 
 server := args.ServerInfo
 for _, server := range s.Servers {
  if server.IPAddress == args.ServerInfo.IPAddress && server.Port == args.ServerInfo.Port {
   reply.Succeed = false
   return nil
  }
 }
 reply.ServerID = len(s.Servers)
 reply.Succeed = true
 s.Servers = append(s.Servers, server)
 
 fmt.Printf("Current registerd servers:\n%v\n", s.Servers)
 
 return nil
}

服务发现:GetServers函数,服务通过调用该RPC方法获取所有服务实例的信息(IP和端口号)

func (s *Service) GetServers(args *GetServersArgs, reply *GetServersReply) error {
 // return all servers
 reply.ServerInfos = s.Servers
 
 return nil
}

Prepare阶段

Proposer,向所有的服务发送Prepare请求,并等待直到半数以上的服务返回结果,这里也可以等待所有服务返回后再处理,但是Paxos协议可以容忍小于半数的服务宕机,因此我们只等待大于N/2个返回即可。当返回的结果有任何一个请求被拒绝,那Proposer即认为这次的请求被拒绝,返回重新生成ProposalID并发送新一轮的Prepare请求。

func (s *Server) CallPrepare(allServers []ServerInfo, proposal Proposal) PrepareReply {
 returnedReplies := make([]PrepareReply, 0)
 for _, otherS := range allServers {
  // use a go routine to call every server
  go func(otherS ServerInfo) {
   delay := rand.Intn(10)
   time.Sleep(time.Second * time.Duration(delay))
   args := PrepareArgs{s.Info, proposal.ID}
   reply := PrepareReply{}
   fmt.Printf("【Prepare】Call Prepare on %v:%v with proposal id %v\n", otherS.IPAddress, otherS.Port, args.ProposalID)
   if Call(otherS, "Server.Prepare", &args, &reply) {
    if reply.HasAcceptedProposal {
     fmt.Printf("【Prepare】%v:%v returns accepted proposal: %v\n", otherS.IPAddress, otherS.Port, reply.AcceptedProposal)
    } else {
     fmt.Printf("【Prepare】%v:%v returns empty proposal\n", otherS.IPAddress, otherS.Port)
    }
    s.mu.Lock()
    returnedReplies = append(returnedReplies, reply)
    s.mu.Unlock()
   }
  }(otherS)
 }
 for {
  // wait for responses from majority
  if len(returnedReplies) > (len(allServers))/2.0 {
   checkReplies := returnedReplies
   // three possible response
   // 1. deny the prepare, and return an empty/accepted proposal
   // as the proposal id is not higher than minProposalID on server (proposal id <= server.minProposalID)
   // 2. accept the prepare, and return an empty proposal as the server has not accept any proposal yet
   // 3. accept the prepare, and return an accepted proposal
   // check responses from majority
   // find the response with max proposal id
   acceptedProposal := NewProposal()
   for _, r := range checkReplies {
    // if any response refused the prepare, this server should resend prepare
    if !r.PrepareAccepted {
     return r
    }
    if r.HasAcceptedProposal && r.AcceptedProposal.ID > acceptedProposal.ID {
     acceptedProposal = r.AcceptedProposal
    }
   }
   // if some other server has accepted proposal, return that proposal with max proposal id
   // if no other server has accepted proposal, return an empty proposal
   return PrepareReply{HasAcceptedProposal: !acceptedProposal.IsEmpty(), AcceptedProposal: acceptedProposal, PrepareAccepted: true}
  }
  //fmt.Printf("Waiting for response from majority...\n")
  time.Sleep(time.Second * 1)
 }
}

Acceptor,通过比较ProposalID和minProposal,如果ProposalID小于等于minProposal,则拒绝该Prepare请求,否则更新minProposal为ProposalID。最后返回已接受的提议

func (s *Server) Prepare(args *PrepareArgs, reply *PrepareReply) error {
 s.mu.Lock()
 defer s.mu.Unlock()
 // 2 promises and 1 response
 // Promise 1
 // do not accept prepare request which ProposalID <= minProposalID
 // Promise 2
 // do not accept propose request which ProposalID < minProposalID
 // Response 1
 // respond with accepted proposal if any
 if reply.PrepareAccepted = args.ProposalID > s.minProposalID; reply.PrepareAccepted {
  // ready to accept the proposal with Id s.minProposalID
  s.minProposalID = args.ProposalID
 }
 reply.HasAcceptedProposal = s.readAcceptedProposal()
 reply.AcceptedProposal = s.Proposal
 return nil
}

Propose阶段

Proposer,同样首先向所有的服务发送Propose请求,并等待知道半数以上的服务返回结果。如果返回的结果有任何一个请求被拒绝,则Proposer认为这次的请求被拒绝,返回重新生成ProposalID并发送新一轮的Prepare请求

func (s *Server) CallPropose(allServers []ServerInfo, proposal Proposal) ProposeReply {
 returnedReplies := make([]ProposeReply, 0)
 for _, otherS := range allServers {
  go func(otherS ServerInfo) {
   delay := rand.Intn(5000)
   time.Sleep(time.Millisecond * time.Duration(delay))
   args := ProposeArgs{otherS, proposal}
   reply := ProposeReply{}
   fmt.Printf("【Propose】Call Propose on %v:%v with proposal: %v\n", otherS.IPAddress, otherS.Port, args.Proposal)
   if Call(otherS, "Server.Propose", &args, &reply) {
    fmt.Printf("【Propose】%v:%v returns: %v\n", otherS.IPAddress, otherS.Port, reply)
    s.mu.Lock()
    returnedReplies = append(returnedReplies, reply)
    s.mu.Unlock()
   }
  }(otherS)
 }
 for {
  // wait for responses from majority
  if len(returnedReplies) > (len(allServers))/2.0 {
   checkReplies := returnedReplies
   for _, r := range checkReplies {
    if !r.ProposeAccepted {
     return r
    }
   }
   return checkReplies[0]
  }
  time.Sleep(time.Second * 1)
 }
}

Acceptor,通过比较ProposalID和minProposal,如果ProposalID小于minProposal,则拒绝该Propose请求,否则更新minProposal为ProposalID,并将提议持久化到本地磁盘中。

func (s *Server) Propose(args *ProposeArgs, reply *ProposeReply) error {
 if s.minProposalID <= args.Proposal.ID {
  s.mu.Lock()
  s.minProposalID = args.Proposal.ID
  s.Proposal = args.Proposal
  s.SaveAcceptedProposal()
  s.mu.Unlock()
 
  reply.ProposeAccepted = true
 }
 
 reply.ProposalID = s.minProposalID
 
 return nil
}

运行

运行结果:

这里我一共开启了3个服务实例,并在每次请求之前加入了随机的延迟,模拟网络通信中的延迟,因此每个服务的每个请求并不是同时发出的

动图一张:

静态结果一张:

可以看到3个服务尽管一开始会尝试以他们自己的端口号(5001,5002,5003)作为提议值,在Prepare/Propose失败后,都会重新生成更大的ProposalID并开启新一轮的提议过程(Prepare,Propose),且最后都以5003达成一致。

小结

至此,我们就用GO实现了Paxos协议的核心逻辑。但显而易见的是,这段代码仍然存在很多问题,完全无法满足生产环境的需求

  • 通过channel而不是mutex锁来共享数据
  • 如何处理服务实例的移除和增加
  • 如何避免陷入活锁

到此这篇关于使用GO实现Paxos共识算法的文章就介绍到这了,更多相关GO实现Paxos共识算法内容请搜索小牛知识库以前的文章或继续浏览下面的相关文章希望大家以后多多支持小牛知识库!

 类似资料:
  • 共识算法 实际上,要保障系统满足不同程度的一致性,往往需要通过共识算法来达成。 共识算法解决的是对某个提案(Proposal),大家达成一致意见的过程。提案的含义在分布式系统中十分宽泛,如多个事件发生的顺序、某个键对应的值、谁是领导……等等,可以认为任何需要达成一致的信息都是一个提案。 注:实践中,一致性的结果往往还需要客户端的特殊支持,典型地通过访问足够多个服务节点来验证确保获取共识后结果。 问

  • POW+DPOS: 混合共识,POW挖矿,DPOS监督。 POW:通过算力生成区块。抵押少量的币,拥有挖矿的权利(避免矿工恶意生成非法区块,恶意矿工将被扣除押金)。 DPOS:通过选票推选出监督节点。监督节点可以微调系统参数(区块大小、区块生成速度),可以举报恶意区块。监督节点有区块奖励(70%返利给投票者)。 所有拥有虚拟币的人,都可以投票,投票后,将可以获得返利。 区块的生成时间是固定的,默认

  • 主要内容:1.Paxos算法简介,2.Paxos算法流程,3.Multi-Paxos算法,1.Paxos算法简介 Paxos算法是一种基于消息传递且具有高容错性的一致性算法 Paxos解决的问题是如何正确快速在一个分布式系统 中对某个数据达成一致。 2.Paxos算法流程 在一个Paxos算法系统中, 所有节点分为3类: Propersor提议者, Accepter接受者, Learner学习者 Proposer: 提出提案 (Proposal)。Proposal信息包括提案编号 (P

  • 本文向大家介绍ZAB 协议和Paxos 算法相关面试题,主要包含被问及ZAB 协议和Paxos 算法时的应答技巧和注意事项,需要的朋友参考一下 Paxos 算法应该可以说是 ZooKeeper 的灵魂了。但是,ZooKeeper 并没有完全采用 Paxos算法 ,而是使用 ZAB 协议作为其保证数据一致性的核心算法。另外,在ZooKeeper的官方文档中也指出,ZAB协议并不像 Paxos 算法那

  • 本文向大家介绍什么是共识算法?相关面试题,主要包含被问及什么是共识算法?时的应答技巧和注意事项,需要的朋友参考一下 回答:共识算法是一种方法,通过该方法,区块链网络的所有对等方都可以达成分布式账本当前状态的标准协议。它可实现高可靠性,并在分布式计算环境中的未知对等方之间建立信任。

  • 算法(Algorithm)是指解题方案的准确而完整的描述,是一系列解决问题的清晰指令,算法代表着用系统的方法描述解决问题的策略机制。也就是说,能够对一定规范的输入,在有限时间内获得所要求的输出。