software

Reference: Pi ↔ Dashboard API

Every Convex mutation and query the Pi needs to call — heartbeat, missions, captures, diagnostics, commands — with Python examples

Reference: Pi ↔ Dashboard API

What this guide covers: Every Convex mutation and query the Raspberry Pi needs to call, with Python examples. This is the contract between the rover software and the web dashboard.


Overview

The Tick Slayer 3000 dashboard (Next.js + Convex) exposes a set of mutations and queries that the Pi calls to:

  1. Report its status (heartbeat every few seconds)
  2. Start and complete missions
  3. Upload captures (image + GPS + battery per stop)
  4. Log diagnostics (pre-flight checks, runtime events)
  5. Poll for commands (start, stop, return home, take photo, pause, resume)

All communication goes through the Convex client. The Pi uses the convex Python package to call mutations directly — no REST API needed.


Setup on the Pi

Install the Convex Python client

pip install convex

Initialize the client

from convex import ConvexClient

client = ConvexClient("https://YOUR_DEPLOYMENT.convex.cloud")

Store the deployment URL in an environment variable or config file on the Pi:

# ~/rover/.env
CONVEX_URL=https://YOUR_DEPLOYMENT.convex.cloud
import os
from dotenv import load_dotenv
from convex import ConvexClient

load_dotenv()
client = ConvexClient(os.environ["CONVEX_URL"])

API Reference

1. Rover Heartbeat

Mutation: rover:heartbeat

The Pi calls this every 3-5 seconds to report its current state. The dashboard uses this to show live telemetry and determine if the rover is online (last seen < 30s).

def send_heartbeat(client, gps, battery, wifi_dbm, disk_gb, cpu_temp, mission_id=None):
    """Send a heartbeat to the dashboard."""
    client.mutation("rover:heartbeat", {
        "position": {"lat": gps.latitude, "lon": gps.longitude} if gps.has_fix else None,
        "batteryVoltage": battery.voltage,
        "wifiSignal": wifi_dbm,          # dBm, e.g. -58
        "diskFreeGb": disk_gb,
        "cpuTemp": cpu_temp,              # °C
        "gpsSatellites": gps.satellites,
        "currentMissionId": mission_id,   # Convex document ID, or None if idle
        "state": "running" if mission_id else "idle",
        "uptimeSeconds": get_uptime(),
    })

Fields:

FieldTypeDescription
position{lat, lon} or NoneCurrent GPS position
batteryVoltagefloat or NoneCrawler battery voltage from ADC
wifiSignalint or NoneWiFi signal strength in dBm
diskFreeGbfloat or NoneFree disk space in GB
cpuTempfloat or NonePi CPU temperature in °C
gpsSatellitesint or NoneNumber of GPS satellites in view
currentMissionIdstring or NoneConvex ID of active mission
statestringOne of: "idle", "running", "returning", "error", "offline"
uptimeSecondsint or NonePi uptime in seconds

Dashboard behavior:

  • If lastSeen is > 30 seconds old, dashboard shows rover as "Offline"
  • The pulsing green dot appears when heartbeats are fresh
  • Telemetry values update in real-time on the Control Center dashboard

2. Mission Lifecycle

Start a mission

Mutation: missions:start

def start_mission(client, route_name, gps, battery):
    """Called when the rover begins a new mission. Returns the Convex document ID."""
    mission_id = client.mutation("missions:start", {
        "missionId": f"mission_{int(time.time())}",
        "route": route_name,
        "startPosition": {"lat": gps.latitude, "lon": gps.longitude},
        "batteryStartV": battery.voltage,
    })
    return mission_id  # Use this ID for captures, diagnostics, and heartbeats

Returns: The Convex document ID (string like "jn74djn8y1kwse8w3ke373346s84pzzv"). Store this — you'll pass it to every subsequent capture, diagnostic, and heartbeat during the mission.

Side effect: The missions:start mutation automatically schedules a weather snapshot. A Convex action (weather:snapWeatherForMission) fetches current conditions from Open-Meteo and patches the mission with a structured weatherSnapshot object. No Pi-side code needed for weather.

Complete a mission

Mutation: missions:complete

def complete_mission(client, mission_id, gps, battery, capture_count, duration_s, aborted=False):
    """Called when the rover finishes or aborts a mission."""
    client.mutation("missions:complete", {
        "id": mission_id,
        "status": "aborted" if aborted else "completed",
        "endPosition": {"lat": gps.latitude, "lon": gps.longitude},
        "batteryEndV": battery.voltage,
        "captureCount": capture_count,
        "durationSeconds": duration_s,
    })

Fields:

FieldTypeDescription
idstringConvex document ID from missions:start
statusstring"completed" or "aborted"
endPosition{lat, lon}Final GPS position
batteryEndVfloatFinal battery voltage
captureCountintTotal captures during mission
durationSecondsintMission duration in seconds

3. Captures

Each time the rover stops and takes a photo, it logs a capture.

Upload an image

Mutation: captures:generateUploadUrl + HTTP POST

import requests

def upload_image(client, filepath):
    """Upload an image to Convex storage. Returns the storage ID."""
    # Step 1: Get a signed upload URL
    upload_url = client.mutation("captures:generateUploadUrl", {})
    
    # Step 2: POST the file
    with open(filepath, "rb") as f:
        response = requests.post(
            upload_url,
            headers={"Content-Type": "image/jpeg"},
            data=f.read(),
        )
    response.raise_for_status()
    return response.json()["storageId"]

Log a capture

Mutation: captures:ingest

def log_capture(client, mission_id, gps, battery, storage_id=None, filename=None):
    """Log a capture with GPS + battery data."""
    client.mutation("captures:ingest", {
        "missionId": mission_id,
        "capturedAt": int(time.time() * 1000),  # milliseconds
        "latitude": gps.latitude,
        "longitude": gps.longitude,
        "batteryVoltage": battery.voltage,
        "imageStorageId": storage_id,      # From upload_image(), or None
        "imageFilename": filename,         # e.g. "capture_001.jpg"
    })

Fields:

FieldTypeDescription
missionIdstringConvex document ID of the active mission
capturedAtintTimestamp in milliseconds (JS convention)
latitudefloatGPS latitude
longitudefloatGPS longitude
batteryVoltagefloatBattery voltage at capture time
imageStorageIdstring or NoneConvex storage ID from upload
imageFilenamestring or NoneOriginal filename for reference

Offline queue pattern:

def do_capture(self):
    """Capture, save locally, and try to upload."""
    filename = f"capture_{self.capture_count:03d}.jpg"
    filepath = os.path.join(self.capture_dir, filename)
    
    # Always save locally first
    self.camera.capture(filepath)
    metadata = {
        "missionId": self.mission_id,
        "capturedAt": int(time.time() * 1000),
        "latitude": self.gps.latitude,
        "longitude": self.gps.longitude,
        "batteryVoltage": self.battery.voltage,
        "imageFilename": filename,
    }
    save_json(filepath.replace(".jpg", ".json"), metadata)
    
    # Try to upload immediately
    try:
        storage_id = upload_image(self.client, filepath)
        metadata["imageStorageId"] = storage_id
        self.client.mutation("captures:ingest", metadata)
        move_to_uploaded(filepath)
    except Exception as e:
        log.warning(f"Upload failed, queued for later: {e}")

4. Diagnostics

Log pre-flight checks and runtime events.

Mutation: diagnostics:log

def log_diagnostic(client, diag_type, status, message, value=None, mission_id=None):
    """Log a diagnostic event."""
    client.mutation("diagnostics:log", {
        "missionId": mission_id,      # None for pre-flight checks without a mission
        "type": diag_type,            # "camera", "gps", "i2c", "battery", "disk_space", "wifi", "general"
        "status": status,             # "pass", "warning", "fail"
        "message": message,
        "value": value,               # Optional numeric value
    })

Pre-flight check example:

def preflight(client, mission_id=None):
    """Run all pre-flight checks and log results."""
    
    # Camera
    try:
        camera.capture_test()
        log_diagnostic(client, "camera", "pass", "Camera module detected, preview OK", mission_id=mission_id)
    except Exception as e:
        log_diagnostic(client, "camera", "fail", f"Camera error: {e}", mission_id=mission_id)
    
    # GPS
    sats = gps.satellites
    if sats >= 4:
        log_diagnostic(client, "gps", "pass", f"GPS fix acquired ({sats} satellites)", value=sats, mission_id=mission_id)
    elif sats > 0:
        log_diagnostic(client, "gps", "warning", f"Weak GPS fix ({sats} satellites)", value=sats, mission_id=mission_id)
    else:
        log_diagnostic(client, "gps", "fail", "No GPS fix", value=0, mission_id=mission_id)
    
    # I2C devices
    devices = scan_i2c()
    if 0x40 in devices and 0x48 in devices:
        log_diagnostic(client, "i2c", "pass", "PCA9685 at 0x40, ADS1115 at 0x48", mission_id=mission_id)
    else:
        log_diagnostic(client, "i2c", "fail", f"Missing I2C devices. Found: {[hex(d) for d in devices]}", mission_id=mission_id)
    
    # Battery
    voltage = battery.voltage
    if voltage > 7.0:
        log_diagnostic(client, "battery", "pass", f"Battery voltage nominal", value=voltage, mission_id=mission_id)
    elif voltage > 6.5:
        log_diagnostic(client, "battery", "warning", f"Battery voltage low", value=voltage, mission_id=mission_id)
    else:
        log_diagnostic(client, "battery", "fail", f"Battery voltage critical", value=voltage, mission_id=mission_id)
    
    # Disk space
    free_gb = get_disk_free_gb()
    if free_gb > 5:
        log_diagnostic(client, "disk_space", "pass", f"{free_gb:.0f}GB free on /home", value=free_gb, mission_id=mission_id)
    else:
        log_diagnostic(client, "disk_space", "warning", f"Low disk: {free_gb:.1f}GB free", value=free_gb, mission_id=mission_id)
    
    # WiFi
    signal = get_wifi_signal()
    if signal > -60:
        log_diagnostic(client, "wifi", "pass", f"WiFi signal strong ({signal} dBm)", value=signal, mission_id=mission_id)
    elif signal > -75:
        log_diagnostic(client, "wifi", "warning", f"WiFi signal weak ({signal} dBm)", value=signal, mission_id=mission_id)
    else:
        log_diagnostic(client, "wifi", "fail", f"WiFi signal very weak ({signal} dBm)", value=signal, mission_id=mission_id)

Diagnostic types:

TypeWhenWhat to log
cameraPre-flight, runtimeCamera detection, capture success/failure
gpsPre-flight, per-captureFix quality, satellite count
i2cPre-flightPCA9685 + ADS1115 detection
batteryPre-flight, runtimeVoltage level checks
disk_spacePre-flightFree space on SD card
wifiPre-flight, runtimeSignal strength
generalAnytimeErrors, warnings, state changes

5. Command Polling

The dashboard can send commands to the rover. The Pi polls for pending commands and executes them.

Poll for commands

Query: rover:pendingCommands

def poll_commands(client):
    """Check for pending commands from the dashboard."""
    return client.query("rover:pendingCommands", {})

Acknowledge a command

Mutation: rover:ackCommand

def ack_command(client, command_id):
    """Tell the dashboard we received the command."""
    client.mutation("rover:ackCommand", {"id": command_id})

Complete a command

Mutation: rover:completeCommand

def complete_command(client, command_id, success=True, result=None):
    """Report command completion."""
    client.mutation("rover:completeCommand", {
        "id": command_id,
        "status": "completed" if success else "failed",
        "result": result,
    })

Full command loop

import time

def command_loop(client, rover):
    """Main loop: heartbeat + poll commands."""
    while True:
        # Send heartbeat
        send_heartbeat(client, rover.gps, rover.battery, 
                       rover.wifi_signal, rover.disk_free,
                       rover.cpu_temp, rover.current_mission_id)
        
        # Poll for commands
        commands = poll_commands(client)
        for cmd in commands:
            ack_command(client, cmd["_id"])
            
            try:
                if cmd["command"] == "start_mission":
                    payload = json.loads(cmd.get("payload", "{}"))
                    rover.start_mission(payload.get("route", "default"))
                    complete_command(client, cmd["_id"], True, "Mission started")
                    
                elif cmd["command"] == "stop":
                    rover.stop()
                    complete_command(client, cmd["_id"], True, "Stopped")
                    
                elif cmd["command"] == "return_home":
                    rover.return_home()
                    complete_command(client, cmd["_id"], True, "Returning home")
                    
                elif cmd["command"] == "take_photo":
                    filename = rover.take_photo()
                    complete_command(client, cmd["_id"], True, f"Captured {filename}")
                    
                elif cmd["command"] == "pause":
                    rover.pause()
                    complete_command(client, cmd["_id"], True, "Paused")
                    
                elif cmd["command"] == "resume":
                    rover.resume()
                    complete_command(client, cmd["_id"], True, "Resumed")
                    
            except Exception as e:
                complete_command(client, cmd["_id"], False, str(e))
        
        time.sleep(3)  # Heartbeat interval

Available commands:

CommandDescriptionWhen available
start_missionBegin a new patrol missionWhen idle
stopEmergency stop, abort current missionWhen running
return_homeNavigate back to start positionWhen running
take_photoCapture an image immediatelyAnytime
pausePause current mission (stay in place)When running
resumeResume a paused missionWhen idle (paused)

Command lifecycle:

Dashboard sends → pending → Pi acks → acknowledged → Pi executes → completed/failed

Data Flow Summary

┌──────────────────────────────────────────────────────────────┐
│                    Raspberry Pi (Rover)                       │
│                                                              │
│  ┌──────────┐  heartbeat   ┌──────────────────────────────┐ │
│  │ Sensors  │─────────────→│                              │ │
│  │ GPS,ADC  │              │    Convex Cloud Backend      │ │
│  │ Camera   │  captures    │                              │ │
│  │ WiFi     │─────────────→│  rover_status (singleton)    │ │
│  └──────────┘  diagnostics │  missions                    │ │
│                ───────────→│  captures                    │ │
│  ┌──────────┐              │  diagnostics                 │ │
│  │ Command  │  poll cmds   │  rover_commands              │ │
│  │  Loop    │←─────────────│  tick_counts                 │ │
│  │          │  ack/done    │                              │ │
│  │          │─────────────→│                              │ │
│  └──────────┘              └──────────┬───────────────────┘ │
│                                       │                      │
└───────────────────────────────────────│──────────────────────┘
                                        │ real-time subscriptions
                                        ▼
                              ┌──────────────────────┐
                              │   Next.js Dashboard   │
                              │                      │
                              │  Control Center      │
                              │  ├─ Rover Live       │
                              │  ├─ New Mission      │
                              │  │   (pre-flight +   │
                              │  │    weather check)  │
                              │  ├─ Missions         │
                              │  │   ├─ Detail       │
                              │  │   └─ Captures     │
                              │  ├─ Diagnostics      │
                              │  └─ Tick Log         │
                              └──────────────────────┘

Convex Schema Reference

rover_status (singleton — Pi upserts this)

FieldTypeNotes
position{lat, lon}?Current GPS position
batteryVoltagefloat?Crawler battery from ADC
wifiSignalint?dBm
diskFreeGbfloat?Free disk on Pi
cpuTempfloat?Pi CPU temperature °C
gpsSatellitesint?Satellites in view
currentMissionIdId<missions>?Active mission, or null
stateenumidle, running, returning, error, offline
lastSeenintAuto-set by heartbeat mutation
uptimeSecondsint?Pi uptime

rover_commands (queue — dashboard writes, Pi reads)

FieldTypeNotes
commandenumstart_mission, stop, return_home, take_photo, pause, resume
statusenumpending, acknowledged, completed, failed
payloadstring?JSON for command-specific data
createdAtintAuto-set
acknowledgedAtint?Set by ackCommand
completedAtint?Set by completeCommand
resultstring?Completion message

missions

FieldTypeNotes
missionIdstringHuman-readable ID like mission_1712345678
routestring?Route name
statusenumplanned, in_progress, completed, aborted
startedAtint?Timestamp ms
endedAtint?Timestamp ms
durationSecondsint?Total mission duration
captureCountint?Total captures
startPosition{lat, lon}?GPS at mission start
endPosition{lat, lon}?GPS at mission end
batteryStartVfloat?Battery at start
batteryEndVfloat?Battery at end
notesstring?Admin-editable notes
weatherstring?Legacy text field (admin-editable)
weatherSnapshotobject?Auto-captured at mission start (see below)

weatherSnapshot structure (auto-populated by convex/weather.ts action):

FieldTypeDescription
temperaturefloat°F at mission start
humidityfloat% relative humidity
windSpeedfloatmph
precipitationfloatinches
weatherCodeintWMO weather code
conditionstringHuman-readable label (e.g. "Partly Cloudy")
fetchedAtintTimestamp ms
forecastarray?Next 4 hours: {time, temperature, precipitation, weatherCode}

captures

FieldTypeNotes
missionIdId<missions>Parent mission
capturedAtintTimestamp ms
latitudefloatGPS latitude
longitudefloatGPS longitude
batteryVoltagefloatBattery at capture
imageStorageIdId<_storage>?Convex file storage ID
imageFilenamestring?Original filename
notesstring?Optional notes

diagnostics

FieldTypeNotes
missionIdId<missions>?Parent mission (null for standalone checks)
typeenumcamera, gps, i2c, battery, disk_space, wifi, general
timestampintAuto-set by log mutation
statusenumpass, warning, fail
messagestring?Human-readable detail
valuefloat?Numeric value (voltage, dBm, GB, etc.)

tick_counts (manual — logged from dashboard)

FieldTypeNotes
missionIdId<missions>Parent mission
loggedAtintAuto-set
countintNumber of ticks found
notesstring?Observations
clothConditionstring?e.g. "good", "worn"
temperaturefloat?°F
humidityfloat?%
timeOfDayenum?morning, afternoon, evening

Timestamps

All timestamps in Convex are milliseconds since epoch (JavaScript convention). When sending from Python:

import time
timestamp_ms = int(time.time() * 1000)

Error Handling & Offline Queue

The Pi should always save data locally first, then try to upload:

  1. Heartbeat — fire and forget, skip on failure
  2. Captures — save image + JSON sidecar locally, queue for upload
  3. Diagnostics — log locally, upload when possible
  4. Commands — poll when online, ignore when offline

The upload script (upload.py from Guide 9) handles the offline queue:

  • Scans ~/rover/captures/ for un-uploaded files
  • Uploads images, logs captures
  • Moves to uploaded/ on success

Testing the API

You can test mutations from the command line:

# Send a test heartbeat
npx convex run rover:heartbeat '{"state":"idle","batteryVoltage":7.8}'

# Start a test mission
npx convex run missions:start '{"missionId":"mission_test","route":"Test Route"}'

# Log a diagnostic
npx convex run diagnostics:log '{"type":"general","status":"pass","message":"API test"}'

Or from the Convex dashboard at dashboard.convex.dev.