DChain single-node blockchain + React Native messenger client. Core: - PBFT consensus with multi-sig validator admission + equivocation slashing - BadgerDB + schema migration scaffold (CurrentSchemaVersion=0) - libp2p gossipsub (tx/v1, blocks/v1, relay/v1, version/v1) - Native Go contracts (username_registry) alongside WASM (wazero) - WebSocket gateway with topic-based fanout + Ed25519-nonce auth - Relay mailbox with NaCl envelope encryption (X25519 + Ed25519) - Prometheus /metrics, per-IP rate limit, body-size cap Deployment: - Single-node compose (deploy/single/) with Caddy TLS + optional Prometheus - 3-node dev compose (docker-compose.yml) with mocked internet topology - 3-validator prod compose (deploy/prod/) for federation - Auto-update from Gitea via /api/update-check + systemd timer - Build-time version injection (ldflags → node --version) - UI / Swagger toggle flags (DCHAIN_DISABLE_UI, DCHAIN_DISABLE_SWAGGER) Client (client-app/): - Expo / React Native / NativeWind - E2E NaCl encryption, typing indicator, contact requests - Auto-discovery of canonical contracts, chain_id aware, WS reconnect on node switch Documentation: - README.md, CHANGELOG.md, CONTEXT.md - deploy/single/README.md with 6 operator scenarios - deploy/UPDATE_STRATEGY.md with 4-layer forward-compat design - docs/contracts/*.md per contract
187 lines
5.5 KiB
Go
187 lines
5.5 KiB
Go
// Package node — Server-Sent Events hub for the block explorer.
|
|
//
|
|
// Clients connect to GET /api/events and receive a real-time stream of:
|
|
//
|
|
// event: block — every committed block
|
|
// event: tx — every confirmed transaction (synthetic BLOCK_REWARD excluded)
|
|
// event: contract_log — every log entry written by a smart contract
|
|
//
|
|
// The stream uses the standard text/event-stream format so the browser's
|
|
// native EventSource API works without any library.
|
|
package node
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"go-blockchain/blockchain"
|
|
)
|
|
|
|
// ── event payload types ──────────────────────────────────────────────────────
|
|
|
|
// SSEBlockEvent is emitted when a block is committed.
|
|
type SSEBlockEvent struct {
|
|
Index uint64 `json:"index"`
|
|
Hash string `json:"hash"`
|
|
TxCount int `json:"tx_count"`
|
|
Validator string `json:"validator"`
|
|
Timestamp string `json:"timestamp"`
|
|
}
|
|
|
|
// SSETxEvent is emitted for each confirmed transaction.
|
|
type SSETxEvent struct {
|
|
ID string `json:"id"`
|
|
TxType blockchain.EventType `json:"tx_type"`
|
|
From string `json:"from"`
|
|
To string `json:"to,omitempty"`
|
|
Amount uint64 `json:"amount,omitempty"`
|
|
Fee uint64 `json:"fee,omitempty"`
|
|
}
|
|
|
|
// SSEContractLogEvent is emitted each time a contract calls env.log().
|
|
type SSEContractLogEvent = blockchain.ContractLogEntry
|
|
|
|
// ── hub ───────────────────────────────────────────────────────────────────────
|
|
|
|
// SSEHub manages all active SSE client connections.
|
|
// It is safe for concurrent use from multiple goroutines.
|
|
type SSEHub struct {
|
|
mu sync.RWMutex
|
|
clients map[chan string]struct{}
|
|
}
|
|
|
|
// NewSSEHub returns an initialised hub ready to accept connections.
|
|
func NewSSEHub() *SSEHub {
|
|
return &SSEHub{clients: make(map[chan string]struct{})}
|
|
}
|
|
|
|
// Clients returns the current number of connected SSE clients.
|
|
func (h *SSEHub) Clients() int {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
return len(h.clients)
|
|
}
|
|
|
|
func (h *SSEHub) subscribe() chan string {
|
|
ch := make(chan string, 64) // buffered: drop events for slow clients
|
|
h.mu.Lock()
|
|
h.clients[ch] = struct{}{}
|
|
h.mu.Unlock()
|
|
return ch
|
|
}
|
|
|
|
func (h *SSEHub) unsubscribe(ch chan string) {
|
|
h.mu.Lock()
|
|
delete(h.clients, ch)
|
|
h.mu.Unlock()
|
|
close(ch)
|
|
}
|
|
|
|
// emit serialises data and broadcasts an SSE message to all subscribers.
|
|
func (h *SSEHub) emit(eventName string, data any) {
|
|
payload, err := json.Marshal(data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
// SSE wire format: "event: <name>\ndata: <json>\n\n"
|
|
msg := fmt.Sprintf("event: %s\ndata: %s\n\n", eventName, payload)
|
|
h.mu.RLock()
|
|
for ch := range h.clients {
|
|
select {
|
|
case ch <- msg:
|
|
default: // drop silently — client is too slow
|
|
}
|
|
}
|
|
h.mu.RUnlock()
|
|
}
|
|
|
|
// ── public emit methods ───────────────────────────────────────────────────────
|
|
|
|
// EmitBlock broadcasts a "block" event for the committed block b.
|
|
func (h *SSEHub) EmitBlock(b *blockchain.Block) {
|
|
h.emit("block", SSEBlockEvent{
|
|
Index: b.Index,
|
|
Hash: b.HashHex(),
|
|
TxCount: len(b.Transactions),
|
|
Validator: b.Validator,
|
|
Timestamp: b.Timestamp.UTC().Format(time.RFC3339),
|
|
})
|
|
}
|
|
|
|
// EmitTx broadcasts a "tx" event for each confirmed transaction.
|
|
// Synthetic BLOCK_REWARD records are skipped.
|
|
func (h *SSEHub) EmitTx(tx *blockchain.Transaction) {
|
|
if tx.Type == blockchain.EventBlockReward {
|
|
return
|
|
}
|
|
h.emit("tx", SSETxEvent{
|
|
ID: tx.ID,
|
|
TxType: tx.Type,
|
|
From: tx.From,
|
|
To: tx.To,
|
|
Amount: tx.Amount,
|
|
Fee: tx.Fee,
|
|
})
|
|
}
|
|
|
|
// EmitContractLog broadcasts a "contract_log" event.
|
|
func (h *SSEHub) EmitContractLog(entry blockchain.ContractLogEntry) {
|
|
h.emit("contract_log", entry)
|
|
}
|
|
|
|
// EmitBlockWithTxs calls EmitBlock then EmitTx for each transaction in b.
|
|
func (h *SSEHub) EmitBlockWithTxs(b *blockchain.Block) {
|
|
h.EmitBlock(b)
|
|
for _, tx := range b.Transactions {
|
|
h.EmitTx(tx)
|
|
}
|
|
}
|
|
|
|
// ── HTTP handler ──────────────────────────────────────────────────────────────
|
|
|
|
// ServeHTTP handles GET /api/events.
|
|
// The response is a text/event-stream that streams block, tx, and
|
|
// contract_log events until the client disconnects.
|
|
func (h *SSEHub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
http.Error(w, "streaming unsupported by this server", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("X-Accel-Buffering", "no") // tell nginx not to buffer
|
|
|
|
ch := h.subscribe()
|
|
defer h.unsubscribe(ch)
|
|
|
|
// Opening comment — confirms the connection to the client.
|
|
fmt.Fprintf(w, ": connected to DChain event stream\n\n")
|
|
flusher.Flush()
|
|
|
|
// Keepalive comments every 20 s prevent proxy/load-balancer timeouts.
|
|
keepalive := time.NewTicker(20 * time.Second)
|
|
defer keepalive.Stop()
|
|
|
|
for {
|
|
select {
|
|
case msg, ok := <-ch:
|
|
if !ok {
|
|
return
|
|
}
|
|
fmt.Fprint(w, msg)
|
|
flusher.Flush()
|
|
case <-keepalive.C:
|
|
fmt.Fprintf(w, ": keepalive\n\n")
|
|
flusher.Flush()
|
|
case <-r.Context().Done():
|
|
return
|
|
}
|
|
}
|
|
}
|