package relay import ( "encoding/json" "errors" "fmt" "strings" "time" badger "github.com/dgraph-io/badger/v4" ) const ( // mailboxTTL is how long undelivered envelopes are retained. mailboxTTL = 7 * 24 * time.Hour // mailboxPrefix is the BadgerDB key prefix for stored envelopes. // Key format: mail::: mailboxPrefix = "mail:" // MailboxMaxLimit caps the number of envelopes returned per single query. MailboxMaxLimit = 200 // MailboxPerRecipientCap is the maximum number of envelopes stored per // recipient key. When the cap is reached, the oldest envelope is evicted // before the new one is written (sliding window, FIFO). // At 64 KB max per envelope, one recipient occupies at most ~32 MB. MailboxPerRecipientCap = 500 // MailboxMaxEnvelopeSize is the maximum allowed ciphertext length in bytes. // Rejects oversized envelopes before writing to disk. MailboxMaxEnvelopeSize = 64 * 1024 // 64 KB ) // ErrEnvelopeTooLarge is returned by Store when the envelope exceeds the size limit. var ErrEnvelopeTooLarge = errors.New("envelope ciphertext exceeds maximum allowed size") // ErrMailboxFull is never returned externally (oldest entry is evicted instead), // but kept as a sentinel for internal logic. var errMailboxFull = errors.New("recipient mailbox is at capacity") // Mailbox is a BadgerDB-backed store for relay envelopes awaiting pickup. // Every received envelope is stored with a 7-day TTL regardless of whether // the recipient is currently online. Recipients poll GET /relay/inbox to fetch // and DELETE /relay/inbox/{id} to acknowledge delivery. // // Anti-spam guarantees: // - Envelopes larger than MailboxMaxEnvelopeSize (64 KB) are rejected. // - At most MailboxPerRecipientCap (500) envelopes per recipient are stored; // when the cap is hit the oldest entry is silently evicted (FIFO). // - All entries expire automatically after 7 days (BadgerDB TTL). // // Messages are stored encrypted — the relay cannot read their contents. type Mailbox struct { db *badger.DB // onStore, if set, is invoked after every successful Store. Used by the // node to push a WebSocket `inbox` event to subscribers of the // recipient's x25519 pubkey so the mobile client stops polling // /relay/inbox every 3 seconds. // // The callback MUST NOT block — it runs on the writer goroutine. Long // work should be fanned out to a goroutine by the callback itself. onStore func(*Envelope) } // SetOnStore registers a post-Store hook. Pass nil to clear. Safe to call // before accepting traffic (wired once at node startup in main.go). func (m *Mailbox) SetOnStore(cb func(*Envelope)) { m.onStore = cb } // NewMailbox creates a Mailbox backed by the given BadgerDB instance. func NewMailbox(db *badger.DB) *Mailbox { return &Mailbox{db: db} } // OpenMailbox opens (or creates) a dedicated BadgerDB at dbPath for the mailbox. // // Storage tuning matches blockchain/chain.NewChain — 64 MiB vlog files // (instead of 1 GiB default) so GC can actually shrink the DB, and single // version retention since envelopes are either present or deleted. func OpenMailbox(dbPath string) (*Mailbox, error) { opts := badger.DefaultOptions(dbPath). WithLogger(nil). WithValueLogFileSize(64 << 20). WithNumVersionsToKeep(1). WithCompactL0OnClose(true) db, err := badger.Open(opts) if err != nil { return nil, fmt.Errorf("open mailbox db: %w", err) } return &Mailbox{db: db}, nil } // Close closes the underlying database. func (m *Mailbox) Close() error { return m.db.Close() } // Store persists an envelope with a 7-day TTL. // // Anti-spam checks (in order): // 1. Ciphertext > MailboxMaxEnvelopeSize → returns ErrEnvelopeTooLarge. // 2. Duplicate envelope ID → silently overwritten (idempotent). // 3. Recipient already has MailboxPerRecipientCap entries → oldest evicted first. func (m *Mailbox) Store(env *Envelope) error { if len(env.Ciphertext) > MailboxMaxEnvelopeSize { return ErrEnvelopeTooLarge } key := mailboxKey(env.RecipientPub, env.SentAt, env.ID) val, err := json.Marshal(env) if err != nil { return fmt.Errorf("marshal envelope: %w", err) } // Track whether this was a fresh insert (vs. duplicate) so we can skip // firing the WS hook for idempotent resubmits — otherwise a misbehaving // sender could amplify events by spamming the same envelope ID. fresh := false err = m.db.Update(func(txn *badger.Txn) error { // Check if this exact envelope is already stored (idempotent). if _, err := txn.Get([]byte(key)); err == nil { return nil // already present, no-op } // Count existing envelopes for this recipient and collect the oldest key. prefix := []byte(fmt.Sprintf("%s%s:", mailboxPrefix, env.RecipientPub)) opts := badger.DefaultIteratorOptions opts.PrefetchValues = false opts.Prefix = prefix var count int var oldestKey []byte it := txn.NewIterator(opts) for it.Rewind(); it.Valid(); it.Next() { if count == 0 { oldestKey = it.Item().KeyCopy(nil) // first = oldest (sorted by sentAt) } count++ } it.Close() // Evict the oldest envelope if cap is reached. if count >= MailboxPerRecipientCap && oldestKey != nil { if err := txn.Delete(oldestKey); err != nil { return fmt.Errorf("evict oldest envelope: %w", err) } } e := badger.NewEntry([]byte(key), val).WithTTL(mailboxTTL) if err := txn.SetEntry(e); err != nil { return err } fresh = true return nil }) if err == nil && fresh && m.onStore != nil { m.onStore(env) } return err } // List returns up to limit envelopes for recipientPubHex, ordered oldest-first. // Pass since > 0 to skip envelopes with SentAt < since (unix timestamp). func (m *Mailbox) List(recipientPubHex string, since int64, limit int) ([]*Envelope, error) { if limit <= 0 || limit > MailboxMaxLimit { limit = MailboxMaxLimit } prefix := []byte(fmt.Sprintf("%s%s:", mailboxPrefix, recipientPubHex)) var out []*Envelope err := m.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.Prefix = prefix it := txn.NewIterator(opts) defer it.Close() for it.Rewind(); it.Valid() && len(out) < limit; it.Next() { if err := it.Item().Value(func(val []byte) error { var env Envelope if err := json.Unmarshal(val, &env); err != nil { return nil // skip corrupt entries } if since > 0 && env.SentAt < since { return nil } out = append(out, &env) return nil }); err != nil { return err } } return nil }) return out, err } // Delete removes an envelope by ID. // It scans the recipient's prefix to locate the full key (sentAt is not known by caller). // Returns nil if the envelope is not found (already expired or never stored). func (m *Mailbox) Delete(recipientPubHex, envelopeID string) error { prefix := []byte(fmt.Sprintf("%s%s:", mailboxPrefix, recipientPubHex)) var found []byte err := m.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchValues = false opts.Prefix = prefix it := txn.NewIterator(opts) defer it.Close() suffix := ":" + envelopeID for it.Rewind(); it.Valid(); it.Next() { key := it.Item().KeyCopy(nil) if strings.HasSuffix(string(key), suffix) { found = key return nil } } return nil }) if err != nil || found == nil { return err } return m.db.Update(func(txn *badger.Txn) error { return txn.Delete(found) }) } // Count returns the number of stored envelopes for a recipient. func (m *Mailbox) Count(recipientPubHex string) (int, error) { prefix := []byte(fmt.Sprintf("%s%s:", mailboxPrefix, recipientPubHex)) count := 0 err := m.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchValues = false opts.Prefix = prefix it := txn.NewIterator(opts) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { count++ } return nil }) return count, err } // RunGC periodically runs BadgerDB value log garbage collection. // Call in a goroutine — blocks until cancelled via channel close or process exit. func (m *Mailbox) RunGC() { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for range ticker.C { for m.db.RunValueLogGC(0.5) == nil { // drain until nothing left to collect } } } func mailboxKey(recipientPubHex string, sentAt int64, envelopeID string) string { // Zero-padded sentAt keeps lexicographic order == chronological order. // Oldest entry = first key in iterator — used for FIFO eviction. return fmt.Sprintf("%s%s:%020d:%s", mailboxPrefix, recipientPubHex, sentAt, envelopeID) }