From 67f3371b45738aaa182c64e59f17689dd20bbd4f Mon Sep 17 00:00:00 2001 From: James <91348155+FrogAi@users.noreply.github.com> Date: Mon, 1 Dec 2025 12:00:00 -0700 Subject: [PATCH] Speed Limit Filler --- frogpilot/system/speed_limit_filler.py | 409 +++++++++++++++++++++++++ system/manager/process_config.py | 4 + 2 files changed, 413 insertions(+) create mode 100644 frogpilot/system/speed_limit_filler.py diff --git a/frogpilot/system/speed_limit_filler.py b/frogpilot/system/speed_limit_filler.py new file mode 100644 index 00000000..a4277eb2 --- /dev/null +++ b/frogpilot/system/speed_limit_filler.py @@ -0,0 +1,409 @@ +#!/usr/bin/env python3 +import json +import math +import requests +import time + +from collections import OrderedDict, deque +from datetime import datetime, timedelta, timezone + +from cereal import log, messaging + +from openpilot.common.constants import CV +from openpilot.common.gps import get_gps_location_service +from openpilot.common.params import Params + +from openpilot.frogpilot.common.frogpilot_utilities import calculate_distance_to_point, calculate_lane_width, is_url_pingable + +NetworkType = log.DeviceState.NetworkType + +BOUNDING_BOX_RADIUS_DEGREE = 0.1 +MAX_ENTRIES = 1_000_000 +MAX_OVERPASS_DATA_BYTES = 1_073_741_824 +MAX_OVERPASS_REQUESTS = 10_000 +METERS_PER_DEG_LAT = 111_320 +VETTING_INTERVAL_DAYS = 7 + +OVERPASS_API_URL = "https://overpass-api.de/api/interpreter" +OVERPASS_STATUS_URL = "https://overpass-api.de/api/status" + +class MapSpeedLogger: + def __init__(self): + self.params = Params(return_defaults=True) + self.params_memory = Params(memory=True) + + self.cached_box = None + self.previous_coordinates = None + + self.cached_segments = {} + + self.dataset_additions = deque(maxlen=MAX_ENTRIES) + + self.overpass_requests = self.params.get("OverpassRequests") + self.overpass_requests.setdefault("day", datetime.now(timezone.utc).day) + self.overpass_requests.setdefault("total_bytes", 0) + self.overpass_requests.setdefault("total_requests", 0) + + self.session = requests.Session() + self.session.headers.update({"Accept-Language": "en"}) + self.session.headers.update({"User-Agent": "frogpilot-map-speed-logger/1.0 (https://github.com/FrogAi/FrogPilot)"}) + + self.gps_location_service = get_gps_location_service(self.params) + + self.sm = messaging.SubMaster(["deviceState", "frogpilotCarState", "frogpilotPlan", self.gps_location_service, "modelV2"]) + + @property + def can_make_overpass_request(self): + return self.overpass_requests["total_bytes"] < MAX_OVERPASS_DATA_BYTES and self.overpass_requests["total_requests"] < MAX_OVERPASS_REQUESTS + + @property + def should_stop_processing(self): + return self.sm["deviceState"].started or not self.params_memory.get_bool("UpdateSpeedLimits") + + @staticmethod + def cleanup_dataset(dataset): + cleaned_data = OrderedDict() + + for item in dataset: + if "last_vetted" in item: + required = {"incorrect_limit", "last_vetted", "segment_id", "source", "speed_limit", "start_coordinates"} + else: + required = {"bearing", "end_coordinates", "incorrect_limit", "road_name", "road_width", "source", "speed_limit", "start_coordinates"} + + if not required.issubset(item.keys()): + continue + + entry_copy = item.copy() + entry_copy.pop("last_vetted", None) + + key = json.dumps(entry_copy, sort_keys=True) + cleaned_data[key] = item + + return deque(cleaned_data.values(), maxlen=MAX_ENTRIES) + + @staticmethod + def meters_to_deg_lat(meters): + return meters / METERS_PER_DEG_LAT + + @staticmethod + def meters_to_deg_lon(meters, latitude): + return meters / (METERS_PER_DEG_LAT * math.cos(latitude * CV.DEG_TO_RAD)) + + def get_speed_limit_source(self): + sources = [ + (self.sm["frogpilotPlan"].slcMapboxSpeedLimit, "Mapbox"), + (self.sm["frogpilotCarState"].dashboardSpeedLimit, "Dashboard") + ] + for speed_limit, source in sources: + if speed_limit > 0: + return speed_limit, source + return None + + def is_in_cached_box(self, latitude, longitude): + if self.cached_box is None: + return False + return self.cached_box["min_latitude"] <= latitude <= self.cached_box["max_latitude"] and \ + self.cached_box["min_longitude"] <= longitude <= self.cached_box["max_longitude"] + + def record_overpass_request(self, content_bytes): + self.overpass_requests["total_bytes"] += content_bytes + self.overpass_requests["total_requests"] += 1 + + def reset_daily_api_limits(self): + current_day = datetime.now(timezone.utc).day + if current_day != self.overpass_requests["day"]: + self.overpass_requests.update({ + "day": current_day, + "total_requests": 0, + "total_bytes": 0, + }) + + def update_params(self, dataset, filtered_dataset): + self.params.put("OverpassRequests", self.overpass_requests) + self.params.put("SpeedLimits", list(dataset)) + self.params.put("SpeedLimitsFiltered", list(filtered_dataset)) + + def wait_for_api(self): + while not is_url_pingable(OVERPASS_STATUS_URL): + print("Waiting for Overpass API to be available...") + self.sm.update() + + if self.should_stop_processing: + return False + + time.sleep(5) + return True + + def fetch_from_overpass(self, latitude, longitude): + min_lat = latitude - BOUNDING_BOX_RADIUS_DEGREE + max_lat = latitude + BOUNDING_BOX_RADIUS_DEGREE + min_lon = longitude - BOUNDING_BOX_RADIUS_DEGREE + max_lon = longitude + BOUNDING_BOX_RADIUS_DEGREE + + self.cached_box = {"min_latitude": min_lat, "max_latitude": max_lat, "min_longitude": min_lon, "max_longitude": max_lon} + self.cached_segments.clear() + + query = ( + f"[out:json][timeout:90][maxsize:{MAX_OVERPASS_DATA_BYTES // 10}];" + f"way({min_lat:.5f},{min_lon:.5f},{max_lat:.5f},{max_lon:.5f})" + "[highway~'^(motorway|motorway_link|primary|primary_link|residential|" + "secondary|secondary_link|tertiary|tertiary_link|trunk|trunk_link)$'];" + "out geom qt;" + ) + + try: + response = self.session.post(OVERPASS_API_URL, data=query, timeout=90) + self.record_overpass_request(len(response.content)) + + if response.status_code == 429: + retry_after = int(response.headers.get("Retry-After", 10)) + print(f"Overpass API rate limit hit. Retrying in {retry_after} seconds.") + + time.sleep(retry_after) + + response = self.session.post(OVERPASS_API_URL, data=query, timeout=90) + self.record_overpass_request(len(response.content)) + + response.raise_for_status() + return response.json().get("elements", []) + except requests.exceptions.RequestException as exception: + print(f"Overpass API request failed: {exception}") + self.cached_segments.clear() + return [] + + def filter_segments_for_entry(self, entry): + bearing_rad = entry["bearing"] * CV.DEG_TO_RAD + start_lat, start_lon = entry["start_coordinates"]["latitude"], entry["start_coordinates"]["longitude"] + end_lat, end_lon = entry["end_coordinates"]["latitude"], entry["end_coordinates"]["longitude"] + mid_lat = (start_lat + end_lat) / 2 + + forward_buffer_lat = self.meters_to_deg_lat(entry["speed_limit"]) + forward_buffer_lon = self.meters_to_deg_lon(entry["speed_limit"], mid_lat) + side_buffer_lat = self.meters_to_deg_lat(entry["road_width"]) + side_buffer_lon = self.meters_to_deg_lon(entry["road_width"], mid_lat) + + delta_lat_fwd = forward_buffer_lat * math.cos(bearing_rad) + delta_lon_fwd = forward_buffer_lon * math.sin(bearing_rad) + delta_lat_side = side_buffer_lat * math.cos(bearing_rad + math.pi / 2) + delta_lon_side = side_buffer_lon * math.sin(bearing_rad + math.pi / 2) + + min_lat = min(start_lat, end_lat) - abs(delta_lat_fwd) - abs(delta_lat_side) + max_lat = max(start_lat, end_lat) + abs(delta_lat_fwd) + abs(delta_lat_side) + min_lon = min(start_lon, end_lon) - abs(delta_lon_fwd) - abs(delta_lon_side) + max_lon = max(start_lon, end_lon) + abs(delta_lon_fwd) + abs(delta_lon_side) + + relevant_segments = [] + for segment in self.cached_segments.values(): + if not segment or "nodes" not in segment: + continue + + latitudes = [node[0] for node in segment["nodes"]] + longitudes = [node[1] for node in segment["nodes"]] + + if not (max(latitudes) < min_lat or min(latitudes) > max_lat or max(longitudes) < min_lon or min(longitudes) > max_lon): + relevant_segments.append(segment) + + return relevant_segments + + def log_speed_limit(self): + if not self.sm.updated[self.gps_location_service]: + return + + gps_location = self.sm[self.gps_location_service] + current_latitude = gps_location.latitude + current_longitude = gps_location.longitude + + if self.previous_coordinates is None: + self.previous_coordinates = {"latitude": current_latitude, "longitude": current_longitude} + return + + current_speed_source = self.get_speed_limit_source() + valid_sources = {source[0] for source in [current_speed_source] if source and source[0] > 0} + + map_speed = self.params_memory.get("MapSpeedLimit") + is_incorrect_limit = bool(map_speed > 0 and valid_sources and all(abs(map_speed - source) > 1 for source in valid_sources)) + + if map_speed > 0 and not is_incorrect_limit: + self.previous_coordinates = None + return + + road_name = self.params_memory.get("RoadName") + if not road_name or not current_speed_source: + return + + distance = calculate_distance_to_point( + self.previous_coordinates["latitude"], + self.previous_coordinates["longitude"], + current_latitude, + current_longitude + ) + if distance < 1: + return + + speed_limit, source = current_speed_source + self.dataset_additions.append({ + "bearing": gps_location.bearingDeg, + "end_coordinates": {"latitude": current_latitude, "longitude": current_longitude}, + "incorrect_limit": is_incorrect_limit, + "road_name": road_name, + "road_width": calculate_lane_width(self.sm["modelV2"].laneLines[1], self.sm["modelV2"].laneLines[2]), + "source": source, + "speed_limit": speed_limit, + "start_coordinates": self.previous_coordinates, + }) + + self.previous_coordinates = {"latitude": current_latitude, "longitude": current_longitude} + + def process_new_entries(self, dataset, filtered_dataset): + existing_segment_ids = {entry["segment_id"] for entry in filtered_dataset if "segment_id" in entry} + entries_to_process = list(dataset) + total_entries = len(entries_to_process) + + for i, entry in enumerate(entries_to_process): + self.sm.update() + + if self.should_stop_processing: + break + + if not self.can_make_overpass_request: + self.params_memory.put("UpdateSpeedLimitsStatus", "Hit API limit...") + time.sleep(5) + break + + self.params_memory.put("UpdateSpeedLimitsStatus", f"Processing: {i + 1} / {total_entries}") + + start_coords = entry["start_coordinates"] + self.update_cached_segments(start_coords["latitude"], start_coords["longitude"]) + segments = self.filter_segments_for_entry(entry) + + dataset.remove(entry) + + for segment in segments: + segment_id = segment["segment_id"] + if segment_id in existing_segment_ids: + continue + if segment["maxspeed"] and not entry.get("incorrect_limit"): + continue + if segment["road_name"] != entry.get("road_name"): + continue + + filtered_dataset.append({ + "incorrect_limit": entry.get("incorrect_limit"), + "last_vetted": datetime.now(timezone.utc).isoformat(), + "segment_id": segment_id, + "source": entry["source"], + "speed_limit": entry["speed_limit"], + "start_coordinates": entry["start_coordinates"], + }) + existing_segment_ids.add(segment_id) + + if i % 100 == 0: + self.update_params(dataset, filtered_dataset) + + def process_speed_limits(self): + self.reset_daily_api_limits() + + if not self.wait_for_api(): + return + + self.cached_box, self.cached_segments = None, {} + + dataset = self.cleanup_dataset(self.params.get("SpeedLimits")) + filtered_dataset = self.cleanup_dataset(self.params.get("SpeedLimitsFiltered")) + + filtered_dataset = self.vet_entries(filtered_dataset) + self.update_params(dataset, filtered_dataset) + + if dataset and not self.should_stop_processing: + self.cached_box, self.cached_segments = None, {} + self.params_memory.put("UpdateSpeedLimitsStatus", "Calculating...") + self.process_new_entries(dataset, filtered_dataset) + + self.update_params(dataset, filtered_dataset) + self.params_memory.put("UpdateSpeedLimitsStatus", "Completed!") + + def update_cached_segments(self, latitude, longitude, vetting=False): + if not self.is_in_cached_box(latitude, longitude): + elements = self.fetch_from_overpass(latitude, longitude) + for way in elements: + if way.get("type") == "way" and (segment_id := way.get("id")): + tags = way.get("tags", {}) + if vetting: + self.cached_segments[segment_id] = tags.get("maxspeed") + elif "geometry" in way and (nodes := way["geometry"]): + self.cached_segments[segment_id] = { + "maxspeed": tags.get("maxspeed"), + "nodes": [(node["lat"], node["lon"]) for node in nodes], + "road_name": tags.get("name"), + "segment_id": segment_id, + } + + def vet_entries(self, filtered_dataset): + dataset_list = list(filtered_dataset) + total_to_vet = len(filtered_dataset) + vetted_entries = deque(maxlen=MAX_ENTRIES) + + for i, entry in enumerate(dataset_list): + self.sm.update() + + if self.should_stop_processing: + vetted_entries.extend(dataset_list[i:]) + break + + if not self.can_make_overpass_request: + self.params_memory.put("UpdateSpeedLimitsStatus", "Hit API limit...") + time.sleep(5) + vetted_entries.extend(dataset_list[i:]) + break + + self.params_memory.put("UpdateSpeedLimitsStatus", f"Vetting: {i + 1} / {total_to_vet}") + + last_vetted_time = datetime.fromisoformat(entry["last_vetted"]) + if datetime.now(timezone.utc) - last_vetted_time < timedelta(days=VETTING_INTERVAL_DAYS): + vetted_entries.append(entry) + continue + + start_coords = entry["start_coordinates"] + self.update_cached_segments(start_coords["latitude"], start_coords["longitude"], vetting=True) + + current_maxspeed = self.cached_segments.get(entry["segment_id"]) + if current_maxspeed is None or (entry.get("incorrect_limit") and current_maxspeed != entry.get("speed_limit")): + entry["last_vetted"] = datetime.now(timezone.utc).isoformat() + vetted_entries.append(entry) + + return self.cleanup_dataset(list(vetted_entries)) + +def main(): + logger = MapSpeedLogger() + + previously_started = False + + while True: + logger.sm.update() + + if logger.sm["deviceState"].started: + logger.log_speed_limit() + + previously_started = True + elif previously_started: + existing_dataset = logger.params.get("SpeedLimits") or [] + existing_dataset.extend(logger.dataset_additions) + + new_dataset = logger.cleanup_dataset(existing_dataset) + logger.params.put("SpeedLimits", list(new_dataset)) + + if logger.sm["deviceState"].networkType in (NetworkType.ethernet, NetworkType.wifi): + logger.params_memory.put_bool("UpdateSpeedLimits", True) + + logger.dataset_additions.clear() + + previously_started = False + elif logger.params_memory.get_bool("UpdateSpeedLimits"): + logger.process_speed_limits() + + logger.params_memory.remove("UpdateSpeedLimits") + else: + time.sleep(5) + +if __name__ == "__main__": + main() diff --git a/system/manager/process_config.py b/system/manager/process_config.py index d2c2e687..206728a7 100644 --- a/system/manager/process_config.py +++ b/system/manager/process_config.py @@ -70,6 +70,9 @@ def allow_logging(started: bool, params: Params, CP: car.CarParams, frogpilot_to def allow_uploads(started: bool, params: Params, CP: car.CarParams, frogpilot_toggles: SimpleNamespace) -> bool: return not frogpilot_toggles.no_uploads or frogpilot_toggles.no_onroad_uploads +def run_speed_limit_filler(started: bool, params: Params, CP: car.CarParams, frogpilot_toggles: SimpleNamespace) -> bool: + return frogpilot_toggles.speed_limit_filler + procs = [ DaemonProcess("manage_athenad", "system.athena.manage_athenad", "AthenadPid"), @@ -131,6 +134,7 @@ elif TICI: procs += [ PythonProcess("frogpilot_process", "frogpilot.frogpilot_process", always_run), PythonProcess("mapd", "frogpilot.navigation.mapd", always_run), + PythonProcess("speed_limit_filler", "frogpilot.system.speed_limit_filler", run_speed_limit_filler), ] managed_processes = {p.name: p for p in procs}