// Package node provides runtime statistics tracking for a validator node. // Stats are accumulated with atomic counters (lock-free hot path) and // exposed as a JSON HTTP endpoint on /stats and per-peer on /stats/peers. package node import ( "encoding/json" "fmt" "net/http" "sync" "sync/atomic" "time" "github.com/libp2p/go-libp2p/core/peer" "go-blockchain/blockchain" "go-blockchain/economy" "go-blockchain/wallet" ) // Tracker accumulates statistics for the running node. // All counter fields are accessed atomically — safe from any goroutine. type Tracker struct { startTime time.Time // Consensus counters BlocksProposed atomic.Int64 BlocksCommitted atomic.Int64 ViewChanges atomic.Int64 VotesCast atomic.Int64 // PREPARE + COMMIT votes sent // Network counters ConsensusMsgsSent atomic.Int64 ConsensusMsgsRecv atomic.Int64 BlocksGossipSent atomic.Int64 BlocksGossipRecv atomic.Int64 TxsGossipSent atomic.Int64 TxsGossipRecv atomic.Int64 BlocksSynced atomic.Int64 // downloaded via sync protocol // Per-peer routing stats peersMu sync.RWMutex peers map[peer.ID]*PeerStats } // PeerStats tracks per-peer message counters. type PeerStats struct { PeerID peer.ID `json:"peer_id"` ConnectedAt time.Time `json:"connected_at"` MsgsSent atomic.Int64 MsgsRecv atomic.Int64 BlocksSent atomic.Int64 BlocksRecv atomic.Int64 SyncRequests atomic.Int64 // times we synced blocks from this peer } // NewTracker creates a new Tracker with start time set to now. func NewTracker() *Tracker { return &Tracker{ startTime: time.Now(), peers: make(map[peer.ID]*PeerStats), } } // PeerConnected registers a new peer connection. func (t *Tracker) PeerConnected(id peer.ID) { t.peersMu.Lock() defer t.peersMu.Unlock() if _, ok := t.peers[id]; !ok { t.peers[id] = &PeerStats{PeerID: id, ConnectedAt: time.Now()} } } // PeerDisconnected removes a peer (keeps the slot so history is visible). func (t *Tracker) PeerDisconnected(id peer.ID) { // intentionally kept — routing history is useful even after disconnect } // peer returns (or lazily creates) the PeerStats for id. func (t *Tracker) peer(id peer.ID) *PeerStats { t.peersMu.RLock() ps, ok := t.peers[id] t.peersMu.RUnlock() if ok { return ps } t.peersMu.Lock() defer t.peersMu.Unlock() if ps, ok = t.peers[id]; ok { return ps } ps = &PeerStats{PeerID: id, ConnectedAt: time.Now()} t.peers[id] = ps return ps } // RecordConsensusSent increments consensus message sent counters for a peer. func (t *Tracker) RecordConsensusSent(to peer.ID) { t.ConsensusMsgsSent.Add(1) t.peer(to).MsgsSent.Add(1) } // RecordConsensusRecv increments consensus message received counters. func (t *Tracker) RecordConsensusRecv(from peer.ID) { t.ConsensusMsgsRecv.Add(1) t.peer(from).MsgsRecv.Add(1) } // RecordBlockGossipSent records a block broadcast to a peer. func (t *Tracker) RecordBlockGossipSent() { t.BlocksGossipSent.Add(1) } // RecordBlockGossipRecv records receiving a block via gossip. func (t *Tracker) RecordBlockGossipRecv(from peer.ID) { t.BlocksGossipRecv.Add(1) t.peer(from).BlocksRecv.Add(1) } // RecordSyncFrom records blocks downloaded from a peer via the sync protocol. func (t *Tracker) RecordSyncFrom(from peer.ID, count int) { t.BlocksSynced.Add(int64(count)) t.peer(from).SyncRequests.Add(1) } // UptimeSeconds returns seconds since the tracker was created. func (t *Tracker) UptimeSeconds() int64 { return int64(time.Since(t.startTime).Seconds()) } // --- HTTP API --- // StatsResponse is the full JSON payload returned by /stats. type StatsResponse struct { Node NodeInfo `json:"node"` Chain ChainInfo `json:"chain"` Consensus ConsensusInfo `json:"consensus"` Network NetworkInfo `json:"network"` Economy EconomyInfo `json:"economy"` Reputation RepInfo `json:"reputation"` Peers []PeerInfo `json:"peers"` } type NodeInfo struct { PubKey string `json:"pub_key"` Address string `json:"address"` PeerID string `json:"peer_id"` UptimeSecs int64 `json:"uptime_secs"` WalletBinding string `json:"wallet_binding,omitempty"` // DC address if bound } type ChainInfo struct { Height uint64 `json:"height"` TipHash string `json:"tip_hash"` TipTime string `json:"tip_time,omitempty"` } type ConsensusInfo struct { BlocksProposed int64 `json:"blocks_proposed"` BlocksCommitted int64 `json:"blocks_committed"` ViewChanges int64 `json:"view_changes"` VotesCast int64 `json:"votes_cast"` } type NetworkInfo struct { PeersConnected int `json:"peers_connected"` ConsensusMsgsSent int64 `json:"consensus_msgs_sent"` ConsensusMsgsRecv int64 `json:"consensus_msgs_recv"` BlocksGossipSent int64 `json:"blocks_gossip_sent"` BlocksGossipRecv int64 `json:"blocks_gossip_recv"` TxsGossipSent int64 `json:"txs_gossip_sent"` TxsGossipRecv int64 `json:"txs_gossip_recv"` BlocksSynced int64 `json:"blocks_synced"` } type EconomyInfo struct { BalanceMicroT uint64 `json:"balance_ut"` BalanceDisplay string `json:"balance"` WalletBalanceMicroT uint64 `json:"wallet_balance_ut,omitempty"` WalletBalance string `json:"wallet_balance,omitempty"` } type RepInfo struct { Score int64 `json:"score"` BlocksProduced uint64 `json:"blocks_produced"` RelayProofs uint64 `json:"relay_proofs"` SlashCount uint64 `json:"slash_count"` Heartbeats uint64 `json:"heartbeats"` Rank string `json:"rank"` } type PeerInfo struct { PeerID string `json:"peer_id"` ConnectedAt string `json:"connected_at"` MsgsSent int64 `json:"msgs_sent"` MsgsRecv int64 `json:"msgs_recv"` BlocksSent int64 `json:"blocks_sent"` BlocksRecv int64 `json:"blocks_recv"` SyncRequests int64 `json:"sync_requests"` } // QueryFunc is called by the HTTP handler to fetch live chain/wallet state. type QueryFunc struct { PubKey func() string PeerID func() string PeersCount func() int ChainTip func() *blockchain.Block Balance func(pubKey string) uint64 WalletBinding func(pubKey string) string // returns wallet DC address or "" Reputation func(pubKey string) blockchain.RepStats } // ServeHTTP returns a mux with /stats and /health, plus any extra routes registered via fns. func (t *Tracker) ServeHTTP(q QueryFunc, fns ...func(*http.ServeMux)) http.Handler { mux := http.NewServeMux() mux.HandleFunc("/stats", func(w http.ResponseWriter, r *http.Request) { pubKey := q.PubKey() tip := q.ChainTip() var height uint64 var tipHash, tipTime string if tip != nil { height = tip.Index tipHash = tip.HashHex() tipTime = tip.Timestamp.Format(time.RFC3339) } bal := q.Balance(pubKey) rep := q.Reputation(pubKey) walletBinding := q.WalletBinding(pubKey) t.peersMu.RLock() var peerInfos []PeerInfo for _, ps := range t.peers { peerInfos = append(peerInfos, PeerInfo{ PeerID: ps.PeerID.String(), ConnectedAt: ps.ConnectedAt.Format(time.RFC3339), MsgsSent: ps.MsgsSent.Load(), MsgsRecv: ps.MsgsRecv.Load(), BlocksSent: ps.BlocksSent.Load(), BlocksRecv: ps.BlocksRecv.Load(), SyncRequests: ps.SyncRequests.Load(), }) } t.peersMu.RUnlock() resp := StatsResponse{ Node: NodeInfo{ PubKey: pubKey, Address: wallet.PubKeyToAddress(pubKey), PeerID: q.PeerID(), UptimeSecs: t.UptimeSeconds(), WalletBinding: walletBinding, }, Chain: ChainInfo{ Height: height, TipHash: tipHash, TipTime: tipTime, }, Consensus: ConsensusInfo{ BlocksProposed: t.BlocksProposed.Load(), BlocksCommitted: t.BlocksCommitted.Load(), ViewChanges: t.ViewChanges.Load(), VotesCast: t.VotesCast.Load(), }, Network: NetworkInfo{ PeersConnected: q.PeersCount(), ConsensusMsgsSent: t.ConsensusMsgsSent.Load(), ConsensusMsgsRecv: t.ConsensusMsgsRecv.Load(), BlocksGossipSent: t.BlocksGossipSent.Load(), BlocksGossipRecv: t.BlocksGossipRecv.Load(), TxsGossipSent: t.TxsGossipSent.Load(), TxsGossipRecv: t.TxsGossipRecv.Load(), BlocksSynced: t.BlocksSynced.Load(), }, Economy: EconomyInfo{ BalanceMicroT: bal, BalanceDisplay: economy.FormatTokens(bal), }, Reputation: RepInfo{ Score: rep.Score, BlocksProduced: rep.BlocksProduced, RelayProofs: rep.RelayProofs, SlashCount: rep.SlashCount, Heartbeats: rep.Heartbeats, Rank: rep.Rank(), }, Peers: peerInfos, } w.Header().Set("Content-Type", "application/json") enc := json.NewEncoder(w) enc.SetIndent("", " ") if err := enc.Encode(resp); err != nil { http.Error(w, `{"error":"failed to encode response"}`, http.StatusInternalServerError) } }) mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, `{"status":"ok","uptime_secs":%d}`, t.UptimeSeconds()) }) for _, fn := range fns { fn(mux) } return mux } // ListenAndServe starts the HTTP stats server on addr (e.g. ":8080"). func (t *Tracker) ListenAndServe(addr string, q QueryFunc, fns ...func(*http.ServeMux)) error { handler := t.ServeHTTP(q, fns...) return http.ListenAndServe(addr, handler) }