DChain single-node blockchain + React Native messenger client. Core: - PBFT consensus with multi-sig validator admission + equivocation slashing - BadgerDB + schema migration scaffold (CurrentSchemaVersion=0) - libp2p gossipsub (tx/v1, blocks/v1, relay/v1, version/v1) - Native Go contracts (username_registry) alongside WASM (wazero) - WebSocket gateway with topic-based fanout + Ed25519-nonce auth - Relay mailbox with NaCl envelope encryption (X25519 + Ed25519) - Prometheus /metrics, per-IP rate limit, body-size cap Deployment: - Single-node compose (deploy/single/) with Caddy TLS + optional Prometheus - 3-node dev compose (docker-compose.yml) with mocked internet topology - 3-validator prod compose (deploy/prod/) for federation - Auto-update from Gitea via /api/update-check + systemd timer - Build-time version injection (ldflags → node --version) - UI / Swagger toggle flags (DCHAIN_DISABLE_UI, DCHAIN_DISABLE_SWAGGER) Client (client-app/): - Expo / React Native / NativeWind - E2E NaCl encryption, typing indicator, contact requests - Auto-discovery of canonical contracts, chain_id aware, WS reconnect on node switch Documentation: - README.md, CHANGELOG.md, CONTEXT.md - deploy/single/README.md with 6 operator scenarios - deploy/UPDATE_STRATEGY.md with 4-layer forward-compat design - docs/contracts/*.md per contract
697 lines
23 KiB
Go
697 lines
23 KiB
Go
// Package node — WebSocket gateway.
|
|
//
|
|
// Clients connect to GET /api/ws and maintain a persistent bidirectional
|
|
// connection. The gateway eliminates HTTP polling for balance, messages,
|
|
// and contact requests by pushing events as soon as they are committed.
|
|
//
|
|
// Protocol (JSON, one frame per line):
|
|
//
|
|
// Client → Server:
|
|
//
|
|
// { "op": "subscribe", "topic": "tx" }
|
|
// { "op": "subscribe", "topic": "blocks" }
|
|
// { "op": "subscribe", "topic": "addr:<hex_pubkey>" } // txs involving this address
|
|
// { "op": "unsubscribe", "topic": "..." }
|
|
// { "op": "ping" }
|
|
//
|
|
// Server → Client:
|
|
//
|
|
// { "event": "hello", "chain_id": "dchain-...", "tip_height": 1234 }
|
|
// { "event": "block", "data": { index, hash, tx_count, validator, timestamp } }
|
|
// { "event": "tx", "data": { id, tx_type, from, to, amount, fee } }
|
|
// { "event": "contract_log", "data": { ... } }
|
|
// { "event": "pong" }
|
|
// { "event": "error", "msg": "..." }
|
|
// { "event": "subscribed", "topic": "..." }
|
|
//
|
|
// Design notes:
|
|
// - Each connection has a bounded outbox (64 frames). If the client is
|
|
// slower than the producer, oldest frames are dropped and a
|
|
// `{"event":"lag"}` notice is sent so the UI can trigger a resync.
|
|
// - Subscriptions are per-connection and kept in memory only. Reconnection
|
|
// requires re-subscribing; this is cheap because the client's Zustand
|
|
// store remembers what it needs.
|
|
// - The hub reuses the same event sources as the SSE hub so both transports
|
|
// stay in sync; any caller that emits to SSE also emits here.
|
|
package node
|
|
|
|
import (
|
|
"crypto/ed25519"
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"go-blockchain/blockchain"
|
|
)
|
|
|
|
// ── wire types ───────────────────────────────────────────────────────────────
|
|
|
|
type wsClientCmd struct {
|
|
Op string `json:"op"`
|
|
Topic string `json:"topic,omitempty"`
|
|
// submit_tx fields — carries a full signed transaction + a client-assigned
|
|
// request id so the ack frame can be correlated on the client.
|
|
Tx json.RawMessage `json:"tx,omitempty"`
|
|
ID string `json:"id,omitempty"`
|
|
// auth fields — client proves ownership of an Ed25519 pubkey by signing
|
|
// the server-supplied nonce (sent in the hello frame as auth_nonce).
|
|
PubKey string `json:"pubkey,omitempty"`
|
|
Signature string `json:"sig,omitempty"`
|
|
// typing fields — `to` is the recipient's X25519 pubkey (same key used
|
|
// for the inbox topic). Server fans out to subscribers of
|
|
// `typing:<to>` so the recipient's client can show an indicator.
|
|
// Purely ephemeral: never stored, never gossiped across nodes.
|
|
To string `json:"to,omitempty"`
|
|
}
|
|
|
|
type wsServerFrame struct {
|
|
Event string `json:"event"`
|
|
Data json.RawMessage `json:"data,omitempty"`
|
|
Topic string `json:"topic,omitempty"`
|
|
Msg string `json:"msg,omitempty"`
|
|
ChainID string `json:"chain_id,omitempty"`
|
|
TipHeight uint64 `json:"tip_height,omitempty"`
|
|
// Sent with the hello frame. The client signs it with their Ed25519
|
|
// private key and replies via the `auth` op; the server binds the
|
|
// connection to the authenticated pubkey for scoped subscriptions.
|
|
AuthNonce string `json:"auth_nonce,omitempty"`
|
|
// submit_ack fields.
|
|
ID string `json:"id,omitempty"`
|
|
Status string `json:"status,omitempty"`
|
|
Reason string `json:"reason,omitempty"`
|
|
}
|
|
|
|
// WSSubmitTxHandler is the hook the hub calls on receiving a `submit_tx` op.
|
|
// Implementations should do the same validation as the HTTP /api/tx handler
|
|
// (timestamp window + signature verify + mempool add) and return an error on
|
|
// rejection. A nil error means the tx has been accepted into the mempool.
|
|
type WSSubmitTxHandler func(txJSON []byte) (txID string, err error)
|
|
|
|
// ── hub ──────────────────────────────────────────────────────────────────────
|
|
|
|
// X25519ForPubFn, if set, lets the hub map an authenticated Ed25519 pubkey
|
|
// to the X25519 key the same identity uses for relay encryption. This lets
|
|
// the auth'd client subscribe to their own inbox without revealing that
|
|
// mapping to unauthenticated clients. Returns ("", nil) if unknown.
|
|
type X25519ForPubFn func(ed25519PubHex string) (x25519PubHex string, err error)
|
|
|
|
// WSHub tracks every open websocket connection and fans out events based on
|
|
// per-connection topic filters.
|
|
type WSHub struct {
|
|
mu sync.RWMutex
|
|
clients map[*wsClient]struct{}
|
|
upgrader websocket.Upgrader
|
|
chainID func() string
|
|
tip func() uint64
|
|
// submitTx is the optional handler for `submit_tx` client ops. If nil,
|
|
// the hub replies with an error so clients know to fall back to HTTP.
|
|
submitTx WSSubmitTxHandler
|
|
// x25519For maps Ed25519 pubkey → X25519 pubkey (from the identity
|
|
// registry) so the hub can validate `inbox:<x25519>` subscriptions
|
|
// against the authenticated identity. Optional — unset disables
|
|
// inbox auth (subscribe just requires auth but not identity lookup).
|
|
x25519For X25519ForPubFn
|
|
|
|
// Per-IP connection counter. Guards against a single host opening
|
|
// unbounded sockets (memory exhaustion, descriptor exhaustion).
|
|
// Counter mutates on connect/disconnect; protected by perIPMu rather
|
|
// than the hub's main mu so fanout isn't blocked by bookkeeping.
|
|
perIPMu sync.Mutex
|
|
perIP map[string]int
|
|
maxPerIP int
|
|
}
|
|
|
|
// WSMaxConnectionsPerIP caps concurrent websocket connections from one IP.
|
|
// Chosen to comfortably fit a power user with multiple devices (phone, web,
|
|
// load-test script) while still bounding a DoS. Override via SetMaxPerIP
|
|
// on the hub if a specific deployment needs different limits.
|
|
const WSMaxConnectionsPerIP = 10
|
|
|
|
// WSMaxSubsPerConnection caps subscriptions per single connection. A real
|
|
// client needs 3-5 topics (addr, inbox, blocks). 32 is generous headroom
|
|
// without letting one conn hold thousands of topic entries.
|
|
const WSMaxSubsPerConnection = 32
|
|
|
|
// NewWSHub constructs a hub. `chainID` and `tip` are optional snapshot
|
|
// functions used only for the hello frame.
|
|
func NewWSHub(chainID func() string, tip func() uint64) *WSHub {
|
|
return &WSHub{
|
|
clients: make(map[*wsClient]struct{}),
|
|
upgrader: websocket.Upgrader{
|
|
// The node may sit behind a reverse proxy; allow cross-origin
|
|
// upgrades. Rate limiting happens separately via api_guards.
|
|
CheckOrigin: func(r *http.Request) bool { return true },
|
|
ReadBufferSize: 4 * 1024,
|
|
WriteBufferSize: 4 * 1024,
|
|
},
|
|
chainID: chainID,
|
|
tip: tip,
|
|
perIP: make(map[string]int),
|
|
maxPerIP: WSMaxConnectionsPerIP,
|
|
}
|
|
}
|
|
|
|
// SetMaxPerIP overrides the default per-IP connection cap. Must be called
|
|
// before Upgrade starts accepting — otherwise racy with new connections.
|
|
func (h *WSHub) SetMaxPerIP(n int) {
|
|
if n <= 0 {
|
|
return
|
|
}
|
|
h.perIPMu.Lock()
|
|
h.maxPerIP = n
|
|
h.perIPMu.Unlock()
|
|
}
|
|
|
|
// SetSubmitTxHandler installs the handler for `submit_tx` ops. Pass nil to
|
|
// disable WS submission (clients will need to keep using HTTP POST /api/tx).
|
|
func (h *WSHub) SetSubmitTxHandler(fn WSSubmitTxHandler) {
|
|
h.submitTx = fn
|
|
}
|
|
|
|
// SetX25519ForPub installs the Ed25519→X25519 lookup used to validate
|
|
// inbox subscriptions post-auth. Pass nil to disable the check.
|
|
func (h *WSHub) SetX25519ForPub(fn X25519ForPubFn) {
|
|
h.x25519For = fn
|
|
}
|
|
|
|
// Clients returns the number of active websocket connections.
|
|
func (h *WSHub) Clients() int {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
return len(h.clients)
|
|
}
|
|
|
|
// ServeHTTP handles GET /api/ws and upgrades to a websocket.
|
|
func (h *WSHub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
ip := clientIP(r)
|
|
|
|
// Access-token gating (if configured). Private nodes require the token
|
|
// for the upgrade itself. Public-with-token-for-writes nodes check it
|
|
// here too — tokenOK is stored on the client so submit_tx can rely on
|
|
// the upgrade-time decision without re-reading headers per op.
|
|
tokenOK := true
|
|
if tok, _ := AccessTokenForWS(); tok != "" {
|
|
if err := checkAccessToken(r); err != nil {
|
|
if _, private := AccessTokenForWS(); private {
|
|
w.Header().Set("WWW-Authenticate", `Bearer realm="dchain"`)
|
|
http.Error(w, "ws: "+err.Error(), http.StatusUnauthorized)
|
|
return
|
|
}
|
|
// Public-with-token mode: upgrade allowed but write ops gated.
|
|
tokenOK = false
|
|
}
|
|
}
|
|
|
|
// Per-IP quota check. Reject BEFORE upgrade so we never hold an open
|
|
// socket for a client we're going to kick. 429 Too Many Requests is the
|
|
// closest status code — TCP is fine, they just opened too many.
|
|
h.perIPMu.Lock()
|
|
if h.perIP[ip] >= h.maxPerIP {
|
|
h.perIPMu.Unlock()
|
|
w.Header().Set("Retry-After", "30")
|
|
http.Error(w, fmt.Sprintf("too many websocket connections from %s (max %d)", ip, h.maxPerIP), http.StatusTooManyRequests)
|
|
return
|
|
}
|
|
h.perIP[ip]++
|
|
h.perIPMu.Unlock()
|
|
|
|
conn, err := h.upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
// Upgrade already wrote an HTTP error response. Release the reservation.
|
|
h.perIPMu.Lock()
|
|
h.perIP[ip]--
|
|
if h.perIP[ip] <= 0 {
|
|
delete(h.perIP, ip)
|
|
}
|
|
h.perIPMu.Unlock()
|
|
return
|
|
}
|
|
// Generate a fresh 32-byte nonce per connection. Client signs this
|
|
// with its Ed25519 private key to prove identity; binding is per-
|
|
// connection (reconnect → new nonce → new auth).
|
|
nonceBytes := make([]byte, 32)
|
|
if _, err := rand.Read(nonceBytes); err != nil {
|
|
conn.Close()
|
|
return
|
|
}
|
|
nonce := hex.EncodeToString(nonceBytes)
|
|
|
|
c := &wsClient{
|
|
conn: conn,
|
|
send: make(chan []byte, 64),
|
|
subs: make(map[string]struct{}),
|
|
authNonce: nonce,
|
|
remoteIP: ip,
|
|
tokenOK: tokenOK,
|
|
}
|
|
h.mu.Lock()
|
|
h.clients[c] = struct{}{}
|
|
h.mu.Unlock()
|
|
MetricWSConnections.Inc()
|
|
|
|
// Send hello with chain metadata so the client can show connection state
|
|
// and verify it's on the expected chain.
|
|
var chainID string
|
|
var tip uint64
|
|
if h.chainID != nil {
|
|
chainID = h.chainID()
|
|
}
|
|
if h.tip != nil {
|
|
tip = h.tip()
|
|
}
|
|
helloBytes, _ := json.Marshal(wsServerFrame{
|
|
Event: "hello",
|
|
ChainID: chainID,
|
|
TipHeight: tip,
|
|
AuthNonce: nonce,
|
|
})
|
|
select {
|
|
case c.send <- helloBytes:
|
|
default:
|
|
}
|
|
|
|
// Reader + writer goroutines. When either returns, the other is signalled
|
|
// to shut down by closing the send channel.
|
|
go h.writeLoop(c)
|
|
h.readLoop(c) // blocks until the connection closes
|
|
}
|
|
|
|
func (h *WSHub) removeClient(c *wsClient) {
|
|
h.mu.Lock()
|
|
wasRegistered := false
|
|
if _, ok := h.clients[c]; ok {
|
|
delete(h.clients, c)
|
|
close(c.send)
|
|
wasRegistered = true
|
|
}
|
|
h.mu.Unlock()
|
|
if wasRegistered {
|
|
MetricWSConnections.Dec()
|
|
}
|
|
// Release the per-IP reservation so the client can reconnect without
|
|
// being rejected. Missing-or-zero counters are silently no-op'd.
|
|
if c.remoteIP != "" {
|
|
h.perIPMu.Lock()
|
|
if h.perIP[c.remoteIP] > 0 {
|
|
h.perIP[c.remoteIP]--
|
|
if h.perIP[c.remoteIP] == 0 {
|
|
delete(h.perIP, c.remoteIP)
|
|
}
|
|
}
|
|
h.perIPMu.Unlock()
|
|
}
|
|
_ = c.conn.Close()
|
|
}
|
|
|
|
// readLoop processes control frames + parses JSON commands from the client.
|
|
func (h *WSHub) readLoop(c *wsClient) {
|
|
defer h.removeClient(c)
|
|
c.conn.SetReadLimit(16 * 1024) // reject oversized frames
|
|
_ = c.conn.SetReadDeadline(time.Now().Add(90 * time.Second))
|
|
c.conn.SetPongHandler(func(string) error {
|
|
_ = c.conn.SetReadDeadline(time.Now().Add(90 * time.Second))
|
|
return nil
|
|
})
|
|
for {
|
|
_, data, err := c.conn.ReadMessage()
|
|
if err != nil {
|
|
return
|
|
}
|
|
var cmd wsClientCmd
|
|
if err := json.Unmarshal(data, &cmd); err != nil {
|
|
c.sendFrame(wsServerFrame{Event: "error", Msg: "invalid JSON"})
|
|
continue
|
|
}
|
|
switch cmd.Op {
|
|
case "auth":
|
|
// Verify signature over the connection's auth_nonce. On success,
|
|
// bind this connection to the declared pubkey; scoped subs are
|
|
// limited to topics this identity owns.
|
|
pub, err := hex.DecodeString(cmd.PubKey)
|
|
if err != nil || len(pub) != ed25519.PublicKeySize {
|
|
c.sendFrame(wsServerFrame{Event: "error", Msg: "auth: invalid pubkey"})
|
|
continue
|
|
}
|
|
sig, err := hex.DecodeString(cmd.Signature)
|
|
if err != nil || len(sig) != ed25519.SignatureSize {
|
|
c.sendFrame(wsServerFrame{Event: "error", Msg: "auth: invalid signature"})
|
|
continue
|
|
}
|
|
if !ed25519.Verify(ed25519.PublicKey(pub), []byte(c.authNonce), sig) {
|
|
c.sendFrame(wsServerFrame{Event: "error", Msg: "auth: signature mismatch"})
|
|
continue
|
|
}
|
|
var x25519 string
|
|
if h.x25519For != nil {
|
|
x25519, _ = h.x25519For(cmd.PubKey) // non-fatal on error
|
|
}
|
|
c.mu.Lock()
|
|
c.authPubKey = cmd.PubKey
|
|
c.authX25519 = x25519
|
|
c.mu.Unlock()
|
|
c.sendFrame(wsServerFrame{Event: "subscribed", Topic: "auth:" + cmd.PubKey[:12] + "…"})
|
|
case "subscribe":
|
|
topic := strings.TrimSpace(cmd.Topic)
|
|
if topic == "" {
|
|
c.sendFrame(wsServerFrame{Event: "error", Msg: "topic required"})
|
|
continue
|
|
}
|
|
// Scoped topics require matching auth. Unauthenticated clients
|
|
// get global streams (`blocks`, `tx`, `contract_log`, …) only.
|
|
if err := h.authorizeSubscribe(c, topic); err != nil {
|
|
c.sendFrame(wsServerFrame{Event: "error", Msg: "forbidden: " + err.Error()})
|
|
continue
|
|
}
|
|
c.mu.Lock()
|
|
if len(c.subs) >= WSMaxSubsPerConnection {
|
|
c.mu.Unlock()
|
|
c.sendFrame(wsServerFrame{Event: "error", Msg: fmt.Sprintf("subscription limit exceeded (%d)", WSMaxSubsPerConnection)})
|
|
continue
|
|
}
|
|
c.subs[topic] = struct{}{}
|
|
c.mu.Unlock()
|
|
c.sendFrame(wsServerFrame{Event: "subscribed", Topic: topic})
|
|
case "unsubscribe":
|
|
c.mu.Lock()
|
|
delete(c.subs, cmd.Topic)
|
|
c.mu.Unlock()
|
|
case "ping":
|
|
c.sendFrame(wsServerFrame{Event: "pong"})
|
|
case "typing":
|
|
// Ephemeral signal: A is typing to B. Requires auth so the
|
|
// "from" claim is verifiable; anon clients can't spoof
|
|
// typing indicators for other identities.
|
|
c.mu.RLock()
|
|
fromX := c.authX25519
|
|
c.mu.RUnlock()
|
|
to := strings.TrimSpace(cmd.To)
|
|
if fromX == "" || to == "" {
|
|
// Silently drop — no need to error, typing is best-effort.
|
|
continue
|
|
}
|
|
data, _ := json.Marshal(map[string]string{
|
|
"from": fromX,
|
|
"to": to,
|
|
})
|
|
h.fanout(wsServerFrame{Event: "typing", Data: data},
|
|
[]string{"typing:" + to})
|
|
case "submit_tx":
|
|
// Low-latency transaction submission over the existing WS
|
|
// connection. Avoids the HTTP round-trip and delivers a
|
|
// submit_ack correlated by the client-supplied id so callers
|
|
// don't have to poll for inclusion status.
|
|
c.mu.RLock()
|
|
tokOK := c.tokenOK
|
|
c.mu.RUnlock()
|
|
if !tokOK {
|
|
c.sendFrame(wsServerFrame{
|
|
Event: "submit_ack",
|
|
ID: cmd.ID,
|
|
Status: "rejected",
|
|
Reason: "submit requires access token; pass ?token= at /api/ws upgrade",
|
|
})
|
|
continue
|
|
}
|
|
if h.submitTx == nil {
|
|
c.sendFrame(wsServerFrame{
|
|
Event: "submit_ack",
|
|
ID: cmd.ID,
|
|
Status: "rejected",
|
|
Reason: "submit_tx over WS not available on this node",
|
|
})
|
|
continue
|
|
}
|
|
if len(cmd.Tx) == 0 {
|
|
c.sendFrame(wsServerFrame{
|
|
Event: "submit_ack", ID: cmd.ID,
|
|
Status: "rejected", Reason: "missing tx",
|
|
})
|
|
continue
|
|
}
|
|
txID, err := h.submitTx(cmd.Tx)
|
|
if err != nil {
|
|
c.sendFrame(wsServerFrame{
|
|
Event: "submit_ack", ID: cmd.ID,
|
|
Status: "rejected", Reason: err.Error(),
|
|
})
|
|
continue
|
|
}
|
|
// Echo back the server-assigned tx id for confirmation. The
|
|
// client already knows it (it generated it), but this lets
|
|
// proxies / middleware log a proper request/response pair.
|
|
c.sendFrame(wsServerFrame{
|
|
Event: "submit_ack", ID: cmd.ID,
|
|
Status: "accepted", Msg: txID,
|
|
})
|
|
default:
|
|
c.sendFrame(wsServerFrame{Event: "error", Msg: "unknown op: " + cmd.Op})
|
|
}
|
|
}
|
|
}
|
|
|
|
// writeLoop pumps outbound frames and sends periodic pings.
|
|
func (h *WSHub) writeLoop(c *wsClient) {
|
|
ping := time.NewTicker(30 * time.Second)
|
|
defer ping.Stop()
|
|
for {
|
|
select {
|
|
case msg, ok := <-c.send:
|
|
if !ok {
|
|
return
|
|
}
|
|
_ = c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
if err := c.conn.WriteMessage(websocket.TextMessage, msg); err != nil {
|
|
return
|
|
}
|
|
case <-ping.C:
|
|
_ = c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// authorizeSubscribe gates which topics a connection is allowed to join.
|
|
//
|
|
// Rules:
|
|
// - Global topics (blocks, tx, contract_log, contract:*, inbox) are
|
|
// open to anyone — `tx` leaks only public envelope fields, and the
|
|
// whole-`inbox` firehose is encrypted per-recipient anyway.
|
|
// - Scoped topics addressed at a specific identity require the client
|
|
// to have authenticated as that identity:
|
|
// addr:<ed25519_pub> — only the owning Ed25519 pubkey
|
|
// inbox:<x25519_pub> — only the identity whose registered X25519
|
|
// key equals this (looked up via x25519For)
|
|
//
|
|
// Without this check any curl client could subscribe to any address and
|
|
// watch incoming transactions in real time — a significant metadata leak.
|
|
func (h *WSHub) authorizeSubscribe(c *wsClient, topic string) error {
|
|
// Open topics — always allowed.
|
|
switch topic {
|
|
case "blocks", "tx", "contract_log", "inbox", "$system":
|
|
return nil
|
|
}
|
|
if strings.HasPrefix(topic, "contract:") {
|
|
return nil // contract-wide log streams are public
|
|
}
|
|
|
|
c.mu.RLock()
|
|
authed := c.authPubKey
|
|
authX := c.authX25519
|
|
c.mu.RUnlock()
|
|
|
|
if strings.HasPrefix(topic, "addr:") {
|
|
if authed == "" {
|
|
return fmt.Errorf("addr:* requires auth")
|
|
}
|
|
want := strings.TrimPrefix(topic, "addr:")
|
|
if want != authed {
|
|
return fmt.Errorf("addr:* only for your own pubkey")
|
|
}
|
|
return nil
|
|
}
|
|
if strings.HasPrefix(topic, "inbox:") {
|
|
if authed == "" {
|
|
return fmt.Errorf("inbox:* requires auth")
|
|
}
|
|
// If we have an x25519 mapping, enforce it; otherwise accept
|
|
// (best-effort — identity may not be registered yet).
|
|
if authX != "" {
|
|
want := strings.TrimPrefix(topic, "inbox:")
|
|
if want != authX {
|
|
return fmt.Errorf("inbox:* only for your own x25519")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
if strings.HasPrefix(topic, "typing:") {
|
|
// Same rule as inbox: you can only listen for "who's typing to ME".
|
|
if authed == "" {
|
|
return fmt.Errorf("typing:* requires auth")
|
|
}
|
|
if authX != "" {
|
|
want := strings.TrimPrefix(topic, "typing:")
|
|
if want != authX {
|
|
return fmt.Errorf("typing:* only for your own x25519")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Unknown scoped form — default-deny.
|
|
return fmt.Errorf("topic %q not recognised", topic)
|
|
}
|
|
|
|
// fanout sends frame to every client subscribed to any of the given topics.
|
|
func (h *WSHub) fanout(frame wsServerFrame, topics []string) {
|
|
buf, err := json.Marshal(frame)
|
|
if err != nil {
|
|
return
|
|
}
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
for c := range h.clients {
|
|
c.mu.RLock()
|
|
matched := false
|
|
for _, t := range topics {
|
|
if _, ok := c.subs[t]; ok {
|
|
matched = true
|
|
break
|
|
}
|
|
}
|
|
c.mu.RUnlock()
|
|
if !matched {
|
|
continue
|
|
}
|
|
select {
|
|
case c.send <- buf:
|
|
default:
|
|
// Outbox full — drop and notify once.
|
|
lagFrame, _ := json.Marshal(wsServerFrame{Event: "lag"})
|
|
select {
|
|
case c.send <- lagFrame:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── public emit methods ───────────────────────────────────────────────────────
|
|
|
|
// EmitBlock notifies subscribers of the `blocks` topic.
|
|
func (h *WSHub) EmitBlock(b *blockchain.Block) {
|
|
data, _ := json.Marshal(SSEBlockEvent{
|
|
Index: b.Index,
|
|
Hash: b.HashHex(),
|
|
TxCount: len(b.Transactions),
|
|
Validator: b.Validator,
|
|
Timestamp: b.Timestamp.UTC().Format(time.RFC3339),
|
|
})
|
|
h.fanout(wsServerFrame{Event: "block", Data: data}, []string{"blocks"})
|
|
}
|
|
|
|
// EmitTx notifies:
|
|
// - `tx` topic (firehose)
|
|
// - `addr:<from>` topic
|
|
// - `addr:<to>` topic (if distinct from from)
|
|
//
|
|
// Synthetic BLOCK_REWARD transactions use `addr:<to>` only (the validator).
|
|
func (h *WSHub) EmitTx(tx *blockchain.Transaction) {
|
|
data, _ := json.Marshal(SSETxEvent{
|
|
ID: tx.ID,
|
|
TxType: tx.Type,
|
|
From: tx.From,
|
|
To: tx.To,
|
|
Amount: tx.Amount,
|
|
Fee: tx.Fee,
|
|
})
|
|
topics := []string{"tx"}
|
|
if tx.From != "" {
|
|
topics = append(topics, "addr:"+tx.From)
|
|
}
|
|
if tx.To != "" && tx.To != tx.From {
|
|
topics = append(topics, "addr:"+tx.To)
|
|
}
|
|
h.fanout(wsServerFrame{Event: "tx", Data: data}, topics)
|
|
}
|
|
|
|
// EmitContractLog fans out to the `contract_log` and `contract:<id>` topics.
|
|
func (h *WSHub) EmitContractLog(entry blockchain.ContractLogEntry) {
|
|
data, _ := json.Marshal(entry)
|
|
topics := []string{"contract_log", "contract:" + entry.ContractID}
|
|
h.fanout(wsServerFrame{Event: "contract_log", Data: data}, topics)
|
|
}
|
|
|
|
// EmitInbox pushes a relay envelope summary to subscribers of the recipient's
|
|
// inbox topic. `envelope` is the full relay.Envelope but we only serialise a
|
|
// minimal shape here — the client can refetch from /api/relay/inbox for the
|
|
// ciphertext if it missed a frame.
|
|
//
|
|
// Called from relay.Mailbox.onStore (wired in cmd/node/main.go). Avoids
|
|
// importing the relay package here to keep the hub dependency-light.
|
|
func (h *WSHub) EmitInbox(recipientX25519 string, envelopeSummary any) {
|
|
data, err := json.Marshal(envelopeSummary)
|
|
if err != nil {
|
|
return
|
|
}
|
|
topics := []string{"inbox", "inbox:" + recipientX25519}
|
|
h.fanout(wsServerFrame{Event: "inbox", Data: data}, topics)
|
|
}
|
|
|
|
// EmitBlockWithTxs is the matching convenience method of SSEHub.
|
|
func (h *WSHub) EmitBlockWithTxs(b *blockchain.Block) {
|
|
h.EmitBlock(b)
|
|
for _, tx := range b.Transactions {
|
|
h.EmitTx(tx)
|
|
}
|
|
}
|
|
|
|
// ── per-client bookkeeping ───────────────────────────────────────────────────
|
|
|
|
type wsClient struct {
|
|
conn *websocket.Conn
|
|
send chan []byte
|
|
mu sync.RWMutex
|
|
subs map[string]struct{}
|
|
|
|
// auth state. authNonce is set when the connection opens; the client
|
|
// signs it and sends via the `auth` op. On success we store the
|
|
// pubkey and (if available) the matching X25519 key so scoped
|
|
// subscriptions can be validated without a DB lookup on each op.
|
|
authNonce string
|
|
authPubKey string // Ed25519 pubkey hex, empty = unauthenticated
|
|
authX25519 string // X25519 pubkey hex, empty if not looked up
|
|
|
|
// remoteIP is stored so removeClient can decrement the per-IP counter.
|
|
remoteIP string
|
|
|
|
// tokenOK is set at upgrade time: true if the connection passed the
|
|
// access-token check (or no token was required). submit_tx ops are
|
|
// rejected when tokenOK is false — matches the HTTP POST /api/tx
|
|
// behaviour so a private node's write surface is uniform across
|
|
// transports.
|
|
tokenOK bool
|
|
}
|
|
|
|
func (c *wsClient) sendFrame(f wsServerFrame) {
|
|
buf, err := json.Marshal(f)
|
|
if err != nil {
|
|
return
|
|
}
|
|
select {
|
|
case c.send <- buf:
|
|
default:
|
|
// Client outbox full; drop. The lag indicator in fanout handles
|
|
// recovery notifications; control-plane frames (errors/acks) from
|
|
// readLoop are best-effort.
|
|
_ = fmt.Sprint // silence unused import on trimmed builds
|
|
}
|
|
}
|