// Package node — unified event bus for SSE, WebSocket, and any future // subscriber of block / tx / contract-log / inbox events. // // Before this file, emit code was duplicated at every commit callsite: // // go sseHub.EmitBlockWithTxs(b) // go wsHub.EmitBlockWithTxs(b) // go emitContractLogs(sseHub, wsHub, chain, b) // // With the bus, callers do one thing: // // go bus.EmitBlockWithTxs(b) // // Adding a new subscriber (metrics sampler, WAL replicator, IPFS mirror…) // means registering once at startup — no edits at every call site. package node import ( "encoding/json" "go-blockchain/blockchain" ) // EventConsumer is what the bus calls for each event. Implementations are // registered once at startup via Bus.Register; fanout happens inside // Emit* methods. // // Methods may be called from multiple goroutines concurrently — consumers // must be safe for concurrent use. type EventConsumer interface { OnBlock(*blockchain.Block) OnTx(*blockchain.Transaction) OnContractLog(blockchain.ContractLogEntry) OnInbox(recipientX25519 string, summary json.RawMessage) } // EventBus fans events out to every registered consumer. Zero value is a // valid empty bus (Emit* are no-ops until someone Register()s). type EventBus struct { consumers []EventConsumer } // NewEventBus returns a fresh bus with no consumers. func NewEventBus() *EventBus { return &EventBus{} } // Register appends a consumer. Not thread-safe — call once at startup // before any Emit* is invoked. func (b *EventBus) Register(c EventConsumer) { b.consumers = append(b.consumers, c) } // EmitBlock notifies every consumer of a freshly-committed block. // Does NOT iterate transactions — use EmitBlockWithTxs for that. func (b *EventBus) EmitBlock(blk *blockchain.Block) { for _, c := range b.consumers { c.OnBlock(blk) } } // EmitTx notifies every consumer of a single committed transaction. // Synthetic BLOCK_REWARD records are skipped by the implementations that // care (SSE already filters); the bus itself doesn't second-guess. func (b *EventBus) EmitTx(tx *blockchain.Transaction) { for _, c := range b.consumers { c.OnTx(tx) } } // EmitContractLog notifies every consumer of a contract log entry. func (b *EventBus) EmitContractLog(entry blockchain.ContractLogEntry) { for _, c := range b.consumers { c.OnContractLog(entry) } } // EmitInbox notifies every consumer of a new relay envelope stored for // the given recipient. Summary is the minimal JSON the WS gateway ships // to subscribers so the client can refresh on push instead of polling. func (b *EventBus) EmitInbox(recipientX25519 string, summary json.RawMessage) { for _, c := range b.consumers { c.OnInbox(recipientX25519, summary) } } // EmitBlockWithTxs is the common path invoked on commit: one block + // every tx in it, so each consumer can index/fan out appropriately. func (b *EventBus) EmitBlockWithTxs(blk *blockchain.Block) { b.EmitBlock(blk) for _, tx := range blk.Transactions { b.EmitTx(tx) } } // ─── Adapter: wrap the existing SSEHub in an EventConsumer ────────────────── type sseEventAdapter struct{ h *SSEHub } func (a sseEventAdapter) OnBlock(b *blockchain.Block) { a.h.EmitBlock(b) } func (a sseEventAdapter) OnTx(tx *blockchain.Transaction) { a.h.EmitTx(tx) } func (a sseEventAdapter) OnContractLog(e blockchain.ContractLogEntry) { a.h.EmitContractLog(e) } // SSE has no inbox topic today — the existing hub doesn't expose one. The // adapter silently drops it; when we add an inbox SSE event, this is the // one place that needs an update. func (a sseEventAdapter) OnInbox(string, json.RawMessage) {} // WrapSSE converts an SSEHub into an EventConsumer for the bus. func WrapSSE(h *SSEHub) EventConsumer { return sseEventAdapter{h} } // ─── Adapter: wrap the WSHub ───────────────────────────────────────────────── type wsEventAdapter struct{ h *WSHub } func (a wsEventAdapter) OnBlock(b *blockchain.Block) { a.h.EmitBlock(b) } func (a wsEventAdapter) OnTx(tx *blockchain.Transaction) { a.h.EmitTx(tx) } func (a wsEventAdapter) OnContractLog(e blockchain.ContractLogEntry) { a.h.EmitContractLog(e) } func (a wsEventAdapter) OnInbox(to string, sum json.RawMessage) { a.h.EmitInbox(to, sum) } // WrapWS converts a WSHub into an EventConsumer for the bus. func WrapWS(h *WSHub) EventConsumer { return wsEventAdapter{h} }