Files
dchain/cmd/node/main.go
vsecoder 7e7393e4f8 chore: initial commit for v0.0.1
DChain single-node blockchain + React Native messenger client.

Core:
- PBFT consensus with multi-sig validator admission + equivocation slashing
- BadgerDB + schema migration scaffold (CurrentSchemaVersion=0)
- libp2p gossipsub (tx/v1, blocks/v1, relay/v1, version/v1)
- Native Go contracts (username_registry) alongside WASM (wazero)
- WebSocket gateway with topic-based fanout + Ed25519-nonce auth
- Relay mailbox with NaCl envelope encryption (X25519 + Ed25519)
- Prometheus /metrics, per-IP rate limit, body-size cap

Deployment:
- Single-node compose (deploy/single/) with Caddy TLS + optional Prometheus
- 3-node dev compose (docker-compose.yml) with mocked internet topology
- 3-validator prod compose (deploy/prod/) for federation
- Auto-update from Gitea via /api/update-check + systemd timer
- Build-time version injection (ldflags → node --version)
- UI / Swagger toggle flags (DCHAIN_DISABLE_UI, DCHAIN_DISABLE_SWAGGER)

Client (client-app/):
- Expo / React Native / NativeWind
- E2E NaCl encryption, typing indicator, contact requests
- Auto-discovery of canonical contracts, chain_id aware, WS reconnect on node switch

Documentation:
- README.md, CHANGELOG.md, CONTEXT.md
- deploy/single/README.md with 6 operator scenarios
- deploy/UPDATE_STRATEGY.md with 4-layer forward-compat design
- docs/contracts/*.md per contract
2026-04-17 14:16:44 +03:00

1579 lines
55 KiB
Go

// cmd/node — full validator node with stats HTTP API.
//
// Flags:
//
// --db BadgerDB directory (default: ./chaindata)
// --listen libp2p listen multiaddr (default: /ip4/0.0.0.0/tcp/4001)
// --announce comma-separated multiaddrs to advertise to peers
// Required for internet deployment (VPS, Docker with fixed IP).
// Example: /ip4/1.2.3.4/tcp/4001
// Without this flag libp2p tries UPnP/NAT-PMP auto-detection.
// --peers comma-separated bootstrap peer multiaddrs
// --validators comma-separated validator pub keys
// --genesis create genesis block on first start
// --key path to identity JSON file (default: ./node.json)
// --stats-addr HTTP stats server address (default: :8080)
// --wallet path to payout wallet JSON (optional)
// --wallet-pass passphrase for wallet file (optional)
// --heartbeat enable periodic heartbeats (default: true)
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
libp2ppeer "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"go-blockchain/blockchain"
"go-blockchain/consensus"
"go-blockchain/economy"
"go-blockchain/identity"
"go-blockchain/node"
"go-blockchain/node/version"
"go-blockchain/p2p"
"go-blockchain/relay"
"go-blockchain/vm"
"go-blockchain/wallet"
)
const heartbeatFeeUT = 50_000 // 0.05 T
// expectedGenesisHash is populated from the --join seed's /api/network-info
// response. After sync completes we verify our local block 0 hashes to the
// same value; mismatch aborts the node (unless --allow-genesis-mismatch).
// Empty string means no --join: skip verification.
var expectedGenesisHash string
func main() {
// Every flag below also reads a DCHAIN_* env var fallback (see envOr /
// envBoolOr / envUint64Or). Flag on CLI wins; env on container deploys;
// hard-coded default as last resort. Lets docker-compose drive the node
// from an env_file without bake-in CLI strings.
dbPath := flag.String("db", envOr("DCHAIN_DB", "./chaindata"), "BadgerDB directory (env: DCHAIN_DB)")
listenAddr := flag.String("listen", envOr("DCHAIN_LISTEN", "/ip4/0.0.0.0/tcp/4001"), "libp2p listen multiaddr (env: DCHAIN_LISTEN)")
announceFlag := flag.String("announce", envOr("DCHAIN_ANNOUNCE", ""), "comma-separated multiaddrs to advertise to peers (env: DCHAIN_ANNOUNCE)")
peersFlag := flag.String("peers", envOr("DCHAIN_PEERS", ""), "comma-separated bootstrap peer multiaddrs (env: DCHAIN_PEERS)")
validators := flag.String("validators", envOr("DCHAIN_VALIDATORS", ""), "comma-separated validator pub keys (env: DCHAIN_VALIDATORS)")
genesisFlag := flag.Bool("genesis", envBoolOr("DCHAIN_GENESIS", false), "create genesis block (env: DCHAIN_GENESIS)")
keyFile := flag.String("key", envOr("DCHAIN_KEY", "./node.json"), "path to identity JSON file (env: DCHAIN_KEY)")
relayKeyFile := flag.String("relay-key", envOr("DCHAIN_RELAY_KEY", "./relay.json"), "path to relay X25519 keypair JSON (env: DCHAIN_RELAY_KEY)")
statsAddr := flag.String("stats-addr", envOr("DCHAIN_STATS_ADDR", ":8080"), "HTTP stats server address (env: DCHAIN_STATS_ADDR)")
walletFile := flag.String("wallet", envOr("DCHAIN_WALLET", ""), "payout wallet JSON (env: DCHAIN_WALLET)")
walletPass := flag.String("wallet-pass", envOr("DCHAIN_WALLET_PASS", ""), "passphrase for wallet file (env: DCHAIN_WALLET_PASS)")
heartbeatEnabled := flag.Bool("heartbeat", envBoolOr("DCHAIN_HEARTBEAT", true), "enable periodic heartbeat transactions (env: DCHAIN_HEARTBEAT)")
registerRelay := flag.Bool("register-relay", envBoolOr("DCHAIN_REGISTER_RELAY", false), "submit REGISTER_RELAY tx on startup (env: DCHAIN_REGISTER_RELAY)")
relayFee := flag.Uint64("relay-fee", envUint64Or("DCHAIN_RELAY_FEE", 1_000), "relay fee per message in µT (env: DCHAIN_RELAY_FEE)")
mailboxDB := flag.String("mailbox-db", envOr("DCHAIN_MAILBOX_DB", "./mailboxdata"), "BadgerDB directory for relay mailbox (env: DCHAIN_MAILBOX_DB)")
govContractID := flag.String("governance-contract", envOr("DCHAIN_GOVERNANCE_CONTRACT", ""), "governance contract ID for dynamic chain parameters (env: DCHAIN_GOVERNANCE_CONTRACT)")
joinSeedURL := flag.String("join", envOr("DCHAIN_JOIN", ""), "bootstrap from a running node: comma-separated HTTP URLs (env: DCHAIN_JOIN)")
// Observer mode: the node participates in the P2P network, applies
// gossiped blocks, serves HTTP/WS, and forwards submitted txs — but
// never proposes blocks or votes in PBFT. Use this for read-heavy
// deployments: put N observers behind a load balancer so clients can
// hammer them without hitting validators directly. Observers also
// never need to be in the validator set.
observerMode := flag.Bool("observer", envBoolOr("DCHAIN_OBSERVER", false), "observer-only mode: apply blocks + serve HTTP but don't propose or vote (env: DCHAIN_OBSERVER)")
// Log format: `text` (default, human-readable) or `json` (production,
// machine-parsable for Loki/ELK). Applies to both the global slog
// handler and Go's std log package — existing log.Printf calls are
// rerouted through slog so old and new log sites share a format.
logFormat := flag.String("log-format", envOr("DCHAIN_LOG_FORMAT", "text"), "log format: `text` or `json` (env: DCHAIN_LOG_FORMAT)")
// Access control for the HTTP/WS API. Empty token = fully public node
// (default). Non-empty token gates /api/tx (and WS submit_tx) behind
// Authorization: Bearer <token>. Add --api-private to also gate every
// read endpoint — useful for personal nodes where the operator
// considers chat metadata private. See node/api_guards.go.
apiToken := flag.String("api-token", envOr("DCHAIN_API_TOKEN", ""), "Bearer token required to submit transactions (env: DCHAIN_API_TOKEN). Empty = public node")
apiPrivate := flag.Bool("api-private", envBoolOr("DCHAIN_API_PRIVATE", false), "also require the access token on READ endpoints (env: DCHAIN_API_PRIVATE)")
updateSrcURL := flag.String("update-source-url", envOr("DCHAIN_UPDATE_SOURCE_URL", ""), "Gitea /api/v1/repos/{owner}/{repo}/releases/latest URL for /api/update-check (env: DCHAIN_UPDATE_SOURCE_URL)")
updateSrcToken := flag.String("update-source-token", envOr("DCHAIN_UPDATE_SOURCE_TOKEN", ""), "optional Gitea PAT used when polling a private repo (env: DCHAIN_UPDATE_SOURCE_TOKEN)")
disableUI := flag.Bool("disable-ui", envBoolOr("DCHAIN_DISABLE_UI", false), "do not register HTML block-explorer pages — JSON API and Swagger still work (env: DCHAIN_DISABLE_UI)")
disableSwagger := flag.Bool("disable-swagger", envBoolOr("DCHAIN_DISABLE_SWAGGER", false), "do not register /swagger* endpoints (env: DCHAIN_DISABLE_SWAGGER)")
// When --join is set, the seed's genesis_hash becomes the expected value
// for our own block 0. After sync completes we compare; on mismatch we
// refuse to start (fail loud) — otherwise a malicious or misconfigured
// seed could silently trick us onto a parallel chain. Use --allow-genesis-mismatch
// only for intentional migrations (e.g. importing data from another chain
// into this network) — very dangerous.
allowGenesisMismatch := flag.Bool("allow-genesis-mismatch", false, "skip the safety check that aborts when the local genesis hash differs from the seed's. Use only for explicit chain migration.")
showVersion := flag.Bool("version", false, "print version info and exit")
flag.Parse()
if *showVersion {
// Print one-line version summary then exit. Used by update.sh smoke
// test and operators running `node --version`.
fmt.Println(version.String())
return
}
// Configure structured logging. Must run before any log.Printf call
// so subsequent logs inherit the format.
setupLogging(*logFormat)
// Wire API access-control. A non-empty token gates writes; adding
// --api-private also gates reads. Logged up-front so the operator
// sees what mode they're in.
node.SetAPIAccess(*apiToken, *apiPrivate)
node.SetUpdateSource(*updateSrcURL, *updateSrcToken)
switch {
case *apiToken == "":
log.Printf("[NODE] API access: public (no token set)")
case *apiPrivate:
log.Printf("[NODE] API access: fully private (token required on all endpoints)")
default:
log.Printf("[NODE] API access: public reads, token-gated writes")
}
// --- Zero-config onboarding: --join fetches a seed node's network info
// and fills in --peers / --validators from it, so operators adding a new
// node to an existing network don't have to hunt down multiaddrs or
// validator pubkeys by hand.
//
// Multi-seed: --join accepts a comma-separated list. We try each URL in
// order until one succeeds. This way a dead seed doesn't orphan a new
// node — just list a couple backups.
//
// Persistence: on first successful --join we write <db>/seeds.json
// with the tried URL list + live peer multiaddrs. On subsequent
// restarts (no --join passed) we read this file and retry seeds
// automatically, so the operator doesn't have to repeat the CLI flag
// on every container restart.
seedFile := filepath.Join(*dbPath, "seeds.json")
seedURLs := parseSeedList(*joinSeedURL)
if len(seedURLs) == 0 {
// Fall back to persisted list from last run, if any.
if persisted, err := loadSeedsFile(seedFile); err == nil && len(persisted.URLs) > 0 {
seedURLs = persisted.URLs
log.Printf("[NODE] loaded %d seed URL(s) from %s", len(seedURLs), seedFile)
}
}
if len(seedURLs) > 0 {
info, usedURL := tryJoinSeeds(seedURLs)
if info == nil {
log.Fatalf("[NODE] --join: all %d seeds failed. Check URLs and network.", len(seedURLs))
}
log.Printf("[NODE] --join: bootstrapped from %s", usedURL)
if *peersFlag == "" && len(info.Peers) > 0 {
var addrs []string
for _, p := range info.Peers {
for _, a := range p.Addrs {
if a != "" {
addrs = append(addrs, a)
break
}
}
}
*peersFlag = strings.Join(addrs, ",")
log.Printf("[NODE] --join: learned %d peer multiaddrs from seed", len(addrs))
}
if *validators == "" && len(info.Validators) > 0 {
*validators = strings.Join(info.Validators, ",")
log.Printf("[NODE] --join: learned %d validators from seed", len(info.Validators))
}
if *govContractID == "" {
if gov, ok := info.Contracts["governance"]; ok && gov.ContractID != "" {
*govContractID = gov.ContractID
log.Printf("[NODE] --join: governance contract = %s", gov.ContractID)
}
}
if info.ChainID != "" {
log.Printf("[NODE] --join: connecting to chain %s (tip=%d)", info.ChainID, info.TipHeight)
}
if info.GenesisHash != "" {
expectedGenesisHash = info.GenesisHash
log.Printf("[NODE] --join: expected genesis hash = %s", info.GenesisHash)
}
// Persist the successful URL first, then others, so retries pick
// the known-good one first. Include the current peer list so the
// next cold boot can bypass the HTTP layer entirely if any peer
// is still up.
var orderedURLs []string
orderedURLs = append(orderedURLs, usedURL)
for _, u := range seedURLs {
if u != usedURL {
orderedURLs = append(orderedURLs, u)
}
}
if err := saveSeedsFile(seedFile, &persistedSeeds{
ChainID: info.ChainID,
GenesisHash: info.GenesisHash,
URLs: orderedURLs,
Peers: *peersFlag,
SavedAt: time.Now().UTC().Format(time.RFC3339),
}); err != nil {
log.Printf("[NODE] warn: could not persist seeds to %s: %v", seedFile, err)
}
}
// --- Identity ---
id := loadOrCreateIdentity(*keyFile)
log.Printf("[NODE] pub_key: %s", id.PubKeyHex())
log.Printf("[NODE] address: %s", wallet.PubKeyToAddress(id.PubKeyHex()))
// --- Optional payout wallet ---
var payoutWallet *wallet.Wallet
if *walletFile != "" {
w, err := wallet.Load(*walletFile, *walletPass)
if err != nil {
log.Fatalf("[NODE] load wallet: %v", err)
}
payoutWallet = w
log.Printf("[NODE] payout wallet: %s", w.Short())
}
// --- Validator set ---
var valSet []string
if *validators != "" {
for _, v := range strings.Split(*validators, ",") {
if v = strings.TrimSpace(v); v != "" {
valSet = append(valSet, v)
}
}
}
if len(valSet) == 0 {
valSet = []string{id.PubKeyHex()}
}
log.Printf("[NODE] validator set (%d): %v", len(valSet), shortKeys(valSet))
// --- Chain ---
chain, err := blockchain.NewChain(*dbPath)
if err != nil {
log.Fatalf("[NODE] open chain: %v", err)
}
defer chain.Close()
log.Printf("[NODE] chain height: %d tip: %s", tipIndex(chain), tipHash(chain))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// One-time catch-up GC: reclaims any garbage that accumulated before
// this version (which introduced the background GC loop). Nodes with
// clean DBs finish in milliseconds; nodes upgraded from pre-GC builds
// may take a minute or two to shrink their value log from multi-GB
// back to actual live size. Safe to skip — the background loop will
// eventually reclaim the same space — but doing it once up front
// makes the upgrade visibly free disk.
go func() {
log.Printf("[NODE] running startup value-log compaction…")
chain.CompactNow()
log.Printf("[NODE] startup compaction done")
}()
// Background value-log garbage collector for the chain DB.
// Without this, every overwrite of a hot key (`height`, `netstats`) leaves
// the previous bytes pinned in BadgerDB's value log forever — the DB ends
// up multiple gigabytes on disk even when live state is megabytes. Runs
// every 5 minutes; stops automatically when ctx is cancelled.
chain.StartValueLogGC(ctx)
// Genesis-hash sanity check for --join'd nodes. Run in background so
// startup isn't blocked while waiting for sync to complete on first
// launch, but fatal-error the process if the local genesis eventually
// diverges from what the seed reported.
if expectedGenesisHash != "" {
go startGenesisVerifier(ctx, chain, expectedGenesisHash, *allowGenesisMismatch)
}
// --- WASM VM ---
contractVM := vm.NewVM(ctx)
defer contractVM.Close(ctx)
chain.SetVM(contractVM)
// --- Native system contracts ---
// Register built-in Go contracts so they're callable via CALL_CONTRACT
// with the same on-chain semantics as WASM contracts, but without the
// WASM VM in the hot path. The username registry lives here because
// it's the most latency-sensitive service (every chat/contact-add
// does a resolve/reverseResolve).
chain.RegisterNative(blockchain.NewUsernameRegistry())
log.Printf("[NODE] registered native contract: %s", blockchain.UsernameRegistryID)
// --- Governance contract (optional) ---
if *govContractID != "" {
chain.SetGovernanceContract(*govContractID)
}
// --- SSE event hub ---
sseHub := node.NewSSEHub()
// --- WebSocket gateway ---
// Same event sources as SSE but pushed over a persistent ws:// connection
// so mobile clients don't have to poll. Hello frame carries chain_id +
// tip_height so the client can verify it's on the right chain.
wsHub := node.NewWSHub(
func() string {
if g, err := chain.GetBlock(0); err == nil && g != nil {
return "dchain-" + g.HashHex()[:12]
}
return "dchain-unknown"
},
chain.TipIndex,
)
// Event bus: one emit site per event; consumers (SSE, WS, future
// indexers) are registered once below. Eliminates the duplicated
// `go sseHub.EmitX / go wsHub.EmitX` pattern at every commit callsite.
eventBus := node.NewEventBus()
eventBus.Register(node.WrapSSE(sseHub))
eventBus.Register(node.WrapWS(wsHub))
// submit_tx handler is wired after `engine` and `h` are constructed —
// see further down where `wsHub.SetSubmitTxHandler(...)` is called.
// Auth binding: Ed25519 pubkey → X25519 pubkey from the identity registry.
// Lets the hub enforce `inbox:<x25519>` subscriptions so a client can
// only listen to their OWN inbox, not anyone else's.
wsHub.SetX25519ForPub(func(edHex string) (string, error) {
info, err := chain.IdentityInfo(edHex)
if err != nil || info == nil {
return "", err
}
return info.X25519Pub, nil
})
// Live peer-count gauge is registered below (after `h` is constructed).
// --- Stats tracker ---
stats := node.NewTracker()
// --- Announce addresses (internet deployment) ---
// Parse --announce flag into []multiaddr.Multiaddr.
// When set, only these addresses are advertised to peers — libp2p's
// auto-detected addresses (internal interfaces, loopback) are suppressed.
// This is essential when running on a VPS or inside Docker with a fixed IP:
// --announce /ip4/203.0.113.10/tcp/4001
var announceAddrs []multiaddr.Multiaddr
if *announceFlag != "" {
for _, s := range strings.Split(*announceFlag, ",") {
s = strings.TrimSpace(s)
if s == "" {
continue
}
ma, err := multiaddr.NewMultiaddr(s)
if err != nil {
log.Fatalf("[NODE] bad --announce addr %q: %v", s, err)
}
announceAddrs = append(announceAddrs, ma)
log.Printf("[NODE] announce addr: %s", s)
}
}
// --- P2P ---
h, err := p2p.NewHost(ctx, id, *listenAddr, announceAddrs)
if err != nil {
log.Fatalf("[NODE] p2p: %v", err)
}
defer h.Close()
// Start peer-version gossip. Best-effort: errors here are logged but do
// not block node startup — the version map is purely advisory.
if err := h.StartVersionGossip(ctx, node.ProtocolVersion); err != nil {
log.Printf("[NODE] peer-version gossip unavailable: %v", err)
}
// Wire peer connect/disconnect into stats
// Live peer-count gauge for Prometheus — queried at scrape time.
// Registered here (not earlier) because we need `h` to exist.
node.NewGaugeFunc("dchain_peer_count_live",
"Live libp2p peer count (queried on scrape)",
func() int64 { return int64(h.PeerCount()) })
// engine captured below after creation; use a pointer-to-pointer pattern
var engineRef *consensus.Engine
h.OnPeerConnected(func(pid libp2ppeer.ID) {
stats.PeerConnected(pid)
eng := engineRef // capture current value
go syncOnConnect(ctx, h, chain, stats, pid, func(next uint64) {
if eng != nil {
eng.SyncSeqNum(next)
}
})
})
// --- Genesis ---
if *genesisFlag && chain.Tip() == nil {
genesis := blockchain.GenesisBlock(id.PubKeyHex(), id.PrivKey)
if err := chain.AddBlock(genesis); err != nil {
log.Fatalf("[NODE] add genesis: %v", err)
}
log.Printf("[NODE] genesis block: %s", genesis.HashHex())
}
// Seed the on-chain validator set from CLI flags (idempotent).
if err := chain.InitValidators(valSet); err != nil {
log.Fatalf("[NODE] init validators: %v", err)
}
// Prefer the on-chain set if it has been extended by ADD_VALIDATOR txs.
if onChain, err := chain.ValidatorSet(); err == nil && len(onChain) > 0 {
valSet = onChain
log.Printf("[NODE] loaded %d validators from chain state", len(valSet))
}
// --- Consensus engine ---
var seqNum uint64
if tip := chain.Tip(); tip != nil {
seqNum = tip.Index + 1
}
engine := consensus.NewEngine(
id, valSet, seqNum,
func(b *blockchain.Block) {
// Time the AddBlock call end-to-end so we can see slow commits
// (typically contract calls) in the Prometheus histogram.
commitStart := time.Now()
if err := chain.AddBlock(b); err != nil {
log.Printf("[NODE] add block #%d: %v", b.Index, err)
return
}
node.MetricBlockCommitSeconds.Observe(time.Since(commitStart).Seconds())
node.MetricBlocksTotal.Inc()
if len(b.Transactions) > 0 {
node.MetricTxsTotal.Add(uint64(len(b.Transactions)))
}
// Remove committed transactions from our mempool so we don't
// re-propose them in the next round (non-proposing validators
// keep txs in their pending list until explicitly pruned).
if engineRef != nil {
engineRef.PruneTxs(b.Transactions)
}
stats.BlocksCommitted.Add(1)
// Push live events to every registered bus consumer (SSE,
// WS, future indexers). Single emit per event type.
go eventBus.EmitBlockWithTxs(b)
go emitContractLogsViaBus(eventBus, chain, b)
r := economy.ComputeBlockReward(b)
log.Printf("[NODE] %s", r.Summary())
// Show balance of whoever receives the reward
rewardKey := b.Validator
if binding, _ := chain.WalletBinding(b.Validator); binding != "" {
rewardKey = binding
}
bal, _ := chain.Balance(rewardKey)
log.Printf("[NODE] reward target %s balance: %s",
wallet.PubKeyToAddress(rewardKey), economy.FormatTokens(bal))
// Hot-reload validator set if this block changed it.
for _, tx := range b.Transactions {
if tx.Type == blockchain.EventAddValidator || tx.Type == blockchain.EventRemoveValidator {
if newSet, err := chain.ValidatorSet(); err == nil {
if eng := engineRef; eng != nil {
eng.UpdateValidators(newSet)
}
}
break
}
}
// Gossip committed block to peers
if err := h.PublishBlock(b); err != nil {
log.Printf("[NODE] publish block: %v", err)
}
stats.BlocksGossipSent.Add(1)
},
func(msg *blockchain.ConsensusMsg) {
if err := h.BroadcastConsensus(msg); err != nil {
log.Printf("[NODE] broadcast consensus: %v", err)
}
stats.ConsensusMsgsSent.Add(1)
},
)
// Assign engine to the ref captured by peer-connect handler
engineRef = engine
// Wire the WS submit_tx handler now that `engine` and `h` exist.
// Mirrors the HTTP POST /api/tx path so clients get the same
// validation regardless of transport.
wsHub.SetSubmitTxHandler(func(txJSON []byte) (string, error) {
var tx blockchain.Transaction
if err := json.Unmarshal(txJSON, &tx); err != nil {
return "", fmt.Errorf("invalid tx JSON: %w", err)
}
if err := node.ValidateTxTimestamp(&tx); err != nil {
return "", fmt.Errorf("bad timestamp: %w", err)
}
if err := identity.VerifyTx(&tx); err != nil {
return "", fmt.Errorf("invalid signature: %w", err)
}
if err := engine.AddTransaction(&tx); err != nil {
return "", err
}
if err := h.PublishTx(&tx); err != nil {
// Mempool accepted — gossip failure is non-fatal for the caller.
log.Printf("[NODE] ws submit_tx: gossip publish failed (mempool has it): %v", err)
}
return tx.ID, nil
})
// Wrap consensus stats
engine.OnPropose(func() { stats.BlocksProposed.Add(1) })
engine.OnVote(func() { stats.VotesCast.Add(1) })
engine.OnViewChange(func() { stats.ViewChanges.Add(1) })
// Register direct-stream consensus handler
h.SetConsensusMsgHandler(func(msg *blockchain.ConsensusMsg) {
stats.ConsensusMsgsRecv.Add(1)
engine.HandleMessage(msg)
})
// --- Sync protocol handler ---
h.SetSyncHandler(
func(index uint64) (*blockchain.Block, error) { return chain.GetBlock(index) },
func() uint64 {
if tip := chain.Tip(); tip != nil {
return tip.Index + 1
}
return 0
},
)
// --- Bootstrap peers ---
if *peersFlag != "" {
for _, addr := range strings.Split(*peersFlag, ",") {
if addr = strings.TrimSpace(addr); addr != "" {
go connectWithRetry(ctx, h, addr)
}
}
}
h.Advertise(ctx)
h.DiscoverPeers(ctx)
// --- Incoming gossip loops ---
txMsgs := h.TxMsgs(ctx)
blockMsgs := h.BlockMsgs(ctx)
go func() {
for tx := range txMsgs {
stats.TxsGossipRecv.Add(1)
if err := engine.AddTransaction(tx); err != nil {
log.Printf("[NODE] rejected gossip tx %s: %v", tx.ID, err)
continue
}
// Trigger an immediate block proposal so new transactions are
// committed quickly instead of waiting for the next tick.
if engine.IsLeader() {
go engine.Propose(chain.Tip())
}
}
}()
// gapFillRecent tracks the last time we asked a given peer to fill in
// missing blocks, so a burst of out-of-order gossips can't cause a
// sync storm. One sync request per peer per minute is plenty —
// SyncFromPeerFull drains whatever is needed in a single call.
gapFillRecent := make(map[libp2ppeer.ID]time.Time)
var gapFillMu sync.Mutex
go func() {
for m := range blockMsgs {
b := m.Block
stats.BlocksGossipRecv.Add(1)
tip := chain.Tip()
if tip != nil && b.Index <= tip.Index {
continue
}
// Gap detected — we received block N but our tip is N-k for k>1.
// Gossipsub won't replay the missing blocks, so we have to ask
// the gossiper (or any peer) to stream them to us explicitly.
// Without this the node silently falls behind forever and never
// catches up, even though the rest of the network is live.
if tip != nil && b.Index > tip.Index+1 {
gapFillMu.Lock()
last := gapFillRecent[m.From]
if time.Since(last) > 60*time.Second {
gapFillRecent[m.From] = time.Now()
gapFillMu.Unlock()
log.Printf("[NODE] gap detected: gossiped #%d, tip=#%d — requesting sync from %s",
b.Index, tip.Index, m.From)
eng := engineRef
go syncOnConnect(ctx, h, chain, stats, m.From, func(next uint64) {
if eng != nil {
eng.SyncSeqNum(next)
}
})
} else {
gapFillMu.Unlock()
log.Printf("[NODE] gap detected but sync from %s throttled (cooldown); dropping #%d",
m.From, b.Index)
}
continue
}
if err := chain.AddBlock(b); err != nil {
log.Printf("[NODE] add gossip block #%d: %v", b.Index, err)
continue
}
engine.SyncSeqNum(b.Index + 1)
log.Printf("[NODE] applied gossip block #%d hash=%s", b.Index, b.HashHex()[:8])
go eventBus.EmitBlockWithTxs(b)
}
}()
// --- Relay mailbox ---
mailbox, err := relay.OpenMailbox(*mailboxDB)
if err != nil {
log.Fatalf("[NODE] relay mailbox: %v", err)
}
defer mailbox.Close()
go mailbox.RunGC()
log.Printf("[NODE] relay mailbox: %s", *mailboxDB)
// Push-notify bus consumers whenever a fresh envelope lands in the
// mailbox. Clients subscribed to `inbox:<my_x25519>` (via WS) get the
// event immediately so they no longer need to poll /relay/inbox.
//
// We send a minimal summary (no ciphertext) — the client refetches from
// /relay/inbox if it needs the full envelope. Keeps WS frames small and
// avoids a fat push for every message.
mailbox.SetOnStore(func(env *relay.Envelope) {
sum, _ := json.Marshal(map[string]any{
"id": env.ID,
"recipient_pub": env.RecipientPub,
"sender_pub": env.SenderPub,
"sent_at": env.SentAt,
})
eventBus.EmitInbox(env.RecipientPub, sum)
})
// --- Relay router ---
relayKP, err := relay.LoadOrCreateKeyPair(*relayKeyFile)
if err != nil {
log.Fatalf("[NODE] relay keypair: %v", err)
}
relayRouter, err := relay.NewRouter(
h.LibP2PHost(),
h.GossipSub(),
relayKP,
id,
mailbox,
func(envID, senderPub string, msg []byte) {
senderShort := senderPub
if len(senderShort) > 8 {
senderShort = senderShort[:8]
}
log.Printf("[RELAY] delivered envelope %s from %s (%d bytes)", envID, senderShort, len(msg))
},
func(tx *blockchain.Transaction) error {
if err := engine.AddTransaction(tx); err != nil {
return err
}
return h.PublishTx(tx)
},
)
if err != nil {
log.Fatalf("[NODE] relay router: %v", err)
}
go relayRouter.Run(ctx)
log.Printf("[NODE] relay pub key: %s", relayRouter.RelayPubHex())
// --- Register relay service on-chain (optional) ---
// Observers can still act as relays (they forward encrypted envelopes
// and serve /relay/inbox the same way validators do) — but only if
// the operator explicitly asks. Default-off for observers avoids
// surprise on-chain registration txs.
if *registerRelay && !*observerMode {
go autoRegisterRelay(ctx, id, relayKP, *relayFee, h.AddrStrings(), chain, engine, h)
} else if *registerRelay && *observerMode {
log.Printf("[NODE] --register-relay ignored in observer mode; run with both flags off an observer to register as relay")
}
// --- Validator liveness metric updater ---
// Engine tracks per-validator last-seen seqNum; we scan periodically
// and publish the worst offender into Prometheus so alerts can fire.
go func() {
t := time.NewTicker(15 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
report := engine.LivenessReport()
var worst uint64
for _, missed := range report {
if missed > worst {
worst = missed
}
}
node.MetricMaxMissedBlocks.Set(int64(worst))
}
}
}()
// --- Periodic heartbeat transaction ---
// Observers don't heartbeat — they're not validators, their uptime
// doesn't affect consensus, and heartbeat is an on-chain fee-paying
// tx that would waste tokens.
if *heartbeatEnabled && !*observerMode {
go heartbeatLoop(ctx, id, chain, h, stats)
} else if *observerMode {
log.Printf("[NODE] heartbeat disabled (observer mode)")
} else {
log.Printf("[NODE] heartbeat disabled by config")
}
// --- Bind wallet at startup if provided and not yet bound ---
if payoutWallet != nil {
go autoBindWallet(ctx, id, payoutWallet, chain, engine)
}
// --- Block production loop (leader only) ---
//
// Two tiers:
// fastTicker (500 ms) — proposes only when the mempool is non-empty.
// Keeps tx latency low without flooding empty blocks.
// idleTicker (5 s) — heartbeat: proposes even if the mempool is empty
// so the genesis block is produced quickly on startup
// and the chain height advances so peers can sync.
// Observer mode skips the producer entirely — they never propose or
// vote. They still receive blocks via gossipsub and forward txs, so
// this keeps them light-weight + certain not to participate in
// consensus even if erroneously listed as a validator.
if *observerMode {
log.Printf("[NODE] observer mode: block producer disabled")
}
go func() {
if *observerMode {
return
}
fastTicker := time.NewTicker(500 * time.Millisecond)
idleTicker := time.NewTicker(5 * time.Second)
// heartbeatTicker: independent of tip-reads and mempool. Its sole job
// is to prove this producer goroutine is actually alive so we can
// tell "chain frozen because consensus halted" apart from "chain
// frozen because producer deadlocked on c.mu". Logs at most once per
// minute; should appear continuously on any live node.
lifeTicker := time.NewTicker(60 * time.Second)
defer fastTicker.Stop()
defer idleTicker.Stop()
defer lifeTicker.Stop()
lastProposedHeight := uint64(0)
for {
select {
case <-ctx.Done():
return
case <-fastTicker.C:
if engine.IsLeader() && engine.HasPendingTxs() {
tip := chain.Tip()
engine.Propose(tip)
if tip != nil {
lastProposedHeight = tip.Index
}
}
case <-idleTicker.C:
if engine.IsLeader() {
tip := chain.Tip()
engine.Propose(tip) // heartbeat block, may be empty
if tip != nil {
lastProposedHeight = tip.Index
}
}
case <-lifeTicker.C:
// Proof of life + last proposal height so we can see from
// logs whether the loop is running AND reaching the chain.
log.Printf("[NODE] producer alive (leader=%v lastProposed=%d tip=%d)",
engine.IsLeader(), lastProposedHeight, chain.TipIndex())
}
}
}()
// --- Stats HTTP server ---
statsQuery := node.QueryFunc{
PubKey: func() string { return id.PubKeyHex() },
PeerID: func() string { return h.PeerID() },
PeersCount: func() int { return h.PeerCount() },
ChainTip: func() *blockchain.Block { return chain.Tip() },
Balance: func(pk string) uint64 {
b, _ := chain.Balance(pk)
return b
},
WalletBinding: func(pk string) string {
binding, _ := chain.WalletBinding(pk)
if binding == "" {
return ""
}
return wallet.PubKeyToAddress(binding)
},
Reputation: func(pk string) blockchain.RepStats {
r, _ := chain.Reputation(pk)
return r
},
}
explorerQuery := node.ExplorerQuery{
GetBlock: chain.GetBlock,
GetTx: chain.TxByID,
AddressToPubKey: chain.AddressToPubKey,
Balance: func(pk string) (uint64, error) { return chain.Balance(pk) },
Reputation: func(pk string) (blockchain.RepStats, error) {
return chain.Reputation(pk)
},
WalletBinding: func(pk string) (string, error) {
return chain.WalletBinding(pk)
},
TxsByAddress: func(pk string, limit, offset int) ([]*blockchain.TxRecord, error) {
return chain.TxsByAddress(pk, limit, offset)
},
RecentBlocks: chain.RecentBlocks,
RecentTxs: chain.RecentTxs,
NetStats: chain.NetworkStats,
RegisteredRelays: chain.RegisteredRelays,
IdentityInfo: func(pubKeyOrAddr string) (*blockchain.IdentityInfo, error) {
return chain.IdentityInfo(pubKeyOrAddr)
},
ValidatorSet: chain.ValidatorSet,
SubmitTx: func(tx *blockchain.Transaction) error {
if err := engine.AddTransaction(tx); err != nil {
return err
}
return h.PublishTx(tx)
},
GetContract: chain.GetContract,
GetContracts: chain.Contracts,
GetContractState: chain.GetContractState,
GetContractLogs: chain.ContractLogs,
Stake: chain.Stake,
GetToken: chain.Token,
GetTokens: chain.Tokens,
TokenBalance: chain.TokenBalance,
GetNFT: chain.NFT,
GetNFTs: chain.NFTs,
NFTsByOwner: chain.NFTsByOwner,
GetChannel: chain.Channel,
GetChannelMembers: chain.ChannelMembers,
Events: sseHub,
WS: wsHub,
// Onboarding: expose libp2p peers + chain_id so new nodes/clients can
// fetch /api/network-info to bootstrap without static configuration.
ConnectedPeers: func() []node.ConnectedPeerRef {
if h == nil {
return nil
}
src := h.ConnectedPeers()
// Pull peer-version map once per call so the N peers share a
// consistent snapshot rather than a racing one.
versions := h.PeerVersions()
out := make([]node.ConnectedPeerRef, len(src))
for i, p := range src {
ref := node.ConnectedPeerRef{ID: p.ID, Addrs: p.Addrs}
if pv, ok := versions[p.ID]; ok {
ref.Version = &node.PeerVersionRef{
Tag: pv.Tag,
Commit: pv.Commit,
ProtocolVersion: pv.ProtocolVersion,
Timestamp: pv.Timestamp,
ReceivedAt: pv.ReceivedAt.Format(time.RFC3339),
}
}
out[i] = ref
}
return out
},
ChainID: func() string {
// Derive from genesis block so every node with the same genesis
// reports the same chain_id. Fall back to a static string if the
// chain has no genesis yet (fresh DB, pre-sync).
if g, err := chain.GetBlock(0); err == nil && g != nil {
return "dchain-" + g.HashHex()[:12]
}
return "dchain-unknown"
},
// Native contracts registered with the chain are exposed through the
// well-known endpoint so clients don't have to know which ones are
// native vs WASM. They use the contract_id transparently.
NativeContracts: func() []node.NativeContractInfo {
all := chain.NativeContracts()
out := make([]node.NativeContractInfo, len(all))
for i, nc := range all {
out[i] = node.NativeContractInfo{
ContractID: nc.ID(),
ABIJson: nc.ABI(),
}
}
return out
},
}
relayConfig := node.RelayConfig{
Mailbox: mailbox,
Send: func(recipientPubHex string, msg []byte) (string, error) {
return relayRouter.Send(recipientPubHex, msg, 0)
},
Broadcast: func(env *relay.Envelope) error {
return relayRouter.Broadcast(env)
},
ContactRequests: func(pubKey string) ([]blockchain.ContactInfo, error) {
return chain.ContactRequests(pubKey)
},
}
go func() {
log.Printf("[NODE] stats API: http://0.0.0.0%s/stats", *statsAddr)
if *disableUI {
log.Printf("[NODE] explorer UI: disabled (--disable-ui)")
} else {
log.Printf("[NODE] explorer UI: http://0.0.0.0%s/", *statsAddr)
}
if *disableSwagger {
log.Printf("[NODE] swagger: disabled (--disable-swagger)")
} else {
log.Printf("[NODE] swagger: http://0.0.0.0%s/swagger", *statsAddr)
}
log.Printf("[NODE] relay inbox: http://0.0.0.0%s/relay/inbox?pub=<x25519hex>", *statsAddr)
routeFlags := node.ExplorerRouteFlags{
DisableUI: *disableUI,
DisableSwagger: *disableSwagger,
}
if err := stats.ListenAndServe(*statsAddr, statsQuery, func(mux *http.ServeMux) {
node.RegisterExplorerRoutes(mux, explorerQuery, routeFlags)
node.RegisterRelayRoutes(mux, relayConfig)
// POST /api/governance/link — link deployed contracts at runtime.
// Body: {"governance": "<id>"}
// Allows deploy script to wire governance without node restart.
mux.HandleFunc("/api/governance/link", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, `{"error":"method not allowed"}`, http.StatusMethodNotAllowed)
return
}
var body struct {
Governance string `json:"governance"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, `{"error":"bad JSON"}`, http.StatusBadRequest)
return
}
if body.Governance != "" {
chain.SetGovernanceContract(body.Governance)
}
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{"status":"ok","governance":%q}`, body.Governance)
})
}); err != nil {
log.Printf("[NODE] stats server error: %v", err)
}
}()
log.Printf("[NODE] running — peer ID: %s", h.PeerID())
log.Printf("[NODE] addrs: %v", h.AddrStrings())
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig
log.Println("[NODE] shutting down...")
}
// emitContractLogsViaBus scans b's CALL_CONTRACT transactions and emits a
// contract_log event for each log entry through the event bus. Bus fans
// it out to every registered consumer (SSE, WS, future indexers) — no
// hub-by-hub emit calls here.
func emitContractLogsViaBus(bus *node.EventBus, chain *blockchain.Chain, b *blockchain.Block) {
for _, tx := range b.Transactions {
if tx.Type != blockchain.EventCallContract {
continue
}
var p blockchain.CallContractPayload
if err := json.Unmarshal(tx.Payload, &p); err != nil || p.ContractID == "" {
continue
}
// Fetch only logs from this block (limit=50 to avoid loading all history).
logs, err := chain.ContractLogs(p.ContractID, 50)
if err != nil {
continue
}
for _, entry := range logs {
if entry.BlockHeight == b.Index && entry.TxID == tx.ID {
bus.EmitContractLog(entry)
}
}
}
}
// heartbeatLoop publishes a HEARTBEAT transaction once per hour.
func heartbeatLoop(ctx context.Context, id *identity.Identity, chain *blockchain.Chain, h *p2p.Host, stats *node.Tracker) {
ticker := time.NewTicker(60 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
bal, err := chain.Balance(id.PubKeyHex())
if err != nil {
log.Printf("[NODE] heartbeat balance check failed: %v", err)
continue
}
if bal < heartbeatFeeUT {
log.Printf("[NODE] heartbeat skipped: balance %s below required fee %s",
economy.FormatTokens(bal), economy.FormatTokens(heartbeatFeeUT))
continue
}
var height uint64
if tip := chain.Tip(); tip != nil {
height = tip.Index
}
payload := blockchain.HeartbeatPayload{
PubKey: id.PubKeyHex(),
ChainHeight: height,
PeerCount: h.PeerCount(),
Version: "0.1.0",
}
pb, _ := json.Marshal(payload)
tx := &blockchain.Transaction{
ID: fmt.Sprintf("hb-%s-%d", id.PubKeyHex()[:8], time.Now().Unix()),
Type: blockchain.EventHeartbeat,
From: id.PubKeyHex(),
Amount: 0,
Payload: pb,
Fee: heartbeatFeeUT,
Memo: "Heartbeat: liveness proof",
Timestamp: time.Now().UTC(),
}
tx.Signature = id.Sign(identity.TxSignBytes(tx))
if err := h.PublishTx(tx); err != nil {
log.Printf("[NODE] heartbeat publish: %v", err)
} else {
stats.TxsGossipSent.Add(1)
log.Printf("[NODE] heartbeat sent (height=%d peers=%d)", height, h.PeerCount())
}
}
}
}
// autoBindWallet submits a BIND_WALLET tx if the node doesn't have one yet.
func autoBindWallet(ctx context.Context, id *identity.Identity, w *wallet.Wallet, chain *blockchain.Chain, engine *consensus.Engine) {
// Wait a bit for chain to sync
time.Sleep(5 * time.Second)
binding, err := chain.WalletBinding(id.PubKeyHex())
if err != nil || binding == w.ID.PubKeyHex() {
return // already bound or error
}
payload := blockchain.BindWalletPayload{
WalletPubKey: w.ID.PubKeyHex(),
WalletAddr: w.Address,
}
pb, _ := json.Marshal(payload)
tx := &blockchain.Transaction{
ID: fmt.Sprintf("bind-%d", time.Now().UnixNano()),
Type: blockchain.EventBindWallet,
From: id.PubKeyHex(),
To: w.ID.PubKeyHex(),
Payload: pb,
Fee: blockchain.MinFee,
Memo: "Bind payout wallet",
Timestamp: time.Now().UTC(),
}
tx.Signature = id.Sign(identity.TxSignBytes(tx))
if err := engine.AddTransaction(tx); err != nil {
log.Printf("[NODE] BIND_WALLET tx rejected: %v", err)
return
}
log.Printf("[NODE] queued BIND_WALLET → %s", w.Address)
}
// startGenesisVerifier watches the local chain and aborts the node if, once
// block 0 is present, its hash differs from what the --join seed advertised.
//
// Lifecycle:
// - Polls every 500 ms for up to 2 minutes waiting for sync to produce
// block 0. On a fresh-db joiner this may take tens of seconds while
// SyncFromPeerFull streams blocks.
// - Once the block exists, compares HashHex() with `expected`:
// match → log success, exit goroutine
// mismatch → log.Fatal, killing the process (unless `allowMismatch`
// is set, in which case we just warn and return)
// - If the window expires without a genesis, we assume something went
// wrong with peer connectivity and abort — otherwise PBFT would
// helpfully produce its OWN genesis, permanently forking us from the
// target network.
func startGenesisVerifier(ctx context.Context, chain *blockchain.Chain, expected string, allowMismatch bool) {
deadline := time.NewTimer(2 * time.Minute)
defer deadline.Stop()
tick := time.NewTicker(500 * time.Millisecond)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case <-deadline.C:
if allowMismatch {
log.Printf("[NODE] genesis verifier: no genesis after 2m — giving up (allow-genesis-mismatch set)")
return
}
log.Fatalf("[NODE] genesis verifier: could not load block 0 within 2 minutes; seed expected hash %s. Check network connectivity to peers.", expected)
case <-tick.C:
g, err := chain.GetBlock(0)
if err != nil || g == nil {
continue // still syncing
}
actual := g.HashHex()
if actual == expected {
log.Printf("[NODE] genesis verified: hash %s matches seed", actual)
return
}
if allowMismatch {
log.Printf("[NODE] WARNING: genesis hash mismatch (local=%s seed=%s) — continuing because --allow-genesis-mismatch is set", actual, expected)
return
}
log.Fatalf("[NODE] FATAL: genesis hash mismatch — local=%s seed=%s. Either the seed at --join is on a different chain, or your local DB was created against a different genesis. Wipe --db and retry, or pass --allow-genesis-mismatch if you know what you're doing.",
actual, expected)
}
}
}
// syncOnConnect syncs chain from a newly connected peer.
// engineSyncFn is called with next seqNum after a successful sync so the
// consensus engine knows which block to propose/commit next.
func syncOnConnect(ctx context.Context, h *p2p.Host, chain *blockchain.Chain, stats *node.Tracker, pid libp2ppeer.ID, engineSyncFn func(uint64)) {
time.Sleep(500 * time.Millisecond)
syncCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
var localCount uint64
if tip := chain.Tip(); tip != nil {
localCount = tip.Index + 1
}
n, err := h.SyncFromPeerFull(syncCtx, pid, localCount, func(b *blockchain.Block) error {
if tip := chain.Tip(); tip != nil && b.Index <= tip.Index {
return nil
}
if err := chain.AddBlock(b); err != nil {
return err
}
log.Printf("[SYNC] applied block #%d hash=%s", b.Index, b.HashHex()[:8])
return nil
})
if err != nil {
log.Printf("[SYNC] sync from %s: %v", pid, err)
return
}
if n > 0 {
stats.RecordSyncFrom(pid, n)
if tip := chain.Tip(); tip != nil {
engineSyncFn(tip.Index + 1)
log.Printf("[SYNC] synced %d blocks from %s, chain now at #%d", n, pid, tip.Index)
}
} else if localCount == 0 {
// No new blocks but we confirmed our genesis is current — still notify engine
if tip := chain.Tip(); tip != nil {
engineSyncFn(tip.Index + 1)
}
}
}
// connectWithRetry dials a bootstrap peer and keeps the connection alive.
//
// Behaviour:
// - Retries indefinitely with exponential backoff (1 s → 2 s → … → 30 s max).
// - After a successful connection it re-dials every 30 s.
// libp2p.Connect is idempotent — if the peer is still connected the call
// returns immediately. If it dropped, this transparently reconnects it.
// - Stops only when ctx is cancelled (node shutdown).
//
// This ensures that even if a bootstrap peer restarts or is temporarily
// unreachable the node will reconnect without manual intervention.
func connectWithRetry(ctx context.Context, h *p2p.Host, addr string) {
const (
minBackoff = 1 * time.Second
maxBackoff = 30 * time.Second
keepAlive = 30 * time.Second
)
backoff := minBackoff
for {
select {
case <-ctx.Done():
return
default:
}
if err := h.Connect(ctx, addr); err != nil {
log.Printf("[P2P] bootstrap %s unavailable: %v (retry in %s)", addr, err, backoff)
select {
case <-ctx.Done():
return
case <-time.After(backoff):
}
if backoff < maxBackoff {
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
continue
}
// Connected (or already connected). Log only on first connect or reconnect.
if backoff > minBackoff {
log.Printf("[P2P] reconnected to bootstrap %s", addr)
} else {
log.Printf("[P2P] connected to bootstrap %s", addr)
}
backoff = minBackoff // reset for next reconnection cycle
// Keep-alive: re-check every 30 s so we detect and repair drops.
select {
case <-ctx.Done():
return
case <-time.After(keepAlive):
}
}
}
// --- helpers ---
type keyJSON struct {
PubKey string `json:"pub_key"`
PrivKey string `json:"priv_key"`
X25519Pub string `json:"x25519_pub,omitempty"`
X25519Priv string `json:"x25519_priv,omitempty"`
}
func loadOrCreateIdentity(keyFile string) *identity.Identity {
if data, err := os.ReadFile(keyFile); err == nil {
var kj keyJSON
if err := json.Unmarshal(data, &kj); err == nil {
if id, err := identity.FromHexFull(kj.PubKey, kj.PrivKey, kj.X25519Pub, kj.X25519Priv); err == nil {
// If the file is missing X25519 keys, backfill and re-save.
if kj.X25519Pub == "" {
kj.X25519Pub = id.X25519PubHex()
kj.X25519Priv = id.X25519PrivHex()
if out, err2 := json.MarshalIndent(kj, "", " "); err2 == nil {
_ = os.WriteFile(keyFile, out, 0600)
}
}
log.Printf("[NODE] loaded identity from %s", keyFile)
return id
}
}
}
id, err := identity.Generate()
if err != nil {
log.Fatalf("generate identity: %v", err)
}
kj := keyJSON{
PubKey: id.PubKeyHex(),
PrivKey: id.PrivKeyHex(),
X25519Pub: id.X25519PubHex(),
X25519Priv: id.X25519PrivHex(),
}
data, _ := json.MarshalIndent(kj, "", " ")
if err := os.WriteFile(keyFile, data, 0600); err != nil {
log.Printf("[NODE] warning: could not save key: %v", err)
} else {
log.Printf("[NODE] new identity saved to %s", keyFile)
}
return id
}
func tipIndex(c *blockchain.Chain) uint64 {
if tip := c.Tip(); tip != nil {
return tip.Index
}
return 0
}
func tipHash(c *blockchain.Chain) string {
if tip := c.Tip(); tip != nil {
return tip.HashHex()[:12]
}
return "(empty)"
}
// autoRegisterRelay submits a REGISTER_RELAY tx and retries until the registration
// is confirmed on-chain. This handles the common case where the initial gossip has
// no peers (node2 hasn't started yet) or the leader hasn't included the tx yet.
func autoRegisterRelay(
ctx context.Context,
id *identity.Identity,
relayKP *relay.KeyPair,
feePerMsgUT uint64,
addrs []string,
chain *blockchain.Chain,
engine *consensus.Engine,
h *p2p.Host,
) {
var multiaddr string
if len(addrs) > 0 {
multiaddr = addrs[0]
}
payload := blockchain.RegisterRelayPayload{
X25519PubKey: relayKP.PubHex(),
FeePerMsgUT: feePerMsgUT,
Multiaddr: multiaddr,
}
pb, _ := json.Marshal(payload)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
// First attempt after 5 seconds; subsequent retries every 10 seconds.
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
}
for {
// Stop if already confirmed on-chain.
if relays, err := chain.RegisteredRelays(); err == nil {
for _, r := range relays {
if r.PubKey == id.PubKeyHex() && r.Relay.X25519PubKey == relayKP.PubHex() {
log.Printf("[NODE] relay registration confirmed on-chain (x25519=%s)", relayKP.PubHex()[:16])
return
}
}
}
// Build and sign a fresh tx each attempt (unique ID, fresh timestamp).
now := time.Now().UTC()
tx := &blockchain.Transaction{
ID: fmt.Sprintf("relay-reg-%d", now.UnixNano()),
Type: blockchain.EventRegisterRelay,
From: id.PubKeyHex(),
// Pay the standard MinFee. The original code set this to 0 with the
// note "REGISTER_RELAY is free"; that's incompatible with validateTx
// in consensus/pbft.go which rejects any tx below MinFee, causing
// the auto-register retry loop to spam the logs forever without
// ever getting the tx into the mempool.
Fee: blockchain.MinFee,
Payload: pb,
Memo: "Register relay service",
Timestamp: now,
}
tx.Signature = id.Sign(identity.TxSignBytes(tx))
if err := engine.AddTransaction(tx); err != nil {
log.Printf("[NODE] REGISTER_RELAY tx rejected: %v", err)
} else if err := h.PublishTx(tx); err != nil {
log.Printf("[NODE] REGISTER_RELAY publish failed: %v", err)
} else {
log.Printf("[NODE] submitted REGISTER_RELAY (x25519=%s fee=%dµT), waiting for commit...",
relayKP.PubHex()[:16], feePerMsgUT)
}
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}
func shortKeys(keys []string) []string {
out := make([]string, len(keys))
for i, k := range keys {
if len(k) > 8 {
out[i] = k[:8] + "…"
} else {
out[i] = k
}
}
return out
}
// setupLogging initialises the slog handler and routes Go's std log through
// it so existing log.Printf calls get the chosen format without a
// file-by-file migration.
//
// "text" (default) is handler-default human-readable format, same as bare
// log.Printf. "json" emits one JSON object per line with `time/level/msg`
// + any key=value attrs — what Loki/ELK ingest natively.
func setupLogging(format string) {
var handler slog.Handler
switch strings.ToLower(format) {
case "json":
handler = slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelInfo,
})
default:
handler = slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelInfo,
})
}
slog.SetDefault(slog.New(handler))
// Reroute Go's standard log package through slog so `log.Printf("…")`
// calls elsewhere in the codebase land in the same stream/format.
log.SetFlags(0)
log.SetOutput(slogWriter{slog.Default()})
}
// slogWriter adapts an io.Writer-style byte stream onto slog. Each line
// becomes one Info-level record. The whole line is stored in `msg` — we
// don't try to parse out attrs; that can happen at callsite by switching
// from log.Printf to slog.Info.
type slogWriter struct{ l *slog.Logger }
func (w slogWriter) Write(p []byte) (int, error) {
msg := strings.TrimRight(string(p), "\r\n")
w.l.Info(msg)
return len(p), nil
}
// ─── Env-var helpers for flag defaults ──────────────────────────────────────
// Pattern: `flag.String(name, envOr("DCHAIN_X", default), help)` lets the
// same binary be driven by either CLI flags (dev) or env-vars (Docker) with
// CLI winning. Flag.Parse() is called once; by that time os.Getenv has
// already populated the fallback via these helpers.
func envOr(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
func envBoolOr(key string, def bool) bool {
v := strings.ToLower(strings.TrimSpace(os.Getenv(key)))
switch v {
case "":
return def
case "1", "true", "yes", "on":
return true
case "0", "false", "no", "off":
return false
default:
log.Printf("[NODE] warn: env %s=%q is not a valid bool, using default %v", key, v, def)
return def
}
}
func envUint64Or(key string, def uint64) uint64 {
v := os.Getenv(key)
if v == "" {
return def
}
var out uint64
if _, err := fmt.Sscanf(v, "%d", &out); err != nil {
log.Printf("[NODE] warn: env %s=%q is not a valid uint64, using default %d", key, v, def)
return def
}
return out
}
// ─── Multi-seed --join support ──────────────────────────────────────────────
// persistedSeeds is the on-disk shape of <db>/seeds.json. Kept minimal so
// format churn doesn't break old files; unknown fields are ignored on read.
type persistedSeeds struct {
ChainID string `json:"chain_id,omitempty"`
GenesisHash string `json:"genesis_hash,omitempty"`
URLs []string `json:"urls"`
Peers string `json:"peers,omitempty"` // comma-separated multiaddrs
SavedAt string `json:"saved_at,omitempty"`
}
// parseSeedList splits a comma-separated --join value into trimmed, non-empty
// URL entries. Preserves order so the first is tried first.
func parseSeedList(raw string) []string {
if raw == "" {
return nil
}
var out []string
for _, s := range strings.Split(raw, ",") {
if t := strings.TrimSpace(s); t != "" {
out = append(out, t)
}
}
return out
}
// tryJoinSeeds walks the list and returns the first seedNetworkInfo that
// responded. The matching URL is returned alongside so the caller can
// persist it as the known-good at the head of the list.
func tryJoinSeeds(urls []string) (*seedNetworkInfo, string) {
for _, u := range urls {
info, err := fetchNetworkInfo(u)
if err != nil {
log.Printf("[NODE] --join: %s failed: %v (trying next)", u, err)
continue
}
return info, u
}
return nil, ""
}
func loadSeedsFile(path string) (*persistedSeeds, error) {
b, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var p persistedSeeds
if err := json.Unmarshal(b, &p); err != nil {
return nil, err
}
return &p, nil
}
func saveSeedsFile(path string, p *persistedSeeds) error {
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return err
}
b, err := json.MarshalIndent(p, "", " ")
if err != nil {
return err
}
return os.WriteFile(path, b, 0o600) // 0600: may contain private IPs of peers
}
// seedNetworkInfo is a minimal subset of /api/network-info needed for
// bootstrap. We intentionally DO NOT import the node package's response
// types here to keep the struct stable across schema evolution; missing
// fields simply unmarshal to zero values.
type seedNetworkInfo struct {
ChainID string `json:"chain_id"`
GenesisHash string `json:"genesis_hash"`
TipHeight uint64 `json:"tip_height"`
Validators []string `json:"validators"`
Peers []struct {
ID string `json:"id"`
Addrs []string `json:"addrs"`
} `json:"peers"`
Contracts map[string]struct {
ContractID string `json:"contract_id"`
Name string `json:"name"`
Version string `json:"version"`
} `json:"contracts"`
}
// fetchNetworkInfo performs a GET /api/network-info on the given HTTP base
// URL (e.g. "http://seed.example:8080") and returns a parsed seedNetworkInfo.
// Times out after 10s so a misconfigured --join URL fails fast instead of
// hanging node startup.
func fetchNetworkInfo(baseURL string) (*seedNetworkInfo, error) {
baseURL = strings.TrimRight(baseURL, "/")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Get(baseURL + "/api/network-info")
if err != nil {
return nil, fmt.Errorf("fetch: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, fmt.Errorf("status %d", resp.StatusCode)
}
var info seedNetworkInfo
if err := json.NewDecoder(resp.Body).Decode(&info); err != nil {
return nil, fmt.Errorf("decode: %w", err)
}
return &info, nil
}