// Package p2p wraps go-libp2p with gossipsub and Kademlia DHT. // The host uses the node's Ed25519 identity so the peer ID is deterministic // across restarts. package p2p import ( "bufio" "context" "crypto/ed25519" "encoding/json" "fmt" "log" "time" libp2p "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" pubsub "github.com/libp2p/go-libp2p-pubsub" libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/discovery/mdns" "github.com/libp2p/go-libp2p/p2p/discovery/routing" discutil "github.com/libp2p/go-libp2p/p2p/discovery/util" "github.com/multiformats/go-multiaddr" "go-blockchain/blockchain" "go-blockchain/identity" ) const ( // Gossipsub topics (for non-consensus broadcast) TopicTx = "dchain/tx/v1" TopicBlocks = "dchain/blocks/v1" // committed block broadcast // Direct stream protocols (for reliable small-N validator consensus) ConsensusStreamProto = "/dchain/consensus/1.0.0" DiscoveryNS = "dchain-v1" mDNSServiceTag = "dchain-mdns" ) // Host is a libp2p host with gossipsub topics and peer discovery. // Consensus messages use per-peer persistent streams — this guarantees // in-order delivery, which is critical for PBFT (PRE-PREPARE must arrive // before PREPARE from the same sender). type Host struct { h host.Host dhtNode *dht.IpfsDHT ps *pubsub.PubSub // exposed for relay and other topic consumers // Gossipsub for block and tx propagation txTopic *pubsub.Topic blocksTopic *pubsub.Topic txSub *pubsub.Subscription blocksSub *pubsub.Subscription // connHandlers is called when a new peer connects. connHandlers []func(peer.ID) // versionAnnouncer is the peer-version gossip subsystem, set by // StartVersionGossip. nil until that's called (e.g. during tests). versionAnnouncer *versionAnnouncer } // NewHost creates a libp2p host. // The Ed25519 identity key is used so the peer ID is stable across restarts. // // announceAddrs, if non-nil, replaces the addresses advertised to peers. // Use this when the node runs on a server with a public IP that differs from // the listen interface (VPS, Docker, NAT), e.g.: // // []multiaddr.Multiaddr{multiaddr.StringCast("/ip4/1.2.3.4/tcp/4001")} // // Without announceAddrs the host tries UPnP/NAT-PMP (libp2p.NATPortMap). // On a direct-IP VPS or in Docker with a fixed backbone IP, pass the address // explicitly — otherwise peers will receive unreachable internal addresses. func NewHost(ctx context.Context, id *identity.Identity, listenAddr string, announceAddrs []multiaddr.Multiaddr) (*Host, error) { ma, err := multiaddr.NewMultiaddr(listenAddr) if err != nil { return nil, fmt.Errorf("bad listen addr: %w", err) } // Convert stdlib Ed25519 key → libp2p crypto.PrivKey privStd := ed25519.PrivateKey(id.PrivKey) lk, _, err := libp2pcrypto.KeyPairFromStdKey(&privStd) if err != nil { return nil, fmt.Errorf("convert identity key: %w", err) } opts := []libp2p.Option{ libp2p.ListenAddrs(ma), libp2p.Identity(lk), libp2p.NATPortMap(), } // Override advertised addresses when explicit announce addrs are provided. // Required for internet deployment: without this libp2p advertises the // bind interface (0.0.0.0 → internal/loopback) which remote peers cannot reach. if len(announceAddrs) > 0 { announce := announceAddrs opts = append(opts, libp2p.AddrsFactory(func(_ []multiaddr.Multiaddr) []multiaddr.Multiaddr { return announce })) } h, err := libp2p.New(opts...) if err != nil { return nil, fmt.Errorf("create libp2p host: %w", err) } // Kademlia DHT for peer discovery. // dht.BootstrapPeers() with no args disables the default public IPFS nodes: // this is a private chain, we don't want to gossip with the global IPFS // network. Peer discovery happens via our own --peers bootstrap nodes. kadDHT, err := dht.New(ctx, h, dht.Mode(dht.ModeAutoServer), dht.BootstrapPeers(), // empty — private network only ) if err != nil { h.Close() return nil, fmt.Errorf("create dht: %w", err) } if err := kadDHT.Bootstrap(ctx); err != nil { h.Close() return nil, fmt.Errorf("dht bootstrap: %w", err) } // GossipSub — only for blocks and transactions (not consensus) ps, err := pubsub.NewGossipSub(ctx, h) if err != nil { h.Close() return nil, fmt.Errorf("create gossipsub: %w", err) } txTopic, err := ps.Join(TopicTx) if err != nil { return nil, err } blocksTopic, err := ps.Join(TopicBlocks) if err != nil { return nil, err } txSub, err := txTopic.Subscribe() if err != nil { return nil, err } blocksSub, err := blocksTopic.Subscribe() if err != nil { return nil, err } node := &Host{ h: h, dhtNode: kadDHT, ps: ps, txTopic: txTopic, blocksTopic: blocksTopic, txSub: txSub, blocksSub: blocksSub, } // mDNS — automatic discovery on the same LAN / Docker bridge network mdnsSvc := mdns.NewMdnsService(h, mDNSServiceTag, &mdnsNotifee{node: node}) if err := mdnsSvc.Start(); err != nil { log.Printf("[P2P] mDNS start error (non-fatal): %v", err) } // Notify connHandlers when a new peer connects h.Network().Notify(&network.NotifyBundle{ ConnectedF: func(_ network.Network, c network.Conn) { go func() { for _, fn := range node.connHandlers { fn(c.RemotePeer()) } }() }, }) log.Printf("[P2P] node started id=%s", h.ID()) for _, addr := range h.Addrs() { log.Printf("[P2P] %s/p2p/%s", addr, h.ID()) } return node, nil } // PeerID returns this node's libp2p peer ID string. func (n *Host) PeerID() string { return n.h.ID().String() } // OnPeerConnected registers a callback called when a new peer connects. func (n *Host) OnPeerConnected(fn func(peer.ID)) { n.connHandlers = append(n.connHandlers, fn) } // Advertise announces this node under DiscoveryNS in the DHT. func (n *Host) Advertise(ctx context.Context) { rd := routing.NewRoutingDiscovery(n.dhtNode) discutil.Advertise(ctx, rd, DiscoveryNS) } // DiscoverPeers continuously searches the DHT for new peers. // Runs a persistent loop: after each FindPeers round it waits 60 s and // tries again, so the node reconnects after network partitions or restarts. func (n *Host) DiscoverPeers(ctx context.Context) { rd := routing.NewRoutingDiscovery(n.dhtNode) go func() { for { select { case <-ctx.Done(): return default: } ch, err := rd.FindPeers(ctx, DiscoveryNS) if err != nil { select { case <-ctx.Done(): return case <-time.After(30 * time.Second): } continue } for p := range ch { if p.ID == n.h.ID() { continue } if n.h.Network().Connectedness(p.ID) == 0 { if err := n.h.Connect(ctx, p); err == nil { log.Printf("[P2P] DHT discovered %s", p.ID) } } } // Wait before the next discovery round. select { case <-ctx.Done(): return case <-time.After(60 * time.Second): } } }() } // Connect dials a peer by full multiaddr (must include /p2p/). func (n *Host) Connect(ctx context.Context, addrStr string) error { ma, err := multiaddr.NewMultiaddr(addrStr) if err != nil { return err } pi, err := peer.AddrInfoFromP2pAddr(ma) if err != nil { return err } return n.h.Connect(ctx, *pi) } // SetConsensusMsgHandler registers the direct-stream handler for consensus messages. // Messages from each connected peer are decoded and passed to handler. func (n *Host) SetConsensusMsgHandler(handler func(*blockchain.ConsensusMsg)) { n.h.SetStreamHandler(ConsensusStreamProto, func(s network.Stream) { defer s.Close() if err := s.SetDeadline(time.Now().Add(10 * time.Second)); err != nil { log.Printf("[P2P] consensus stream deadline error: %v", err) } scanner := bufio.NewScanner(s) scanner.Buffer(make([]byte, 1<<20), 1<<20) for scanner.Scan() { var msg blockchain.ConsensusMsg if err := json.Unmarshal(scanner.Bytes(), &msg); err != nil { log.Printf("[P2P] bad consensus msg: %v", err) continue } handler(&msg) } }) } // BroadcastConsensus sends a ConsensusMsg directly to all connected peers. // Uses dedicated streams — reliable for small validator sets. func (n *Host) BroadcastConsensus(msg *blockchain.ConsensusMsg) error { data, err := json.Marshal(msg) if err != nil { return err } data = append(data, '\n') peers := n.h.Network().Peers() for _, pid := range peers { pid := pid go func() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() s, err := n.h.NewStream(ctx, pid, ConsensusStreamProto) if err != nil { return // peer may not support this protocol yet } defer s.Close() if err := s.SetDeadline(time.Now().Add(5 * time.Second)); err != nil { log.Printf("[P2P] consensus write deadline to %s: %v", pid, err) return } if _, err := s.Write(data); err != nil { log.Printf("[P2P] consensus write to %s: %v", pid, err) } }() } return nil } // PublishTx broadcasts a Transaction. func (n *Host) PublishTx(tx *blockchain.Transaction) error { data, err := json.Marshal(tx) if err != nil { return err } return n.txTopic.Publish(context.Background(), data) } // PublishBlock broadcasts a committed block so peers can sync. func (n *Host) PublishBlock(b *blockchain.Block) error { data, err := json.Marshal(b) if err != nil { return err } return n.blocksTopic.Publish(context.Background(), data) } // TxMsgs returns a channel of incoming Transactions from peers. func (n *Host) TxMsgs(ctx context.Context) <-chan *blockchain.Transaction { ch := make(chan *blockchain.Transaction, 64) go func() { defer close(ch) for { m, err := n.txSub.Next(ctx) if err != nil { return } if m.ReceivedFrom == n.h.ID() { continue } var tx blockchain.Transaction if err := json.Unmarshal(m.Data, &tx); err != nil { continue } select { case ch <- &tx: case <-ctx.Done(): return } } }() return ch } // BlockMsg is a gossip-received block along with the peer that forwarded it // to us. Used by the main node loop so gap-fill can ask the gossiper for the // missing blocks between tip and the received one. type BlockMsg struct { Block *blockchain.Block From peer.ID } // BlockMsgs returns a channel of committed blocks broadcast by peers. // The channel item includes the forwarding peer ID so callers can drive // gap-fill sync from whichever peer just proved it has the new tip. func (n *Host) BlockMsgs(ctx context.Context) <-chan BlockMsg { ch := make(chan BlockMsg, 64) go func() { defer close(ch) for { m, err := n.blocksSub.Next(ctx) if err != nil { return } if m.ReceivedFrom == n.h.ID() { continue } var b blockchain.Block if err := json.Unmarshal(m.Data, &b); err != nil { continue } select { case ch <- BlockMsg{Block: &b, From: m.ReceivedFrom}: case <-ctx.Done(): return } } }() return ch } // PeerCount returns number of connected peers. func (n *Host) PeerCount() int { return len(n.h.Network().Peers()) } // Peers returns all connected peer IDs. func (n *Host) Peers() []peer.ID { return n.h.Network().Peers() } // LibP2PHost exposes the underlying host for the sync protocol. func (n *Host) LibP2PHost() host.Host { return n.h } // GossipSub returns the underlying PubSub instance so callers can join // additional topics (e.g. the relay envelope topic). func (n *Host) GossipSub() *pubsub.PubSub { return n.ps } // AddrStrings returns all full multiaddrs for this host. func (n *Host) AddrStrings() []string { var out []string for _, a := range n.h.Addrs() { out = append(out, fmt.Sprintf("%s/p2p/%s", a, n.h.ID())) } return out } // ConnectedPeerInfo describes one currently-connected remote peer. // Used by the /api/peers endpoint so new joiners can download a live seed // list from any existing node and bootstrap their libp2p connectivity. type ConnectedPeerInfo struct { ID string `json:"id"` Addrs []string `json:"addrs"` } // ConnectedPeers returns every peer in the network's current view with their // full libp2p multiaddrs (suffixed with /p2p/). Addresses come from the // peerstore, which includes both dialed and received connections. // // Safe to call concurrently while the host is running; does not hold any // lock beyond libp2p's internal peerstore lock. func (n *Host) ConnectedPeers() []ConnectedPeerInfo { peers := n.h.Network().Peers() out := make([]ConnectedPeerInfo, 0, len(peers)) for _, pid := range peers { addrs := n.h.Peerstore().Addrs(pid) addrStrs := make([]string, 0, len(addrs)) for _, a := range addrs { addrStrs = append(addrStrs, fmt.Sprintf("%s/p2p/%s", a, pid)) } out = append(out, ConnectedPeerInfo{ ID: pid.String(), Addrs: addrStrs, }) } return out } // Close shuts down the host. func (n *Host) Close() error { return n.h.Close() } // --- mDNS notifee --- type mdnsNotifee struct{ node *Host } func (m *mdnsNotifee) HandlePeerFound(pi peer.AddrInfo) { if pi.ID == m.node.h.ID() { return } log.Printf("[P2P] mDNS found peer %s — connecting", pi.ID) if err := m.node.h.Connect(context.Background(), pi); err != nil { log.Printf("[P2P] mDNS connect to %s failed: %v", pi.ID, err) } }