package blockchain import ( "context" "crypto/ed25519" "crypto/sha256" "encoding/base64" "encoding/binary" "encoding/hex" "encoding/json" "errors" "fmt" "log" "strings" "sync" "time" badger "github.com/dgraph-io/badger/v4" ) // RelayHeartbeatTTL is how long a relay registration stays "live" without a // refresh. Clients pick from the live list in /api/relays; anything with // last-heartbeat older than this is omitted. // // Set to 2 hours so a validator that heartbeats hourly (the default // heartbeatLoop interval) can miss ONE beat without being delisted — // tolerating a brief restart or network glitch. const RelayHeartbeatTTL int64 = 2 * 3600 // seconds // ErrTxFailed is a sentinel wrapped around any business-logic rejection inside // applyTx (bad fee, insufficient balance, missing fields, etc.). // AddBlock uses errors.Is(err, ErrTxFailed) to skip the individual transaction // rather than rejecting the entire block, preventing chain stalls caused by // a single malformed or untimely transaction. var ErrTxFailed = errors.New("tx failed") // Key prefixes in BadgerDB const ( prefixBlock = "block:" // block: → Block JSON prefixHeight = "height" // height → uint64 prefixBalance = "balance:" // balance: → uint64 prefixIdentity = "id:" // id: → RegisterKeyPayload JSON prefixChannel = "chan:" // chan: → CreateChannelPayload JSON prefixChanMember = "chan-member:" // chan-member:: → "" (presence = member) prefixWalletBind = "walletbind:" // walletbind: → wallet_pubkey (string) prefixReputation = "rep:" // rep: → RepStats JSON prefixPayChan = "paychan:" // paychan: → PayChanState JSON prefixRelay = "relay:" // relay: → RegisterRelayPayload JSON prefixRelayHB = "relayhb:" // relayhb: → unix seconds (int64) of last HB prefixContactIn = "contact_in:" // contact_in:: → contactRecord JSON prefixValidator = "validator:" // validator: → "" (presence = active) prefixContract = "contract:" // contract: → ContractRecord JSON prefixContractState = "cstate:" // cstate:: → raw bytes prefixContractLog = "clog:" // clog::: → ContractLogEntry JSON prefixStake = "stake:" // stake: → uint64 staked amount prefixToken = "token:" // token: → TokenRecord JSON prefixTokenBal = "tokbal:" // tokbal:: → uint64 token balance prefixNFT = "nft:" // nft: → NFTRecord JSON prefixNFTOwner = "nftowner:" // nftowner:: → "" (index by owner) // prefixTxChron gives O(limit) recent-tx scans without walking empty blocks. // Key layout: txchron:: → tx_id (string). // Writes happen in indexBlock for every non-synthetic tx. prefixTxChron = "txchron:" // txchron:: → tx_id ) // ContractVM is the interface used by applyTx to execute WASM contracts. // The vm package provides the concrete implementation; the interface lives here // to avoid a circular import (vm imports blockchain/types, not blockchain/chain). type ContractVM interface { // Validate compiles the WASM bytes and returns an error if they are invalid. // Called during DEPLOY_CONTRACT to reject bad modules before storing them. Validate(ctx context.Context, wasmBytes []byte) error // Call executes the named method of a deployed contract. // wasmBytes is the compiled WASM; env provides host function callbacks. // Returns gas consumed. Returns ErrOutOfGas (wrapping ErrTxFailed) on exhaustion. Call(ctx context.Context, contractID string, wasmBytes []byte, method string, argsJSON []byte, gasLimit uint64, env VMHostEnv) (gasUsed uint64, err error) } // VMHostEnv is the callback interface passed to ContractVM.Call. // Implementations are created per-transaction and wrap the live badger.Txn. type VMHostEnv interface { GetState(key []byte) ([]byte, error) SetState(key, value []byte) error GetBalance(pubKeyHex string) (uint64, error) Transfer(from, to string, amount uint64) error GetCaller() string GetBlockHeight() uint64 GetContractTreasury() string Log(msg string) // CallContract executes a method on another deployed contract (inter-contract call). // The caller of the sub-contract is set to the current contract's ID. // gasLimit caps the sub-call; actual gas consumed is returned. // Returns ErrTxFailed if the target contract is not found or the call fails. CallContract(contractID, method string, argsJSON []byte, gasLimit uint64) (uint64, error) } // RepStats are stored per public key and updated as blocks are committed. type RepStats struct { BlocksProduced uint64 `json:"blocks_produced"` RelayProofs uint64 `json:"relay_proofs"` SlashCount uint64 `json:"slash_count"` Heartbeats uint64 `json:"heartbeats"` // Score is re-computed on every read; stored for fast API queries. Score int64 `json:"score"` } // ComputeScore calculates the reputation score from raw counters. func (r RepStats) ComputeScore() int64 { return int64(r.BlocksProduced)*10 + int64(r.RelayProofs)*1 + int64(r.Heartbeats)/10 - int64(r.SlashCount)*500 } // Rank returns a human-readable tier string. func (r RepStats) Rank() string { switch s := r.ComputeScore(); { case s >= 1000: return "Validator" case s >= 100: return "Trusted" case s >= 10: return "Active" default: return "Observer" } } // Chain is the canonical state machine backed by BadgerDB. type Chain struct { db *badger.DB mu sync.RWMutex tip *Block vm ContractVM // optional; set via SetVM before processing contract txs // govContractID and any other live-tunable config live under configMu, // NOT c.mu. Chain-config reads happen inside applyTx (e.g. // GetEffectiveGasPrice for CALL_CONTRACT), which runs under c.mu.Lock() // held by AddBlock. Re-locking c.mu for read would deadlock because // sync.RWMutex is not re-entrant on the same goroutine. configMu sync.RWMutex govContractID string // native maps contract ID → in-process Go handler. Registered via // RegisterNative once at startup (genesis or on-disk reload). When a // CALL_CONTRACT tx references an ID in this map, the dispatcher skips // the WASM VM entirely and calls the Go handler directly. // // Protected by its own mutex for the same reason as configMu above: // lookupNative is called from applyTx under c.mu.Lock(), and we must // not re-acquire c.mu. native map[string]NativeContract nativeMu sync.RWMutex } // SetVM wires a ContractVM implementation into the chain. // Must be called before any DEPLOY_CONTRACT or CALL_CONTRACT transactions are processed. func (c *Chain) SetVM(vm ContractVM) { c.mu.Lock() defer c.mu.Unlock() c.vm = vm } // SetGovernanceContract configures the governance contract ID used for // dynamic chain parameters (gas_price, relay_fee, etc.). Safe to call at any time. // Uses configMu (not c.mu) so it never blocks against in-flight AddBlock. func (c *Chain) SetGovernanceContract(id string) { c.configMu.Lock() defer c.configMu.Unlock() c.govContractID = id log.Printf("[CHAIN] governance contract linked: %s", id) } // GetGovParam reads a live parameter from the governance contract's state. // Returns ("", false) if no governance contract is configured or the key is not set. // Uses configMu so it's safe to call from within applyTx (where c.mu is held). func (c *Chain) GetGovParam(key string) (string, bool) { c.configMu.RLock() id := c.govContractID c.configMu.RUnlock() if id == "" { return "", false } var val []byte err := c.db.View(func(txn *badger.Txn) error { dbKey := []byte(prefixContractState + id + ":param:" + key) item, err := txn.Get(dbKey) if err != nil { return err } return item.Value(func(v []byte) error { val = make([]byte, len(v)) copy(val, v) return nil }) }) if err != nil { return "", false } return string(val), true } // GetEffectiveGasPrice returns the current gas price in µT per gas unit. // If a governance contract is configured and has set gas_price, that value is used. // Otherwise falls back to the DefaultGasPrice constant. func (c *Chain) GetEffectiveGasPrice() uint64 { if val, ok := c.GetGovParam("gas_price"); ok { var p uint64 if _, err := fmt.Sscanf(val, "%d", &p); err == nil && p > 0 { return p } } return GasPrice } // NewChain opens (or creates) the BadgerDB at dbPath and returns a Chain. // // Storage tuning rationale: // // - `WithValueLogFileSize(64 MiB)` — default is 1 GiB, which means every // value-log file reserves a full gigabyte on disk even when nearly // empty. On a low-traffic chain (tens of thousands of mostly-empty // blocks) that produced multi-GB databases that would never shrink. // 64 MiB files rotate more often so value-log GC can reclaim space. // // - `WithNumVersionsToKeep(1)` — we never read historical versions of a // key; every write overwrites the previous one. Telling Badger this // lets L0 compaction discard stale versions immediately instead of // waiting for the versions-kept quota to fill. // // - `WithCompactL0OnClose(true)` — finish outstanding compaction on a // clean shutdown so the next startup reads a tidy LSM. // // The caller SHOULD start the background value-log GC loop via // Chain.StartValueLogGC(ctx) — without it, reclaimable vlog bytes are never // actually freed and the DB grows monotonically. func NewChain(dbPath string) (*Chain, error) { opts := badger.DefaultOptions(dbPath). WithLogger(nil). WithValueLogFileSize(64 << 20). // 64 MiB per vlog (default 1 GiB) WithNumVersionsToKeep(1). // no multi-version reads, drop old WithCompactL0OnClose(true) db, err := badger.Open(opts) if err != nil { return nil, fmt.Errorf("open badger: %w", err) } // Run any pending schema migrations BEFORE loadTip — migrations may // rewrite the very keys loadTip reads. See schema_migrations.go for the // versioning contract. if err := runMigrations(db); err != nil { _ = db.Close() return nil, fmt.Errorf("schema migrations: %w", err) } c := &Chain{db: db} tip, err := c.loadTip() if err != nil { return nil, err } c.tip = tip return c, nil } // CompactNow runs a one-shot aggressive value-log GC and L0 compaction. // Intended to be called at startup on nodes upgraded from a version that // had no background GC, so accumulated garbage (potentially gigabytes) can // be reclaimed without waiting for the periodic loop. // // Uses a lower discard ratio (0.25 vs 0.5 for the periodic loop) so even // mildly-fragmented vlog files get rewritten. Capped at 64 iterations so we // can never loop indefinitely — a 4 GiB DB at 64 MiB vlog-file-size has at // most 64 files, so this caps at the true theoretical maximum. func (c *Chain) CompactNow() { const maxPasses = 64 passes := 0 start := time.Now() for c.db.RunValueLogGC(0.25) == nil { passes++ if passes >= maxPasses { log.Printf("[CHAIN] CompactNow: reached pass cap (%d) after %s", maxPasses, time.Since(start)) return } } if passes > 0 { log.Printf("[CHAIN] CompactNow: reclaimed %d vlog file(s) in %s", passes, time.Since(start)) } } // StartValueLogGC runs Badger's value-log garbage collector in a background // goroutine for the lifetime of ctx. // // Without this the chain DB grows monotonically: every overwrite of a // small hot key like `height` or `netstats` leaves the old value pinned // in the active value-log file until GC reclaims it. After enough block // commits a node ends up multiple GB on disk even though actual live // chain state is a few megabytes. // // The loop runs every 5 minutes and drains GC cycles until Badger says // there is nothing more worth rewriting. `0.5` is the discard ratio: // Badger rewrites a vlog file only if at least 50% of its bytes are // garbage, which balances I/O cost against space reclamation. func (c *Chain) StartValueLogGC(ctx context.Context) { go func() { t := time.NewTicker(5 * time.Minute) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: // RunValueLogGC returns nil when it successfully rewrote // one file; keep draining until it returns an error // (typically badger.ErrNoRewrite). for c.db.RunValueLogGC(0.5) == nil { } } } }() } // Close closes the underlying BadgerDB. func (c *Chain) Close() error { return c.db.Close() } // Height returns index of the latest block (0 if empty). func (c *Chain) Height() uint64 { c.mu.RLock() defer c.mu.RUnlock() if c.tip == nil { return 0 } return c.tip.Index } // Tip returns the latest block or nil if chain is empty. func (c *Chain) Tip() *Block { c.mu.RLock() defer c.mu.RUnlock() return c.tip } // TipIndex reads the committed tip height directly from BadgerDB, bypassing // the chain mutex. Returns 0 if the chain is uninitialized. // // Use this from read-only API handlers (e.g. /api/blocks, /api/txs/recent) // that must not hang when AddBlock is holding the write lock — for example // during a slow contract call or an extended consensus round. A slightly // stale height is better than a stuck explorer. func (c *Chain) TipIndex() uint64 { var h uint64 _ = c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(prefixHeight)) if err != nil { return nil // 0 is a valid "empty chain" result } return item.Value(func(val []byte) error { return json.Unmarshal(val, &h) }) }) return h } // AddBlock validates and appends a finalized block to the chain, // applying all state mutations atomically. // // Logs a warning if apply takes longer than slowApplyThreshold so we can see // in the logs exactly which block/tx is causing the chain to stall — a slow // CALL_CONTRACT that exhausts gas, a very large DEPLOY_CONTRACT, or genuine // BadgerDB contention. const slowApplyThreshold = 2 * time.Second func (c *Chain) AddBlock(b *Block) error { started := time.Now() c.mu.Lock() defer func() { c.mu.Unlock() if dt := time.Since(started); dt > slowApplyThreshold { log.Printf("[CHAIN] SLOW AddBlock idx=%d txs=%d took=%s — investigate applyTx path", b.Index, len(b.Transactions), dt) } }() var prevHash []byte if c.tip != nil { prevHash = c.tip.Hash } else { if b.Index != 0 { return errors.New("chain is empty but received non-genesis block") } prevHash = b.PrevHash } if err := b.Validate(prevHash); err != nil { return fmt.Errorf("block validation: %w", err) } if err := c.db.Update(func(txn *badger.Txn) error { // Persist block val, err := json.Marshal(b) if err != nil { return err } if err := txn.Set([]byte(blockKey(b.Index)), val); err != nil { return err } // Update height hv, err := json.Marshal(b.Index) if err != nil { return err } if err := txn.Set([]byte(prefixHeight), hv); err != nil { return err } // Apply transactions. // Business-logic failures (ErrTxFailed) skip the individual tx so that // a single bad transaction never causes the block — and the entire chain // height — to stall. Infrastructure failures (DB errors) still abort. // Only fees of SUCCESSFULLY applied txs are credited to the validator; // skipped txs contribute nothing (avoids minting tokens from thin air). var collectedFees uint64 gasUsedByTx := make(map[string]uint64) seenInBlock := make(map[string]bool, len(b.Transactions)) for _, tx := range b.Transactions { // Guard against duplicate tx IDs within the same block or already // committed in a previous block (defense-in-depth for mempool bugs). if seenInBlock[tx.ID] { log.Printf("[CHAIN] block %d: duplicate tx %s in same block — skipped", b.Index, tx.ID) continue } seenInBlock[tx.ID] = true if _, err := txn.Get([]byte(prefixTxRecord + tx.ID)); err == nil { log.Printf("[CHAIN] block %d: tx %s already committed — skipped", b.Index, tx.ID) continue } gasUsed, err := c.applyTx(txn, tx) if err != nil { if errors.Is(err, ErrTxFailed) { senderBal, _ := c.readBalance(txn, tx.From) log.Printf("[CHAIN] block %d: tx %s (%s) skipped — %v [sender %s balance: %d µT]", b.Index, tx.ID, tx.Type, err, tx.From[:min(8, len(tx.From))], senderBal) continue } return fmt.Errorf("apply tx %s: %w", tx.ID, err) } if gasUsed > 0 { gasUsedByTx[tx.ID] = gasUsed } collectedFees += tx.Fee } // Credit validator (or their bound wallet). // Genesis block (index 0): one-time allocation of fixed supply. // All other blocks: validator earns only the transaction fees — no minting. rewardTarget, err := c.resolveRewardTarget(txn, b.Validator) if err != nil { return fmt.Errorf("resolve reward target: %w", err) } if b.Index == 0 { if err := c.creditBalance(txn, rewardTarget, GenesisAllocation); err != nil { return fmt.Errorf("genesis allocation: %w", err) } } else if collectedFees > 0 { if err := c.creditBalance(txn, rewardTarget, collectedFees); err != nil { return fmt.Errorf("credit validator fees: %w", err) } } // Update validator reputation if err := c.incrementRep(txn, b.Validator, func(r *RepStats) { r.BlocksProduced++ }); err != nil { return err } // Index transactions and update network stats if err := c.indexBlock(txn, b, gasUsedByTx); err != nil { return err } return nil }); err != nil { return err } c.tip = b return nil } // GetBlock returns the block at the given index. func (c *Chain) GetBlock(index uint64) (*Block, error) { var b Block err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(blockKey(index))) if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &b) }) }) if err != nil { return nil, err } return &b, nil } // Balance returns µT balance for a public key. func (c *Chain) Balance(pubKeyHex string) (uint64, error) { var bal uint64 err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(prefixBalance + pubKeyHex)) if errors.Is(err, badger.ErrKeyNotFound) { return nil } if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &bal) }) }) return bal, err } // Identity returns the RegisterKeyPayload for a public key, or nil. func (c *Chain) Identity(pubKeyHex string) (*RegisterKeyPayload, error) { var p RegisterKeyPayload err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(prefixIdentity + pubKeyHex)) if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &p) }) }) if errors.Is(err, badger.ErrKeyNotFound) { return nil, nil } return &p, err } // Channel returns the CreateChannelPayload for a channel ID, or nil. func (c *Chain) Channel(channelID string) (*CreateChannelPayload, error) { var p CreateChannelPayload err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(prefixChannel + channelID)) if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &p) }) }) if errors.Is(err, badger.ErrKeyNotFound) { return nil, nil } return &p, err } // ChannelMembers returns the public keys of all members added to channelID. func (c *Chain) ChannelMembers(channelID string) ([]string, error) { prefix := []byte(fmt.Sprintf("%s%s:", prefixChanMember, channelID)) var members []string err := c.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchValues = false opts.Prefix = prefix it := txn.NewIterator(opts) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { key := string(it.Item().Key()) // key = "chan-member::" parts := strings.SplitN(key, ":", 3) if len(parts) == 3 { members = append(members, parts[2]) } } return nil }) return members, err } // WalletBinding returns the payout wallet pub key bound to a node, or "" if none. func (c *Chain) WalletBinding(nodePubKey string) (string, error) { var walletPubKey string err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(prefixWalletBind + nodePubKey)) if errors.Is(err, badger.ErrKeyNotFound) { return nil } if err != nil { return err } return item.Value(func(val []byte) error { walletPubKey = string(val) return nil }) }) return walletPubKey, err } // PayChannel returns the PayChanState for a channel ID, or nil if not found. func (c *Chain) PayChannel(channelID string) (*PayChanState, error) { var state PayChanState err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(prefixPayChan + channelID)) if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &state) }) }) if errors.Is(err, badger.ErrKeyNotFound) { return nil, nil } return &state, err } // PayChanSigPayload returns the canonical bytes both parties sign to open a channel. // Use this from the wallet CLI to produce SigB before submitting an OPEN_PAY_CHAN tx. func PayChanSigPayload(channelID, partyA, partyB string, depositA, depositB, expiryBlock uint64) []byte { return payChanSigPayload(channelID, partyA, partyB, depositA, depositB, expiryBlock) } // PayChanCloseSigPayload returns the canonical bytes both parties sign to close a channel. func PayChanCloseSigPayload(channelID string, balanceA, balanceB, nonce uint64) []byte { return payChanCloseSigPayload(channelID, balanceA, balanceB, nonce) } // Reputation returns the reputation stats for a public key. func (c *Chain) Reputation(pubKeyHex string) (RepStats, error) { var r RepStats err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(prefixReputation + pubKeyHex)) if errors.Is(err, badger.ErrKeyNotFound) { return nil } if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &r) }) }) if err != nil { return RepStats{}, err } r.Score = r.ComputeScore() return r, nil } // --- internal --- func blockKey(index uint64) string { return fmt.Sprintf("%s%020d", prefixBlock, index) } func (c *Chain) loadTip() (*Block, error) { var height uint64 err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(prefixHeight)) if errors.Is(err, badger.ErrKeyNotFound) { return nil } if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &height) }) }) if err != nil { return nil, err } if height == 0 { // Check if genesis exists var genesis Block err2 := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(blockKey(0))) if errors.Is(err, badger.ErrKeyNotFound) { return nil } if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &genesis) }) }) if err2 != nil { return nil, err2 } if genesis.Hash != nil { return &genesis, nil } return nil, nil } return c.GetBlock(height) } // resolveRewardTarget returns the wallet pub key to credit for a validator. // If the validator has a bound wallet, returns that; otherwise returns their own pub key. func (c *Chain) resolveRewardTarget(txn *badger.Txn, validatorPubKey string) (string, error) { item, err := txn.Get([]byte(prefixWalletBind + validatorPubKey)) if errors.Is(err, badger.ErrKeyNotFound) { return validatorPubKey, nil } if err != nil { return "", err } var target string err = item.Value(func(val []byte) error { target = string(val) return nil }) if err != nil || target == "" { return validatorPubKey, nil } return target, nil } // applyTx applies one transaction within txn. // Returns (gasUsed, error); gasUsed is non-zero only for CALL_CONTRACT. func (c *Chain) applyTx(txn *badger.Txn, tx *Transaction) (uint64, error) { switch tx.Type { case EventRegisterKey: var p RegisterKeyPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: REGISTER_KEY bad payload: %v", ErrTxFailed, err) } if tx.Fee < RegistrationFee { return 0, fmt.Errorf("%w: REGISTER_KEY fee %d µT below minimum %d µT", ErrTxFailed, tx.Fee, RegistrationFee) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("REGISTER_KEY debit: %w", err) } val, _ := json.Marshal(p) if err := txn.Set([]byte(prefixIdentity+tx.From), val); err != nil { return 0, err } case EventCreateChannel: var p CreateChannelPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: CREATE_CHANNEL bad payload: %v", ErrTxFailed, err) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("CREATE_CHANNEL debit: %w", err) } val, _ := json.Marshal(p) if err := txn.Set([]byte(prefixChannel+p.ChannelID), val); err != nil { return 0, err } case EventAddMember: var p AddMemberPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: ADD_MEMBER bad payload: %v", ErrTxFailed, err) } if p.ChannelID == "" { return 0, fmt.Errorf("%w: ADD_MEMBER: channel_id required", ErrTxFailed) } if _, err := txn.Get([]byte(prefixChannel + p.ChannelID)); err != nil { if errors.Is(err, badger.ErrKeyNotFound) { return 0, fmt.Errorf("%w: ADD_MEMBER: channel %q not found", ErrTxFailed, p.ChannelID) } return 0, err } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("ADD_MEMBER debit: %w", err) } member := tx.To if member == "" { member = tx.From } if err := txn.Set([]byte(fmt.Sprintf("%s%s:%s", prefixChanMember, p.ChannelID, member)), []byte{}); err != nil { return 0, err } case EventTransfer: senderBal, _ := c.readBalance(txn, tx.From) log.Printf("[CHAIN] TRANSFER %s→%s amount=%d fee=%d senderBal=%d", tx.From[:min(8, len(tx.From))], tx.To[:min(8, len(tx.To))], tx.Amount, tx.Fee, senderBal) if err := c.debitBalance(txn, tx.From, tx.Amount+tx.Fee); err != nil { return 0, fmt.Errorf("TRANSFER debit: %w", err) } if err := c.creditBalance(txn, tx.To, tx.Amount); err != nil { return 0, fmt.Errorf("credit recipient: %w", err) } case EventRelayProof: var p RelayProofPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: RELAY_PROOF bad payload: %v", ErrTxFailed, err) } if p.SenderPubKey == "" || p.FeeUT == 0 || len(p.FeeSig) == 0 { return 0, fmt.Errorf("%w: relay proof missing fee authorization fields", ErrTxFailed) } authBytes := FeeAuthBytes(p.EnvelopeID, p.FeeUT) ok, err := verifyEd25519(p.SenderPubKey, authBytes, p.FeeSig) if err != nil || !ok { return 0, fmt.Errorf("%w: invalid relay fee authorization signature", ErrTxFailed) } if err := c.debitBalance(txn, p.SenderPubKey, p.FeeUT); err != nil { return 0, fmt.Errorf("RELAY_PROOF debit: %w", err) } target, err := c.resolveRewardTarget(txn, p.RelayPubKey) if err != nil { return 0, err } if err := c.creditBalance(txn, target, p.FeeUT); err != nil { return 0, fmt.Errorf("credit relay fee: %w", err) } if err := c.incrementRep(txn, p.RelayPubKey, func(r *RepStats) { r.RelayProofs++ }); err != nil { return 0, err } case EventBindWallet: var p BindWalletPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: BIND_WALLET bad payload: %v", ErrTxFailed, err) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("BIND_WALLET debit: %w", err) } if err := txn.Set([]byte(prefixWalletBind+tx.From), []byte(p.WalletPubKey)); err != nil { return 0, err } case EventSlash: var p SlashPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: SLASH bad payload: %v", ErrTxFailed, err) } if p.OffenderPubKey == "" { return 0, fmt.Errorf("%w: SLASH: offender_pub_key required", ErrTxFailed) } // Sender must be a validator — non-validators can't trigger slashing // without gumming up the chain with spurious reports. fromIsValidator, err := c.isValidatorTxn(txn, tx.From) if err != nil { return 0, err } if !fromIsValidator { return 0, fmt.Errorf("%w: SLASH: sender is not a current validator", ErrTxFailed) } // Only "equivocation" is cryptographically verifiable on-chain; // reject other reasons until we implement their proofs (downtime // is handled via auto-removal, not slashing). if p.Reason != "equivocation" { return 0, fmt.Errorf("%w: SLASH: only reason=equivocation is supported on-chain, got %q", ErrTxFailed, p.Reason) } var ev EquivocationEvidence if err := json.Unmarshal(p.Evidence, &ev); err != nil { return 0, fmt.Errorf("%w: SLASH: bad evidence: %v", ErrTxFailed, err) } if err := ValidateEquivocation(p.OffenderPubKey, &ev); err != nil { return 0, fmt.Errorf("%w: SLASH: %v", ErrTxFailed, err) } // Pay the sender's tx fee (they did work to produce the evidence). if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("SLASH fee debit: %w", err) } // Burn offender's stake (preferred — bonded amount), fall back to // balance if stake < SlashAmount. Either way, the tokens are // destroyed — not redirected to the reporter, to keep incentives // clean (reporters profit only from healthier chain, not bounties). stake := c.readStake(txn, p.OffenderPubKey) if stake >= SlashAmount { if err := c.writeStake(txn, p.OffenderPubKey, stake-SlashAmount); err != nil { return 0, fmt.Errorf("SLASH stake burn: %w", err) } } else { if stake > 0 { if err := c.writeStake(txn, p.OffenderPubKey, 0); err != nil { return 0, fmt.Errorf("SLASH stake burn: %w", err) } } // Burn the rest from liquid balance (best-effort; ignore // insufficient-balance error so the slash still counts). remaining := SlashAmount - stake _ = c.debitBalance(txn, p.OffenderPubKey, remaining) } // Eject from the validator set — slashed validators are off the // committee permanently (re-admission requires a fresh // ADD_VALIDATOR with stake). if err := txn.Delete([]byte(prefixValidator + p.OffenderPubKey)); err != nil && err != badger.ErrKeyNotFound { return 0, fmt.Errorf("SLASH remove validator: %w", err) } if err := c.incrementRep(txn, p.OffenderPubKey, func(r *RepStats) { r.SlashCount++ }); err != nil { return 0, err } log.Printf("[CHAIN] SLASH: offender=%s reason=%s reporter=%s amount=%d µT", p.OffenderPubKey[:min(8, len(p.OffenderPubKey))], p.Reason, tx.From[:min(8, len(tx.From))], SlashAmount) case EventHeartbeat: var p HeartbeatPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: HEARTBEAT bad payload: %v", ErrTxFailed, err) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("HEARTBEAT debit: %w", err) } if err := c.incrementRep(txn, tx.From, func(r *RepStats) { r.Heartbeats++ }); err != nil { return 0, err } // Also refresh the relay-heartbeat timestamp if the sender is a // registered relay. This reuses the existing hourly HEARTBEAT tx // so relay-only nodes don't need to pay for a dedicated keep- // alive; one tx serves both purposes. if _, err := txn.Get([]byte(prefixRelay + tx.From)); err == nil { if err := c.writeRelayHeartbeat(txn, tx.From, tx.Timestamp.Unix()); err != nil { return 0, err } } case EventRegisterRelay: var p RegisterRelayPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: REGISTER_RELAY bad payload: %v", ErrTxFailed, err) } if p.X25519PubKey == "" { return 0, fmt.Errorf("%w: REGISTER_RELAY: x25519_pub_key is required", ErrTxFailed) } val, _ := json.Marshal(p) if err := txn.Set([]byte(prefixRelay+tx.From), val); err != nil { return 0, err } // Seed the heartbeat so the relay is immediately reachable via // /api/relays. Without this a fresh relay wouldn't appear until // its first heartbeat tx commits (~1 hour default), making the // register tx look silent. if err := c.writeRelayHeartbeat(txn, tx.From, tx.Timestamp.Unix()); err != nil { return 0, err } case EventContactRequest: var p ContactRequestPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: CONTACT_REQUEST bad payload: %v", ErrTxFailed, err) } if tx.To == "" { return 0, fmt.Errorf("%w: CONTACT_REQUEST: recipient (to) is required", ErrTxFailed) } if tx.Amount < MinContactFee { return 0, fmt.Errorf("%w: CONTACT_REQUEST: amount %d < MinContactFee %d", ErrTxFailed, tx.Amount, MinContactFee) } if err := c.debitBalance(txn, tx.From, tx.Amount+tx.Fee); err != nil { return 0, fmt.Errorf("CONTACT_REQUEST debit: %w", err) } if err := c.creditBalance(txn, tx.To, tx.Amount); err != nil { return 0, fmt.Errorf("credit contact target: %w", err) } rec := contactRecord{ Status: string(ContactPending), Intro: p.Intro, FeeUT: tx.Amount, TxID: tx.ID, CreatedAt: tx.Timestamp.Unix(), } val, _ := json.Marshal(rec) key := prefixContactIn + tx.To + ":" + tx.From if err := txn.Set([]byte(key), val); err != nil { return 0, fmt.Errorf("store contact record: %w", err) } case EventAcceptContact: if tx.To == "" { return 0, fmt.Errorf("%w: ACCEPT_CONTACT: requester (to) is required", ErrTxFailed) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("ACCEPT_CONTACT debit: %w", err) } key := prefixContactIn + tx.From + ":" + tx.To if err := c.updateContactStatus(txn, key, ContactAccepted); err != nil { return 0, fmt.Errorf("%w: accept contact: %v", ErrTxFailed, err) } case EventBlockContact: if tx.To == "" { return 0, fmt.Errorf("%w: BLOCK_CONTACT: sender (to) is required", ErrTxFailed) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("BLOCK_CONTACT debit: %w", err) } key := prefixContactIn + tx.From + ":" + tx.To var rec contactRecord item, err := txn.Get([]byte(key)) if err == nil { _ = item.Value(func(val []byte) error { return json.Unmarshal(val, &rec) }) } rec.Status = string(ContactBlocked) val, _ := json.Marshal(rec) if err := txn.Set([]byte(key), val); err != nil { return 0, fmt.Errorf("store block record: %w", err) } case EventAddValidator: if tx.To == "" { return 0, fmt.Errorf("%w: ADD_VALIDATOR: target pub key (to) is required", ErrTxFailed) } fromIsValidator, err := c.isValidatorTxn(txn, tx.From) if err != nil { return 0, err } if !fromIsValidator { return 0, fmt.Errorf("%w: ADD_VALIDATOR: %s is not a current validator", ErrTxFailed, tx.From) } // Decode admission payload early so we can read CoSignatures. var admitP AddValidatorPayload if len(tx.Payload) > 0 { if err := json.Unmarshal(tx.Payload, &admitP); err != nil { return 0, fmt.Errorf("%w: ADD_VALIDATOR bad payload: %v", ErrTxFailed, err) } } // ── Stake gate ───────────────────────────────────────────────── // Candidate must have locked at least MinValidatorStake before the // admission tx is accepted. Prevents sybil admissions. if stake := c.readStake(txn, tx.To); stake < MinValidatorStake { return 0, fmt.Errorf("%w: ADD_VALIDATOR: candidate has %d µT staked, need %d µT", ErrTxFailed, stake, MinValidatorStake) } // ── Multi-sig gate ───────────────────────────────────────────── // Count approvals: the sender (a validator, checked above) is 1. // Each valid CoSignature from a DISTINCT current validator adds 1. // Require ⌈2/3⌉ of the current validator set to admit. currentSet, err := c.validatorSetTxn(txn) if err != nil { return 0, err } required := (2*len(currentSet) + 2) / 3 // ceil(2N/3) if required < 1 { required = 1 } digest := AdmitDigest(tx.To) approvers := map[string]struct{}{tx.From: {}} for _, cs := range admitP.CoSignatures { // Reject cosigs from non-validators or signatures that don't // verify. Silently duplicates are dropped. if _, alreadyIn := approvers[cs.PubKey]; alreadyIn { continue } if !contains(currentSet, cs.PubKey) { continue } pubBytes, err := hex.DecodeString(cs.PubKey) if err != nil || len(pubBytes) != ed25519.PublicKeySize { continue } if !ed25519.Verify(ed25519.PublicKey(pubBytes), digest, cs.Signature) { continue } approvers[cs.PubKey] = struct{}{} } if len(approvers) < required { return 0, fmt.Errorf("%w: ADD_VALIDATOR: %d of %d approvals (need %d = ceil(2/3) of %d validators)", ErrTxFailed, len(approvers), len(currentSet), required, len(currentSet)) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("ADD_VALIDATOR debit: %w", err) } if err := txn.Set([]byte(prefixValidator+tx.To), []byte{}); err != nil { return 0, fmt.Errorf("store validator: %w", err) } log.Printf("[CHAIN] ADD_VALIDATOR: admitted %s (%d/%d approvals)", tx.To[:min(8, len(tx.To))], len(approvers), len(currentSet)) case EventRemoveValidator: if tx.To == "" { return 0, fmt.Errorf("%w: REMOVE_VALIDATOR: target pub key (to) is required", ErrTxFailed) } fromIsValidator, err := c.isValidatorTxn(txn, tx.From) if err != nil { return 0, err } if !fromIsValidator { return 0, fmt.Errorf("%w: REMOVE_VALIDATOR: %s is not a current validator", ErrTxFailed, tx.From) } // Self-removal is always allowed — a validator should be able to // leave the set gracefully without needing peers' approval. selfRemove := tx.From == tx.To if !selfRemove { // Forced removal requires ⌈2/3⌉ cosigs on RemoveDigest(target). // Same shape as ADD_VALIDATOR; keeps governance symmetric. var rmP RemoveValidatorPayload if len(tx.Payload) > 0 { if err := json.Unmarshal(tx.Payload, &rmP); err != nil { return 0, fmt.Errorf("%w: REMOVE_VALIDATOR bad payload: %v", ErrTxFailed, err) } } currentSet, err := c.validatorSetTxn(txn) if err != nil { return 0, err } required := (2*len(currentSet) + 2) / 3 if required < 1 { required = 1 } digest := RemoveDigest(tx.To) approvers := map[string]struct{}{tx.From: {}} for _, cs := range rmP.CoSignatures { if _, already := approvers[cs.PubKey]; already { continue } if !contains(currentSet, cs.PubKey) { continue } pubBytes, err := hex.DecodeString(cs.PubKey) if err != nil || len(pubBytes) != ed25519.PublicKeySize { continue } if !ed25519.Verify(ed25519.PublicKey(pubBytes), digest, cs.Signature) { continue } approvers[cs.PubKey] = struct{}{} } if len(approvers) < required { return 0, fmt.Errorf("%w: REMOVE_VALIDATOR: %d of %d approvals (need %d = ceil(2/3))", ErrTxFailed, len(approvers), len(currentSet), required) } } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("REMOVE_VALIDATOR debit: %w", err) } if err := txn.Delete([]byte(prefixValidator + tx.To)); err != nil && err != badger.ErrKeyNotFound { return 0, fmt.Errorf("remove validator: %w", err) } if selfRemove { log.Printf("[CHAIN] REMOVE_VALIDATOR: %s self-removed", tx.To[:min(8, len(tx.To))]) } else { log.Printf("[CHAIN] REMOVE_VALIDATOR: removed %s (multi-sig)", tx.To[:min(8, len(tx.To))]) } case EventOpenPayChan: if err := c.applyOpenPayChan(txn, tx); err != nil { return 0, fmt.Errorf("%w: open paychan: %v", ErrTxFailed, err) } case EventClosePayChan: if err := c.applyClosePayChan(txn, tx); err != nil { return 0, fmt.Errorf("%w: close paychan: %v", ErrTxFailed, err) } case EventDeployContract: if c.vm == nil { return 0, fmt.Errorf("%w: DEPLOY_CONTRACT: VM not configured on this node", ErrTxFailed) } var p DeployContractPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: DEPLOY_CONTRACT bad payload: %v", ErrTxFailed, err) } if p.WASMBase64 == "" || p.ABIJson == "" { return 0, fmt.Errorf("%w: DEPLOY_CONTRACT: wasm_b64 and abi_json are required", ErrTxFailed) } if tx.Fee < MinDeployFee { return 0, fmt.Errorf("%w: DEPLOY_CONTRACT fee %d < MinDeployFee %d", ErrTxFailed, tx.Fee, MinDeployFee) } import64 := func(s string) ([]byte, error) { buf := make([]byte, len(s)) n, err := decodeBase64(s, buf) return buf[:n], err } wasmBytes, err := import64(p.WASMBase64) if err != nil { return 0, fmt.Errorf("%w: DEPLOY_CONTRACT: invalid base64 wasm: %v", ErrTxFailed, err) } if err := c.vm.Validate(context.Background(), wasmBytes); err != nil { return 0, fmt.Errorf("%w: DEPLOY_CONTRACT: invalid WASM: %v", ErrTxFailed, err) } contractID := computeContractID(tx.From, wasmBytes) if _, dbErr := txn.Get([]byte(prefixContract + contractID)); dbErr == nil { return 0, fmt.Errorf("%w: DEPLOY_CONTRACT: contract %s already deployed", ErrTxFailed, contractID) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("DEPLOY_CONTRACT debit: %w", err) } var height uint64 if item, hErr := txn.Get([]byte(prefixHeight)); hErr == nil { _ = item.Value(func(val []byte) error { return json.Unmarshal(val, &height) }) } rec := ContractRecord{ ContractID: contractID, WASMBytes: wasmBytes, ABIJson: p.ABIJson, DeployerPub: tx.From, DeployedAt: height, } val, _ := json.Marshal(rec) if err := txn.Set([]byte(prefixContract+contractID), val); err != nil { return 0, fmt.Errorf("store contract: %w", err) } log.Printf("[CHAIN] DEPLOY_CONTRACT id=%s deployer=%s height=%d wasmSize=%d", contractID, tx.From[:min(8, len(tx.From))], height, len(wasmBytes)) case EventCallContract: var p CallContractPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: CALL_CONTRACT bad payload: %v", ErrTxFailed, err) } if p.ContractID == "" || p.Method == "" { return 0, fmt.Errorf("%w: CALL_CONTRACT: contract_id and method are required", ErrTxFailed) } if p.GasLimit == 0 { return 0, fmt.Errorf("%w: CALL_CONTRACT: gas_limit must be > 0", ErrTxFailed) } // ── Native dispatch ────────────────────────────────────────────── // System contracts (username_registry etc.) implemented in Go run // here, bypassing wazero entirely. This eliminates a whole class // of VM-hang bugs and cuts per-call latency ~100×. if nc := c.lookupNative(p.ContractID); nc != nil { gasPrice := c.GetEffectiveGasPrice() maxGasCost := p.GasLimit * gasPrice if err := c.debitBalance(txn, tx.From, tx.Fee+maxGasCost); err != nil { return 0, fmt.Errorf("CALL_CONTRACT debit: %w", err) } var height uint64 if hi, hErr := txn.Get([]byte(prefixHeight)); hErr == nil { _ = hi.Value(func(val []byte) error { return json.Unmarshal(val, &height) }) } nctx := &NativeContext{ Txn: txn, ContractID: p.ContractID, Caller: tx.From, TxID: tx.ID, BlockHeight: height, TxAmount: tx.Amount, // payment attached to this call chain: c, } gasUsed, callErr := nc.Call(nctx, p.Method, []byte(p.ArgsJSON)) if gasUsed > p.GasLimit { gasUsed = p.GasLimit } if callErr != nil { // Refund unused gas but keep fee debited — prevents spam. if refund := (p.GasLimit - gasUsed) * gasPrice; refund > 0 { _ = c.creditBalance(txn, tx.From, refund) } return 0, fmt.Errorf("%w: CALL_CONTRACT %s.%s: %v", ErrTxFailed, p.ContractID, p.Method, callErr) } // Success: refund remaining gas. if refund := (p.GasLimit - gasUsed) * gasPrice; refund > 0 { if err := c.creditBalance(txn, tx.From, refund); err != nil { log.Printf("[CHAIN] CALL_CONTRACT native gas refund failed: %v", err) } } log.Printf("[CHAIN] native CALL_CONTRACT id=%s method=%s caller=%s gasUsed=%d", p.ContractID, p.Method, tx.From[:min(8, len(tx.From))], gasUsed) return gasUsed, nil } // ── WASM path ──────────────────────────────────────────────────── if c.vm == nil { return 0, fmt.Errorf("%w: CALL_CONTRACT: VM not configured on this node", ErrTxFailed) } item, err := txn.Get([]byte(prefixContract + p.ContractID)) if err != nil { if errors.Is(err, badger.ErrKeyNotFound) { return 0, fmt.Errorf("%w: CALL_CONTRACT: contract %s not found", ErrTxFailed, p.ContractID) } return 0, err } var rec ContractRecord if err := item.Value(func(val []byte) error { return json.Unmarshal(val, &rec) }); err != nil { return 0, fmt.Errorf("%w: CALL_CONTRACT: corrupt contract record: %v", ErrTxFailed, err) } // Use effective gas price (may be overridden by governance contract). gasPrice := c.GetEffectiveGasPrice() // Pre-charge fee + maximum possible gas cost upfront. maxGasCost := p.GasLimit * gasPrice if err := c.debitBalance(txn, tx.From, tx.Fee+maxGasCost); err != nil { return 0, fmt.Errorf("CALL_CONTRACT debit: %w", err) } var height uint64 if hi, hErr := txn.Get([]byte(prefixHeight)); hErr == nil { _ = hi.Value(func(val []byte) error { return json.Unmarshal(val, &height) }) } env := newChainHostEnv(txn, p.ContractID, tx.From, tx.ID, height, c) // Hard wall-clock budget per contract call. Even if gas metering // fails or the contract dodges the function-listener (tight loop of // unhooked opcodes), WithCloseOnContextDone(true) on the runtime // will abort the call once the deadline fires. Prevents a single // bad tx from freezing the entire chain — as happened with the // username_registry.register hang. callCtx, callCancel := context.WithTimeout(context.Background(), 30*time.Second) gasUsed, callErr := c.vm.Call( callCtx, p.ContractID, rec.WASMBytes, p.Method, []byte(p.ArgsJSON), p.GasLimit, env, ) callCancel() if callErr != nil { // Refund unused gas even on error (gas already consumed stays charged). if refund := (p.GasLimit - gasUsed) * gasPrice; refund > 0 { _ = c.creditBalance(txn, tx.From, refund) } return 0, fmt.Errorf("%w: CALL_CONTRACT %s.%s: %v", ErrTxFailed, p.ContractID, p.Method, callErr) } // Refund unused gas back to caller. if refund := (p.GasLimit - gasUsed) * gasPrice; refund > 0 { if err := c.creditBalance(txn, tx.From, refund); err != nil { log.Printf("[CHAIN] CALL_CONTRACT gas refund failed (refund=%d µT): %v", refund, err) } } log.Printf("[CHAIN] CALL_CONTRACT id=%s method=%s caller=%s gasUsed=%d/%d gasCost=%d µT refund=%d µT", p.ContractID, p.Method, tx.From[:min(8, len(tx.From))], gasUsed, p.GasLimit, gasUsed*gasPrice, (p.GasLimit-gasUsed)*gasPrice) return gasUsed, nil case EventStake: if tx.Amount == 0 { return 0, fmt.Errorf("%w: STAKE: amount must be > 0", ErrTxFailed) } if err := c.debitBalance(txn, tx.From, tx.Amount+tx.Fee); err != nil { return 0, fmt.Errorf("STAKE debit: %w", err) } current := c.readStake(txn, tx.From) if err := c.writeStake(txn, tx.From, current+tx.Amount); err != nil { return 0, fmt.Errorf("STAKE write: %w", err) } log.Printf("[CHAIN] STAKE pubkey=%s amount=%d µT total=%d µT", tx.From[:min(8, len(tx.From))], tx.Amount, current+tx.Amount) case EventUnstake: staked := c.readStake(txn, tx.From) if staked == 0 { return 0, fmt.Errorf("%w: UNSTAKE: no active stake", ErrTxFailed) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("UNSTAKE fee debit: %w", err) } if err := c.writeStake(txn, tx.From, 0); err != nil { return 0, fmt.Errorf("UNSTAKE write: %w", err) } if err := c.creditBalance(txn, tx.From, staked); err != nil { return 0, fmt.Errorf("UNSTAKE credit: %w", err) } log.Printf("[CHAIN] UNSTAKE pubkey=%s returned=%d µT", tx.From[:min(8, len(tx.From))], staked) case EventIssueToken: var p IssueTokenPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: ISSUE_TOKEN bad payload: %v", ErrTxFailed, err) } if p.Name == "" || p.Symbol == "" { return 0, fmt.Errorf("%w: ISSUE_TOKEN: name and symbol are required", ErrTxFailed) } if p.TotalSupply == 0 { return 0, fmt.Errorf("%w: ISSUE_TOKEN: total_supply must be > 0", ErrTxFailed) } if tx.Fee < MinIssueTokenFee { return 0, fmt.Errorf("%w: ISSUE_TOKEN fee %d < MinIssueTokenFee %d", ErrTxFailed, tx.Fee, MinIssueTokenFee) } tokenID := computeTokenID(tx.From, p.Symbol) if _, dbErr := txn.Get([]byte(prefixToken + tokenID)); dbErr == nil { return 0, fmt.Errorf("%w: ISSUE_TOKEN: token %s already exists", ErrTxFailed, tokenID) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("ISSUE_TOKEN debit: %w", err) } var height uint64 if hi, hErr := txn.Get([]byte(prefixHeight)); hErr == nil { _ = hi.Value(func(val []byte) error { return json.Unmarshal(val, &height) }) } tokenRec := TokenRecord{ TokenID: tokenID, Name: p.Name, Symbol: p.Symbol, Decimals: p.Decimals, TotalSupply: p.TotalSupply, Issuer: tx.From, IssuedAt: height, } val, _ := json.Marshal(tokenRec) if err := txn.Set([]byte(prefixToken+tokenID), val); err != nil { return 0, fmt.Errorf("store token record: %w", err) } if err := c.creditTokenBalance(txn, tokenID, tx.From, p.TotalSupply); err != nil { return 0, fmt.Errorf("ISSUE_TOKEN credit: %w", err) } log.Printf("[CHAIN] ISSUE_TOKEN id=%s symbol=%s supply=%d issuer=%s", tokenID, p.Symbol, p.TotalSupply, tx.From[:min(8, len(tx.From))]) case EventTransferToken: var p TransferTokenPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: TRANSFER_TOKEN bad payload: %v", ErrTxFailed, err) } if p.TokenID == "" || p.Amount == 0 { return 0, fmt.Errorf("%w: TRANSFER_TOKEN: token_id and amount are required", ErrTxFailed) } if tx.To == "" { return 0, fmt.Errorf("%w: TRANSFER_TOKEN: recipient (to) is required", ErrTxFailed) } if _, dbErr := txn.Get([]byte(prefixToken + p.TokenID)); dbErr != nil { return 0, fmt.Errorf("%w: TRANSFER_TOKEN: token %s not found", ErrTxFailed, p.TokenID) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("TRANSFER_TOKEN fee debit: %w", err) } if err := c.debitTokenBalance(txn, p.TokenID, tx.From, p.Amount); err != nil { return 0, fmt.Errorf("TRANSFER_TOKEN debit: %w", err) } if err := c.creditTokenBalance(txn, p.TokenID, tx.To, p.Amount); err != nil { return 0, fmt.Errorf("TRANSFER_TOKEN credit: %w", err) } case EventBurnToken: var p BurnTokenPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: BURN_TOKEN bad payload: %v", ErrTxFailed, err) } if p.TokenID == "" || p.Amount == 0 { return 0, fmt.Errorf("%w: BURN_TOKEN: token_id and amount are required", ErrTxFailed) } tokenItem, dbErr := txn.Get([]byte(prefixToken + p.TokenID)) if dbErr != nil { return 0, fmt.Errorf("%w: BURN_TOKEN: token %s not found", ErrTxFailed, p.TokenID) } var tokenRec TokenRecord if err := tokenItem.Value(func(v []byte) error { return json.Unmarshal(v, &tokenRec) }); err != nil { return 0, fmt.Errorf("%w: BURN_TOKEN: corrupt token record", ErrTxFailed) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("BURN_TOKEN fee debit: %w", err) } if err := c.debitTokenBalance(txn, p.TokenID, tx.From, p.Amount); err != nil { return 0, fmt.Errorf("BURN_TOKEN debit: %w", err) } // Reduce total supply. if tokenRec.TotalSupply >= p.Amount { tokenRec.TotalSupply -= p.Amount } else { tokenRec.TotalSupply = 0 } val, _ := json.Marshal(tokenRec) if err := txn.Set([]byte(prefixToken+p.TokenID), val); err != nil { return 0, fmt.Errorf("BURN_TOKEN update supply: %w", err) } log.Printf("[CHAIN] BURN_TOKEN id=%s amount=%d newSupply=%d burner=%s", p.TokenID, p.Amount, tokenRec.TotalSupply, tx.From[:min(8, len(tx.From))]) case EventMintNFT: var p MintNFTPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: MINT_NFT bad payload: %v", ErrTxFailed, err) } if p.Name == "" { return 0, fmt.Errorf("%w: MINT_NFT: name is required", ErrTxFailed) } if tx.Fee < MinMintNFTFee { return 0, fmt.Errorf("%w: MINT_NFT fee %d < MinMintNFTFee %d", ErrTxFailed, tx.Fee, MinMintNFTFee) } nftID := computeNFTID(tx.From, tx.ID) if _, dbErr := txn.Get([]byte(prefixNFT + nftID)); dbErr == nil { return 0, fmt.Errorf("%w: MINT_NFT: NFT %s already exists", ErrTxFailed, nftID) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("MINT_NFT debit: %w", err) } var height uint64 if hi, hErr := txn.Get([]byte(prefixHeight)); hErr == nil { _ = hi.Value(func(val []byte) error { return json.Unmarshal(val, &height) }) } nft := NFTRecord{ NFTID: nftID, Name: p.Name, Description: p.Description, URI: p.URI, Attributes: p.Attributes, Owner: tx.From, Issuer: tx.From, MintedAt: height, } val, _ := json.Marshal(nft) if err := txn.Set([]byte(prefixNFT+nftID), val); err != nil { return 0, fmt.Errorf("store NFT: %w", err) } if err := txn.Set([]byte(prefixNFTOwner+tx.From+":"+nftID), []byte{}); err != nil { return 0, fmt.Errorf("index NFT owner: %w", err) } log.Printf("[CHAIN] MINT_NFT id=%s name=%q owner=%s", nftID, p.Name, tx.From[:min(8, len(tx.From))]) case EventTransferNFT: var p TransferNFTPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: TRANSFER_NFT bad payload: %v", ErrTxFailed, err) } if p.NFTID == "" { return 0, fmt.Errorf("%w: TRANSFER_NFT: nft_id is required", ErrTxFailed) } if tx.To == "" { return 0, fmt.Errorf("%w: TRANSFER_NFT: recipient (to) is required", ErrTxFailed) } nftItem, dbErr := txn.Get([]byte(prefixNFT + p.NFTID)) if dbErr != nil { return 0, fmt.Errorf("%w: TRANSFER_NFT: NFT %s not found", ErrTxFailed, p.NFTID) } var nft NFTRecord if err := nftItem.Value(func(v []byte) error { return json.Unmarshal(v, &nft) }); err != nil { return 0, fmt.Errorf("%w: TRANSFER_NFT: corrupt NFT record", ErrTxFailed) } if nft.Burned { return 0, fmt.Errorf("%w: TRANSFER_NFT: NFT %s is burned", ErrTxFailed, p.NFTID) } if nft.Owner != tx.From { return 0, fmt.Errorf("%w: TRANSFER_NFT: %s is not the owner of NFT %s", ErrTxFailed, tx.From[:min(8, len(tx.From))], p.NFTID) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("TRANSFER_NFT fee debit: %w", err) } // Remove old owner index, add new one. _ = txn.Delete([]byte(prefixNFTOwner + tx.From + ":" + p.NFTID)) if err := txn.Set([]byte(prefixNFTOwner+tx.To+":"+p.NFTID), []byte{}); err != nil { return 0, fmt.Errorf("index new NFT owner: %w", err) } nft.Owner = tx.To val, _ := json.Marshal(nft) if err := txn.Set([]byte(prefixNFT+p.NFTID), val); err != nil { return 0, fmt.Errorf("update NFT owner: %w", err) } case EventBurnNFT: var p BurnNFTPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return 0, fmt.Errorf("%w: BURN_NFT bad payload: %v", ErrTxFailed, err) } if p.NFTID == "" { return 0, fmt.Errorf("%w: BURN_NFT: nft_id is required", ErrTxFailed) } nftItem, dbErr := txn.Get([]byte(prefixNFT + p.NFTID)) if dbErr != nil { return 0, fmt.Errorf("%w: BURN_NFT: NFT %s not found", ErrTxFailed, p.NFTID) } var nft NFTRecord if err := nftItem.Value(func(v []byte) error { return json.Unmarshal(v, &nft) }); err != nil { return 0, fmt.Errorf("%w: BURN_NFT: corrupt NFT record", ErrTxFailed) } if nft.Owner != tx.From { return 0, fmt.Errorf("%w: BURN_NFT: %s is not the owner", ErrTxFailed, tx.From[:min(8, len(tx.From))]) } if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("BURN_NFT fee debit: %w", err) } _ = txn.Delete([]byte(prefixNFTOwner + tx.From + ":" + p.NFTID)) nft.Burned = true nft.Owner = "" val, _ := json.Marshal(nft) if err := txn.Set([]byte(prefixNFT+p.NFTID), val); err != nil { return 0, fmt.Errorf("BURN_NFT update: %w", err) } log.Printf("[CHAIN] BURN_NFT id=%s burner=%s", p.NFTID, tx.From[:min(8, len(tx.From))]) case EventBlockReward: return 0, fmt.Errorf("%w: BLOCK_REWARD is a synthetic event and cannot be included in blocks", ErrTxFailed) default: // Forward-compatibility: a tx with an EventType this binary doesn't // recognise is treated as a no-op rather than a hard error. This // lets newer clients include newer event kinds in blocks without // splitting the validator set every time a feature lands. // // Still charge the fee so the tx isn't a free spam vector: if an // attacker sends bogus-type txs, they pay for each one like any // other tx. The validator pockets the fee via the outer AddBlock // loop (collectedFees += tx.Fee). if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return 0, fmt.Errorf("%w: unknown-type fee debit: %v", ErrTxFailed, err) } log.Printf("[CHAIN] unknown event type %q in tx %s — applied as no-op (binary is older than this tx)", tx.Type, tx.ID) } return 0, nil } func (c *Chain) applyOpenPayChan(txn *badger.Txn, tx *Transaction) error { var p OpenPayChanPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return err } if p.ChannelID == "" || p.PartyA == "" || p.PartyB == "" { return fmt.Errorf("missing channel fields") } if tx.From != p.PartyA { return fmt.Errorf("tx.From must be PartyA") } if p.DepositA+p.DepositB == 0 { return fmt.Errorf("at least one deposit must be > 0") } // Verify PartyB's counter-signature over the channel parameters. sigPayload := payChanSigPayload(p.ChannelID, p.PartyA, p.PartyB, p.DepositA, p.DepositB, p.ExpiryBlock) if ok, err := verifyEd25519(p.PartyB, sigPayload, p.SigB); err != nil || !ok { return fmt.Errorf("invalid PartyB signature") } // Check channel does not already exist. if _, existErr := txn.Get([]byte(prefixPayChan + p.ChannelID)); existErr == nil { return fmt.Errorf("channel %s already exists", p.ChannelID) } // Lock deposits. if p.DepositA > 0 { if err := c.debitBalance(txn, p.PartyA, p.DepositA+tx.Fee); err != nil { return fmt.Errorf("debit PartyA: %w", err) } } else { if err := c.debitBalance(txn, p.PartyA, tx.Fee); err != nil { return fmt.Errorf("debit PartyA fee: %w", err) } } if p.DepositB > 0 { if err := c.debitBalance(txn, p.PartyB, p.DepositB); err != nil { return fmt.Errorf("debit PartyB: %w", err) } } // Read current block height for OpenedBlock. var height uint64 item, err := txn.Get([]byte(prefixHeight)) if err == nil { _ = item.Value(func(val []byte) error { return json.Unmarshal(val, &height) }) } state := PayChanState{ ChannelID: p.ChannelID, PartyA: p.PartyA, PartyB: p.PartyB, DepositA: p.DepositA, DepositB: p.DepositB, ExpiryBlock: p.ExpiryBlock, OpenedBlock: height, Nonce: 0, } val, err := json.Marshal(state) if err != nil { return err } return txn.Set([]byte(prefixPayChan+p.ChannelID), val) } func (c *Chain) applyClosePayChan(txn *badger.Txn, tx *Transaction) error { var p ClosePayChanPayload if err := json.Unmarshal(tx.Payload, &p); err != nil { return err } // Load channel state. item, err := txn.Get([]byte(prefixPayChan + p.ChannelID)) if err != nil { return fmt.Errorf("channel %s not found", p.ChannelID) } var state PayChanState if err := item.Value(func(val []byte) error { return json.Unmarshal(val, &state) }); err != nil { return err } if state.Closed { return fmt.Errorf("channel %s already closed", p.ChannelID) } if p.Nonce < state.Nonce { return fmt.Errorf("stale state: nonce %d < current %d", p.Nonce, state.Nonce) } total := state.DepositA + state.DepositB if p.BalanceA+p.BalanceB != total { return fmt.Errorf("balance sum %d != total deposits %d", p.BalanceA+p.BalanceB, total) } // Verify both parties' signatures over the final state. sigPayload := payChanCloseSigPayload(p.ChannelID, p.BalanceA, p.BalanceB, p.Nonce) if okA, err := verifyEd25519(state.PartyA, sigPayload, p.SigA); err != nil || !okA { return fmt.Errorf("invalid PartyA close signature") } if okB, err := verifyEd25519(state.PartyB, sigPayload, p.SigB); err != nil || !okB { return fmt.Errorf("invalid PartyB close signature") } // Distribute balances. if p.BalanceA > 0 { if err := c.creditBalance(txn, state.PartyA, p.BalanceA); err != nil { return fmt.Errorf("credit PartyA: %w", err) } } if p.BalanceB > 0 { if err := c.creditBalance(txn, state.PartyB, p.BalanceB); err != nil { return fmt.Errorf("credit PartyB: %w", err) } } // Deduct fee from submitter. if err := c.debitBalance(txn, tx.From, tx.Fee); err != nil { return fmt.Errorf("debit closer fee: %w", err) } // Mark channel closed. state.Closed = true state.Nonce = p.Nonce val, err := json.Marshal(state) if err != nil { return err } return txn.Set([]byte(prefixPayChan+p.ChannelID), val) } // payChanSigPayload returns the bytes both parties sign when agreeing to open a channel. func payChanSigPayload(channelID, partyA, partyB string, depositA, depositB, expiryBlock uint64) []byte { data, _ := json.Marshal(struct { ChannelID string `json:"channel_id"` PartyA string `json:"party_a"` PartyB string `json:"party_b"` DepositA uint64 `json:"deposit_a_ut"` DepositB uint64 `json:"deposit_b_ut"` ExpiryBlock uint64 `json:"expiry_block"` }{channelID, partyA, partyB, depositA, depositB, expiryBlock}) return data } // payChanCloseSigPayload returns the bytes both parties sign to close a channel. func payChanCloseSigPayload(channelID string, balanceA, balanceB, nonce uint64) []byte { data, _ := json.Marshal(struct { ChannelID string `json:"channel_id"` BalanceA uint64 `json:"balance_a_ut"` BalanceB uint64 `json:"balance_b_ut"` Nonce uint64 `json:"nonce"` }{channelID, balanceA, balanceB, nonce}) return data } // incrementRep reads, modifies, and writes a RepStats entry. func (c *Chain) incrementRep(txn *badger.Txn, pubKeyHex string, fn func(*RepStats)) error { key := []byte(prefixReputation + pubKeyHex) var r RepStats item, err := txn.Get(key) if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { return err } if err == nil { _ = item.Value(func(val []byte) error { return json.Unmarshal(val, &r) }) } fn(&r) r.Score = r.ComputeScore() val, err := json.Marshal(r) if err != nil { return err } return txn.Set(key, val) } func (c *Chain) readBalance(txn *badger.Txn, pubKeyHex string) (uint64, error) { item, err := txn.Get([]byte(prefixBalance + pubKeyHex)) if errors.Is(err, badger.ErrKeyNotFound) { return 0, nil } if err != nil { return 0, err } var bal uint64 err = item.Value(func(val []byte) error { return json.Unmarshal(val, &bal) }) return bal, err } func (c *Chain) writeBalance(txn *badger.Txn, pubKeyHex string, bal uint64) error { val, err := json.Marshal(bal) if err != nil { return err } return txn.Set([]byte(prefixBalance+pubKeyHex), val) } func (c *Chain) creditBalance(txn *badger.Txn, pubKeyHex string, amount uint64) error { bal, err := c.readBalance(txn, pubKeyHex) if err != nil { return err } return c.writeBalance(txn, pubKeyHex, bal+amount) } func (c *Chain) debitBalance(txn *badger.Txn, pubKeyHex string, amount uint64) error { bal, err := c.readBalance(txn, pubKeyHex) if err != nil { return err // DB error — not ErrTxFailed } if bal < amount { return fmt.Errorf("%w: insufficient balance for %s: have %d µT, need %d µT", ErrTxFailed, pubKeyHex[:min(8, len(pubKeyHex))], bal, amount) } return c.writeBalance(txn, pubKeyHex, bal-amount) } func min(a, b int) int { if a < b { return a } return b } // ── Stake helpers ───────────────────────────────────────────────────────────── func (c *Chain) readStake(txn *badger.Txn, pubKeyHex string) uint64 { item, err := txn.Get([]byte(prefixStake + pubKeyHex)) if err != nil { return 0 } var amount uint64 _ = item.Value(func(val []byte) error { return json.Unmarshal(val, &amount) }) return amount } func (c *Chain) writeStake(txn *badger.Txn, pubKeyHex string, amount uint64) error { if amount == 0 { err := txn.Delete([]byte(prefixStake + pubKeyHex)) if errors.Is(err, badger.ErrKeyNotFound) { return nil } return err } val, _ := json.Marshal(amount) return txn.Set([]byte(prefixStake+pubKeyHex), val) } // Stake returns the staked amount for a public key (public query). func (c *Chain) Stake(pubKeyHex string) (uint64, error) { var amount uint64 err := c.db.View(func(txn *badger.Txn) error { amount = c.readStake(txn, pubKeyHex) return nil }) return amount, err } // ── Token balance helpers ────────────────────────────────────────────────────── func tokenBalKey(tokenID, pubKeyHex string) []byte { return []byte(prefixTokenBal + tokenID + ":" + pubKeyHex) } func (c *Chain) readTokenBalance(txn *badger.Txn, tokenID, pubKeyHex string) uint64 { item, err := txn.Get(tokenBalKey(tokenID, pubKeyHex)) if err != nil { return 0 } var bal uint64 _ = item.Value(func(val []byte) error { return json.Unmarshal(val, &bal) }) return bal } func (c *Chain) writeTokenBalance(txn *badger.Txn, tokenID, pubKeyHex string, bal uint64) error { if bal == 0 { err := txn.Delete(tokenBalKey(tokenID, pubKeyHex)) if errors.Is(err, badger.ErrKeyNotFound) { return nil } return err } val, _ := json.Marshal(bal) return txn.Set(tokenBalKey(tokenID, pubKeyHex), val) } func (c *Chain) creditTokenBalance(txn *badger.Txn, tokenID, pubKeyHex string, amount uint64) error { bal := c.readTokenBalance(txn, tokenID, pubKeyHex) return c.writeTokenBalance(txn, tokenID, pubKeyHex, bal+amount) } func (c *Chain) debitTokenBalance(txn *badger.Txn, tokenID, pubKeyHex string, amount uint64) error { bal := c.readTokenBalance(txn, tokenID, pubKeyHex) if bal < amount { return fmt.Errorf("%w: insufficient token balance for %s: have %d, need %d", ErrTxFailed, pubKeyHex[:min(8, len(pubKeyHex))], bal, amount) } return c.writeTokenBalance(txn, tokenID, pubKeyHex, bal-amount) } // TokenBalance returns the token balance for a public key (public query). func (c *Chain) TokenBalance(tokenID, pubKeyHex string) (uint64, error) { var bal uint64 err := c.db.View(func(txn *badger.Txn) error { bal = c.readTokenBalance(txn, tokenID, pubKeyHex) return nil }) return bal, err } // Token returns a TokenRecord by ID. func (c *Chain) Token(tokenID string) (*TokenRecord, error) { var rec TokenRecord err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(prefixToken + tokenID)) if errors.Is(err, badger.ErrKeyNotFound) { return nil } if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &rec) }) }) if err != nil || rec.TokenID == "" { return nil, err } return &rec, nil } // Tokens returns all issued tokens. func (c *Chain) Tokens() ([]TokenRecord, error) { var out []TokenRecord err := c.db.View(func(txn *badger.Txn) error { prefix := []byte(prefixToken) opts := badger.DefaultIteratorOptions opts.Prefix = prefix it := txn.NewIterator(opts) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { var rec TokenRecord if err := it.Item().Value(func(val []byte) error { return json.Unmarshal(val, &rec) }); err == nil { out = append(out, rec) } } return nil }) return out, err } // computeTokenID derives a deterministic token ID from issuer pubkey + symbol. func computeTokenID(issuerPub, symbol string) string { h := sha256.Sum256([]byte("token:" + issuerPub + ":" + symbol)) return hex.EncodeToString(h[:16]) } // computeNFTID derives a deterministic NFT ID from minter pubkey + tx ID. func computeNFTID(minterPub, txID string) string { h := sha256.Sum256([]byte("nft:" + minterPub + ":" + txID)) return hex.EncodeToString(h[:16]) } // NFT returns an NFTRecord by ID. func (c *Chain) NFT(nftID string) (*NFTRecord, error) { var rec NFTRecord err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(prefixNFT + nftID)) if errors.Is(err, badger.ErrKeyNotFound) { return nil } if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &rec) }) }) if err != nil || rec.NFTID == "" { return nil, err } return &rec, nil } // NFTsByOwner returns all NFTs owned by a public key (excluding burned). func (c *Chain) NFTsByOwner(ownerPub string) ([]NFTRecord, error) { var out []NFTRecord err := c.db.View(func(txn *badger.Txn) error { prefix := []byte(prefixNFTOwner + ownerPub + ":") opts := badger.DefaultIteratorOptions opts.Prefix = prefix opts.PrefetchValues = false it := txn.NewIterator(opts) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { key := string(it.Item().Key()) nftID := strings.TrimPrefix(key, prefixNFTOwner+ownerPub+":") item, err := txn.Get([]byte(prefixNFT + nftID)) if err != nil { continue } var rec NFTRecord if err := item.Value(func(val []byte) error { return json.Unmarshal(val, &rec) }); err == nil && !rec.Burned { out = append(out, rec) } } return nil }) return out, err } // NFTs returns all minted NFTs (including burned for history). func (c *Chain) NFTs() ([]NFTRecord, error) { var out []NFTRecord err := c.db.View(func(txn *badger.Txn) error { prefix := []byte(prefixNFT) opts := badger.DefaultIteratorOptions opts.Prefix = prefix it := txn.NewIterator(opts) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { var rec NFTRecord if err := it.Item().Value(func(val []byte) error { return json.Unmarshal(val, &rec) }); err == nil { out = append(out, rec) } } return nil }) return out, err } // RegisteredRelayInfo wraps a relay's Ed25519 pub key with its registration payload. type RegisteredRelayInfo struct { PubKey string `json:"pub_key"` Address string `json:"address"` Relay RegisterRelayPayload `json:"relay"` LastHeartbeat int64 `json:"last_heartbeat,omitempty"` // unix seconds } // writeRelayHeartbeat stores the given unix timestamp as the relay's // last-heartbeat marker. Called from REGISTER_RELAY and from HEARTBEAT // txs originating from registered relays. func (c *Chain) writeRelayHeartbeat(txn *badger.Txn, nodePub string, unixSec int64) error { var buf [8]byte binary.BigEndian.PutUint64(buf[:], uint64(unixSec)) return txn.Set([]byte(prefixRelayHB+nodePub), buf[:]) } // readRelayHeartbeat returns the stored unix seconds, or 0 if missing. func (c *Chain) readRelayHeartbeat(txn *badger.Txn, nodePub string) int64 { item, err := txn.Get([]byte(prefixRelayHB + nodePub)) if err != nil { return 0 } var out int64 _ = item.Value(func(val []byte) error { if len(val) != 8 { return nil } out = int64(binary.BigEndian.Uint64(val)) return nil }) return out } // RegisteredRelays returns every relay node that has submitted EventRegisterRelay // AND whose last heartbeat is within RelayHeartbeatTTL. Dead relays (node // died, network went away) are filtered out so clients don't waste // bandwidth trying to deliver through them. // // A relay without any recorded heartbeat is treated as live — this covers // historical data from nodes upgraded to this version; they'll start // recording heartbeats on the next HEARTBEAT tx. func (c *Chain) RegisteredRelays() ([]RegisteredRelayInfo, error) { var out []RegisteredRelayInfo now := time.Now().Unix() err := c.db.View(func(txn *badger.Txn) error { prefix := []byte(prefixRelay) opts := badger.DefaultIteratorOptions opts.Prefix = prefix it := txn.NewIterator(opts) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { item := it.Item() key := string(item.Key()) pubKey := key[len(prefixRelay):] var p RegisterRelayPayload if err := item.Value(func(val []byte) error { return json.Unmarshal(val, &p) }); err != nil { continue } hb := c.readRelayHeartbeat(txn, pubKey) if hb > 0 && now-hb > RelayHeartbeatTTL { continue // stale, delist } out = append(out, RegisteredRelayInfo{ PubKey: pubKey, Address: pubKeyToAddr(pubKey), Relay: p, LastHeartbeat: hb, }) } return nil }) return out, err } // contactRecord is the on-chain storage representation of a contact relationship. type contactRecord struct { Status string `json:"status"` Intro string `json:"intro,omitempty"` FeeUT uint64 `json:"fee_ut"` TxID string `json:"tx_id"` CreatedAt int64 `json:"created_at"` } // updateContactStatus updates the status of an existing contact record. // Only Pending records may be transitioned — re-accepting an already-accepted // or blocked request is a no-op error so an attacker cannot spam state changes // on another user's contact list by replaying ACCEPT_CONTACT txs. func (c *Chain) updateContactStatus(txn *badger.Txn, key string, status ContactStatus) error { item, err := txn.Get([]byte(key)) if err != nil { return fmt.Errorf("contact record not found") } var rec contactRecord if err := item.Value(func(val []byte) error { return json.Unmarshal(val, &rec) }); err != nil { return err } // Allowed transitions: // Pending -> Accepted (ACCEPT_CONTACT) // Pending -> Blocked (BLOCK_CONTACT) // Accepted -> Blocked (BLOCK_CONTACT, recipient changes their mind) // Everything else is rejected. cur := ContactStatus(rec.Status) switch status { case ContactAccepted: if cur != ContactPending { return fmt.Errorf("cannot accept contact: current status is %q, must be %q", cur, ContactPending) } case ContactBlocked: if cur != ContactPending && cur != ContactAccepted { return fmt.Errorf("cannot block contact: current status is %q", cur) } } rec.Status = string(status) val, _ := json.Marshal(rec) return txn.Set([]byte(key), val) } // ContactRequests returns all incoming contact records for the given Ed25519 pubkey. // Results include pending, accepted, and blocked records. func (c *Chain) ContactRequests(targetPub string) ([]ContactInfo, error) { prefix := []byte(prefixContactIn + targetPub + ":") var out []ContactInfo err := c.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.Prefix = prefix it := txn.NewIterator(opts) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { item := it.Item() key := string(item.Key()) // key format: contact_in:: requesterPub := key[len(prefixContactIn)+len(targetPub)+1:] var rec contactRecord if err := item.Value(func(val []byte) error { return json.Unmarshal(val, &rec) }); err != nil { continue } out = append(out, ContactInfo{ RequesterPub: requesterPub, RequesterAddr: pubKeyToAddr(requesterPub), Status: ContactStatus(rec.Status), Intro: rec.Intro, FeeUT: rec.FeeUT, TxID: rec.TxID, CreatedAt: rec.CreatedAt, }) } return nil }) return out, err } // IdentityInfo returns identity information for the given Ed25519 public key. // It works even if the key has never submitted a REGISTER_KEY transaction. func (c *Chain) IdentityInfo(pubKey string) (*IdentityInfo, error) { info := &IdentityInfo{ PubKey: pubKey, Address: pubKeyToAddr(pubKey), } err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(prefixIdentity + pubKey)) if err != nil { return nil // not registered — return defaults } return item.Value(func(val []byte) error { var p RegisterKeyPayload if err := json.Unmarshal(val, &p); err != nil { return err } info.Registered = true info.Nickname = p.Nickname info.X25519Pub = p.X25519PubKey return nil }) }) return info, err } // InitValidators replaces the on-chain validator set with the given pub keys. // Any stale keys from previous runs are deleted first so that old Docker // volumes or leftover DB state can never inject phantom validators into PBFT. // Dynamic ADD_VALIDATOR / REMOVE_VALIDATOR transactions layer on top of this // base set for the lifetime of the running node. func (c *Chain) InitValidators(pubKeys []string) error { return c.db.Update(func(txn *badger.Txn) error { // Collect and delete all existing validator keys. opts := badger.DefaultIteratorOptions opts.PrefetchValues = false prefix := []byte(prefixValidator) it := txn.NewIterator(opts) var toDelete [][]byte for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { k := make([]byte, len(it.Item().Key())) copy(k, it.Item().Key()) toDelete = append(toDelete, k) } it.Close() for _, k := range toDelete { if err := txn.Delete(k); err != nil { return err } } // Write the authoritative set from CLI flags. for _, pk := range pubKeys { if err := txn.Set([]byte(prefixValidator+pk), []byte{}); err != nil { return err } } return nil }) } // ValidatorSet returns the current active validator pub keys (sorted by insertion order). func (c *Chain) ValidatorSet() ([]string, error) { var validators []string err := c.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchValues = false it := txn.NewIterator(opts) defer it.Close() prefix := []byte(prefixValidator) for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { key := string(it.Item().Key()) validators = append(validators, key[len(prefixValidator):]) } return nil }) return validators, err } // validatorSetTxn returns the full current validator set inside the given // txn. Used by ADD_VALIDATOR to compute the ⌈2/3⌉ approval threshold. // Ordering is iteration order (BadgerDB key order), stable enough for // threshold math. func (c *Chain) validatorSetTxn(txn *badger.Txn) ([]string, error) { prefix := []byte(prefixValidator) opts := badger.DefaultIteratorOptions opts.PrefetchValues = false opts.Prefix = prefix it := txn.NewIterator(opts) defer it.Close() var out []string for it.Rewind(); it.Valid(); it.Next() { out = append(out, string(it.Item().Key()[len(prefix):])) } return out, nil } // contains is a tiny generic-free helper: true if s is in haystack. func contains(haystack []string, s string) bool { for _, h := range haystack { if h == s { return true } } return false } // isValidatorTxn checks if pubKey is an active validator inside a read/write txn. func (c *Chain) isValidatorTxn(txn *badger.Txn, pubKey string) (bool, error) { _, err := txn.Get([]byte(prefixValidator + pubKey)) if err == badger.ErrKeyNotFound { return false, nil } if err != nil { return false, err } return true, nil } // verifyEd25519 verifies an Ed25519 signature without importing the identity package // (which would create a circular dependency). func verifyEd25519(pubKeyHex string, msg, sig []byte) (bool, error) { pubBytes, err := hex.DecodeString(pubKeyHex) if err != nil { return false, fmt.Errorf("invalid pub key hex: %w", err) } return ed25519.Verify(ed25519.PublicKey(pubBytes), msg, sig), nil } // --- contract helpers --- // computeContractID returns hex(sha256(deployerPub || wasmBytes)[:16]). // Stable and deterministic: same deployer + same WASM → same contract ID. func computeContractID(deployerPub string, wasmBytes []byte) string { import256 := sha256.New() import256.Write([]byte(deployerPub)) import256.Write(wasmBytes) return hex.EncodeToString(import256.Sum(nil)[:16]) } // decodeBase64 decodes a standard or raw base64 string into dst. // Returns the number of bytes written. func decodeBase64(s string, dst []byte) (int, error) { import64 := base64.StdEncoding // Try standard encoding first, then raw (no padding). n, err := import64.Decode(dst, []byte(s)) if err != nil { n, err = base64.RawStdEncoding.Decode(dst, []byte(s)) } return n, err } // Contracts returns all deployed contracts (WASM bytes omitted). func (c *Chain) Contracts() ([]ContractRecord, error) { var out []ContractRecord err := c.db.View(func(txn *badger.Txn) error { prefix := []byte(prefixContract) opts := badger.DefaultIteratorOptions opts.Prefix = prefix it := txn.NewIterator(opts) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { var rec ContractRecord if err := it.Item().Value(func(v []byte) error { return json.Unmarshal(v, &rec) }); err != nil { continue } rec.WASMBytes = nil // strip bytes from list response out = append(out, rec) } return nil }) return out, err } // GetContract returns the ContractRecord for the given contract ID, or nil if not found. func (c *Chain) GetContract(contractID string) (*ContractRecord, error) { var rec ContractRecord err := c.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(prefixContract + contractID)) if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &rec) }) }) if errors.Is(err, badger.ErrKeyNotFound) { return nil, nil } return &rec, err } // GetContractState returns the raw state value for a contract key, or nil if not set. func (c *Chain) GetContractState(contractID, key string) ([]byte, error) { var result []byte err := c.db.View(func(txn *badger.Txn) error { dbKey := []byte(prefixContractState + contractID + ":" + key) item, err := txn.Get(dbKey) if err != nil { return err } return item.Value(func(val []byte) error { result = make([]byte, len(val)) copy(result, val) return nil }) }) if errors.Is(err, badger.ErrKeyNotFound) { return nil, nil } return result, err } // ContractLogs returns the most recent log entries for a contract, newest first. // limit <= 0 returns up to 100 entries. func (c *Chain) ContractLogs(contractID string, limit int) ([]ContractLogEntry, error) { if limit <= 0 || limit > 100 { limit = 100 } prefix := []byte(prefixContractLog + contractID + ":") var entries []ContractLogEntry err := c.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.Reverse = true opts.Prefix = prefix it := txn.NewIterator(opts) defer it.Close() // Seek to the end of the prefix range (prefix + 0xFF). seekKey := append(append([]byte{}, prefix...), 0xFF) for it.Seek(seekKey); it.ValidForPrefix(prefix) && len(entries) < limit; it.Next() { var entry ContractLogEntry if err := it.Item().Value(func(val []byte) error { return json.Unmarshal(val, &entry) }); err == nil { entries = append(entries, entry) } } return nil }) return entries, err } // maxContractCallDepth is the maximum nesting depth for inter-contract calls. // Prevents infinite recursion and stack overflows. const maxContractCallDepth = 8 // chainHostEnv implements VMHostEnv backed by a live badger.Txn. type chainHostEnv struct { txn *badger.Txn contractID string caller string blockHeight uint64 txID string logSeq int chain *Chain depth int // inter-contract call nesting depth (0 = top-level) } func newChainHostEnv(txn *badger.Txn, contractID, caller, txID string, blockHeight uint64, chain *Chain) *chainHostEnv { return newChainHostEnvDepth(txn, contractID, caller, txID, blockHeight, chain, 0) } func newChainHostEnvDepth(txn *badger.Txn, contractID, caller, txID string, blockHeight uint64, chain *Chain, depth int) *chainHostEnv { // Count existing log entries for this contract+block so that logs from // multiple TXs within the same block get unique sequence numbers and don't // overwrite each other. startSeq := countContractLogsInBlock(txn, contractID, blockHeight) return &chainHostEnv{ txn: txn, contractID: contractID, caller: caller, txID: txID, blockHeight: blockHeight, chain: chain, logSeq: startSeq, depth: depth, } } // countContractLogsInBlock counts how many log entries already exist for the // given contract at the given block height (used to pick the starting logSeq). func countContractLogsInBlock(txn *badger.Txn, contractID string, blockHeight uint64) int { prefix := []byte(fmt.Sprintf("%s%s:%020d:", prefixContractLog, contractID, blockHeight)) opts := badger.DefaultIteratorOptions opts.PrefetchValues = false opts.Prefix = prefix it := txn.NewIterator(opts) defer it.Close() n := 0 for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { n++ } return n } func (e *chainHostEnv) GetState(key []byte) ([]byte, error) { dbKey := []byte(prefixContractState + e.contractID + ":" + string(key)) item, err := e.txn.Get(dbKey) if errors.Is(err, badger.ErrKeyNotFound) { return nil, nil } if err != nil { return nil, err } var val []byte err = item.Value(func(v []byte) error { val = make([]byte, len(v)) copy(val, v) return nil }) return val, err } func (e *chainHostEnv) SetState(key, value []byte) error { dbKey := []byte(prefixContractState + e.contractID + ":" + string(key)) return e.txn.Set(dbKey, value) } func (e *chainHostEnv) GetBalance(pubKeyHex string) (uint64, error) { return e.chain.readBalance(e.txn, pubKeyHex) } func (e *chainHostEnv) Transfer(from, to string, amount uint64) error { if err := e.chain.debitBalance(e.txn, from, amount); err != nil { return err } return e.chain.creditBalance(e.txn, to, amount) } func (e *chainHostEnv) GetCaller() string { return e.caller } func (e *chainHostEnv) GetBlockHeight() uint64 { return e.blockHeight } // GetContractTreasury returns a deterministic ownerless address for this // contract derived as hex(sha256(contractID + ":treasury")). // No private key exists for this address; only the contract itself can spend // from it via the transfer host function. func (e *chainHostEnv) GetContractTreasury() string { h := sha256.Sum256([]byte(e.contractID + ":treasury")) return hex.EncodeToString(h[:]) } func (e *chainHostEnv) Log(msg string) { log.Printf("[CONTRACT %s] %s", e.contractID[:8], msg) entry := ContractLogEntry{ ContractID: e.contractID, BlockHeight: e.blockHeight, TxID: e.txID, Seq: e.logSeq, Message: msg, } val, _ := json.Marshal(entry) // Key: clog::: key := fmt.Sprintf("%s%s:%020d:%05d", prefixContractLog, e.contractID, e.blockHeight, e.logSeq) _ = e.txn.Set([]byte(key), val) e.logSeq++ } // CallContract executes a method on another deployed contract from within // the current contract execution. The caller seen by the sub-contract is // the current contract's ID. State changes share the same badger.Txn so // they are all committed or rolled back atomically with the parent call. func (e *chainHostEnv) CallContract(contractID, method string, argsJSON []byte, gasLimit uint64) (uint64, error) { if e.depth >= maxContractCallDepth { return 0, fmt.Errorf("%w: inter-contract call depth limit (%d) exceeded", ErrTxFailed, maxContractCallDepth) } if e.chain.vm == nil { return 0, fmt.Errorf("%w: VM not available for inter-contract call", ErrTxFailed) } item, err := e.txn.Get([]byte(prefixContract + contractID)) if err != nil { if errors.Is(err, badger.ErrKeyNotFound) { return 0, fmt.Errorf("%w: contract %s not found", ErrTxFailed, contractID) } return 0, err } var rec ContractRecord if err := item.Value(func(val []byte) error { return json.Unmarshal(val, &rec) }); err != nil { return 0, fmt.Errorf("%w: corrupt contract record for inter-contract call: %v", ErrTxFailed, err) } // Sub-contract sees the current contract as its caller. subEnv := newChainHostEnvDepth(e.txn, contractID, e.contractID, e.txID, e.blockHeight, e.chain, e.depth+1) // Same timeout guard as the top-level CALL_CONTRACT path; protects // against recursive contract calls that never return. subCtx, subCancel := context.WithTimeout(context.Background(), 30*time.Second) gasUsed, callErr := e.chain.vm.Call( subCtx, contractID, rec.WASMBytes, method, argsJSON, gasLimit, subEnv, ) subCancel() if callErr != nil { return gasUsed, fmt.Errorf("%w: sub-call %s.%s: %v", ErrTxFailed, contractID[:min(8, len(contractID))], method, callErr) } log.Printf("[CHAIN] inter-contract %s→%s.%s gasUsed=%d", e.contractID[:min(8, len(e.contractID))], contractID[:min(8, len(contractID))], method, gasUsed) return gasUsed, nil }