Tick Slayer 3000
← All Documents

build-guides

Guide 8: Data Pipeline & First Run

Mission controller, data upload, preflight checks, first autonomous run, and what comes next

Build Guide 8: Data Pipeline & First Run

Type: Build Guide

This is the final guide. Everything comes together: the rover follows a route autonomously, stops to photograph the cloth at intervals, monitors its battery, and uploads the data to your web app. Then you run it for real.

What You Need

  • Fully assembled rover with all software from Guides 1-7
  • A recorded GPS route (from Guide 7)
  • A treated drag cloth attached (from Guide 6)
  • Both batteries fully charged (TRX-4 LiPo and Pi battery)
  • WiFi coverage over the test area
  • Laptop monitoring via SSH
  • Clear weather, temperature >15°C if possible

Step 1: The Mission Controller

This is the main program — it orchestrates everything:

# ~/rover/mission.py
"""Main mission controller — autonomous drag with stop-and-capture."""
from adafruit_pca9685 import PCA9685
from adafruit_motor import servo
import board
import busio
import time
import sys
import os

sys.path.insert(0, '/home/pi/rover')
from config.rover_config import *
from capture import CaptureSystem
from gps_reader import GPSReader
from battery import BatteryMonitor
from navigator import Navigator
from heading import HeadingEstimator
from steering_controller import SteeringController
from watchdog import Watchdog

class Mission:
    def __init__(self, route_file):
        # Hardware
        i2c = busio.I2C(board.SCL, board.SDA)
        self.pca = PCA9685(i2c)
        self.pca.frequency = 50
        self.steer_servo = servo.Servo(self.pca.channels[STEERING_CHANNEL])
        self.esc = servo.ContinuousServo(self.pca.channels[THROTTLE_CHANNEL])

        # Subsystems
        self.capture = CaptureSystem(CAPTURE_RESOLUTION)
        self.gps = GPSReader(GPS_SERIAL_PORT, GPS_BAUD_RATE)
        self.battery = BatteryMonitor(
            BATTERY_DIVIDER_RATIO, BATTERY_LOW_THRESHOLD, BATTERY_CRITICAL
        )
        self.navigator = Navigator(route_file)
        self.heading_est = HeadingEstimator()
        self.steer_ctrl = SteeringController(
            STEERING_CENTER, STEERING_LEFT_MAX, STEERING_RIGHT_MAX
        )
        self.watchdog = Watchdog(timeout_seconds=2.0, on_timeout=self.emergency_stop)

        # State
        self.last_capture_pos = None
        self.state = "idle"  # idle, running, returning, stopped

    def emergency_stop(self):
        """Watchdog callback — kill throttle and steering."""
        print("WATCHDOG: Emergency stop!")
        self.esc.throttle = 0
        self.steer_servo.angle = STEERING_CENTER
        self.state = "stopped"

    def stop(self):
        """Graceful stop."""
        self.esc.throttle = 0
        self.steer_servo.angle = STEERING_CENTER

    def arm(self):
        """Arm the ESC and wait for GPS fix."""
        print("Arming ESC...")
        self.esc.throttle = 0
        self.steer_servo.angle = STEERING_CENTER
        time.sleep(3)
        print("ESC armed.")

        print("Starting GPS...")
        self.gps.start()
        print("Waiting for GPS fix...")
        timeout = 120  # 2 minutes max
        start = time.time()
        while not self.gps.has_fix:
            if time.time() - start > timeout:
                print("ERROR: No GPS fix after 2 minutes. Aborting.")
                return False
            time.sleep(1)
        print(f"GPS fix acquired! Satellites: {self.gps.satellites}")

        # Establish initial position
        self.last_capture_pos = self.gps.position
        return True

    def should_capture(self):
        """Check if we've traveled 15m since last capture."""
        if self.last_capture_pos is None:
            return True
        distance = GPSReader.distance_meters(
            self.last_capture_pos, self.gps.position
        )
        return distance >= CAPTURE_INTERVAL_METERS

    def do_capture(self):
        """Stop, capture image, resume."""
        print(f"  Capturing at {self.gps.position}...")
        self.stop()
        time.sleep(1)  # Let the cloth settle

        pos = self.gps.position
        self.capture.capture(pos[0], pos[1], self.battery.voltage)
        self.last_capture_pos = pos

        time.sleep(0.5)  # Brief pause before resuming
        print(f"  Capture #{self.capture.capture_count} done. Battery: {self.battery.voltage:.1f}V")

    def run(self):
        """Main mission loop."""
        if not self.arm():
            return

        self.watchdog.start()
        self.state = "running"
        print(f"Starting mission — {len(self.navigator.waypoints)} waypoints")

        # Initial capture at start position
        self.do_capture()

        try:
            while self.state == "running":
                self.watchdog.heartbeat()

                # Battery check
                if self.battery.is_critical:
                    print("CRITICAL: Battery critical! Stopping immediately.")
                    self.stop()
                    self.state = "stopped"
                    break
                elif self.battery.is_low:
                    print("WARNING: Battery low. Completing current waypoint then stopping.")
                    # Continue to next waypoint then stop
                    # (A real return-to-home would navigate back — future enhancement)

                # Navigation update
                pos = self.gps.position
                self.heading_est.update(pos)

                bearing, distance, wp_idx = self.navigator.update(pos)

                if wp_idx == -1:
                    print("Route complete!")
                    self.do_capture()  # Final capture
                    self.state = "stopped"
                    break

                # Steering
                steer_angle = self.steer_ctrl.compute_steering(
                    self.heading_est.heading, bearing
                )
                self.steer_servo.angle = steer_angle
                self.esc.throttle = THROTTLE_FORWARD_MIN

                # Capture check
                if self.should_capture():
                    self.do_capture()
                    self.esc.throttle = THROTTLE_FORWARD_MIN  # Resume after capture

                # Status (every 5 seconds-ish)
                print(
                    f"  WP {wp_idx}/{len(self.navigator.waypoints)} "
                    f"dist={distance:.1f}m "
                    f"hdg={self.heading_est.heading:.0f}° "
                    f"brg={bearing:.0f}° "
                    f"steer={steer_angle:.0f}° "
                    f"bat={self.battery.voltage:.1f}V"
                )

                time.sleep(0.5)  # Control loop at 2 Hz

        except KeyboardInterrupt:
            print("\nManual abort!")

        finally:
            self.stop()
            self.watchdog.stop()
            self.gps.stop()
            self.capture.shutdown()
            self.pca.deinit()
            print(f"Mission ended. {self.capture.capture_count} images captured.")


if __name__ == "__main__":
    route = sys.argv[1] if len(sys.argv) > 1 else "routes/test_route.json"
    mission = Mission(route)
    mission.run()

Step 2: Data Upload Script

After a mission, upload the captures to your Convex backend:

# ~/rover/upload.py
"""Upload captured images and metadata to the Convex backend."""
import os
import json
import requests
import glob

CAPTURE_DIR = "/home/pi/rover/captures"
UPLOADED_DIR = "/home/pi/rover/captures/uploaded"

def upload_captures(api_url):
    """Upload all un-uploaded captures."""
    os.makedirs(UPLOADED_DIR, exist_ok=True)

    meta_files = sorted(glob.glob(os.path.join(CAPTURE_DIR, "*.json")))
    print(f"Found {len(meta_files)} captures to upload.")

    for meta_path in meta_files:
        img_path = meta_path.replace(".json", ".jpg")
        if not os.path.exists(img_path):
            print(f"  Skipping {meta_path} — no matching image")
            continue

        with open(meta_path) as f:
            metadata = json.load(f)

        try:
            # Upload image
            with open(img_path, "rb") as img:
                response = requests.post(
                    f"{api_url}/upload",
                    files={"image": img},
                    data={"metadata": json.dumps(metadata)},
                    timeout=30
                )

            if response.status_code == 200:
                # Move to uploaded folder
                os.rename(img_path, os.path.join(UPLOADED_DIR, os.path.basename(img_path)))
                os.rename(meta_path, os.path.join(UPLOADED_DIR, os.path.basename(meta_path)))
                print(f"  Uploaded: {metadata['filename']}")
            else:
                print(f"  Failed ({response.status_code}): {metadata['filename']}")

        except requests.exceptions.RequestException as e:
            print(f"  Network error: {e}")
            print("  Remaining captures will retry next upload.")
            break

    remaining = len(glob.glob(os.path.join(CAPTURE_DIR, "*.json")))
    print(f"Done. {remaining} captures remaining in queue.")


if __name__ == "__main__":
    from config.rover_config import CONVEX_UPLOAD_URL
    if not CONVEX_UPLOAD_URL:
        print("Set CONVEX_UPLOAD_URL in rover_config.py first!")
    else:
        upload_captures(CONVEX_UPLOAD_URL)

Note: The upload endpoint on the Convex side needs to be built — we'll create Convex functions for image storage and mission logging as a follow-up. For now, the script queues everything locally and handles retries.

Step 3: Mission Log

Record mission metadata for the web app:

# ~/rover/mission_log.py
"""Log mission summary to a local file and (eventually) the backend."""
import json
import os
from datetime import datetime

LOG_DIR = "/home/pi/rover/logs"

def save_mission_log(route_name, capture_count, duration_seconds,
                     start_pos, end_pos, battery_start, battery_end):
    """Save a mission summary."""
    os.makedirs(LOG_DIR, exist_ok=True)

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    log = {
        "mission_id": f"mission_{timestamp}",
        "route": route_name,
        "started_at": datetime.now().isoformat(),
        "duration_seconds": duration_seconds,
        "captures": capture_count,
        "start_position": {"lat": start_pos[0], "lon": start_pos[1]},
        "end_position": {"lat": end_pos[0], "lon": end_pos[1]},
        "battery_start_v": battery_start,
        "battery_end_v": battery_end,
    }

    filepath = os.path.join(LOG_DIR, f"{log['mission_id']}.json")
    with open(filepath, "w") as f:
        json.dump(log, f, indent=2)

    print(f"Mission log saved: {filepath}")
    return log

Step 4: Pre-Flight Checklist Script

Before every run, this script checks that everything is working:

# ~/rover/preflight.py
"""Pre-flight checks — run before every mission."""
import sys
sys.path.insert(0, '/home/pi/rover')

def check_all():
    results = []

    # 1. Camera
    print("Checking camera...", end=" ")
    try:
        from picamera2 import Picamera2
        cam = Picamera2()
        config = cam.create_still_configuration()
        cam.configure(config)
        cam.start()
        cam.capture_file("/tmp/preflight_test.jpg")
        cam.stop()
        print("OK")
        results.append(("Camera", True))
    except Exception as e:
        print(f"FAIL: {e}")
        results.append(("Camera", False))

    # 2. GPS
    print("Checking GPS serial...", end=" ")
    try:
        import serial
        ser = serial.Serial('/dev/serial0', 9600, timeout=2)
        line = ser.readline().decode('ascii', errors='replace')
        ser.close()
        if '$GP' in line or '$GN' in line:
            print("OK (NMEA data flowing)")
            results.append(("GPS", True))
        else:
            print("WARNING: Serial open but no NMEA data (may need sky view)")
            results.append(("GPS", True))  # Hardware OK, just no fix
    except Exception as e:
        print(f"FAIL: {e}")
        results.append(("GPS", False))

    # 3. I2C devices
    print("Checking I2C bus...", end=" ")
    try:
        import board, busio
        i2c = busio.I2C(board.SCL, board.SDA)
        while not i2c.try_lock():
            pass
        devices = i2c.scan()
        i2c.unlock()
        pca_ok = 0x40 in devices
        ads_ok = 0x48 in devices
        print(f"PCA9685={'OK' if pca_ok else 'MISSING'}, ADS1115={'OK' if ads_ok else 'MISSING'}")
        results.append(("PCA9685", pca_ok))
        results.append(("ADS1115", ads_ok))
    except Exception as e:
        print(f"FAIL: {e}")
        results.append(("I2C", False))

    # 4. Battery
    print("Checking battery voltage...", end=" ")
    try:
        from battery import BatteryMonitor
        batt = BatteryMonitor()
        v = batt.voltage
        status = "OK" if v > 7.0 else "LOW" if v > 6.4 else "CRITICAL"
        print(f"{v:.1f}V ({status})")
        results.append(("Battery", v > 6.4))
    except Exception as e:
        print(f"FAIL: {e}")
        results.append(("Battery", False))

    # 5. Disk space
    print("Checking disk space...", end=" ")
    import shutil
    usage = shutil.disk_usage("/home/pi/rover/captures")
    free_gb = usage.free / (1024**3)
    print(f"{free_gb:.1f} GB free")
    results.append(("Disk", free_gb > 1.0))

    # Summary
    print("\n" + "="*40)
    all_pass = all(r[1] for r in results)
    for name, ok in results:
        print(f"  {'PASS' if ok else 'FAIL'}: {name}")
    print("="*40)

    if all_pass:
        print("All checks passed. Ready for mission.")
    else:
        print("Some checks failed. Fix issues before flying.")

    return all_pass


if __name__ == "__main__":
    check_all()

Step 5: First Autonomous Test (Short Run)

Don't go big yet. First run should be a tiny route:

  1. Record a simple route — walk a straight line, ~30 meters:

    python route_recorder.py first_test
    
  2. Run preflight:

    python preflight.py
    
  3. Set the rover at the start of the route with cloth attached

  4. Run the mission:

    python mission.py routes/first_test.json
    
  5. Walk alongside the rover — ready to pick it up if something goes wrong

What you're watching for:

  • Does it follow the route roughly? (Don't expect perfection — GPS is ±2-3m)
  • Does it stop for captures?
  • Does the cloth maintain ground contact during the run?
  • Does it stop at the end of the route?
  • Do you get images in ~/rover/captures/?

Common first-run issues:

ProblemLikely CauseFix
Rover drives in circlesHeading estimate wrong, or steering invertedCheck steering direction — left/right might be swapped
Rover overshoots waypointsMoving too fast for 2Hz control loopReduce throttle
No captures takenDistance calculation off, or camera timeoutCheck GPS fix quality during run
Rover doesn't stop at route endNavigation logic bugCheck waypoint count and completion detection
Cloth tangled after first turnTurning too sharplyReduce max steering angle during missions

Step 6: Full Test Run

Once the short run works, do a proper test:

  1. Record a route that covers actual yard terrain — include turns, edges, varied grass height
  2. Run the mission for the full route
  3. After the run:
# Check captures
ls -la ~/rover/captures/
# Should have images + JSON sidecars

# Check a metadata file
cat ~/rover/captures/capture_*_0001.json

# Try uploading (if Convex endpoint is set up)
python upload.py

Step 7: Post-Run Image Review

Transfer all captures to your laptop for review:

# From your laptop
scp -r pi@tickslayer.local:~/rover/captures/ ./mission_captures/

Look at the images:

  • Can you see the full cloth?
  • Is focus consistent?
  • Is there enough contrast to spot ticks (or small objects)?
  • Are there any captures that are blurry, obscured, or badly framed?

This is your baseline for what the vision system will need to process. If images are consistently bad, revisit camera mounting (Guide 3) or capture timing (stop duration might need to be longer).

Step 8: Connect to the Web App

The final step is connecting the rover data to your Tick Slayer 3000 web app. This requires building Convex functions for:

  • Image storage — Upload images via Convex file storage
  • Mission logging — Record mission summaries
  • Capture records — Individual capture entries with GPS, timestamp, and image reference
  • Dashboard data — Aggregations for the web UI

This is a natural next phase of development — the web app's Documentation and Software sections are ready for these features. The rover-side upload script (upload.py) is already structured to POST to an API endpoint.

What's Next

Congratulations — you have an autonomous tick-sampling rover. Here's what comes next:

Short term:

  • Build the Convex upload endpoint and connect upload.py
  • Add mission history and image gallery to the web app
  • Run regular sampling sessions and build a dataset

Medium term:

  • Tick detection — start with manual counting from images, then explore ML
  • Heatmap visualization — plot tick density by GPS location
  • Environmental correlation — log temperature, humidity, time-of-day alongside captures

Long term:

  • CO2 attractant system (documented in the research section — 44% capture rate improvement)
  • Obstacle avoidance (ultrasonic or camera-based)
  • Multi-route scheduling — autonomous daily runs
  • Return-to-home with actual GPS navigation (not just stopping)

Final Checklist

  • Mission controller runs a short autonomous route
  • Captures are taken at ~15m intervals
  • Battery monitoring works during the run (via ADC + voltage divider)
  • Watchdog stops the rover if control loop fails
  • Route replay follows waypoints within GPS accuracy
  • Preflight script passes all checks
  • Full test run completed on real terrain
  • Images reviewed and quality is acceptable
  • Upload script structured and ready for Convex endpoint
  • Mission logs saved locally

You built a thing. Time to go find some ticks.