#!/usr/bin/env python3 # PFEIFER - SLC - Modified by FrogAi for FrogPilot import calendar import json import math import numpy as np import requests from concurrent.futures import ThreadPoolExecutor from openpilot.common.conversions import Conversions as CV from openpilot.common.realtime import DT_MDL from openpilot.frogpilot.common.frogpilot_utilities import calculate_bearing_offset, calculate_distance_to_point, is_url_pingable from openpilot.frogpilot.common.frogpilot_variables import params, params_memory FREE_MAPBOX_REQUESTS = 100_000 class SpeedLimitController: def __init__(self): self.calling_mapbox = False self.override_slc = False self.denied_target = 0 self.map_speed_limit = 0 self.mapbox_limit = 0 self.next_speed_limit = 0 self.overridden_speed = 0 self.segment_distance = 0 self.speed_limit_changed_timer = 0 self.target = 0 self.unconfirmed_speed_limit = 0 self.previous_source = "None" self.source = "None" self.mapbox_requests = json.loads(params.get("MapBoxRequests") or "{}") self.mapbox_requests.setdefault("total_requests", 0) self.mapbox_requests.setdefault("max_requests", FREE_MAPBOX_REQUESTS - (28 * 100)) self.mapbox_host = "https://api.mapbox.com" self.mapbox_token = params.get("MapboxSecretKey", encoding="utf8") self.previous_target = params.get_float("PreviousSpeedLimit") self.executor = ThreadPoolExecutor(max_workers=1) self.session = requests.Session() self.session.headers.update({"Accept-Language": "en"}) self.session.headers.update({"User-Agent": "frogpilot-mapbox-speed-limit-retriever/1.0 (https://github.com/FrogAi/FrogPilot)"}) @property def experimental_mode(self): return self.frogpilot_toggles.slc_fallback_experimental_mode and self.target == 0 @property def offset(self): if self.frogpilot_toggles.is_metric: offset_map = [ (0, 8.1, self.frogpilot_toggles.speed_limit_offset1), # 0–29 km/h (8.1, 13.6, self.frogpilot_toggles.speed_limit_offset2), # 30–49 (13.6, 16.4, self.frogpilot_toggles.speed_limit_offset3), # 50–59 (16.4, 21.9, self.frogpilot_toggles.speed_limit_offset4), # 60–79 (21.9, 27.5, self.frogpilot_toggles.speed_limit_offset5), # 80–99 (27.5, 33.1, self.frogpilot_toggles.speed_limit_offset6), # 100–119 (33.1, 38.9, self.frogpilot_toggles.speed_limit_offset7), # 120–140 ] else: offset_map = [ (0, 11.2, self.frogpilot_toggles.speed_limit_offset1), # 0–24 mph (11.2, 15.2, self.frogpilot_toggles.speed_limit_offset2), # 25–34 (15.2, 19.6, self.frogpilot_toggles.speed_limit_offset3), # 35–44 (19.6, 24.1, self.frogpilot_toggles.speed_limit_offset4), # 45–54 (24.1, 28.6, self.frogpilot_toggles.speed_limit_offset5), # 55–64 (28.6, 33.1, self.frogpilot_toggles.speed_limit_offset6), # 65–74 (33.1, 44.2, self.frogpilot_toggles.speed_limit_offset7), # 75–99 ] return next((offset for low, high, offset in offset_map if low < self.target < high), 0) def get_mapbox_speed_limit(self, gps_position, now, time_validated, v_ego, sm): if not gps_position or not self.mapbox_token or (sm["carState"].steeringAngleDeg - sm["liveParameters"].angleOffsetDeg) >= 45: self.mapbox_limit = 0 self.segment_distance = 0 return if v_ego < 1: return if self.segment_distance > 0: self.segment_distance -= v_ego * DT_MDL return if self.calling_mapbox: self.segment_distance = v_ego return def make_request(): try: self.calling_mapbox = True successful = False if not is_url_pingable(self.mapbox_host): self.segment_distance = 1000 return None if time_validated: current_month = now.month if current_month != self.mapbox_requests.get("month"): self.mapbox_requests.update({ "month": current_month, "total_requests": 0, "max_requests": FREE_MAPBOX_REQUESTS - calendar.monthrange(now.year, current_month)[1] * 100, }) self.mapbox_requests["total_requests"] += 1 params.put_nonblocking("MapBoxRequests", json.dumps(self.mapbox_requests)) current_bearing = gps_position.get("bearing") current_latitude = gps_position.get("latitude") current_longitude = gps_position.get("longitude") future_latitude, future_longitude = calculate_bearing_offset(current_latitude, current_longitude, current_bearing, v_ego) url = ( f"{self.mapbox_host}/matching/v5/mapbox/driving/" f"{current_longitude},{current_latitude};" f"{future_longitude},{future_latitude}.json" ) mapbox_params = { "access_token": self.mapbox_token, "annotations": "maxspeed,distance", "geometries": "polyline6", "overview": "full", "steps": "false", "radiuses": "10;10", "tidy": "true", } response = self.session.get(url, params=mapbox_params, timeout=10) response.raise_for_status() successful = True return response.json() except Exception as exception: print(f"Unexpected error in Mapbox request: {exception}") finally: self.calling_mapbox = False if not successful: self.mapbox_limit = 0 self.segment_distance = v_ego return None def complete_request(future): try: data = future.result() if data: matchings = data.get("matchings") or [] if not matchings: self.mapbox_limit = 0 self.segment_distance = v_ego return legs = (matchings[0] or {}).get("legs") or [] if not legs: self.mapbox_limit = 0 self.segment_distance = v_ego return annotation = legs[0].get("annotation") or {} distances = annotation.get("distance") or [v_ego] segment_distance = distances[0] speed_data = annotation.get("maxspeed", []) speed_limit_kph = 0 if speed_data: first_segment_speed = speed_data[0] speed_limit_kph = (first_segment_speed.get("speed") if first_segment_speed.get("speed") != "none" else 0) or 0 if speed_limit_kph > 0: self.mapbox_limit = speed_limit_kph * CV.KPH_TO_MS self.segment_distance = segment_distance return self.mapbox_limit = 0 self.segment_distance = v_ego except Exception as exception: print(f"Mapbox Callback Error: {exception}") self.mapbox_limit = 0 self.segment_distance = v_ego future = self.executor.submit(make_request) future.add_done_callback(complete_request) def handle_limit_change(self, desired_source, desired_target, sm): self.speed_limit_changed_timer += DT_MDL speed_limit_accepted = (sm["frogpilotCarState"].accelPressed and sm["carControl"].longActive) or params_memory.get_bool("SpeedLimitAccepted") speed_limit_denied = sm["frogpilotCarState"].decelPressed or (self.speed_limit_changed_timer >= 30) if speed_limit_accepted: self.overridden_speed = 0 self.source = desired_source self.target = desired_target params_memory.remove("SpeedLimitAccepted") elif speed_limit_denied: self.denied_target = desired_target self.previous_source = desired_source self.previous_target = desired_target elif desired_target < self.target and not self.frogpilot_toggles.speed_limit_confirmation_lower: self.source = desired_source self.target = desired_target elif desired_target > self.target and not self.frogpilot_toggles.speed_limit_confirmation_higher: self.source = desired_source self.target = desired_target else: self.source = "None" self.unconfirmed_speed_limit = desired_target if self.target != self.previous_target and self.target > 0 and not speed_limit_denied: self.denied_target = 0 self.previous_source = self.source self.previous_target = self.target params.put_float_nonblocking("PreviousSpeedLimit", self.target) def update_limits(self, dashboard_speed_limit, gps_position, navigation_speed_limit, now, time_validated, v_cruise, v_ego, sm): self.update_map_speed_limit(gps_position, v_ego) limits = { "Dashboard": dashboard_speed_limit, "Map Data": self.map_speed_limit, "Navigation": navigation_speed_limit } filtered_limits = {source: limit for source, limit in limits.items() if limit >= 1} if self.frogpilot_toggles.speed_limit_priority_highest: desired_source = max(filtered_limits, key=filtered_limits.get, default="None") desired_target = filtered_limits.get(desired_source, 0) elif self.frogpilot_toggles.speed_limit_priority_lowest: desired_source = min(filtered_limits, key=filtered_limits.get, default="None") desired_target = filtered_limits.get(desired_source, 0) elif filtered_limits: for priority in [ self.frogpilot_toggles.speed_limit_priority1, self.frogpilot_toggles.speed_limit_priority2, self.frogpilot_toggles.speed_limit_priority3 ]: if priority in filtered_limits: desired_source = priority desired_target = filtered_limits[desired_source] break else: desired_source = "None" desired_target = 0 else: desired_source = "None" desired_target = 0 if desired_target == 0 or self.target == 0: if self.mapbox_requests["total_requests"] < self.mapbox_requests["max_requests"] and self.frogpilot_toggles.slc_mapbox_filler: self.get_mapbox_speed_limit(gps_position, now, time_validated, v_ego, sm) if self.mapbox_limit >= 1: desired_source = "Mapbox" desired_target = self.mapbox_limit if desired_target == 0 or self.target == 0: if self.denied_target != self.previous_target > 0 and self.frogpilot_toggles.slc_fallback_previous_speed_limit: desired_source = self.previous_source desired_target = self.previous_target self.target = desired_target elif sm["controlsState"].enabled and self.frogpilot_toggles.slc_fallback_set_speed: desired_source = "None" desired_target = v_cruise else: self.mapbox_limit = 0 self.segment_distance = 0 if abs(desired_target - self.previous_target) >= 1: self.handle_limit_change(desired_source, desired_target, sm) elif desired_source != self.source and abs(desired_target - self.target) < 1: self.source = desired_source else: self.speed_limit_changed_timer = 0 self.unconfirmed_speed_limit = 0 def update_map_speed_limit(self, gps_position, v_ego): if not gps_position: return self.map_speed_limit = params_memory.get_float("MapSpeedLimit") next_map_speed_limit = json.loads(params_memory.get("NextMapSpeedLimit") or "{}") self.next_speed_limit = next_map_speed_limit.get("speedlimit", 0) if self.next_speed_limit: current_latitude = gps_position.get("latitude") current_longitude = gps_position.get("longitude") next_latitude = next_map_speed_limit.get("latitude") next_longitude = next_map_speed_limit.get("longitude") distance_to_upcoming = calculate_distance_to_point(current_latitude * CV.DEG_TO_RAD, current_longitude * CV.DEG_TO_RAD, next_latitude * CV.DEG_TO_RAD, next_longitude * CV.DEG_TO_RAD) if self.map_speed_limit < self.next_speed_limit: max_lookahead = self.frogpilot_toggles.map_speed_lookahead_higher * v_ego elif self.map_speed_limit > self.next_speed_limit: max_lookahead = self.frogpilot_toggles.map_speed_lookahead_lower * v_ego else: max_lookahead = 0 if distance_to_upcoming < max_lookahead: self.map_speed_limit = self.next_speed_limit def update_override(self, v_cruise, v_cruise_diff, v_ego, v_ego_diff, sm): self.override_slc = self.overridden_speed > self.target + self.offset > 0 self.override_slc |= sm["carState"].gasPressed and v_ego > self.target + self.offset > 0 self.override_slc &= sm["controlsState"].enabled if self.override_slc: if self.frogpilot_toggles.speed_limit_controller_override_manual: if sm["carState"].gasPressed: self.overridden_speed = max(v_ego + v_ego_diff, self.overridden_speed) self.overridden_speed = float(np.clip(self.overridden_speed, self.target + self.offset, v_cruise + v_cruise_diff)) elif self.frogpilot_toggles.speed_limit_controller_override_set_speed: self.overridden_speed = v_cruise + v_cruise_diff self.source = "None" else: self.overridden_speed = 0