以太坊源码编译需要gov1.7以上,及C编译器,执行make geth 即可编译项目,编译后可执行的geth文件。
Makefile文件:
geth:
build/env.sh go run build/ci.go install ./cmd/geth
@echo "Done building."
@echo "Run \"$(GOBIN)/geth\" to launch geth."
设置环境变量,执行build/cli.go文件,此文件会执行go install进行编译。
编译完成生成geth文件,直接执行即采用默认配置连接main网络,开始同步区块。
geth入口文件src/github.com/ethereum/go-ethereum/cmd/geth/main.go
func main() {
if err := app.Run(os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
这里使用了一个第三方库,运行run会自动调用之前注册的before action after方法,地址:https://github.com/urfave/cli/tree/v2,很好用的一个库。
func init() {
// Initialize the CLI app and start Geth
app.Action = geth
app.HideVersion = true // we have a command to print the version
app.Copyright = "Copyright 2013-2018 The go-ethereum Authors"
app.Commands = []cli.Command{
// See chaincmd.go:
initCommand,
importCommand,
exportCommand,
importPreimagesCommand,
exportPreimagesCommand,
copydbCommand,
removedbCommand,
dumpCommand,
// See monitorcmd.go:
monitorCommand,
// See accountcmd.go:
accountCommand,
walletCommand,
// See consolecmd.go:
consoleCommand,
attachCommand,
javascriptCommand,
// See misccmd.go:
makecacheCommand,
makedagCommand,
versionCommand,
bugCommand,
licenseCommand,
// See config.go
dumpConfigCommand,
}
sort.Sort(cli.CommandsByName(app.Commands))
app.Flags = append(app.Flags, nodeFlags...)
app.Flags = append(app.Flags, rpcFlags...)
app.Flags = append(app.Flags, consoleFlags...)
app.Flags = append(app.Flags, debug.Flags...)
app.Flags = append(app.Flags, whisperFlags...)
app.Flags = append(app.Flags, metricsFlags...)
app.Before = func(ctx *cli.Context) error {
logdir := ""
if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
logdir = (&node.Config{DataDir: utils.MakeDataDir(ctx)}).ResolvePath("logs")
}
if err := debug.Setup(ctx, logdir); err != nil {
return err
}
// Cap the cache allowance and tune the garbage collector
var mem gosigar.Mem
if err := mem.Get(); err == nil {
allowance := int(mem.Total / 1024 / 1024 / 3)
if cache := ctx.GlobalInt(utils.CacheFlag.Name); cache > allowance {
log.Warn("Sanitizing cache to Go's GC limits", "provided", cache, "updated", allowance)
ctx.GlobalSet(utils.CacheFlag.Name, strconv.Itoa(allowance))
}
}
// Ensure Go's GC ignores the database cache for trigger percentage
cache := ctx.GlobalInt(utils.CacheFlag.Name)
gogc := math.Max(20, math.Min(100, 100/(float64(cache)/1024)))
log.Debug("Sanitizing Go's GC trigger", "percent", int(gogc))
godebug.SetGCPercent(int(gogc))
// Start metrics export if enabled
utils.SetupMetrics(ctx)
// Start system runtime metrics collection
go metrics.CollectProcessMetrics(3 * time.Second)
return nil
}
app.After = func(ctx *cli.Context) error {
debug.Exit()
console.Stdin.Close() // Resets terminal mode.
return nil
}
}
以上代码即提前注册了action方法,可以执行的command,命令接受的参数,及before方法,after方法。
直接执行geth,就相当于直接调用如下方法:
func geth(ctx *cli.Context) error {
if args := ctx.Args(); len(args) > 0 {
return fmt.Errorf("invalid command: %q", args[0])
}
node := makeFullNode(ctx)
startNode(ctx, node)
node.Wait()
return nil
}
makeFullNode使用默认配置创建一个全节点的node,然后startNode启动node,最后等待结束指令。
下面先深入分析makeFullNode,主要是从配置文件或者命令行参数初始一个Node对象,看代码注释:
func makeFullNode(ctx *cli.Context) *node.Node {
stack, cfg := makeConfigNode(ctx) //返回初始配置及node实例
utils.RegisterEthService(stack, &cfg.Eth) //注册eth服务,但还未启动ethereum协议处理服务
if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit) //注册控制面板服务
}
// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
shhEnabled := enableWhisper(ctx)
shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
if shhEnabled || shhAutoEnabled {
if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
}
if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
}
if ctx.GlobalIsSet(utils.WhisperRestrictConnectionBetweenLightClientsFlag.Name) {
cfg.Shh.RestrictConnectionBetweenLightClients = true
}
utils.RegisterShhService(stack, &cfg.Shh) //注册whisper服务
}
// Add the Ethereum Stats daemon if requested.
if cfg.Ethstats.URL != "" {
utils.RegisterEthStatsService(stack, cfg.Ethstats.URL) //注册统计监控服务
}
return stack
}
makeConfigNode代码如下
func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
// Load defaults.
cfg := gethConfig{
Eth: eth.DefaultConfig,
Shh: whisper.DefaultConfig,
Node: defaultNodeConfig(),
Dashboard: dashboard.DefaultConfig,
}
// Load config file.
if file := ctx.GlobalString(configFileFlag.Name); file != "" {
if err := loadConfig(file, &cfg); err != nil {
utils.Fatalf("%v", err)
}
}
// Apply flags.
utils.SetNodeConfig(ctx, &cfg.Node)
stack, err := node.New(&cfg.Node)
if err != nil {
utils.Fatalf("Failed to create the protocol stack: %v", err)
}
utils.SetEthConfig(ctx, stack, &cfg.Eth)
if ctx.GlobalIsSet(utils.EthStatsURLFlag.Name) {
cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name)
}
utils.SetShhConfig(ctx, stack, &cfg.Shh)
utils.SetDashboardConfig(ctx, &cfg.Dashboard)
return stack, cfg
}
eth.DefaultConfig是geth的默认配置,代码:
// DefaultConfig contains default settings for use on the Ethereum main net.
var DefaultConfig = Config{
SyncMode: downloader.FastSync,
Ethash: ethash.Config{
CacheDir: "ethash",
CachesInMem: 2,
CachesOnDisk: 3,
DatasetsInMem: 1,
DatasetsOnDisk: 2,
},
NetworkId: 1,
LightPeers: 100,
DatabaseCache: 768,
TrieCache: 256,
TrieTimeout: 60 * time.Minute,
MinerGasFloor: 8000000,
MinerGasCeil: 8000000,
MinerGasPrice: big.NewInt(params.GWei),
MinerRecommit: 3 * time.Second,
TxPool: core.DefaultTxPoolConfig,
GPO: gasprice.Config{
Blocks: 20,
Percentile: 60,
},
}
主要设置同步模式为fast模式,网络id为1,缓存的值,gas的值,及交易池的默认配置
makeConfigNode方法中还有一个从配置文件加载配置的逻辑
// Load config file.
if file := ctx.GlobalString(configFileFlag.Name); file != "" {
if err := loadConfig(file, &cfg); err != nil {
utils.Fatalf("%v", err)
}
}
这个配置文件是一个toml的格式的文件,使用了第三方库github.com/naoina/toml,解析toml的库有很多,除了这个,还有另外一个库也挺好用,推荐给大家https://github.com/BurntSushi/toml
配置加载后,就调用stack, err := node.New(&cfg.Node) 初始一个node实例,即进入了node目录下的node.go文件,把New方法放这种规范的文件里,也是不错的规范。
返回node实例即配置信息后,即执行utils.RegisterEthService(stack, &cfg.Eth) 注册ethereum服务,注册逻辑就是将下面的方法注册到serviceFuns数组里
func(ctx *node.ServiceContext) (node.Service, error) {
fullNode, err := eth.New(ctx, cfg)
if fullNode != nil && cfg.LightServ > 0 {
ls, _ := les.NewLesServer(fullNode, cfg)
fullNode.AddLesServer(ls)
}
return fullNode, err
}
这个方法有个很重要的一行代码fullNode, err := eth.New(ctx, cfg),即创建一个ethereum实例,下面主要代码分析。
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
// ...对一些配置进行验证,代码略
// Assemble the Ethereum object
chainDb, err := CreateDB(ctx, config, "chaindata") //获取chaindata目录关联的db对象
if err != nil {
return nil, err
}
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis) //设置genesis块,加载默认genesis及写到goleveldb数据库,下面有代码详细分析
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr
}
log.Info("Initialised chain configuration", "config", chainConfig)
eth := &Ethereum{
config: config,
chainDb: chainDb,
chainConfig: chainConfig,
eventMux: ctx.EventMux,
accountManager: ctx.AccountManager,
engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.MinerNotify, config.MinerNoverify, chainDb),
shutdownChan: make(chan bool),
networkID: config.NetworkId,
gasPrice: config.MinerGasPrice,
etherbase: config.Etherbase,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
} //实例化ethereum
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
if !config.SkipBcVersionCheck {
bcVersion := rawdb.ReadDatabaseVersion(chainDb)
if bcVersion != core.BlockChainVersion && bcVersion != 0 { //对链的版本检查,当前是3
return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d).\n", bcVersion, core.BlockChainVersion)
}
rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion)
}
//...初始vm及cach的配置
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig, eth.shouldPreserve)//创建一个blockchain实例
if err != nil {
return nil, err
}
// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
eth.blockchain.SetHead(compat.RewindTo)
rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
eth.bloomIndexer.Start(eth.blockchain) //启动bloom索引,一个多次hash按位比较的索引过滤器
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)// 创建交易池
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err
}
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine, config.MinerRecommit, config.MinerGasFloor, config.MinerGasCeil, eth.isLocalBlock) //创建矿工实例
eth.miner.SetExtra(makeExtraData(config.MinerExtraData))
eth.APIBackend = &EthAPIBackend{eth, nil}
gpoParams := config.GPO
if gpoParams.Default == nil {
gpoParams.Default = config.MinerGasPrice
}
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams) //创建一个gasprice的语言机,提供推荐的gasprice
return eth, nil
}
上面的ethereum服务会在node.start()时同时执行ethereum.start方法,代码如下:
// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start(srvr *p2p.Server) error {
// Start the bloom bits servicing goroutines
s.startBloomHandlers(params.BloomBitsBlocks)
// Start the RPC service
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
// Figure out a max peers count based on the server limits
maxPeers := srvr.MaxPeers
if s.config.LightServ > 0 {
if s.config.LightPeers >= srvr.MaxPeers {
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
}
maxPeers -= s.config.LightPeers
}
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(srvr)
}
return nil
}
主要的一行代码是s.protocolManager.Start(maxPeers),启动协议管理实例,代码:
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop() //协程开启交易广播
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop() //协程开启挖矿
// start sync handlers
go pm.syncer() //协程开启同步
go pm.txsyncLoop()//协程开启交易同步
}
上面的广播及同步操作,都采用CSP思想设计,把需要处理的消息放到channel中。然后由p.broadcast()方法处理
现在回到makeFullNode,刚才说过了注册ethereum服务,除此之外还注册了whisper服务,统计服务,最后返回这个node实例。
接下来就进行startNode(ctx, node),主要代码如下:
func startNode(ctx *cli.Context, stack *node.Node) {
// Start up the node itself
utils.StartNode(stack) //启动本节点的注册的服务及rpc等,代码下面会分析
// Unlock any account specifically requested
ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)
//......解锁钱包代码略
// Start auxiliary services if enabled
if ctx.GlobalBool(utils.MiningEnabledFlag.Name) || ctx.GlobalBool(utils.DeveloperFlag.Name) {
//...代码略,获取正在运行的ethereum服务,并设置交易池的gasprice
ethereum.TxPool().SetGasPrice(gasprice)
//设置挖矿线程数
if err := ethereum.StartMining(threads); err != nil { //开启挖矿
utils.Fatalf("Failed to start mining: %v", err)
}
}
}
StartNode代码如下:
func StartNode(stack *node.Node) {
if err := stack.Start(); err != nil {
Fatalf("Error starting protocol stack: %v", err)
}
go func() {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(sigc)
<-sigc
log.Info("Got interrupt, shutting down...")
go stack.Stop()
for i := 10; i > 0; i-- {
<-sigc
if i > 1 {
log.Warn("Already shutting down, interrupt more to panic.", "times", i-1)
}
}
debug.Exit() // ensure trace and CPU profile data is flushed.
debug.LoudPanic("boom")
}()
}
stack.Start()启动node,然后监听终止信号。
这个node.start方法的代码挺多,但是逻辑主要有两个,如下:
func (n *Node) Start() error {
n.lock.Lock()
defer n.lock.Unlock()
//设置serverconfig
running := &p2p.Server{Config: n.serverConfig} //实例化一个p2p的server端
n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
//找出本节点支持的service,同时初始化running.Protocols
if err := running.Start(); err != nil { //启动p2p服务端
return convertFileLockError(err)
}
// Start each of the services 然后循环所有支持的服务进行启动service.Start(running)
started := []reflect.Type{}
for kind, service := range services {
// Start the next service, stopping all previous upon failure
if err := service.Start(running); err != nil { //启动 ethereum服务就在这启动的
for _, kind := range started {
services[kind].Stop()
}
running.Stop()
return err
}
// Mark the service started for potential cleanup
started = append(started, kind)
}
// Lastly start the configured RPC interfaces
if err := n.startRPC(services); err != nil { //启动rpc服务,包括ipc/http/websocket
for _, service := range services {
service.Stop()
}
running.Stop()
return err
}
// Finish initializing the startup
n.services = services
n.server = running
n.stop = make(chan struct{})
return nil
}
下面看p2p服务端的启动代码
func (srv *Server) Start() (err error) {
// src参数初始化代码略
if err := srv.setupLocalNode(); err != nil {
return err
}
if srv.ListenAddr != "" {
if err := srv.setupListening(); err != nil { //启动端口监听
return err
}
}
if err := srv.setupDiscovery(); err != nil { //启动节点发现
return err
}
dynPeers := srv.maxDialedConns()
dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict) //获得初始连接的节点
srv.loopWG.Add(1)
go srv.run(dialer) //启动server
return nil
}
go srv.run(dialer)这行代码是关键,负责与初始的节点进行握手并启动节点的连接,代码如下:
func (srv *Server) run(dialstate dialer) {
srv.log.Info("Started P2P networking", "self", srv.localnode.Node())
defer srv.loopWG.Done()
defer srv.nodedb.Close()
var (
peers = make(map[enode.ID]*Peer)
inboundCount = 0
trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
taskdone = make(chan task, maxActiveDialTasks)
runningTasks []task
queuedTasks []task // tasks that can't run yet
)
// Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
for _, n := range srv.TrustedNodes {
trusted[n.ID()] = true
}
// removes t from runningTasks
delTask := func(t task) {
for i := range runningTasks {
if runningTasks[i] == t {
runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
break
}
}
}
// starts until max number of active tasks is satisfied
startTasks := func(ts []task) (rest []task) {
i := 0
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
t := ts[i]
srv.log.Trace("New dial task", "task", t)
go func() { t.Do(srv); taskdone <- t }()
runningTasks = append(runningTasks, t)
}
return ts[i:]
}
scheduleTasks := func() {
// Start from queue first.
queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
// Query dialer for new tasks and start as many as possible now.
if len(runningTasks) < maxActiveDialTasks {
nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
queuedTasks = append(queuedTasks, startTasks(nt)...)
}
}
running:
for {
scheduleTasks()
select {
case <-srv.quit:
// The server was stopped. Run the cleanup logic.
break running
case n := <-srv.addstatic:
// This channel is used by AddPeer to add to the
// ephemeral static peer list. Add it to the dialer,
// it will keep the node connected.
srv.log.Trace("Adding static node", "node", n)
dialstate.addStatic(n)
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected.
srv.log.Trace("Removing static node", "node", n)
dialstate.removeStatic(n)
if p, ok := peers[n.ID()]; ok {
p.Disconnect(DiscRequested)
}
case n := <-srv.addtrusted:
// This channel is used by AddTrustedPeer to add an enode
// to the trusted node set.
srv.log.Trace("Adding trusted node", "node", n)
trusted[n.ID()] = true
// Mark any already-connected peer as trusted
if p, ok := peers[n.ID()]; ok {
p.rw.set(trustedConn, true)
}
case n := <-srv.removetrusted:
// This channel is used by RemoveTrustedPeer to remove an enode
// from the trusted node set.
srv.log.Trace("Removing trusted node", "node", n)
if _, ok := trusted[n.ID()]; ok {
delete(trusted, n.ID())
}
// Unmark any already-connected peer as trusted
if p, ok := peers[n.ID()]; ok {
p.rw.set(trustedConn, false)
}
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
op(peers)
srv.peerOpDone <- struct{}{}
case t := <-taskdone:
// A task got done. Tell dialstate about it so it
// can update its state and remove it from the active
// tasks list.
srv.log.Trace("Dial task done", "task", t)
dialstate.taskDone(t, time.Now())
delTask(t)
case c := <-srv.posthandshake:
// A connection has passed the encryption handshake so
// the remote identity is known (but hasn't been verified yet).
if trusted[c.node.ID()] {
// Ensure that the trusted flag is set before checking against MaxPeers.
c.flags |= trustedConn
}
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
select {
case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
case <-srv.quit:
break running
}
case c := <-srv.addpeer:
// At this point the connection is past the protocol handshake.
// Its capabilities are known and the remote identity is verified.
err := srv.protoHandshakeChecks(peers, inboundCount, c)
if err == nil {
// The handshakes are done and it passed all checks.
p := newPeer(c, srv.Protocols)
// If message events are enabled, pass the peerFeed
// to the peer
if srv.EnableMsgEvents {
p.events = &srv.peerFeed
}
name := truncateName(c.name)
srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
go srv.runPeer(p)
peers[c.node.ID()] = p
if p.Inbound() {
inboundCount++
}
}
// The dialer logic relies on the assumption that
// dial tasks complete after the peer has been added or
// discarded. Unblock the task last.
select {
case c.cont <- err:
case <-srv.quit:
break running
}
case pd := <-srv.delpeer:
// A peer disconnected.
d := common.PrettyDuration(mclock.Now() - pd.created)
pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err)
delete(peers, pd.ID())
if pd.Inbound() {
inboundCount--
}
}
}
srv.log.Trace("P2P networking is spinning down")
// Terminate discovery. If there is a running lookup it will terminate soon.
if srv.ntab != nil {
srv.ntab.Close()
}
if srv.DiscV5 != nil {
srv.DiscV5.Close()
}
// Disconnect all peers.
for _, p := range peers {
p.Disconnect(DiscQuitting)
}
// Wait for peers to shut down. Pending connections and tasks are
// not handled here and will terminate soon-ish because srv.quit
// is closed.
for len(peers) > 0 {
p := <-srv.delpeer
p.log.Trace("<-delpeer (spindown)", "remainingTasks", len(runningTasks))
delete(peers, p.ID())
}
}
其中case c := <-srv.addpeer 里有go srv.runPeer(p)即与p节点建立连接,主要代码如下:
func (p *Peer) run() (remoteRequested bool, err error) {
var (
writeStart = make(chan struct{}, 1)
writeErr = make(chan error, 1)
readErr = make(chan error, 1)
reason DiscReason // sent to the peer
)
p.wg.Add(2)
go p.readLoop(readErr) //负责读取消息并处理消息
go p.pingLoop() //定时发送Ping
// Start all protocol handlers.
writeStart <- struct{}{}
p.startProtocols(writeStart, writeErr) //启动服务支持的协议
// Wait for an error or disconnect.
loop:
for {
//代码略
}
close(p.closed)
p.rw.close(reason)
p.wg.Wait()
return remoteRequested, err
}
readLoop是读取连接发送来的数据的,代码
func (p *Peer) readLoop(errc chan<- error) {
defer p.wg.Done()
for {
msg, err := p.rw.ReadMsg()
if err != nil {
errc <- err
return
}
msg.ReceivedAt = time.Now()
if err = p.handle(msg); err != nil {
errc <- err
return
}
}
}
读取消息后,进行处理,代码:
func (p *Peer) handle(msg Msg) error {
switch {
case msg.Code == pingMsg:
msg.Discard()
go SendItems(p.rw, pongMsg)
case msg.Code == discMsg:
var reason [1]DiscReason
// This is the last message. We don't need to discard or
// check errors because, the connection will be closed after it.
rlp.Decode(msg.Payload, &reason)
return reason[0]
case msg.Code < baseProtocolLength:
// ignore other base protocol messages
return msg.Discard()
default:
// it's a subprotocol message
proto, err := p.getProto(msg.Code)
if err != nil {
return fmt.Errorf("msg code out of range: %v", msg.Code)
}
select {
case proto.in <- msg:
return nil
case <-p.closed:
return io.EOF
}
}
return nil
}
至此,p2p连接已经建立!节点之间同步区块头、区块体,及交易池的交易等等。
接下来开始挖矿ethereum.StartMining(threads),这个方法主要是设置gasprice,设置etherbase(coinbase)账号,然后执行
go s.miner.Start(eb),代码如下:
func (self *Miner) Start(coinbase common.Address) {
atomic.StoreInt32(&self.shouldStart, 1)
self.SetEtherbase(coinbase)
if atomic.LoadInt32(&self.canStart) == 0 {
log.Info("Network syncing, will start miner afterwards")
return
}
self.worker.start()
}
self.worker.start()代码:
func (w *worker) start() {
atomic.StoreInt32(&w.running, 1)
w.startCh <- struct{}{}
}
设置了w.startCh,一个work的挖矿开关。
这个work是在ethereum.new服务创建时,创建mine时一块创建的,代码:
func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(block *types.Block) bool) *Miner {
miner := &Miner{
eth: eth,
mux: mux,
engine: engine,
exitCh: make(chan struct{}),
worker: newWorker(config, engine, eth, mux, recommit, gasFloor, gasCeil, isLocalBlock),
canStart: 1,
}
go miner.update()
return miner
}
有一个newWorker(config, engine, eth, mux, recommit, gasFloor, gasCeil, isLocalBlock),主要代码:
func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock
//代码略...
go worker.mainLoop()
go worker.newWorkLoop(recommit)
go worker.resultLoop()
go worker.taskLoop()
// Submit first work to initialize pending state.
worker.startCh <- struct{}{}
return worker
}
OK吧,挖矿的逻辑还是挺多的,具体怎么挖矿改日再续~
看源码,以太坊的代码量挺多的,模块也划分的挺清晰,只是某些方法注释没有写,项目模块化还可以做的更加松散些,依赖的一些第三方依赖可以专门处理下~