Node is the place where everthing starts.
// Node is a container on which services can be registered.
type Node struct {
eventmux *event.TypeMux // Event multiplexer used between the services of a stack
config *Config
accman *accounts.Manager
ephemeralKeystore string // if non-empty, the key directory that will be removed by Stop
instanceDirLock fileutil.Releaser // prevents concurrent use of instance directory
serverConfig p2p.Config
server *p2p.Server // Currently running P2P networking layer
serviceFuncs []ServiceConstructor // Service constructors (in dependency order)
services map[reflect.Type]Service // Currently running services
rpcAPIs []rpc.API // List of APIs currently provided by the node
inprocHandler *rpc.Server // In-process RPC request handler to process the API requests
ipcEndpoint string // IPC endpoint to listen at (empty = IPC disabled)
ipcListener net.Listener // IPC RPC listener socket to serve API requests
ipcHandler *rpc.Server // IPC RPC request handler to process the API requests
httpEndpoint string // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled)
httpWhitelist []string // HTTP RPC modules to allow through this endpoint
httpListener net.Listener // HTTP RPC listener socket to server API requests
httpHandler *rpc.Server // HTTP RPC request handler to process the API requests
wsEndpoint string // Websocket endpoint (interface + port) to listen at (empty = websocket disabled)
wsListener net.Listener // Websocket RPC listener socket to server API requests
wsHandler *rpc.Server // Websocket RPC request handler to process the API requests
stop chan struct{} // Channel to wait for termination notifications
lock sync.RWMutex
log log.Logger
}
// Manager is an overarching account manager that can communicate with various
// backends for signing transactions.
type Manager struct {
config *Config // Global account manager configurations
backends map[reflect.Type][]Backend // Index of backends currently registered
updaters []event.Subscription // Wallet update subscriptions for all backends
updates chan WalletEvent // Subscription sink for backend wallet changes
wallets []Wallet // Cache of all wallets from all registered backends
feed event.Feed // Wallet feed notifying of arrivals/departures
quit chan chan error
lock sync.RWMutex
}
// NewManager creates a generic account manager to sign transaction via various
// supported backends.
func NewManager(config *Config, backends ...Backend) *Manager {
// Retrieve the initial list of wallets from the backends and sort by URL
var wallets []Wallet
for _, backend := range backends {
wallets = merge(wallets, backend.Wallets()...)
}
// Subscribe to wallet notifications from all backends
updates := make(chan WalletEvent, 4*len(backends))
subs := make([]event.Subscription, len(backends))
for i, backend := range backends {
subs[i] = backend.Subscribe(updates)
}
// Assemble the account manager and return
am := &Manager{
config: config,
backends: make(map[reflect.Type][]Backend),
updaters: subs,
updates: updates,
wallets: wallets,
quit: make(chan chan error),
}
for _, backend := range backends {
kind := reflect.TypeOf(backend)
am.backends[kind] = append(am.backends[kind], backend)
}
go am.update()
return am
}
// update is the wallet event loop listening for notifications from the backends
// and updating the cache of wallets.
func (am *Manager) update() {
// Close all subscriptions when the manager terminates
defer func() {
am.lock.Lock()
for _, sub := range am.updaters {
sub.Unsubscribe()
}
am.updaters = nil
am.lock.Unlock()
}()
// Loop until termination
for {
select {
case event := <-am.updates:
// Wallet event arrived, update local cache
am.lock.Lock()
switch event.Kind {
case WalletArrived:
am.wallets = merge(am.wallets, event.Wallet)
case WalletDropped:
am.wallets = drop(am.wallets, event.Wallet)
}
am.lock.Unlock()
// Notify any listeners of the event
am.feed.Send(event)
case errc := <-am.quit:
// Manager terminating, return
errc <- nil
return
}
}
}
// events
const (
// WalletArrived is fired when a new wallet is detected either via USB or via
// a filesystem event in the keystore.
WalletArrived WalletEventType = iota
// WalletOpened is fired when a wallet is successfully opened with the purpose
// of starting any background processes such as automatic key derivation.
WalletOpened
// WalletDropped
WalletDropped
)
Account Mgr starts a goroutine ‘am.update()’ to receive update notification from any wallets. Each backend will subscribe to the update channel:
subs[i] = backend.Subscribe(updates)
Then the event will be fed to all listeners.
am.feed.Send(event)
func makeFullNode(ctx *cli.Context) *node.Node {
stack, cfg := makeConfigNode(ctx)
if ctx.GlobalIsSet(utils.OverrideIstanbulFlag.Name) {
cfg.Eth.OverrideIstanbul = new(big.Int).SetUint64(ctx.GlobalUint64(utils.OverrideIstanbulFlag.Name))
}
if ctx.GlobalIsSet(utils.OverrideMuirGlacierFlag.Name) {
cfg.Eth.OverrideMuirGlacier = new(big.Int).SetUint64(ctx.GlobalUint64(utils.OverrideMuirGlacierFlag.Name))
}
utils.RegisterEthService(stack, &cfg.Eth)
// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
shhEnabled := enableWhisper(ctx)
shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
if shhEnabled || shhAutoEnabled {
if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
}
if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
}
if ctx.GlobalIsSet(utils.WhisperRestrictConnectionBetweenLightClientsFlag.Name) {
cfg.Shh.RestrictConnectionBetweenLightClients = true
}
utils.RegisterShhService(stack, &cfg.Shh)
}
// Configure GraphQL if requested
if ctx.GlobalIsSet(utils.GraphQLEnabledFlag.Name) {
utils.RegisterGraphQLService(stack, cfg.Node.GraphQLEndpoint(), cfg.Node.GraphQLCors, cfg.Node.GraphQLVirtualHosts, cfg.Node.HTTPTimeouts)
}
// Add the Ethereum Stats daemon if requested.
if cfg.Ethstats.URL != "" {
utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
}
return stack
}
The call stack of Ethereum object initialization:
github.com/ethereum/go-ethereum/eth.New at backend.go:119
github.com/ethereum/go-ethereum/cmd/utils.RegisterEthService.func2 at flags.go:1564
github.com/ethereum/go-ethereum/node.(*Node).Start at node.go:206
github.com/ethereum/go-ethereum/cmd/utils.StartNode at cmd.go:67
main.startNode at main.go:327
main.geth at main.go:308
gopkg.in/urfave/cli%2ev1.HandleAction at app.go:490
gopkg.in/urfave/cli%2ev1.(*App).Run at app.go:264
main.main at main.go:248
runtime.main at proc.go:203
runtime.goexit at asm_amd64.s:1357
- Async stack trace
runtime.rt0_go at asm_amd64.s:220
// New creates a new Ethereum object (including the
// initialisation of the common Ethereum object)
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
// Ensure configuration values are compatible and sane
if config.SyncMode == downloader.LightSync {
return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
}
if !config.SyncMode.IsValid() {
return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
}
if config.Miner.GasPrice == nil || config.Miner.GasPrice.Cmp(common.Big0) <= 0 {
log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", DefaultConfig.Miner.GasPrice)
config.Miner.GasPrice = new(big.Int).Set(DefaultConfig.Miner.GasPrice)
}
if config.NoPruning && config.TrieDirtyCache > 0 {
config.TrieCleanCache += config.TrieDirtyCache
config.TrieDirtyCache = 0
}
log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024)
// Assemble the Ethereum object
chainDb, err := ctx.OpenDatabaseWithFreezer("chaindata", config.DatabaseCache, config.DatabaseHandles, config.DatabaseFreezer, "eth/db/chaindata/")
if err != nil {
return nil, err
}
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlockWithOverride(chainDb, config.Genesis, config.OverrideIstanbul, config.OverrideMuirGlacier)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr
}
log.Info("Initialised chain configuration", "config", chainConfig)
eth := &Ethereum{
config: config,
chainDb: chainDb,
eventMux: ctx.EventMux,
accountManager: ctx.AccountManager,
engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify, chainDb),
shutdownChan: make(chan bool),
networkID: config.NetworkId,
gasPrice: config.Miner.GasPrice,
etherbase: config.Miner.Etherbase,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
}
bcVersion := rawdb.ReadDatabaseVersion(chainDb)
var dbVer = "<nil>"
if bcVersion != nil {
dbVer = fmt.Sprintf("%d", *bcVersion)
}
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId, "dbversion", dbVer)
if !config.SkipBcVersionCheck {
if bcVersion != nil && *bcVersion > core.BlockChainVersion {
return nil, fmt.Errorf("database version is v%d, Geth %s only supports v%d", *bcVersion, params.VersionWithMeta, core.BlockChainVersion)
} else if bcVersion == nil || *bcVersion < core.BlockChainVersion {
log.Warn("Upgrade blockchain database version", "from", dbVer, "to", core.BlockChainVersion)
rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion)
}
}
var (
vmConfig = vm.Config{
EnablePreimageRecording: config.EnablePreimageRecording,
EWASMInterpreter: config.EWASMInterpreter,
EVMInterpreter: config.EVMInterpreter,
}
cacheConfig = &core.CacheConfig{
TrieCleanLimit: config.TrieCleanCache,
TrieCleanNoPrefetch: config.NoPrefetch,
TrieDirtyLimit: config.TrieDirtyCache,
TrieDirtyDisabled: config.NoPruning,
TrieTimeLimit: config.TrieTimeout,
}
)
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve)
if err != nil {
return nil, err
}
// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
eth.blockchain.SetHead(compat.RewindTo)
rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
eth.bloomIndexer.Start(eth.blockchain)
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit
checkpoint := config.Checkpoint
if checkpoint == nil {
checkpoint = params.TrustedCheckpoints[genesisHash]
}
if eth.protocolManager, err = NewProtocolManager(chainConfig, checkpoint, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil {
return nil, err
}
eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock)
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
eth.APIBackend = &EthAPIBackend{ctx.ExtRPCEnabled(), eth, nil}
gpoParams := config.GPO
if gpoParams.Default == nil {
gpoParams.Default = config.Miner.GasPrice
}
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)
eth.dialCandiates, err = eth.setupDiscovery(&ctx.Config.P2P)
if err != nil {
return nil, err
}
return eth, nil
}
Start Service will start Ethereum.
// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start(srvr *p2p.Server) error {
s.startEthEntryUpdate(srvr.LocalNode())
// Start the bloom bits servicing goroutines
s.startBloomHandlers(params.BloomBitsBlocks)
// Start the RPC service
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
// Figure out a max peers count based on the server limits
maxPeers := srvr.MaxPeers
if s.config.LightServ > 0 {
if s.config.LightPeers >= srvr.MaxPeers {
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
}
maxPeers -= s.config.LightPeers
}
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(srvr)
}
return nil
}
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
// start sync handlers
go pm.syncer()
go pm.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64.
}
Ethereum sub protocol manager. can manage peers capable with the Ethereum network.
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
// start sync handlers
go pm.syncer()
go pm.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64.
}
Start a goroutine looping for accepting inbound connections. Start a background gorouting for updating NAT map.
This gorouting uses go chan to implement a limiter of max allowed connections. defaultMaxPendingPeers = 50 or --maxpendpeers value
Any inbound connection will be handled by:
// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
err := srv.setupConn(c, flags, dialDest)
if err != nil {
c.close(err)
}
return err
}
As a result, any valid connection will be accepted as a peer, by sending a message to srv.checkpointAddPeer.
func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
closeCtx, cancel := context.WithCancel(context.Background())
t := &UDPv4{
conn: c,
priv: cfg.PrivateKey,
netrestrict: cfg.NetRestrict,
localNode: ln,
db: ln.Database(),
gotreply: make(chan reply),
addReplyMatcher: make(chan *replyMatcher),
closeCtx: closeCtx,
cancelCloseCtx: cancel,
log: cfg.Log,
}
if t.log == nil {
t.log = log.Root()
}
tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log)
if err != nil {
return nil, err
}
t.tab = tab
go tab.loop()
t.wg.Add(2)
go t.loop()
go t.readLoop(cfg.Unhandled)
return t, nil
}
It is loosely inspired by the Kademlia DHT, but unlike most DHTs no arbitrary keys and values are stored. Instead, the DHT stores and relays ‘node records’, which are signed documents providing information about nodes in the network. Node Discovery acts as a database of all live nodes in the network and performs three basic functions: