DChain single-node blockchain + React Native messenger client. Core: - PBFT consensus with multi-sig validator admission + equivocation slashing - BadgerDB + schema migration scaffold (CurrentSchemaVersion=0) - libp2p gossipsub (tx/v1, blocks/v1, relay/v1, version/v1) - Native Go contracts (username_registry) alongside WASM (wazero) - WebSocket gateway with topic-based fanout + Ed25519-nonce auth - Relay mailbox with NaCl envelope encryption (X25519 + Ed25519) - Prometheus /metrics, per-IP rate limit, body-size cap Deployment: - Single-node compose (deploy/single/) with Caddy TLS + optional Prometheus - 3-node dev compose (docker-compose.yml) with mocked internet topology - 3-validator prod compose (deploy/prod/) for federation - Auto-update from Gitea via /api/update-check + systemd timer - Build-time version injection (ldflags → node --version) - UI / Swagger toggle flags (DCHAIN_DISABLE_UI, DCHAIN_DISABLE_SWAGGER) Client (client-app/): - Expo / React Native / NativeWind - E2E NaCl encryption, typing indicator, contact requests - Auto-discovery of canonical contracts, chain_id aware, WS reconnect on node switch Documentation: - README.md, CHANGELOG.md, CONTEXT.md - deploy/single/README.md with 6 operator scenarios - deploy/UPDATE_STRATEGY.md with 4-layer forward-compat design - docs/contracts/*.md per contract
266 lines
8.4 KiB
Go
266 lines
8.4 KiB
Go
package relay
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
badger "github.com/dgraph-io/badger/v4"
|
|
)
|
|
|
|
const (
|
|
// mailboxTTL is how long undelivered envelopes are retained.
|
|
mailboxTTL = 7 * 24 * time.Hour
|
|
|
|
// mailboxPrefix is the BadgerDB key prefix for stored envelopes.
|
|
// Key format: mail:<recipientX25519Hex>:<sentAt_20d>:<envelopeID>
|
|
mailboxPrefix = "mail:"
|
|
|
|
// MailboxMaxLimit caps the number of envelopes returned per single query.
|
|
MailboxMaxLimit = 200
|
|
|
|
// MailboxPerRecipientCap is the maximum number of envelopes stored per
|
|
// recipient key. When the cap is reached, the oldest envelope is evicted
|
|
// before the new one is written (sliding window, FIFO).
|
|
// At 64 KB max per envelope, one recipient occupies at most ~32 MB.
|
|
MailboxPerRecipientCap = 500
|
|
|
|
// MailboxMaxEnvelopeSize is the maximum allowed ciphertext length in bytes.
|
|
// Rejects oversized envelopes before writing to disk.
|
|
MailboxMaxEnvelopeSize = 64 * 1024 // 64 KB
|
|
)
|
|
|
|
// ErrEnvelopeTooLarge is returned by Store when the envelope exceeds the size limit.
|
|
var ErrEnvelopeTooLarge = errors.New("envelope ciphertext exceeds maximum allowed size")
|
|
|
|
// ErrMailboxFull is never returned externally (oldest entry is evicted instead),
|
|
// but kept as a sentinel for internal logic.
|
|
var errMailboxFull = errors.New("recipient mailbox is at capacity")
|
|
|
|
// Mailbox is a BadgerDB-backed store for relay envelopes awaiting pickup.
|
|
// Every received envelope is stored with a 7-day TTL regardless of whether
|
|
// the recipient is currently online. Recipients poll GET /relay/inbox to fetch
|
|
// and DELETE /relay/inbox/{id} to acknowledge delivery.
|
|
//
|
|
// Anti-spam guarantees:
|
|
// - Envelopes larger than MailboxMaxEnvelopeSize (64 KB) are rejected.
|
|
// - At most MailboxPerRecipientCap (500) envelopes per recipient are stored;
|
|
// when the cap is hit the oldest entry is silently evicted (FIFO).
|
|
// - All entries expire automatically after 7 days (BadgerDB TTL).
|
|
//
|
|
// Messages are stored encrypted — the relay cannot read their contents.
|
|
type Mailbox struct {
|
|
db *badger.DB
|
|
|
|
// onStore, if set, is invoked after every successful Store. Used by the
|
|
// node to push a WebSocket `inbox` event to subscribers of the
|
|
// recipient's x25519 pubkey so the mobile client stops polling
|
|
// /relay/inbox every 3 seconds.
|
|
//
|
|
// The callback MUST NOT block — it runs on the writer goroutine. Long
|
|
// work should be fanned out to a goroutine by the callback itself.
|
|
onStore func(*Envelope)
|
|
}
|
|
|
|
// SetOnStore registers a post-Store hook. Pass nil to clear. Safe to call
|
|
// before accepting traffic (wired once at node startup in main.go).
|
|
func (m *Mailbox) SetOnStore(cb func(*Envelope)) {
|
|
m.onStore = cb
|
|
}
|
|
|
|
// NewMailbox creates a Mailbox backed by the given BadgerDB instance.
|
|
func NewMailbox(db *badger.DB) *Mailbox {
|
|
return &Mailbox{db: db}
|
|
}
|
|
|
|
// OpenMailbox opens (or creates) a dedicated BadgerDB at dbPath for the mailbox.
|
|
//
|
|
// Storage tuning matches blockchain/chain.NewChain — 64 MiB vlog files
|
|
// (instead of 1 GiB default) so GC can actually shrink the DB, and single
|
|
// version retention since envelopes are either present or deleted.
|
|
func OpenMailbox(dbPath string) (*Mailbox, error) {
|
|
opts := badger.DefaultOptions(dbPath).
|
|
WithLogger(nil).
|
|
WithValueLogFileSize(64 << 20).
|
|
WithNumVersionsToKeep(1).
|
|
WithCompactL0OnClose(true)
|
|
db, err := badger.Open(opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("open mailbox db: %w", err)
|
|
}
|
|
return &Mailbox{db: db}, nil
|
|
}
|
|
|
|
// Close closes the underlying database.
|
|
func (m *Mailbox) Close() error { return m.db.Close() }
|
|
|
|
// Store persists an envelope with a 7-day TTL.
|
|
//
|
|
// Anti-spam checks (in order):
|
|
// 1. Ciphertext > MailboxMaxEnvelopeSize → returns ErrEnvelopeTooLarge.
|
|
// 2. Duplicate envelope ID → silently overwritten (idempotent).
|
|
// 3. Recipient already has MailboxPerRecipientCap entries → oldest evicted first.
|
|
func (m *Mailbox) Store(env *Envelope) error {
|
|
if len(env.Ciphertext) > MailboxMaxEnvelopeSize {
|
|
return ErrEnvelopeTooLarge
|
|
}
|
|
|
|
key := mailboxKey(env.RecipientPub, env.SentAt, env.ID)
|
|
val, err := json.Marshal(env)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal envelope: %w", err)
|
|
}
|
|
|
|
// Track whether this was a fresh insert (vs. duplicate) so we can skip
|
|
// firing the WS hook for idempotent resubmits — otherwise a misbehaving
|
|
// sender could amplify events by spamming the same envelope ID.
|
|
fresh := false
|
|
err = m.db.Update(func(txn *badger.Txn) error {
|
|
// Check if this exact envelope is already stored (idempotent).
|
|
if _, err := txn.Get([]byte(key)); err == nil {
|
|
return nil // already present, no-op
|
|
}
|
|
|
|
// Count existing envelopes for this recipient and collect the oldest key.
|
|
prefix := []byte(fmt.Sprintf("%s%s:", mailboxPrefix, env.RecipientPub))
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.PrefetchValues = false
|
|
opts.Prefix = prefix
|
|
|
|
var count int
|
|
var oldestKey []byte
|
|
it := txn.NewIterator(opts)
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
if count == 0 {
|
|
oldestKey = it.Item().KeyCopy(nil) // first = oldest (sorted by sentAt)
|
|
}
|
|
count++
|
|
}
|
|
it.Close()
|
|
|
|
// Evict the oldest envelope if cap is reached.
|
|
if count >= MailboxPerRecipientCap && oldestKey != nil {
|
|
if err := txn.Delete(oldestKey); err != nil {
|
|
return fmt.Errorf("evict oldest envelope: %w", err)
|
|
}
|
|
}
|
|
|
|
e := badger.NewEntry([]byte(key), val).WithTTL(mailboxTTL)
|
|
if err := txn.SetEntry(e); err != nil {
|
|
return err
|
|
}
|
|
fresh = true
|
|
return nil
|
|
})
|
|
if err == nil && fresh && m.onStore != nil {
|
|
m.onStore(env)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// List returns up to limit envelopes for recipientPubHex, ordered oldest-first.
|
|
// Pass since > 0 to skip envelopes with SentAt < since (unix timestamp).
|
|
func (m *Mailbox) List(recipientPubHex string, since int64, limit int) ([]*Envelope, error) {
|
|
if limit <= 0 || limit > MailboxMaxLimit {
|
|
limit = MailboxMaxLimit
|
|
}
|
|
prefix := []byte(fmt.Sprintf("%s%s:", mailboxPrefix, recipientPubHex))
|
|
var out []*Envelope
|
|
|
|
err := m.db.View(func(txn *badger.Txn) error {
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.Prefix = prefix
|
|
it := txn.NewIterator(opts)
|
|
defer it.Close()
|
|
|
|
for it.Rewind(); it.Valid() && len(out) < limit; it.Next() {
|
|
if err := it.Item().Value(func(val []byte) error {
|
|
var env Envelope
|
|
if err := json.Unmarshal(val, &env); err != nil {
|
|
return nil // skip corrupt entries
|
|
}
|
|
if since > 0 && env.SentAt < since {
|
|
return nil
|
|
}
|
|
out = append(out, &env)
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
return out, err
|
|
}
|
|
|
|
// Delete removes an envelope by ID.
|
|
// It scans the recipient's prefix to locate the full key (sentAt is not known by caller).
|
|
// Returns nil if the envelope is not found (already expired or never stored).
|
|
func (m *Mailbox) Delete(recipientPubHex, envelopeID string) error {
|
|
prefix := []byte(fmt.Sprintf("%s%s:", mailboxPrefix, recipientPubHex))
|
|
var found []byte
|
|
|
|
err := m.db.View(func(txn *badger.Txn) error {
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.PrefetchValues = false
|
|
opts.Prefix = prefix
|
|
it := txn.NewIterator(opts)
|
|
defer it.Close()
|
|
|
|
suffix := ":" + envelopeID
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
key := it.Item().KeyCopy(nil)
|
|
if strings.HasSuffix(string(key), suffix) {
|
|
found = key
|
|
return nil
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil || found == nil {
|
|
return err
|
|
}
|
|
|
|
return m.db.Update(func(txn *badger.Txn) error {
|
|
return txn.Delete(found)
|
|
})
|
|
}
|
|
|
|
// Count returns the number of stored envelopes for a recipient.
|
|
func (m *Mailbox) Count(recipientPubHex string) (int, error) {
|
|
prefix := []byte(fmt.Sprintf("%s%s:", mailboxPrefix, recipientPubHex))
|
|
count := 0
|
|
err := m.db.View(func(txn *badger.Txn) error {
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.PrefetchValues = false
|
|
opts.Prefix = prefix
|
|
it := txn.NewIterator(opts)
|
|
defer it.Close()
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
count++
|
|
}
|
|
return nil
|
|
})
|
|
return count, err
|
|
}
|
|
|
|
// RunGC periodically runs BadgerDB value log garbage collection.
|
|
// Call in a goroutine — blocks until cancelled via channel close or process exit.
|
|
func (m *Mailbox) RunGC() {
|
|
ticker := time.NewTicker(5 * time.Minute)
|
|
defer ticker.Stop()
|
|
for range ticker.C {
|
|
for m.db.RunValueLogGC(0.5) == nil {
|
|
// drain until nothing left to collect
|
|
}
|
|
}
|
|
}
|
|
|
|
func mailboxKey(recipientPubHex string, sentAt int64, envelopeID string) string {
|
|
// Zero-padded sentAt keeps lexicographic order == chronological order.
|
|
// Oldest entry = first key in iterator — used for FIFO eviction.
|
|
return fmt.Sprintf("%s%s:%020d:%s", mailboxPrefix, recipientPubHex, sentAt, envelopeID)
|
|
}
|