Files
dchain/cmd/loadtest/main.go
vsecoder 7e7393e4f8 chore: initial commit for v0.0.1
DChain single-node blockchain + React Native messenger client.

Core:
- PBFT consensus with multi-sig validator admission + equivocation slashing
- BadgerDB + schema migration scaffold (CurrentSchemaVersion=0)
- libp2p gossipsub (tx/v1, blocks/v1, relay/v1, version/v1)
- Native Go contracts (username_registry) alongside WASM (wazero)
- WebSocket gateway with topic-based fanout + Ed25519-nonce auth
- Relay mailbox with NaCl envelope encryption (X25519 + Ed25519)
- Prometheus /metrics, per-IP rate limit, body-size cap

Deployment:
- Single-node compose (deploy/single/) with Caddy TLS + optional Prometheus
- 3-node dev compose (docker-compose.yml) with mocked internet topology
- 3-validator prod compose (deploy/prod/) for federation
- Auto-update from Gitea via /api/update-check + systemd timer
- Build-time version injection (ldflags → node --version)
- UI / Swagger toggle flags (DCHAIN_DISABLE_UI, DCHAIN_DISABLE_SWAGGER)

Client (client-app/):
- Expo / React Native / NativeWind
- E2E NaCl encryption, typing indicator, contact requests
- Auto-discovery of canonical contracts, chain_id aware, WS reconnect on node switch

Documentation:
- README.md, CHANGELOG.md, CONTEXT.md
- deploy/single/README.md with 6 operator scenarios
- deploy/UPDATE_STRATEGY.md with 4-layer forward-compat design
- docs/contracts/*.md per contract
2026-04-17 14:16:44 +03:00

403 lines
12 KiB
Go

// Command loadtest — probes a running DChain cluster with N concurrent
// WebSocket clients, each subscribing to its own address and submitting
// periodic TRANSFER transactions.
//
// Goal: smoke-test the WS gateway, submit_tx path, native contracts, and
// mempool fairness end-to-end. Catches deadlocks / leaks that unit tests
// miss because they don't run the full stack.
//
// Usage:
//
// go run ./cmd/loadtest \
// --node http://localhost:8081 \
// --funder testdata/node1.json \
// --clients 50 \
// --duration 60s \
// --tx-per-client-per-sec 1
//
// Exits non-zero if:
// - chain tip doesn't advance during the run (consensus stuck)
// - any client's WS connection drops and fails to reconnect
// - mempool-reject rate exceeds 10%
package main
import (
"bytes"
"context"
"crypto/ed25519"
"crypto/rand"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"go-blockchain/blockchain"
"go-blockchain/identity"
)
func main() {
nodeURL := flag.String("node", "http://localhost:8081", "node HTTP base URL")
funderKey := flag.String("funder", "testdata/node1.json", "path to key file with balance used to fund the test clients")
numClients := flag.Int("clients", 50, "number of concurrent clients")
duration := flag.Duration("duration", 30*time.Second, "how long to run the load test")
txRate := flag.Float64("tx-per-client-per-sec", 1.0, "how fast each client should submit TRANSFER txs")
fundAmount := flag.Uint64("fund-amount", 100_000, "µT sent to each client before the test begins")
flag.Parse()
funder := loadKeyFile(*funderKey)
log.Printf("[loadtest] funder: %s", funder.PubKeyHex()[:12])
ctx, cancel := context.WithTimeout(context.Background(), *duration+1*time.Minute)
defer cancel()
// --- 1. Generate N throw-away client identities ---
clients := make([]*identity.Identity, *numClients)
for i := range clients {
clients[i] = newEphemeralIdentity()
}
log.Printf("[loadtest] generated %d client identities", *numClients)
// --- 2. Fund them all — throttle to stay below the node's per-IP
// submit rate limiter (~10/s with burst 20). Loadtest runs from a
// single IP so it'd hit that defence immediately otherwise.
log.Printf("[loadtest] funding each client with %d µT…", *fundAmount)
startHeight := mustNetstats(*nodeURL).TotalBlocks
for _, c := range clients {
if err := submitTransfer(*nodeURL, funder, c.PubKeyHex(), *fundAmount); err != nil {
log.Fatalf("fund client: %v", err)
}
time.Sleep(120 * time.Millisecond)
}
// Wait for all funding txs to commit. We budget 60s at a conservative
// 1 block / 3-5 s PBFT cadence — plenty for dozens of fundings to
// round-robin into blocks. We only require ONE block of advance as
// the "chain is alive" signal; real check is via balance query below.
if err := waitTipAdvance(ctx, *nodeURL, startHeight, 1, 60*time.Second); err != nil {
log.Fatalf("funding didn't commit: %v", err)
}
// Poll until every client has a non-zero balance — that's the real
// signal that funding landed, independent of block-count guesses.
if err := waitAllFunded(ctx, *nodeURL, clients, *fundAmount, 90*time.Second); err != nil {
log.Fatalf("funding balance check: %v", err)
}
log.Printf("[loadtest] funding complete; starting traffic")
// --- 3. Kick off N client goroutines ---
var (
accepted atomic.Uint64
rejected atomic.Uint64
wsDrops atomic.Uint64
)
var wg sync.WaitGroup
runCtx, runCancel := context.WithTimeout(ctx, *duration)
defer runCancel()
for i, c := range clients {
wg.Add(1)
go func(idx int, id *identity.Identity) {
defer wg.Done()
runClient(runCtx, *nodeURL, id, clients, *txRate, &accepted, &rejected, &wsDrops)
}(i, c)
}
// --- 4. Monitor chain progression while the test runs ---
monitorDone := make(chan struct{})
go func() {
defer close(monitorDone)
lastHeight := startHeight
lastTime := time.Now()
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for {
select {
case <-runCtx.Done():
return
case <-t.C:
s := mustNetstats(*nodeURL)
blkPerSec := float64(s.TotalBlocks-lastHeight) / time.Since(lastTime).Seconds()
log.Printf("[loadtest] tip=%d (%.1f blk/s) accepted=%d rejected=%d ws-drops=%d",
s.TotalBlocks, blkPerSec,
accepted.Load(), rejected.Load(), wsDrops.Load())
lastHeight = s.TotalBlocks
lastTime = time.Now()
}
}
}()
wg.Wait()
runCancel()
<-monitorDone
// --- 5. Final verdict ---
finalHeight := mustNetstats(*nodeURL).TotalBlocks
acc := accepted.Load()
rej := rejected.Load()
total := acc + rej
log.Printf("[loadtest] DONE: startHeight=%d endHeight=%d (Δ=%d blocks)",
startHeight, finalHeight, finalHeight-startHeight)
log.Printf("[loadtest] txs: accepted=%d rejected=%d (%.1f%% reject rate)",
acc, rej, 100*float64(rej)/float64(max1(total)))
log.Printf("[loadtest] ws-drops=%d", wsDrops.Load())
if finalHeight <= startHeight {
log.Fatalf("FAIL: chain did not advance during the test")
}
if rej*10 > total {
log.Fatalf("FAIL: reject rate > 10%% (%d of %d)", rej, total)
}
log.Printf("PASS")
}
// ─── Client loop ──────────────────────────────────────────────────────────────
func runClient(
ctx context.Context,
nodeURL string,
self *identity.Identity,
all []*identity.Identity,
txRate float64,
accepted, rejected, wsDrops *atomic.Uint64,
) {
wsURL := toWSURL(nodeURL) + "/api/ws"
conn, _, err := websocket.DefaultDialer.DialContext(ctx, wsURL, nil)
if err != nil {
wsDrops.Add(1)
return
}
defer conn.Close()
// Read hello, then authenticate.
var hello struct {
Event string `json:"event"`
AuthNonce string `json:"auth_nonce"`
}
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
if err := conn.ReadJSON(&hello); err != nil {
wsDrops.Add(1)
return
}
conn.SetReadDeadline(time.Time{})
sig := ed25519.Sign(self.PrivKey, []byte(hello.AuthNonce))
_ = conn.WriteJSON(map[string]any{
"op": "auth",
"pubkey": self.PubKeyHex(),
"sig": hex.EncodeToString(sig),
})
// Subscribe to our own addr topic.
_ = conn.WriteJSON(map[string]any{
"op": "subscribe",
"topic": "addr:" + self.PubKeyHex(),
})
// Drain incoming frames in a background goroutine so the socket stays
// alive while we submit.
go func() {
for {
if _, _, err := conn.ReadMessage(); err != nil {
return
}
}
}()
// Submit txs at the requested rate.
interval := time.Duration(float64(time.Second) / txRate)
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
peer := all[randIndex(len(all))]
if peer.PubKeyHex() == self.PubKeyHex() {
continue // don't transfer to self
}
err := submitTransfer(nodeURL, self, peer.PubKeyHex(), 1)
if err != nil {
rejected.Add(1)
} else {
accepted.Add(1)
}
}
}
}
// ─── HTTP helpers ─────────────────────────────────────────────────────────────
func submitTransfer(nodeURL string, from *identity.Identity, toHex string, amount uint64) error {
tx := &blockchain.Transaction{
ID: fmt.Sprintf("lt-%d-%x", time.Now().UnixNano(), randBytes(4)),
Type: blockchain.EventTransfer,
From: from.PubKeyHex(),
To: toHex,
Amount: amount,
Fee: blockchain.MinFee,
Timestamp: time.Now().UTC(),
}
tx.Signature = from.Sign(identity.TxSignBytes(tx))
body, _ := json.Marshal(tx)
resp, err := http.Post(nodeURL+"/api/tx", "application/json", bytes.NewReader(body))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
b, _ := io.ReadAll(resp.Body)
return fmt.Errorf("status %d: %s", resp.StatusCode, string(b))
}
return nil
}
type netStats struct {
TotalBlocks uint64 `json:"total_blocks"`
TotalTxs uint64 `json:"total_txs"`
}
func mustNetstats(nodeURL string) netStats {
resp, err := http.Get(nodeURL + "/api/netstats")
if err != nil {
log.Fatalf("netstats: %v", err)
}
defer resp.Body.Close()
var s netStats
if err := json.NewDecoder(resp.Body).Decode(&s); err != nil {
log.Fatalf("decode netstats: %v", err)
}
return s
}
func waitTipAdvance(ctx context.Context, nodeURL string, from, minDelta uint64, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
s := mustNetstats(nodeURL)
if s.TotalBlocks >= from+minDelta {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(500 * time.Millisecond):
}
}
return fmt.Errorf("tip did not advance by %d within %s", minDelta, timeout)
}
// waitAllFunded polls /api/address/<pubkey> for each client until their
// balance reaches fundAmount. More reliable than block-count heuristics
// because it verifies the funding txs were actually applied (not just
// that SOME blocks committed — empty blocks wouldn't fund anyone).
func waitAllFunded(ctx context.Context, nodeURL string, clients []*identity.Identity, fundAmount uint64, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
allFunded := true
for _, c := range clients {
resp, err := http.Get(nodeURL + "/api/address/" + c.PubKeyHex())
if err != nil {
allFunded = false
break
}
var body struct{ BalanceUT uint64 `json:"balance_ut"` }
_ = json.NewDecoder(resp.Body).Decode(&body)
resp.Body.Close()
if body.BalanceUT < fundAmount {
allFunded = false
break
}
}
if allFunded {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
}
}
return fmt.Errorf("not all clients funded within %s", timeout)
}
// ─── Identity helpers ─────────────────────────────────────────────────────────
func newEphemeralIdentity() *identity.Identity {
id, err := identity.Generate()
if err != nil {
log.Fatalf("genkey: %v", err)
}
return id
}
// loadKeyFile reads the same JSON shape cmd/client uses (PubKey/PrivKey
// as hex strings, optional X25519 pair) and returns an Identity.
func loadKeyFile(path string) *identity.Identity {
data, err := os.ReadFile(path)
if err != nil {
log.Fatalf("read funder key %s: %v", path, err)
}
var k struct {
PubKey string `json:"pub_key"`
PrivKey string `json:"priv_key"`
X25519Pub string `json:"x25519_pub"`
X25519Priv string `json:"x25519_priv"`
}
if err := json.Unmarshal(data, &k); err != nil {
log.Fatalf("parse funder key: %v", err)
}
id, err := identity.FromHexFull(k.PubKey, k.PrivKey, k.X25519Pub, k.X25519Priv)
if err != nil {
log.Fatalf("load funder identity: %v", err)
}
return id
}
// ─── Misc ─────────────────────────────────────────────────────────────────────
func toWSURL(httpURL string) string {
u, _ := url.Parse(httpURL)
switch u.Scheme {
case "https":
u.Scheme = "wss"
default:
u.Scheme = "ws"
}
return u.String()
}
func randBytes(n int) []byte {
b := make([]byte, n)
_, _ = rand.Read(b)
return b
}
func randIndex(n int) int {
var b [8]byte
_, _ = rand.Read(b[:])
v := 0
for _, x := range b {
v = (v*256 + int(x)) & 0x7fffffff
}
return v % n
}
func max1(x uint64) uint64 {
if x == 0 {
return 1
}
return x
}
// Silence unused-imports warning when building on platforms that don't
// need them. All imports above ARE used in the file; this is belt + braces.
var _ = os.Exit