// Package p2p — peer version discovery via gossipsub. // // What this solves // ──────────────── // A decentralized node fleet has no registry telling each operator what // version everyone else is running. Without that knowledge: // // • We can't decide when it's safe to activate a new feature-flag tx // (§5.2 of UPDATE_STRATEGY.md) — activation must wait until ≥N% of // the network has the new binary. // • Operators can't see at a glance "am I the one holding back an // upgrade?" — because their node's Explorer had no way to ask peers. // • Clients can't warn the user "this node is running a pre-channels // build" without making N extra HTTP round-trips. // // How it works // ──────────── // A small gossipsub topic — `dchain/version/v1` — carries a JSON blob from // each node: // // { // "peer_id": "12D3KooW…", // "tag": "v0.5.1", // "commit": "abc1234…", // "protocol_version": 1, // "timestamp": 1715000000 // } // // Every node: // 1. Publishes its own blob every 60 seconds. // 2. Subscribes to the topic and keeps a bounded in-memory map // peer.ID → latest announce. // 3. Evicts entries older than 15 minutes (peer disconnect / stale). // // Messages are unsigned and advisory — a peer lying about its version is // detectable when their blocks/txs use unsupported fields (consensus will // reject), so we don't add a signature layer here. The map is pure UX. // // Memory budget: ~200 bytes per peer × bounded by connected peer count. // Topic traffic: ~300 bytes every 60s per peer → trivial for a libp2p fleet. package p2p import ( "context" "encoding/json" "log" "sync" "time" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" "go-blockchain/node/version" ) // TopicVersion is the gossipsub topic for peer-version announces. const TopicVersion = "dchain/version/v1" const ( versionGossipInterval = 60 * time.Second versionGossipTTL = 15 * time.Minute ) // PeerVersion is one peer's self-reported identity. type PeerVersion struct { PeerID string `json:"peer_id"` Tag string `json:"tag"` Commit string `json:"commit"` ProtocolVersion int `json:"protocol_version"` Timestamp int64 `json:"timestamp"` ReceivedAt time.Time `json:"received_at,omitempty"` } // versionAnnouncer is wired into Host via StartVersionGossip. Holds the // publish topic + subscription + the latest-seen map under its own mutex, // so read path (PeerVersions) is lock-free against publish. type versionAnnouncer struct { h *Host topic *pubsub.Topic sub *pubsub.Subscription protoVer int mu sync.RWMutex latest map[peer.ID]PeerVersion } // StartVersionGossip joins the version topic, spawns the publisher loop and // the subscriber loop, and returns. Both goroutines run until ctx is done. // // Call exactly once per Host. protocolVersion should be node.ProtocolVersion // (the compile-time wire-protocol const) — threaded through as an int to // avoid an import cycle (p2p → node would be circular; node → p2p already // exists via the host injection). func (n *Host) StartVersionGossip(ctx context.Context, protocolVersion int) error { topic, err := n.ps.Join(TopicVersion) if err != nil { return err } sub, err := topic.Subscribe() if err != nil { return err } va := &versionAnnouncer{ h: n, topic: topic, sub: sub, protoVer: protocolVersion, latest: make(map[peer.ID]PeerVersion), } n.versionAnnouncer = va go va.publishLoop(ctx) go va.subscribeLoop(ctx) go va.evictLoop(ctx) return nil } // PeerVersions returns a snapshot of every peer's last-known version. // Result is a copy — caller can iterate without a lock. func (n *Host) PeerVersions() map[string]PeerVersion { if n.versionAnnouncer == nil { return nil } va := n.versionAnnouncer va.mu.RLock() defer va.mu.RUnlock() out := make(map[string]PeerVersion, len(va.latest)) for pid, v := range va.latest { out[pid.String()] = v } return out } func (va *versionAnnouncer) publishLoop(ctx context.Context) { // First publish immediately so peers who just joined learn our version // without a minute of lag. va.publishOnce(ctx) t := time.NewTicker(versionGossipInterval) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: va.publishOnce(ctx) } } } func (va *versionAnnouncer) publishOnce(ctx context.Context) { msg := PeerVersion{ PeerID: va.h.h.ID().String(), Tag: version.Tag, Commit: version.Commit, ProtocolVersion: va.protoVer, Timestamp: time.Now().Unix(), } b, err := json.Marshal(msg) if err != nil { log.Printf("[P2P] version gossip marshal: %v", err) return } pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := va.topic.Publish(pubCtx, b); err != nil { log.Printf("[P2P] version gossip publish: %v", err) } } func (va *versionAnnouncer) subscribeLoop(ctx context.Context) { for { m, err := va.sub.Next(ctx) if err != nil { if ctx.Err() != nil { return } log.Printf("[P2P] version gossip recv: %v", err) continue } // Skip our own broadcasts — gossipsub delivers them back to us by // default. Without this we'd overwrite our own "received" timestamp // every minute and clutter metrics. if m.ReceivedFrom == va.h.h.ID() { continue } var pv PeerVersion if err := json.Unmarshal(m.Data, &pv); err != nil { log.Printf("[P2P] version gossip bad msg from %s: %v", m.ReceivedFrom, err) continue } // Source validation: the peer ID inside the message must match the // peer that sent it. Otherwise a node could spoof "version" rows // for peers it doesn't control, confusing the UX. if pv.PeerID != m.ReceivedFrom.String() { continue } pv.ReceivedAt = time.Now() va.mu.Lock() va.latest[m.ReceivedFrom] = pv va.mu.Unlock() } } func (va *versionAnnouncer) evictLoop(ctx context.Context) { t := time.NewTicker(versionGossipTTL / 3) defer t.Stop() for { select { case <-ctx.Done(): return case now := <-t.C: cutoff := now.Add(-versionGossipTTL) va.mu.Lock() for pid, v := range va.latest { if v.ReceivedAt.Before(cutoff) { delete(va.latest, pid) } } va.mu.Unlock() } } }