software
Reference: Pi ↔ Dashboard API
Every Convex mutation and query the Pi needs to call — heartbeat, missions, captures, diagnostics, commands — with Python examples
Reference: Pi ↔ Dashboard API
What this guide covers: Every Convex mutation and query the Raspberry Pi needs to call, with Python examples. This is the contract between the rover software and the web dashboard.
Overview
The Tick Slayer 3000 dashboard (Next.js + Convex) exposes a set of mutations and queries that the Pi calls to:
- Report its status (heartbeat every few seconds)
- Start and complete missions
- Upload captures (image + GPS + battery per stop)
- Log diagnostics (pre-flight checks, runtime events)
- Poll for commands (start, stop, return home, take photo, pause, resume)
All communication goes through the Convex client. The Pi uses the convex Python package to call mutations directly — no REST API needed.
Setup on the Pi
Install the Convex Python client
pip install convex
Initialize the client
from convex import ConvexClient
client = ConvexClient("https://YOUR_DEPLOYMENT.convex.cloud")
Store the deployment URL in an environment variable or config file on the Pi:
# ~/rover/.env
CONVEX_URL=https://YOUR_DEPLOYMENT.convex.cloud
import os
from dotenv import load_dotenv
from convex import ConvexClient
load_dotenv()
client = ConvexClient(os.environ["CONVEX_URL"])
API Reference
1. Rover Heartbeat
Mutation: rover:heartbeat
The Pi calls this every 3-5 seconds to report its current state. The dashboard uses this to show live telemetry and determine if the rover is online (last seen < 30s).
def send_heartbeat(client, gps, battery, wifi_dbm, disk_gb, cpu_temp, mission_id=None):
"""Send a heartbeat to the dashboard."""
client.mutation("rover:heartbeat", {
"position": {"lat": gps.latitude, "lon": gps.longitude} if gps.has_fix else None,
"batteryVoltage": battery.voltage,
"wifiSignal": wifi_dbm, # dBm, e.g. -58
"diskFreeGb": disk_gb,
"cpuTemp": cpu_temp, # °C
"gpsSatellites": gps.satellites,
"currentMissionId": mission_id, # Convex document ID, or None if idle
"state": "running" if mission_id else "idle",
"uptimeSeconds": get_uptime(),
})
Fields:
| Field | Type | Description |
|---|---|---|
position | {lat, lon} or None | Current GPS position |
batteryVoltage | float or None | Crawler battery voltage from ADC |
wifiSignal | int or None | WiFi signal strength in dBm |
diskFreeGb | float or None | Free disk space in GB |
cpuTemp | float or None | Pi CPU temperature in °C |
gpsSatellites | int or None | Number of GPS satellites in view |
currentMissionId | string or None | Convex ID of active mission |
state | string | One of: "idle", "running", "returning", "error", "offline" |
uptimeSeconds | int or None | Pi uptime in seconds |
Dashboard behavior:
- If
lastSeenis > 30 seconds old, dashboard shows rover as "Offline" - The pulsing green dot appears when heartbeats are fresh
- Telemetry values update in real-time on the Control Center dashboard
2. Mission Lifecycle
Start a mission
Mutation: missions:start
def start_mission(client, route_name, gps, battery):
"""Called when the rover begins a new mission. Returns the Convex document ID."""
mission_id = client.mutation("missions:start", {
"missionId": f"mission_{int(time.time())}",
"route": route_name,
"startPosition": {"lat": gps.latitude, "lon": gps.longitude},
"batteryStartV": battery.voltage,
})
return mission_id # Use this ID for captures, diagnostics, and heartbeats
Returns: The Convex document ID (string like "jn74djn8y1kwse8w3ke373346s84pzzv"). Store this — you'll pass it to every subsequent capture, diagnostic, and heartbeat during the mission.
Side effect: The missions:start mutation automatically schedules a weather snapshot. A Convex action (weather:snapWeatherForMission) fetches current conditions from Open-Meteo and patches the mission with a structured weatherSnapshot object. No Pi-side code needed for weather.
Complete a mission
Mutation: missions:complete
def complete_mission(client, mission_id, gps, battery, capture_count, duration_s, aborted=False):
"""Called when the rover finishes or aborts a mission."""
client.mutation("missions:complete", {
"id": mission_id,
"status": "aborted" if aborted else "completed",
"endPosition": {"lat": gps.latitude, "lon": gps.longitude},
"batteryEndV": battery.voltage,
"captureCount": capture_count,
"durationSeconds": duration_s,
})
Fields:
| Field | Type | Description |
|---|---|---|
id | string | Convex document ID from missions:start |
status | string | "completed" or "aborted" |
endPosition | {lat, lon} | Final GPS position |
batteryEndV | float | Final battery voltage |
captureCount | int | Total captures during mission |
durationSeconds | int | Mission duration in seconds |
3. Captures
Each time the rover stops and takes a photo, it logs a capture.
Upload an image
Mutation: captures:generateUploadUrl + HTTP POST
import requests
def upload_image(client, filepath):
"""Upload an image to Convex storage. Returns the storage ID."""
# Step 1: Get a signed upload URL
upload_url = client.mutation("captures:generateUploadUrl", {})
# Step 2: POST the file
with open(filepath, "rb") as f:
response = requests.post(
upload_url,
headers={"Content-Type": "image/jpeg"},
data=f.read(),
)
response.raise_for_status()
return response.json()["storageId"]
Log a capture
Mutation: captures:ingest
def log_capture(client, mission_id, gps, battery, storage_id=None, filename=None):
"""Log a capture with GPS + battery data."""
client.mutation("captures:ingest", {
"missionId": mission_id,
"capturedAt": int(time.time() * 1000), # milliseconds
"latitude": gps.latitude,
"longitude": gps.longitude,
"batteryVoltage": battery.voltage,
"imageStorageId": storage_id, # From upload_image(), or None
"imageFilename": filename, # e.g. "capture_001.jpg"
})
Fields:
| Field | Type | Description |
|---|---|---|
missionId | string | Convex document ID of the active mission |
capturedAt | int | Timestamp in milliseconds (JS convention) |
latitude | float | GPS latitude |
longitude | float | GPS longitude |
batteryVoltage | float | Battery voltage at capture time |
imageStorageId | string or None | Convex storage ID from upload |
imageFilename | string or None | Original filename for reference |
Offline queue pattern:
def do_capture(self):
"""Capture, save locally, and try to upload."""
filename = f"capture_{self.capture_count:03d}.jpg"
filepath = os.path.join(self.capture_dir, filename)
# Always save locally first
self.camera.capture(filepath)
metadata = {
"missionId": self.mission_id,
"capturedAt": int(time.time() * 1000),
"latitude": self.gps.latitude,
"longitude": self.gps.longitude,
"batteryVoltage": self.battery.voltage,
"imageFilename": filename,
}
save_json(filepath.replace(".jpg", ".json"), metadata)
# Try to upload immediately
try:
storage_id = upload_image(self.client, filepath)
metadata["imageStorageId"] = storage_id
self.client.mutation("captures:ingest", metadata)
move_to_uploaded(filepath)
except Exception as e:
log.warning(f"Upload failed, queued for later: {e}")
4. Diagnostics
Log pre-flight checks and runtime events.
Mutation: diagnostics:log
def log_diagnostic(client, diag_type, status, message, value=None, mission_id=None):
"""Log a diagnostic event."""
client.mutation("diagnostics:log", {
"missionId": mission_id, # None for pre-flight checks without a mission
"type": diag_type, # "camera", "gps", "i2c", "battery", "disk_space", "wifi", "general"
"status": status, # "pass", "warning", "fail"
"message": message,
"value": value, # Optional numeric value
})
Pre-flight check example:
def preflight(client, mission_id=None):
"""Run all pre-flight checks and log results."""
# Camera
try:
camera.capture_test()
log_diagnostic(client, "camera", "pass", "Camera module detected, preview OK", mission_id=mission_id)
except Exception as e:
log_diagnostic(client, "camera", "fail", f"Camera error: {e}", mission_id=mission_id)
# GPS
sats = gps.satellites
if sats >= 4:
log_diagnostic(client, "gps", "pass", f"GPS fix acquired ({sats} satellites)", value=sats, mission_id=mission_id)
elif sats > 0:
log_diagnostic(client, "gps", "warning", f"Weak GPS fix ({sats} satellites)", value=sats, mission_id=mission_id)
else:
log_diagnostic(client, "gps", "fail", "No GPS fix", value=0, mission_id=mission_id)
# I2C devices
devices = scan_i2c()
if 0x40 in devices and 0x48 in devices:
log_diagnostic(client, "i2c", "pass", "PCA9685 at 0x40, ADS1115 at 0x48", mission_id=mission_id)
else:
log_diagnostic(client, "i2c", "fail", f"Missing I2C devices. Found: {[hex(d) for d in devices]}", mission_id=mission_id)
# Battery
voltage = battery.voltage
if voltage > 7.0:
log_diagnostic(client, "battery", "pass", f"Battery voltage nominal", value=voltage, mission_id=mission_id)
elif voltage > 6.5:
log_diagnostic(client, "battery", "warning", f"Battery voltage low", value=voltage, mission_id=mission_id)
else:
log_diagnostic(client, "battery", "fail", f"Battery voltage critical", value=voltage, mission_id=mission_id)
# Disk space
free_gb = get_disk_free_gb()
if free_gb > 5:
log_diagnostic(client, "disk_space", "pass", f"{free_gb:.0f}GB free on /home", value=free_gb, mission_id=mission_id)
else:
log_diagnostic(client, "disk_space", "warning", f"Low disk: {free_gb:.1f}GB free", value=free_gb, mission_id=mission_id)
# WiFi
signal = get_wifi_signal()
if signal > -60:
log_diagnostic(client, "wifi", "pass", f"WiFi signal strong ({signal} dBm)", value=signal, mission_id=mission_id)
elif signal > -75:
log_diagnostic(client, "wifi", "warning", f"WiFi signal weak ({signal} dBm)", value=signal, mission_id=mission_id)
else:
log_diagnostic(client, "wifi", "fail", f"WiFi signal very weak ({signal} dBm)", value=signal, mission_id=mission_id)
Diagnostic types:
| Type | When | What to log |
|---|---|---|
camera | Pre-flight, runtime | Camera detection, capture success/failure |
gps | Pre-flight, per-capture | Fix quality, satellite count |
i2c | Pre-flight | PCA9685 + ADS1115 detection |
battery | Pre-flight, runtime | Voltage level checks |
disk_space | Pre-flight | Free space on SD card |
wifi | Pre-flight, runtime | Signal strength |
general | Anytime | Errors, warnings, state changes |
5. Command Polling
The dashboard can send commands to the rover. The Pi polls for pending commands and executes them.
Poll for commands
Query: rover:pendingCommands
def poll_commands(client):
"""Check for pending commands from the dashboard."""
return client.query("rover:pendingCommands", {})
Acknowledge a command
Mutation: rover:ackCommand
def ack_command(client, command_id):
"""Tell the dashboard we received the command."""
client.mutation("rover:ackCommand", {"id": command_id})
Complete a command
Mutation: rover:completeCommand
def complete_command(client, command_id, success=True, result=None):
"""Report command completion."""
client.mutation("rover:completeCommand", {
"id": command_id,
"status": "completed" if success else "failed",
"result": result,
})
Full command loop
import time
def command_loop(client, rover):
"""Main loop: heartbeat + poll commands."""
while True:
# Send heartbeat
send_heartbeat(client, rover.gps, rover.battery,
rover.wifi_signal, rover.disk_free,
rover.cpu_temp, rover.current_mission_id)
# Poll for commands
commands = poll_commands(client)
for cmd in commands:
ack_command(client, cmd["_id"])
try:
if cmd["command"] == "start_mission":
payload = json.loads(cmd.get("payload", "{}"))
rover.start_mission(payload.get("route", "default"))
complete_command(client, cmd["_id"], True, "Mission started")
elif cmd["command"] == "stop":
rover.stop()
complete_command(client, cmd["_id"], True, "Stopped")
elif cmd["command"] == "return_home":
rover.return_home()
complete_command(client, cmd["_id"], True, "Returning home")
elif cmd["command"] == "take_photo":
filename = rover.take_photo()
complete_command(client, cmd["_id"], True, f"Captured {filename}")
elif cmd["command"] == "pause":
rover.pause()
complete_command(client, cmd["_id"], True, "Paused")
elif cmd["command"] == "resume":
rover.resume()
complete_command(client, cmd["_id"], True, "Resumed")
except Exception as e:
complete_command(client, cmd["_id"], False, str(e))
time.sleep(3) # Heartbeat interval
Available commands:
| Command | Description | When available |
|---|---|---|
start_mission | Begin a new patrol mission | When idle |
stop | Emergency stop, abort current mission | When running |
return_home | Navigate back to start position | When running |
take_photo | Capture an image immediately | Anytime |
pause | Pause current mission (stay in place) | When running |
resume | Resume a paused mission | When idle (paused) |
Command lifecycle:
Dashboard sends → pending → Pi acks → acknowledged → Pi executes → completed/failed
Data Flow Summary
┌──────────────────────────────────────────────────────────────┐
│ Raspberry Pi (Rover) │
│ │
│ ┌──────────┐ heartbeat ┌──────────────────────────────┐ │
│ │ Sensors │─────────────→│ │ │
│ │ GPS,ADC │ │ Convex Cloud Backend │ │
│ │ Camera │ captures │ │ │
│ │ WiFi │─────────────→│ rover_status (singleton) │ │
│ └──────────┘ diagnostics │ missions │ │
│ ───────────→│ captures │ │
│ ┌──────────┐ │ diagnostics │ │
│ │ Command │ poll cmds │ rover_commands │ │
│ │ Loop │←─────────────│ tick_counts │ │
│ │ │ ack/done │ │ │
│ │ │─────────────→│ │ │
│ └──────────┘ └──────────┬───────────────────┘ │
│ │ │
└───────────────────────────────────────│──────────────────────┘
│ real-time subscriptions
▼
┌──────────────────────┐
│ Next.js Dashboard │
│ │
│ Control Center │
│ ├─ Rover Live │
│ ├─ New Mission │
│ │ (pre-flight + │
│ │ weather check) │
│ ├─ Missions │
│ │ ├─ Detail │
│ │ └─ Captures │
│ ├─ Diagnostics │
│ └─ Tick Log │
└──────────────────────┘
Convex Schema Reference
rover_status (singleton — Pi upserts this)
| Field | Type | Notes |
|---|---|---|
position | {lat, lon}? | Current GPS position |
batteryVoltage | float? | Crawler battery from ADC |
wifiSignal | int? | dBm |
diskFreeGb | float? | Free disk on Pi |
cpuTemp | float? | Pi CPU temperature °C |
gpsSatellites | int? | Satellites in view |
currentMissionId | Id<missions>? | Active mission, or null |
state | enum | idle, running, returning, error, offline |
lastSeen | int | Auto-set by heartbeat mutation |
uptimeSeconds | int? | Pi uptime |
rover_commands (queue — dashboard writes, Pi reads)
| Field | Type | Notes |
|---|---|---|
command | enum | start_mission, stop, return_home, take_photo, pause, resume |
status | enum | pending, acknowledged, completed, failed |
payload | string? | JSON for command-specific data |
createdAt | int | Auto-set |
acknowledgedAt | int? | Set by ackCommand |
completedAt | int? | Set by completeCommand |
result | string? | Completion message |
missions
| Field | Type | Notes |
|---|---|---|
missionId | string | Human-readable ID like mission_1712345678 |
route | string? | Route name |
status | enum | planned, in_progress, completed, aborted |
startedAt | int? | Timestamp ms |
endedAt | int? | Timestamp ms |
durationSeconds | int? | Total mission duration |
captureCount | int? | Total captures |
startPosition | {lat, lon}? | GPS at mission start |
endPosition | {lat, lon}? | GPS at mission end |
batteryStartV | float? | Battery at start |
batteryEndV | float? | Battery at end |
notes | string? | Admin-editable notes |
weather | string? | Legacy text field (admin-editable) |
weatherSnapshot | object? | Auto-captured at mission start (see below) |
weatherSnapshot structure (auto-populated by convex/weather.ts action):
| Field | Type | Description |
|---|---|---|
temperature | float | °F at mission start |
humidity | float | % relative humidity |
windSpeed | float | mph |
precipitation | float | inches |
weatherCode | int | WMO weather code |
condition | string | Human-readable label (e.g. "Partly Cloudy") |
fetchedAt | int | Timestamp ms |
forecast | array? | Next 4 hours: {time, temperature, precipitation, weatherCode} |
captures
| Field | Type | Notes |
|---|---|---|
missionId | Id<missions> | Parent mission |
capturedAt | int | Timestamp ms |
latitude | float | GPS latitude |
longitude | float | GPS longitude |
batteryVoltage | float | Battery at capture |
imageStorageId | Id<_storage>? | Convex file storage ID |
imageFilename | string? | Original filename |
notes | string? | Optional notes |
diagnostics
| Field | Type | Notes |
|---|---|---|
missionId | Id<missions>? | Parent mission (null for standalone checks) |
type | enum | camera, gps, i2c, battery, disk_space, wifi, general |
timestamp | int | Auto-set by log mutation |
status | enum | pass, warning, fail |
message | string? | Human-readable detail |
value | float? | Numeric value (voltage, dBm, GB, etc.) |
tick_counts (manual — logged from dashboard)
| Field | Type | Notes |
|---|---|---|
missionId | Id<missions> | Parent mission |
loggedAt | int | Auto-set |
count | int | Number of ticks found |
notes | string? | Observations |
clothCondition | string? | e.g. "good", "worn" |
temperature | float? | °F |
humidity | float? | % |
timeOfDay | enum? | morning, afternoon, evening |
Timestamps
All timestamps in Convex are milliseconds since epoch (JavaScript convention). When sending from Python:
import time
timestamp_ms = int(time.time() * 1000)
Error Handling & Offline Queue
The Pi should always save data locally first, then try to upload:
- Heartbeat — fire and forget, skip on failure
- Captures — save image + JSON sidecar locally, queue for upload
- Diagnostics — log locally, upload when possible
- Commands — poll when online, ignore when offline
The upload script (upload.py from Guide 9) handles the offline queue:
- Scans
~/rover/captures/for un-uploaded files - Uploads images, logs captures
- Moves to
uploaded/on success
Testing the API
You can test mutations from the command line:
# Send a test heartbeat
npx convex run rover:heartbeat '{"state":"idle","batteryVoltage":7.8}'
# Start a test mission
npx convex run missions:start '{"missionId":"mission_test","route":"Test Route"}'
# Log a diagnostic
npx convex run diagnostics:log '{"type":"general","status":"pass","message":"API test"}'
Or from the Convex dashboard at dashboard.convex.dev.
