DChain single-node blockchain + React Native messenger client. Core: - PBFT consensus with multi-sig validator admission + equivocation slashing - BadgerDB + schema migration scaffold (CurrentSchemaVersion=0) - libp2p gossipsub (tx/v1, blocks/v1, relay/v1, version/v1) - Native Go contracts (username_registry) alongside WASM (wazero) - WebSocket gateway with topic-based fanout + Ed25519-nonce auth - Relay mailbox with NaCl envelope encryption (X25519 + Ed25519) - Prometheus /metrics, per-IP rate limit, body-size cap Deployment: - Single-node compose (deploy/single/) with Caddy TLS + optional Prometheus - 3-node dev compose (docker-compose.yml) with mocked internet topology - 3-validator prod compose (deploy/prod/) for federation - Auto-update from Gitea via /api/update-check + systemd timer - Build-time version injection (ldflags → node --version) - UI / Swagger toggle flags (DCHAIN_DISABLE_UI, DCHAIN_DISABLE_SWAGGER) Client (client-app/): - Expo / React Native / NativeWind - E2E NaCl encryption, typing indicator, contact requests - Auto-discovery of canonical contracts, chain_id aware, WS reconnect on node switch Documentation: - README.md, CHANGELOG.md, CONTEXT.md - deploy/single/README.md with 6 operator scenarios - deploy/UPDATE_STRATEGY.md with 4-layer forward-compat design - docs/contracts/*.md per contract
120 lines
4.4 KiB
Go
120 lines
4.4 KiB
Go
// Package node — unified event bus for SSE, WebSocket, and any future
|
|
// subscriber of block / tx / contract-log / inbox events.
|
|
//
|
|
// Before this file, emit code was duplicated at every commit callsite:
|
|
//
|
|
// go sseHub.EmitBlockWithTxs(b)
|
|
// go wsHub.EmitBlockWithTxs(b)
|
|
// go emitContractLogs(sseHub, wsHub, chain, b)
|
|
//
|
|
// With the bus, callers do one thing:
|
|
//
|
|
// go bus.EmitBlockWithTxs(b)
|
|
//
|
|
// Adding a new subscriber (metrics sampler, WAL replicator, IPFS mirror…)
|
|
// means registering once at startup — no edits at every call site.
|
|
package node
|
|
|
|
import (
|
|
"encoding/json"
|
|
|
|
"go-blockchain/blockchain"
|
|
)
|
|
|
|
// EventConsumer is what the bus calls for each event. Implementations are
|
|
// registered once at startup via Bus.Register; fanout happens inside
|
|
// Emit* methods.
|
|
//
|
|
// Methods may be called from multiple goroutines concurrently — consumers
|
|
// must be safe for concurrent use.
|
|
type EventConsumer interface {
|
|
OnBlock(*blockchain.Block)
|
|
OnTx(*blockchain.Transaction)
|
|
OnContractLog(blockchain.ContractLogEntry)
|
|
OnInbox(recipientX25519 string, summary json.RawMessage)
|
|
}
|
|
|
|
// EventBus fans events out to every registered consumer. Zero value is a
|
|
// valid empty bus (Emit* are no-ops until someone Register()s).
|
|
type EventBus struct {
|
|
consumers []EventConsumer
|
|
}
|
|
|
|
// NewEventBus returns a fresh bus with no consumers.
|
|
func NewEventBus() *EventBus { return &EventBus{} }
|
|
|
|
// Register appends a consumer. Not thread-safe — call once at startup
|
|
// before any Emit* is invoked.
|
|
func (b *EventBus) Register(c EventConsumer) {
|
|
b.consumers = append(b.consumers, c)
|
|
}
|
|
|
|
// EmitBlock notifies every consumer of a freshly-committed block.
|
|
// Does NOT iterate transactions — use EmitBlockWithTxs for that.
|
|
func (b *EventBus) EmitBlock(blk *blockchain.Block) {
|
|
for _, c := range b.consumers {
|
|
c.OnBlock(blk)
|
|
}
|
|
}
|
|
|
|
// EmitTx notifies every consumer of a single committed transaction.
|
|
// Synthetic BLOCK_REWARD records are skipped by the implementations that
|
|
// care (SSE already filters); the bus itself doesn't second-guess.
|
|
func (b *EventBus) EmitTx(tx *blockchain.Transaction) {
|
|
for _, c := range b.consumers {
|
|
c.OnTx(tx)
|
|
}
|
|
}
|
|
|
|
// EmitContractLog notifies every consumer of a contract log entry.
|
|
func (b *EventBus) EmitContractLog(entry blockchain.ContractLogEntry) {
|
|
for _, c := range b.consumers {
|
|
c.OnContractLog(entry)
|
|
}
|
|
}
|
|
|
|
// EmitInbox notifies every consumer of a new relay envelope stored for
|
|
// the given recipient. Summary is the minimal JSON the WS gateway ships
|
|
// to subscribers so the client can refresh on push instead of polling.
|
|
func (b *EventBus) EmitInbox(recipientX25519 string, summary json.RawMessage) {
|
|
for _, c := range b.consumers {
|
|
c.OnInbox(recipientX25519, summary)
|
|
}
|
|
}
|
|
|
|
// EmitBlockWithTxs is the common path invoked on commit: one block +
|
|
// every tx in it, so each consumer can index/fan out appropriately.
|
|
func (b *EventBus) EmitBlockWithTxs(blk *blockchain.Block) {
|
|
b.EmitBlock(blk)
|
|
for _, tx := range blk.Transactions {
|
|
b.EmitTx(tx)
|
|
}
|
|
}
|
|
|
|
// ─── Adapter: wrap the existing SSEHub in an EventConsumer ──────────────────
|
|
|
|
type sseEventAdapter struct{ h *SSEHub }
|
|
|
|
func (a sseEventAdapter) OnBlock(b *blockchain.Block) { a.h.EmitBlock(b) }
|
|
func (a sseEventAdapter) OnTx(tx *blockchain.Transaction) { a.h.EmitTx(tx) }
|
|
func (a sseEventAdapter) OnContractLog(e blockchain.ContractLogEntry) { a.h.EmitContractLog(e) }
|
|
// SSE has no inbox topic today — the existing hub doesn't expose one. The
|
|
// adapter silently drops it; when we add an inbox SSE event, this is the
|
|
// one place that needs an update.
|
|
func (a sseEventAdapter) OnInbox(string, json.RawMessage) {}
|
|
|
|
// WrapSSE converts an SSEHub into an EventConsumer for the bus.
|
|
func WrapSSE(h *SSEHub) EventConsumer { return sseEventAdapter{h} }
|
|
|
|
// ─── Adapter: wrap the WSHub ─────────────────────────────────────────────────
|
|
|
|
type wsEventAdapter struct{ h *WSHub }
|
|
|
|
func (a wsEventAdapter) OnBlock(b *blockchain.Block) { a.h.EmitBlock(b) }
|
|
func (a wsEventAdapter) OnTx(tx *blockchain.Transaction) { a.h.EmitTx(tx) }
|
|
func (a wsEventAdapter) OnContractLog(e blockchain.ContractLogEntry) { a.h.EmitContractLog(e) }
|
|
func (a wsEventAdapter) OnInbox(to string, sum json.RawMessage) { a.h.EmitInbox(to, sum) }
|
|
|
|
// WrapWS converts a WSHub into an EventConsumer for the bus.
|
|
func WrapWS(h *WSHub) EventConsumer { return wsEventAdapter{h} }
|