Files
dchain/node/sse.go
vsecoder 7e7393e4f8 chore: initial commit for v0.0.1
DChain single-node blockchain + React Native messenger client.

Core:
- PBFT consensus with multi-sig validator admission + equivocation slashing
- BadgerDB + schema migration scaffold (CurrentSchemaVersion=0)
- libp2p gossipsub (tx/v1, blocks/v1, relay/v1, version/v1)
- Native Go contracts (username_registry) alongside WASM (wazero)
- WebSocket gateway with topic-based fanout + Ed25519-nonce auth
- Relay mailbox with NaCl envelope encryption (X25519 + Ed25519)
- Prometheus /metrics, per-IP rate limit, body-size cap

Deployment:
- Single-node compose (deploy/single/) with Caddy TLS + optional Prometheus
- 3-node dev compose (docker-compose.yml) with mocked internet topology
- 3-validator prod compose (deploy/prod/) for federation
- Auto-update from Gitea via /api/update-check + systemd timer
- Build-time version injection (ldflags → node --version)
- UI / Swagger toggle flags (DCHAIN_DISABLE_UI, DCHAIN_DISABLE_SWAGGER)

Client (client-app/):
- Expo / React Native / NativeWind
- E2E NaCl encryption, typing indicator, contact requests
- Auto-discovery of canonical contracts, chain_id aware, WS reconnect on node switch

Documentation:
- README.md, CHANGELOG.md, CONTEXT.md
- deploy/single/README.md with 6 operator scenarios
- deploy/UPDATE_STRATEGY.md with 4-layer forward-compat design
- docs/contracts/*.md per contract
2026-04-17 14:16:44 +03:00

187 lines
5.5 KiB
Go

// Package node — Server-Sent Events hub for the block explorer.
//
// Clients connect to GET /api/events and receive a real-time stream of:
//
// event: block — every committed block
// event: tx — every confirmed transaction (synthetic BLOCK_REWARD excluded)
// event: contract_log — every log entry written by a smart contract
//
// The stream uses the standard text/event-stream format so the browser's
// native EventSource API works without any library.
package node
import (
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"go-blockchain/blockchain"
)
// ── event payload types ──────────────────────────────────────────────────────
// SSEBlockEvent is emitted when a block is committed.
type SSEBlockEvent struct {
Index uint64 `json:"index"`
Hash string `json:"hash"`
TxCount int `json:"tx_count"`
Validator string `json:"validator"`
Timestamp string `json:"timestamp"`
}
// SSETxEvent is emitted for each confirmed transaction.
type SSETxEvent struct {
ID string `json:"id"`
TxType blockchain.EventType `json:"tx_type"`
From string `json:"from"`
To string `json:"to,omitempty"`
Amount uint64 `json:"amount,omitempty"`
Fee uint64 `json:"fee,omitempty"`
}
// SSEContractLogEvent is emitted each time a contract calls env.log().
type SSEContractLogEvent = blockchain.ContractLogEntry
// ── hub ───────────────────────────────────────────────────────────────────────
// SSEHub manages all active SSE client connections.
// It is safe for concurrent use from multiple goroutines.
type SSEHub struct {
mu sync.RWMutex
clients map[chan string]struct{}
}
// NewSSEHub returns an initialised hub ready to accept connections.
func NewSSEHub() *SSEHub {
return &SSEHub{clients: make(map[chan string]struct{})}
}
// Clients returns the current number of connected SSE clients.
func (h *SSEHub) Clients() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.clients)
}
func (h *SSEHub) subscribe() chan string {
ch := make(chan string, 64) // buffered: drop events for slow clients
h.mu.Lock()
h.clients[ch] = struct{}{}
h.mu.Unlock()
return ch
}
func (h *SSEHub) unsubscribe(ch chan string) {
h.mu.Lock()
delete(h.clients, ch)
h.mu.Unlock()
close(ch)
}
// emit serialises data and broadcasts an SSE message to all subscribers.
func (h *SSEHub) emit(eventName string, data any) {
payload, err := json.Marshal(data)
if err != nil {
return
}
// SSE wire format: "event: <name>\ndata: <json>\n\n"
msg := fmt.Sprintf("event: %s\ndata: %s\n\n", eventName, payload)
h.mu.RLock()
for ch := range h.clients {
select {
case ch <- msg:
default: // drop silently — client is too slow
}
}
h.mu.RUnlock()
}
// ── public emit methods ───────────────────────────────────────────────────────
// EmitBlock broadcasts a "block" event for the committed block b.
func (h *SSEHub) EmitBlock(b *blockchain.Block) {
h.emit("block", SSEBlockEvent{
Index: b.Index,
Hash: b.HashHex(),
TxCount: len(b.Transactions),
Validator: b.Validator,
Timestamp: b.Timestamp.UTC().Format(time.RFC3339),
})
}
// EmitTx broadcasts a "tx" event for each confirmed transaction.
// Synthetic BLOCK_REWARD records are skipped.
func (h *SSEHub) EmitTx(tx *blockchain.Transaction) {
if tx.Type == blockchain.EventBlockReward {
return
}
h.emit("tx", SSETxEvent{
ID: tx.ID,
TxType: tx.Type,
From: tx.From,
To: tx.To,
Amount: tx.Amount,
Fee: tx.Fee,
})
}
// EmitContractLog broadcasts a "contract_log" event.
func (h *SSEHub) EmitContractLog(entry blockchain.ContractLogEntry) {
h.emit("contract_log", entry)
}
// EmitBlockWithTxs calls EmitBlock then EmitTx for each transaction in b.
func (h *SSEHub) EmitBlockWithTxs(b *blockchain.Block) {
h.EmitBlock(b)
for _, tx := range b.Transactions {
h.EmitTx(tx)
}
}
// ── HTTP handler ──────────────────────────────────────────────────────────────
// ServeHTTP handles GET /api/events.
// The response is a text/event-stream that streams block, tx, and
// contract_log events until the client disconnects.
func (h *SSEHub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported by this server", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // tell nginx not to buffer
ch := h.subscribe()
defer h.unsubscribe(ch)
// Opening comment — confirms the connection to the client.
fmt.Fprintf(w, ": connected to DChain event stream\n\n")
flusher.Flush()
// Keepalive comments every 20 s prevent proxy/load-balancer timeouts.
keepalive := time.NewTicker(20 * time.Second)
defer keepalive.Stop()
for {
select {
case msg, ok := <-ch:
if !ok {
return
}
fmt.Fprint(w, msg)
flusher.Flush()
case <-keepalive.C:
fmt.Fprintf(w, ": keepalive\n\n")
flusher.Flush()
case <-r.Context().Done():
return
}
}
}