// Package node — WebSocket gateway. // // Clients connect to GET /api/ws and maintain a persistent bidirectional // connection. The gateway eliminates HTTP polling for balance, messages, // and contact requests by pushing events as soon as they are committed. // // Protocol (JSON, one frame per line): // // Client → Server: // // { "op": "subscribe", "topic": "tx" } // { "op": "subscribe", "topic": "blocks" } // { "op": "subscribe", "topic": "addr:" } // txs involving this address // { "op": "unsubscribe", "topic": "..." } // { "op": "ping" } // // Server → Client: // // { "event": "hello", "chain_id": "dchain-...", "tip_height": 1234 } // { "event": "block", "data": { index, hash, tx_count, validator, timestamp } } // { "event": "tx", "data": { id, tx_type, from, to, amount, fee } } // { "event": "contract_log", "data": { ... } } // { "event": "pong" } // { "event": "error", "msg": "..." } // { "event": "subscribed", "topic": "..." } // // Design notes: // - Each connection has a bounded outbox (64 frames). If the client is // slower than the producer, oldest frames are dropped and a // `{"event":"lag"}` notice is sent so the UI can trigger a resync. // - Subscriptions are per-connection and kept in memory only. Reconnection // requires re-subscribing; this is cheap because the client's Zustand // store remembers what it needs. // - The hub reuses the same event sources as the SSE hub so both transports // stay in sync; any caller that emits to SSE also emits here. package node import ( "crypto/ed25519" "crypto/rand" "encoding/hex" "encoding/json" "fmt" "net/http" "strings" "sync" "time" "github.com/gorilla/websocket" "go-blockchain/blockchain" ) // ── wire types ─────────────────────────────────────────────────────────────── type wsClientCmd struct { Op string `json:"op"` Topic string `json:"topic,omitempty"` // submit_tx fields — carries a full signed transaction + a client-assigned // request id so the ack frame can be correlated on the client. Tx json.RawMessage `json:"tx,omitempty"` ID string `json:"id,omitempty"` // auth fields — client proves ownership of an Ed25519 pubkey by signing // the server-supplied nonce (sent in the hello frame as auth_nonce). PubKey string `json:"pubkey,omitempty"` Signature string `json:"sig,omitempty"` // typing fields — `to` is the recipient's X25519 pubkey (same key used // for the inbox topic). Server fans out to subscribers of // `typing:` so the recipient's client can show an indicator. // Purely ephemeral: never stored, never gossiped across nodes. To string `json:"to,omitempty"` } type wsServerFrame struct { Event string `json:"event"` Data json.RawMessage `json:"data,omitempty"` Topic string `json:"topic,omitempty"` Msg string `json:"msg,omitempty"` ChainID string `json:"chain_id,omitempty"` TipHeight uint64 `json:"tip_height,omitempty"` // Sent with the hello frame. The client signs it with their Ed25519 // private key and replies via the `auth` op; the server binds the // connection to the authenticated pubkey for scoped subscriptions. AuthNonce string `json:"auth_nonce,omitempty"` // submit_ack fields. ID string `json:"id,omitempty"` Status string `json:"status,omitempty"` Reason string `json:"reason,omitempty"` } // WSSubmitTxHandler is the hook the hub calls on receiving a `submit_tx` op. // Implementations should do the same validation as the HTTP /api/tx handler // (timestamp window + signature verify + mempool add) and return an error on // rejection. A nil error means the tx has been accepted into the mempool. type WSSubmitTxHandler func(txJSON []byte) (txID string, err error) // ── hub ────────────────────────────────────────────────────────────────────── // X25519ForPubFn, if set, lets the hub map an authenticated Ed25519 pubkey // to the X25519 key the same identity uses for relay encryption. This lets // the auth'd client subscribe to their own inbox without revealing that // mapping to unauthenticated clients. Returns ("", nil) if unknown. type X25519ForPubFn func(ed25519PubHex string) (x25519PubHex string, err error) // WSHub tracks every open websocket connection and fans out events based on // per-connection topic filters. type WSHub struct { mu sync.RWMutex clients map[*wsClient]struct{} upgrader websocket.Upgrader chainID func() string tip func() uint64 // submitTx is the optional handler for `submit_tx` client ops. If nil, // the hub replies with an error so clients know to fall back to HTTP. submitTx WSSubmitTxHandler // x25519For maps Ed25519 pubkey → X25519 pubkey (from the identity // registry) so the hub can validate `inbox:` subscriptions // against the authenticated identity. Optional — unset disables // inbox auth (subscribe just requires auth but not identity lookup). x25519For X25519ForPubFn // Per-IP connection counter. Guards against a single host opening // unbounded sockets (memory exhaustion, descriptor exhaustion). // Counter mutates on connect/disconnect; protected by perIPMu rather // than the hub's main mu so fanout isn't blocked by bookkeeping. perIPMu sync.Mutex perIP map[string]int maxPerIP int } // WSMaxConnectionsPerIP caps concurrent websocket connections from one IP. // Chosen to comfortably fit a power user with multiple devices (phone, web, // load-test script) while still bounding a DoS. Override via SetMaxPerIP // on the hub if a specific deployment needs different limits. const WSMaxConnectionsPerIP = 10 // WSMaxSubsPerConnection caps subscriptions per single connection. A real // client needs 3-5 topics (addr, inbox, blocks). 32 is generous headroom // without letting one conn hold thousands of topic entries. const WSMaxSubsPerConnection = 32 // NewWSHub constructs a hub. `chainID` and `tip` are optional snapshot // functions used only for the hello frame. func NewWSHub(chainID func() string, tip func() uint64) *WSHub { return &WSHub{ clients: make(map[*wsClient]struct{}), upgrader: websocket.Upgrader{ // The node may sit behind a reverse proxy; allow cross-origin // upgrades. Rate limiting happens separately via api_guards. CheckOrigin: func(r *http.Request) bool { return true }, ReadBufferSize: 4 * 1024, WriteBufferSize: 4 * 1024, }, chainID: chainID, tip: tip, perIP: make(map[string]int), maxPerIP: WSMaxConnectionsPerIP, } } // SetMaxPerIP overrides the default per-IP connection cap. Must be called // before Upgrade starts accepting — otherwise racy with new connections. func (h *WSHub) SetMaxPerIP(n int) { if n <= 0 { return } h.perIPMu.Lock() h.maxPerIP = n h.perIPMu.Unlock() } // SetSubmitTxHandler installs the handler for `submit_tx` ops. Pass nil to // disable WS submission (clients will need to keep using HTTP POST /api/tx). func (h *WSHub) SetSubmitTxHandler(fn WSSubmitTxHandler) { h.submitTx = fn } // SetX25519ForPub installs the Ed25519→X25519 lookup used to validate // inbox subscriptions post-auth. Pass nil to disable the check. func (h *WSHub) SetX25519ForPub(fn X25519ForPubFn) { h.x25519For = fn } // Clients returns the number of active websocket connections. func (h *WSHub) Clients() int { h.mu.RLock() defer h.mu.RUnlock() return len(h.clients) } // ServeHTTP handles GET /api/ws and upgrades to a websocket. func (h *WSHub) ServeHTTP(w http.ResponseWriter, r *http.Request) { ip := clientIP(r) // Access-token gating (if configured). Private nodes require the token // for the upgrade itself. Public-with-token-for-writes nodes check it // here too — tokenOK is stored on the client so submit_tx can rely on // the upgrade-time decision without re-reading headers per op. tokenOK := true if tok, _ := AccessTokenForWS(); tok != "" { if err := checkAccessToken(r); err != nil { if _, private := AccessTokenForWS(); private { w.Header().Set("WWW-Authenticate", `Bearer realm="dchain"`) http.Error(w, "ws: "+err.Error(), http.StatusUnauthorized) return } // Public-with-token mode: upgrade allowed but write ops gated. tokenOK = false } } // Per-IP quota check. Reject BEFORE upgrade so we never hold an open // socket for a client we're going to kick. 429 Too Many Requests is the // closest status code — TCP is fine, they just opened too many. h.perIPMu.Lock() if h.perIP[ip] >= h.maxPerIP { h.perIPMu.Unlock() w.Header().Set("Retry-After", "30") http.Error(w, fmt.Sprintf("too many websocket connections from %s (max %d)", ip, h.maxPerIP), http.StatusTooManyRequests) return } h.perIP[ip]++ h.perIPMu.Unlock() conn, err := h.upgrader.Upgrade(w, r, nil) if err != nil { // Upgrade already wrote an HTTP error response. Release the reservation. h.perIPMu.Lock() h.perIP[ip]-- if h.perIP[ip] <= 0 { delete(h.perIP, ip) } h.perIPMu.Unlock() return } // Generate a fresh 32-byte nonce per connection. Client signs this // with its Ed25519 private key to prove identity; binding is per- // connection (reconnect → new nonce → new auth). nonceBytes := make([]byte, 32) if _, err := rand.Read(nonceBytes); err != nil { conn.Close() return } nonce := hex.EncodeToString(nonceBytes) c := &wsClient{ conn: conn, send: make(chan []byte, 64), subs: make(map[string]struct{}), authNonce: nonce, remoteIP: ip, tokenOK: tokenOK, } h.mu.Lock() h.clients[c] = struct{}{} h.mu.Unlock() MetricWSConnections.Inc() // Send hello with chain metadata so the client can show connection state // and verify it's on the expected chain. var chainID string var tip uint64 if h.chainID != nil { chainID = h.chainID() } if h.tip != nil { tip = h.tip() } helloBytes, _ := json.Marshal(wsServerFrame{ Event: "hello", ChainID: chainID, TipHeight: tip, AuthNonce: nonce, }) select { case c.send <- helloBytes: default: } // Reader + writer goroutines. When either returns, the other is signalled // to shut down by closing the send channel. go h.writeLoop(c) h.readLoop(c) // blocks until the connection closes } func (h *WSHub) removeClient(c *wsClient) { h.mu.Lock() wasRegistered := false if _, ok := h.clients[c]; ok { delete(h.clients, c) close(c.send) wasRegistered = true } h.mu.Unlock() if wasRegistered { MetricWSConnections.Dec() } // Release the per-IP reservation so the client can reconnect without // being rejected. Missing-or-zero counters are silently no-op'd. if c.remoteIP != "" { h.perIPMu.Lock() if h.perIP[c.remoteIP] > 0 { h.perIP[c.remoteIP]-- if h.perIP[c.remoteIP] == 0 { delete(h.perIP, c.remoteIP) } } h.perIPMu.Unlock() } _ = c.conn.Close() } // readLoop processes control frames + parses JSON commands from the client. func (h *WSHub) readLoop(c *wsClient) { defer h.removeClient(c) c.conn.SetReadLimit(16 * 1024) // reject oversized frames _ = c.conn.SetReadDeadline(time.Now().Add(90 * time.Second)) c.conn.SetPongHandler(func(string) error { _ = c.conn.SetReadDeadline(time.Now().Add(90 * time.Second)) return nil }) for { _, data, err := c.conn.ReadMessage() if err != nil { return } var cmd wsClientCmd if err := json.Unmarshal(data, &cmd); err != nil { c.sendFrame(wsServerFrame{Event: "error", Msg: "invalid JSON"}) continue } switch cmd.Op { case "auth": // Verify signature over the connection's auth_nonce. On success, // bind this connection to the declared pubkey; scoped subs are // limited to topics this identity owns. pub, err := hex.DecodeString(cmd.PubKey) if err != nil || len(pub) != ed25519.PublicKeySize { c.sendFrame(wsServerFrame{Event: "error", Msg: "auth: invalid pubkey"}) continue } sig, err := hex.DecodeString(cmd.Signature) if err != nil || len(sig) != ed25519.SignatureSize { c.sendFrame(wsServerFrame{Event: "error", Msg: "auth: invalid signature"}) continue } if !ed25519.Verify(ed25519.PublicKey(pub), []byte(c.authNonce), sig) { c.sendFrame(wsServerFrame{Event: "error", Msg: "auth: signature mismatch"}) continue } var x25519 string if h.x25519For != nil { x25519, _ = h.x25519For(cmd.PubKey) // non-fatal on error } c.mu.Lock() c.authPubKey = cmd.PubKey c.authX25519 = x25519 c.mu.Unlock() c.sendFrame(wsServerFrame{Event: "subscribed", Topic: "auth:" + cmd.PubKey[:12] + "…"}) case "subscribe": topic := strings.TrimSpace(cmd.Topic) if topic == "" { c.sendFrame(wsServerFrame{Event: "error", Msg: "topic required"}) continue } // Scoped topics require matching auth. Unauthenticated clients // get global streams (`blocks`, `tx`, `contract_log`, …) only. if err := h.authorizeSubscribe(c, topic); err != nil { c.sendFrame(wsServerFrame{Event: "error", Msg: "forbidden: " + err.Error()}) continue } c.mu.Lock() if len(c.subs) >= WSMaxSubsPerConnection { c.mu.Unlock() c.sendFrame(wsServerFrame{Event: "error", Msg: fmt.Sprintf("subscription limit exceeded (%d)", WSMaxSubsPerConnection)}) continue } c.subs[topic] = struct{}{} c.mu.Unlock() c.sendFrame(wsServerFrame{Event: "subscribed", Topic: topic}) case "unsubscribe": c.mu.Lock() delete(c.subs, cmd.Topic) c.mu.Unlock() case "ping": c.sendFrame(wsServerFrame{Event: "pong"}) case "typing": // Ephemeral signal: A is typing to B. Requires auth so the // "from" claim is verifiable; anon clients can't spoof // typing indicators for other identities. c.mu.RLock() fromX := c.authX25519 c.mu.RUnlock() to := strings.TrimSpace(cmd.To) if fromX == "" || to == "" { // Silently drop — no need to error, typing is best-effort. continue } data, _ := json.Marshal(map[string]string{ "from": fromX, "to": to, }) h.fanout(wsServerFrame{Event: "typing", Data: data}, []string{"typing:" + to}) case "submit_tx": // Low-latency transaction submission over the existing WS // connection. Avoids the HTTP round-trip and delivers a // submit_ack correlated by the client-supplied id so callers // don't have to poll for inclusion status. c.mu.RLock() tokOK := c.tokenOK c.mu.RUnlock() if !tokOK { c.sendFrame(wsServerFrame{ Event: "submit_ack", ID: cmd.ID, Status: "rejected", Reason: "submit requires access token; pass ?token= at /api/ws upgrade", }) continue } if h.submitTx == nil { c.sendFrame(wsServerFrame{ Event: "submit_ack", ID: cmd.ID, Status: "rejected", Reason: "submit_tx over WS not available on this node", }) continue } if len(cmd.Tx) == 0 { c.sendFrame(wsServerFrame{ Event: "submit_ack", ID: cmd.ID, Status: "rejected", Reason: "missing tx", }) continue } txID, err := h.submitTx(cmd.Tx) if err != nil { c.sendFrame(wsServerFrame{ Event: "submit_ack", ID: cmd.ID, Status: "rejected", Reason: err.Error(), }) continue } // Echo back the server-assigned tx id for confirmation. The // client already knows it (it generated it), but this lets // proxies / middleware log a proper request/response pair. c.sendFrame(wsServerFrame{ Event: "submit_ack", ID: cmd.ID, Status: "accepted", Msg: txID, }) default: c.sendFrame(wsServerFrame{Event: "error", Msg: "unknown op: " + cmd.Op}) } } } // writeLoop pumps outbound frames and sends periodic pings. func (h *WSHub) writeLoop(c *wsClient) { ping := time.NewTicker(30 * time.Second) defer ping.Stop() for { select { case msg, ok := <-c.send: if !ok { return } _ = c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := c.conn.WriteMessage(websocket.TextMessage, msg); err != nil { return } case <-ping.C: _ = c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } // authorizeSubscribe gates which topics a connection is allowed to join. // // Rules: // - Global topics (blocks, tx, contract_log, contract:*, inbox) are // open to anyone — `tx` leaks only public envelope fields, and the // whole-`inbox` firehose is encrypted per-recipient anyway. // - Scoped topics addressed at a specific identity require the client // to have authenticated as that identity: // addr: — only the owning Ed25519 pubkey // inbox: — only the identity whose registered X25519 // key equals this (looked up via x25519For) // // Without this check any curl client could subscribe to any address and // watch incoming transactions in real time — a significant metadata leak. func (h *WSHub) authorizeSubscribe(c *wsClient, topic string) error { // Open topics — always allowed. switch topic { case "blocks", "tx", "contract_log", "inbox", "$system": return nil } if strings.HasPrefix(topic, "contract:") { return nil // contract-wide log streams are public } c.mu.RLock() authed := c.authPubKey authX := c.authX25519 c.mu.RUnlock() if strings.HasPrefix(topic, "addr:") { if authed == "" { return fmt.Errorf("addr:* requires auth") } want := strings.TrimPrefix(topic, "addr:") if want != authed { return fmt.Errorf("addr:* only for your own pubkey") } return nil } if strings.HasPrefix(topic, "inbox:") { if authed == "" { return fmt.Errorf("inbox:* requires auth") } // If we have an x25519 mapping, enforce it; otherwise accept // (best-effort — identity may not be registered yet). if authX != "" { want := strings.TrimPrefix(topic, "inbox:") if want != authX { return fmt.Errorf("inbox:* only for your own x25519") } } return nil } if strings.HasPrefix(topic, "typing:") { // Same rule as inbox: you can only listen for "who's typing to ME". if authed == "" { return fmt.Errorf("typing:* requires auth") } if authX != "" { want := strings.TrimPrefix(topic, "typing:") if want != authX { return fmt.Errorf("typing:* only for your own x25519") } } return nil } // Unknown scoped form — default-deny. return fmt.Errorf("topic %q not recognised", topic) } // fanout sends frame to every client subscribed to any of the given topics. func (h *WSHub) fanout(frame wsServerFrame, topics []string) { buf, err := json.Marshal(frame) if err != nil { return } h.mu.RLock() defer h.mu.RUnlock() for c := range h.clients { c.mu.RLock() matched := false for _, t := range topics { if _, ok := c.subs[t]; ok { matched = true break } } c.mu.RUnlock() if !matched { continue } select { case c.send <- buf: default: // Outbox full — drop and notify once. lagFrame, _ := json.Marshal(wsServerFrame{Event: "lag"}) select { case c.send <- lagFrame: default: } } } } // ── public emit methods ─────────────────────────────────────────────────────── // EmitBlock notifies subscribers of the `blocks` topic. func (h *WSHub) EmitBlock(b *blockchain.Block) { data, _ := json.Marshal(SSEBlockEvent{ Index: b.Index, Hash: b.HashHex(), TxCount: len(b.Transactions), Validator: b.Validator, Timestamp: b.Timestamp.UTC().Format(time.RFC3339), }) h.fanout(wsServerFrame{Event: "block", Data: data}, []string{"blocks"}) } // EmitTx notifies: // - `tx` topic (firehose) // - `addr:` topic // - `addr:` topic (if distinct from from) // // Synthetic BLOCK_REWARD transactions use `addr:` only (the validator). func (h *WSHub) EmitTx(tx *blockchain.Transaction) { data, _ := json.Marshal(SSETxEvent{ ID: tx.ID, TxType: tx.Type, From: tx.From, To: tx.To, Amount: tx.Amount, Fee: tx.Fee, }) topics := []string{"tx"} if tx.From != "" { topics = append(topics, "addr:"+tx.From) } if tx.To != "" && tx.To != tx.From { topics = append(topics, "addr:"+tx.To) } h.fanout(wsServerFrame{Event: "tx", Data: data}, topics) } // EmitContractLog fans out to the `contract_log` and `contract:` topics. func (h *WSHub) EmitContractLog(entry blockchain.ContractLogEntry) { data, _ := json.Marshal(entry) topics := []string{"contract_log", "contract:" + entry.ContractID} h.fanout(wsServerFrame{Event: "contract_log", Data: data}, topics) } // EmitInbox pushes a relay envelope summary to subscribers of the recipient's // inbox topic. `envelope` is the full relay.Envelope but we only serialise a // minimal shape here — the client can refetch from /api/relay/inbox for the // ciphertext if it missed a frame. // // Called from relay.Mailbox.onStore (wired in cmd/node/main.go). Avoids // importing the relay package here to keep the hub dependency-light. func (h *WSHub) EmitInbox(recipientX25519 string, envelopeSummary any) { data, err := json.Marshal(envelopeSummary) if err != nil { return } topics := []string{"inbox", "inbox:" + recipientX25519} h.fanout(wsServerFrame{Event: "inbox", Data: data}, topics) } // EmitBlockWithTxs is the matching convenience method of SSEHub. func (h *WSHub) EmitBlockWithTxs(b *blockchain.Block) { h.EmitBlock(b) for _, tx := range b.Transactions { h.EmitTx(tx) } } // ── per-client bookkeeping ─────────────────────────────────────────────────── type wsClient struct { conn *websocket.Conn send chan []byte mu sync.RWMutex subs map[string]struct{} // auth state. authNonce is set when the connection opens; the client // signs it and sends via the `auth` op. On success we store the // pubkey and (if available) the matching X25519 key so scoped // subscriptions can be validated without a DB lookup on each op. authNonce string authPubKey string // Ed25519 pubkey hex, empty = unauthenticated authX25519 string // X25519 pubkey hex, empty if not looked up // remoteIP is stored so removeClient can decrement the per-IP counter. remoteIP string // tokenOK is set at upgrade time: true if the connection passed the // access-token check (or no token was required). submit_tx ops are // rejected when tokenOK is false — matches the HTTP POST /api/tx // behaviour so a private node's write surface is uniform across // transports. tokenOK bool } func (c *wsClient) sendFrame(f wsServerFrame) { buf, err := json.Marshal(f) if err != nil { return } select { case c.send <- buf: default: // Client outbox full; drop. The lag indicator in fanout handles // recovery notifications; control-plane frames (errors/acks) from // readLoop are best-effort. _ = fmt.Sprint // silence unused import on trimmed builds } }