// Package node — Server-Sent Events hub for the block explorer. // // Clients connect to GET /api/events and receive a real-time stream of: // // event: block — every committed block // event: tx — every confirmed transaction (synthetic BLOCK_REWARD excluded) // event: contract_log — every log entry written by a smart contract // // The stream uses the standard text/event-stream format so the browser's // native EventSource API works without any library. package node import ( "encoding/json" "fmt" "net/http" "sync" "time" "go-blockchain/blockchain" ) // ── event payload types ────────────────────────────────────────────────────── // SSEBlockEvent is emitted when a block is committed. type SSEBlockEvent struct { Index uint64 `json:"index"` Hash string `json:"hash"` TxCount int `json:"tx_count"` Validator string `json:"validator"` Timestamp string `json:"timestamp"` } // SSETxEvent is emitted for each confirmed transaction. type SSETxEvent struct { ID string `json:"id"` TxType blockchain.EventType `json:"tx_type"` From string `json:"from"` To string `json:"to,omitempty"` Amount uint64 `json:"amount,omitempty"` Fee uint64 `json:"fee,omitempty"` } // SSEContractLogEvent is emitted each time a contract calls env.log(). type SSEContractLogEvent = blockchain.ContractLogEntry // ── hub ─────────────────────────────────────────────────────────────────────── // SSEHub manages all active SSE client connections. // It is safe for concurrent use from multiple goroutines. type SSEHub struct { mu sync.RWMutex clients map[chan string]struct{} } // NewSSEHub returns an initialised hub ready to accept connections. func NewSSEHub() *SSEHub { return &SSEHub{clients: make(map[chan string]struct{})} } // Clients returns the current number of connected SSE clients. func (h *SSEHub) Clients() int { h.mu.RLock() defer h.mu.RUnlock() return len(h.clients) } func (h *SSEHub) subscribe() chan string { ch := make(chan string, 64) // buffered: drop events for slow clients h.mu.Lock() h.clients[ch] = struct{}{} h.mu.Unlock() return ch } func (h *SSEHub) unsubscribe(ch chan string) { h.mu.Lock() delete(h.clients, ch) h.mu.Unlock() close(ch) } // emit serialises data and broadcasts an SSE message to all subscribers. func (h *SSEHub) emit(eventName string, data any) { payload, err := json.Marshal(data) if err != nil { return } // SSE wire format: "event: \ndata: \n\n" msg := fmt.Sprintf("event: %s\ndata: %s\n\n", eventName, payload) h.mu.RLock() for ch := range h.clients { select { case ch <- msg: default: // drop silently — client is too slow } } h.mu.RUnlock() } // ── public emit methods ─────────────────────────────────────────────────────── // EmitBlock broadcasts a "block" event for the committed block b. func (h *SSEHub) EmitBlock(b *blockchain.Block) { h.emit("block", SSEBlockEvent{ Index: b.Index, Hash: b.HashHex(), TxCount: len(b.Transactions), Validator: b.Validator, Timestamp: b.Timestamp.UTC().Format(time.RFC3339), }) } // EmitTx broadcasts a "tx" event for each confirmed transaction. // Synthetic BLOCK_REWARD records are skipped. func (h *SSEHub) EmitTx(tx *blockchain.Transaction) { if tx.Type == blockchain.EventBlockReward { return } h.emit("tx", SSETxEvent{ ID: tx.ID, TxType: tx.Type, From: tx.From, To: tx.To, Amount: tx.Amount, Fee: tx.Fee, }) } // EmitContractLog broadcasts a "contract_log" event. func (h *SSEHub) EmitContractLog(entry blockchain.ContractLogEntry) { h.emit("contract_log", entry) } // EmitBlockWithTxs calls EmitBlock then EmitTx for each transaction in b. func (h *SSEHub) EmitBlockWithTxs(b *blockchain.Block) { h.EmitBlock(b) for _, tx := range b.Transactions { h.EmitTx(tx) } } // ── HTTP handler ────────────────────────────────────────────────────────────── // ServeHTTP handles GET /api/events. // The response is a text/event-stream that streams block, tx, and // contract_log events until the client disconnects. func (h *SSEHub) ServeHTTP(w http.ResponseWriter, r *http.Request) { flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming unsupported by this server", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") // tell nginx not to buffer ch := h.subscribe() defer h.unsubscribe(ch) // Opening comment — confirms the connection to the client. fmt.Fprintf(w, ": connected to DChain event stream\n\n") flusher.Flush() // Keepalive comments every 20 s prevent proxy/load-balancer timeouts. keepalive := time.NewTicker(20 * time.Second) defer keepalive.Stop() for { select { case msg, ok := <-ch: if !ok { return } fmt.Fprint(w, msg) flusher.Flush() case <-keepalive.C: fmt.Fprintf(w, ": keepalive\n\n") flusher.Flush() case <-r.Context().Done(): return } } }