// Package p2p — chain sync protocol. // // Sync protocol "/dchain/sync/1.0.0": // // Request → {"from": N, "to": M} // Response → newline-delimited JSON blocks (index N … M), then EOF // // Height protocol "/dchain/height/1.0.0": // // Request → (empty) // Response → {"height": N} package p2p import ( "bufio" "context" "encoding/json" "fmt" "io" "log" "time" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "go-blockchain/blockchain" ) const ( SyncProtocol = "/dchain/sync/1.0.0" HeightProtocol = "/dchain/height/1.0.0" syncTimeout = 30 * time.Second ) type syncRequest struct { From uint64 `json:"from"` To uint64 `json:"to"` } type heightResponse struct { Height uint64 `json:"height"` } // SetSyncHandler registers the block-sync stream handler. // getBlock must be safe to call concurrently. func (n *Host) SetSyncHandler( getBlock func(index uint64) (*blockchain.Block, error), getHeight func() uint64, ) { n.h.SetStreamHandler(SyncProtocol, func(s network.Stream) { defer s.Close() if err := s.SetDeadline(time.Now().Add(syncTimeout)); err != nil { log.Printf("[SYNC] set deadline error: %v", err) return } var req syncRequest if err := json.NewDecoder(s).Decode(&req); err != nil { return } log.Printf("[SYNC] serving blocks %d–%d to %s", req.From, req.To, s.Conn().RemotePeer()) enc := json.NewEncoder(s) for i := req.From; i <= req.To; i++ { b, err := getBlock(i) if err != nil { break // peer asks for a block we don't have yet } if err := enc.Encode(b); err != nil { return } } }) n.h.SetStreamHandler(HeightProtocol, func(s network.Stream) { defer s.Close() if err := s.SetDeadline(time.Now().Add(5 * time.Second)); err != nil { log.Printf("[SYNC] set height deadline error: %v", err) return } resp := heightResponse{Height: getHeight()} if err := json.NewEncoder(s).Encode(resp); err != nil { log.Printf("[SYNC] encode height response error: %v", err) } }) } // QueryPeerHeight returns the chain height of a connected peer. func (n *Host) QueryPeerHeight(ctx context.Context, peerID peer.ID) (uint64, error) { s, err := n.h.NewStream(ctx, peerID, HeightProtocol) if err != nil { return 0, fmt.Errorf("open height stream to %s: %w", peerID, err) } defer s.Close() if err := s.SetDeadline(time.Now().Add(5 * time.Second)); err != nil { return 0, fmt.Errorf("set height deadline: %w", err) } var resp heightResponse if err := json.NewDecoder(s).Decode(&resp); err != nil { return 0, fmt.Errorf("decode height: %w", err) } return resp.Height, nil } // SyncBlocks fetches blocks [from, to] from a peer and returns them in order. func (n *Host) SyncBlocks(ctx context.Context, peerID peer.ID, from, to uint64) ([]*blockchain.Block, error) { s, err := n.h.NewStream(ctx, peerID, SyncProtocol) if err != nil { return nil, fmt.Errorf("open sync stream to %s: %w", peerID, err) } defer s.Close() if err := s.SetDeadline(time.Now().Add(syncTimeout)); err != nil { return nil, fmt.Errorf("set sync deadline: %w", err) } req := syncRequest{From: from, To: to} if err := json.NewEncoder(s).Encode(req); err != nil { return nil, fmt.Errorf("send sync req: %w", err) } if err := s.CloseWrite(); err != nil { return nil, fmt.Errorf("close sync write: %w", err) } var blocks []*blockchain.Block scanner := bufio.NewScanner(io.LimitReader(s, 100<<20)) // 100 MiB max scanner.Buffer(make([]byte, 1<<20), 1<<20) for scanner.Scan() { var b blockchain.Block if err := json.Unmarshal(scanner.Bytes(), &b); err != nil { return nil, fmt.Errorf("decode block: %w", err) } blocks = append(blocks, &b) } return blocks, scanner.Err() } // SyncFromPeerFull syncs all blocks that the peer has but we don't. // localCount = number of blocks we already have (0 if empty, N if we have blocks 0..N-1). // The peer reports its own block count; we fetch [localCount .. peerCount-1]. // Each block is passed to applyFn in ascending index order. // Returns the number of blocks synced. func (n *Host) SyncFromPeerFull(ctx context.Context, peerID peer.ID, localCount uint64, applyFn func(*blockchain.Block) error) (int, error) { peerCount, err := n.QueryPeerHeight(ctx, peerID) if err != nil { return 0, fmt.Errorf("query height: %w", err) } if peerCount <= localCount { return 0, nil // already up to date } from := localCount // first missing block index to := peerCount - 1 // last block index peer has log.Printf("[SYNC] syncing blocks %d–%d from peer %s", from, to, peerID) blocks, err := n.SyncBlocks(ctx, peerID, from, to) if err != nil { return 0, err } for _, b := range blocks { if err := applyFn(b); err != nil { return 0, fmt.Errorf("apply block #%d: %w", b.Index, err) } } return len(blocks), nil }