EP-058: Real-time WebSocket Streaming Results

Live experiment results delivered over WebSocket — no polling required.

Overview

EP-058 adds a WebSocket endpoint that streams experiment result snapshots to connected clients. The frontend uses the useExperimentStream hook and the LiveResultsPanel component to display live data in the results dashboard.

Architecture

Browser (LiveResultsPanel)
    |
    | WebSocket ws://host/api/v1/ws/experiments/{id}/results
    |
ConnectionManager  ←──── broadcasts to all subscribers of this experiment
    |
ResultsStreamingService
    |
    | Queries assignments + events tables
    |
PostgreSQL (Aurora)

Backend

WebSocket Endpoint

ws://localhost:8000/api/v1/ws/experiments/{experiment_id}/results

On connect the server sends an initial snapshot. Periodic updates are sent every 30 seconds (configurable via RESULTS_STREAM_INTERVAL_SECONDS).

Client Message Protocol

Client sendsServer responds
{"action": "ping"}{"event": "pong"}
{"action": "refresh"}Fresh snapshot (same shape)
Any unknown actionIgnored (no crash)

Snapshot Schema

{
  "event": "results_update",
  "experiment_id": "uuid-string",
  "timestamp": "2026-03-07T12:00:00+00:00",
  "status": "active",
  "variants": [
    {
      "key": "control",
      "name": "Control",
      "participant_count": 500,
      "conversion_count": 50,
      "conversion_rate": 0.10,
      "relative_lift": 0.0,
      "p_value": null,
      "is_control": true
    },
    {
      "key": "variant_b",
      "name": "Variant B",
      "participant_count": 500,
      "conversion_count": 65,
      "conversion_rate": 0.13,
      "relative_lift": 0.30,
      "p_value": 0.031,
      "is_control": false
    }
  ],
  "total_participants": 1000,
  "days_running": 7,
  "is_significant": true
}

HTTP Companion Endpoints

MethodPathDescription
GET/api/v1/ws/experiments/{id}/results/subscribersActive subscriber count
GET/api/v1/ws/active-experimentsAll experiments with active subscribers

Key Classes

ConnectionManager (backend/app/services/websocket_manager.py):

  • Thread-safe asyncio.Lock-protected subscriber registry
  • connect(ws, experiment_id) — accept and register
  • disconnect(ws, experiment_id) — deregister; cleans up empty experiment keys
  • broadcast(experiment_id, message) — sends to all; removes dead connections

ResultsStreamingService (backend/app/services/results_streaming_service.py):

  • get_live_snapshot(experiment_id) — queries DB, returns snapshot dict
  • Uses a db_session_factory callable for fresh sessions per snapshot
  • Gracefully handles missing experiments and DB errors (returns error snapshot)
  • compute_and_broadcast(manager, experiment_id) — convenience wrapper

Configuration

Environment VariableDefaultDescription
RESULTS_STREAM_INTERVAL_SECONDS30Periodic update interval

Frontend

useExperimentStream Hook

import { useExperimentStream } from '@/hooks/useExperimentStream';

function MyComponent({ experimentId }: { experimentId: string }) {
  const { snapshot, status, error, refresh, disconnect } =
    useExperimentStream(experimentId);

  // status: 'connecting' | 'connected' | 'disconnected' | 'error'
  // snapshot: ExperimentSnapshot | null
}

The hook auto-connects when experimentId is set and auto-reconnects up to 5 times (configurable) on unexpected disconnection.

WebSocket URL: NEXT_PUBLIC_WS_URL env var (default: ws://localhost:8000).

LiveResultsPanel Component

import { LiveResultsPanel } from '@/components/experiments/LiveResultsPanel';

<LiveResultsPanel experimentId="exp-uuid" />

Features:

  • Connection status badge (green/yellow/grey/red dot)
  • Pulsing "LIVE" indicator when connected
  • Variant results table: Name | Participants | Conversions | Rate | Lift | P-value | Status
  • Refresh button
  • Last updated timestamp
  • Auto-reconnect countdown when disconnected
  • Error banner with reconnect button

Results Dashboard Integration

The LiveResultsPanel is accessible via the ⚡ Live tab in the ResultsDashboard component on the /results/[id] page.

Tests

Running

source venv/bin/activate
export APP_ENV=test TESTING=true

# Unit tests
python -m pytest backend/tests/unit/services/test_websocket_manager.py -v
python -m pytest backend/tests/unit/services/test_results_streaming_service.py -v

# Integration tests
python -m pytest backend/tests/integration/api/test_websocket_results.py -v

# All together
python -m pytest \
  backend/tests/unit/services/test_websocket_manager.py \
  backend/tests/unit/services/test_results_streaming_service.py \
  backend/tests/integration/api/test_websocket_results.py \
  -v --tb=short

Test Coverage

Test fileTestsCovers
test_websocket_manager.py22ConnectionManager lifecycle, broadcast, concurrency
test_results_streaming_service.py23Snapshot schema, DB error handling, significance logic
test_websocket_results.py34WebSocket protocol, HTTP endpoints, edge cases
Total79

All tests mock the DB session; no live PostgreSQL required.

Security Notes

  • WebSocket endpoint currently has no authentication (open for real-time access). For production, add token validation in the stream_results handler using a query parameter (?token=...) before calling manager.connect().
  • HTTP companion endpoints are also unauthenticated (informational only).