From 2f00fc348d95e62a2ca2e867d8a70b28015bfb9d Mon Sep 17 00:00:00 2001 From: James <91348155+FrogAi@users.noreply.github.com> Date: Tue, 9 Dec 2025 05:28:24 -0700 Subject: [PATCH] FrogPilot utilities --- .../common/frogpilot_download_utilities.py | 162 ++++++++++++++++ frogpilot/common/frogpilot_utilities.py | 175 ++++++++++++++++++ frogpilot/frogpilot_process.py | 17 +- 3 files changed, 347 insertions(+), 7 deletions(-) create mode 100644 frogpilot/common/frogpilot_download_utilities.py create mode 100644 frogpilot/common/frogpilot_utilities.py diff --git a/frogpilot/common/frogpilot_download_utilities.py b/frogpilot/common/frogpilot_download_utilities.py new file mode 100644 index 00000000..b14c4014 --- /dev/null +++ b/frogpilot/common/frogpilot_download_utilities.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 +import requests + +from datetime import datetime, timezone + +from openpilot.frogpilot.common.frogpilot_utilities import delete_file, is_url_pingable +from openpilot.frogpilot.common.frogpilot_variables import RESOURCES_REPO + +GITHUB_URL = f"https://raw.githubusercontent.com/{RESOURCES_REPO}" +GITLAB_URL = f"https://gitlab.com/{RESOURCES_REPO}/-/raw" + +def download_file(cancel_param, destination, download_param, params_memory, progress_param, session, url, offset_bytes=0, total_bytes=0): + try: + destination.parent.mkdir(parents=True, exist_ok=True) + + with session.get(url, stream=True, timeout=10) as response: + if response.status_code == 404 and url.endswith(".gif"): + print(f"GIF download failed (404). Attempting fallback to PNG for {destination.name}") + return download_file(cancel_param, destination.with_suffix(".png"), download_param, params_memory, progress_param, session, url.replace(".gif", ".png"), offset_bytes, total_bytes) + + response.raise_for_status() + + total_size = int(response.headers.get("Content-Length", 0)) + if total_size == 0: + handle_error(None, download_param, "Download invalid...", "Download invalid...", params_memory, progress_param) + return + + temp_file_path = destination.with_suffix(destination.suffix + ".tmp") + + try: + with temp_file_path.open("wb") as temp_file: + downloaded_size = 0 + + for chunk in response.iter_content(chunk_size=16384): + if params_memory.get_bool(cancel_param): + raise InterruptedError + + if chunk: + temp_file.write(chunk) + downloaded_size += len(chunk) + + if total_bytes: + overall_progress = (offset_bytes + downloaded_size) / total_bytes * 100 + elif total_size > 0: + overall_progress = downloaded_size / total_size * 100 + else: + overall_progress = 0 + + if overall_progress < 100: + params_memory.put(progress_param, f"{overall_progress:.0f}%") + else: + params_memory.put(progress_param, "Verifying authenticity...") + + temp_file_path.replace(destination) + + except InterruptedError: + temp_file_path.unlink(missing_ok=True) + handle_error(None, download_param, "Download cancelled...", "Download cancelled...", params_memory, progress_param) + return + except Exception: + temp_file_path.unlink(missing_ok=True) + raise + + except Exception as exception: + handle_request_error(destination, download_param, exception, params_memory, progress_param) + + +def get_remote_file_size(params_memory, session, url): + try: + response = session.head(url, headers={"Accept-Encoding": "identity"}, timeout=10) + response.raise_for_status() + size = int(response.headers.get("Content-Length", 0)) + + if size == 0: + with session.get(url, headers={"Accept-Encoding": "identity"}, stream=True, timeout=10) as response: + response.raise_for_status() + size = int(response.headers.get("Content-Length", 0)) + + return size + except Exception as exception: + handle_request_error(None, None, exception, params_memory, None) + return 0 + + +def get_repository_url(session): + if is_url_pingable("https://github.com") and not github_rate_limited(session): + return GITHUB_URL + if is_url_pingable("https://gitlab.com"): + return GITLAB_URL + return None + + +def github_rate_limited(session): + try: + response = session.get("https://api.github.com/rate_limit", timeout=10) + response.raise_for_status() + rate_limit_info = response.json() + + rate_info = rate_limit_info.get("resources", {}).get("core", {}) + remaining = rate_info.get("remaining", 0) + print(f"GitHub API Requests Remaining: {remaining}") + + if remaining <= 0: + reset_timestamp = rate_info.get("reset", 0) + reset_time = datetime.fromtimestamp(reset_timestamp, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + print("GitHub rate limit reached") + print(f"GitHub Rate Limit Resets At (UTC): {reset_time}") + return True + return False + + except requests.exceptions.RequestException as exception: + print(f"Error checking GitHub rate limit: {exception}") + return True + + +def handle_error(destination, download_param, error, error_message, params_memory, progress_param): + if destination: + delete_file(destination) + + if progress_param and "404" not in error_message: + print(f"Error occurred: {error}") + params_memory.put(progress_param, error_message) + params_memory.remove(download_param) + + +def handle_request_error(destination, download_param, error, params_memory, progress_param): + if isinstance(error, requests.exceptions.HTTPError) and error.response is not None: + error_message = f"Server error ({error.response.status_code})" + elif isinstance(error, (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError)): + error_message = "Connection dropped" + elif isinstance(error, requests.exceptions.ReadTimeout): + error_message = "Read timed out" + elif isinstance(error, requests.exceptions.RequestException): + error_message = "Network request error. Check connection" + elif isinstance(error, requests.exceptions.Timeout): + error_message = "Download timed out" + else: + error_message = "Unexpected error" + + handle_error(destination, download_param, error, f"Failed: {error_message}", params_memory, progress_param) + + +def verify_download(file_path, params_memory, session, url): + if not file_path.is_file(): + print(f"File not found: {file_path}") + return False + + if file_path.suffix == ".png" and url.endswith(".gif"): + url = url.replace(".gif", ".png") + + remote_file_size = get_remote_file_size(params_memory, session, url) + + if remote_file_size == 0: + print(f"Error fetching remote size for {file_path}") + return False + + local_size = file_path.stat().st_size + if remote_file_size != local_size: + print(f"File size mismatch for {file_path}: Remote {remote_file_size} vs Local {local_size}") + return False + + return True diff --git a/frogpilot/common/frogpilot_utilities.py b/frogpilot/common/frogpilot_utilities.py new file mode 100644 index 00000000..b19eac31 --- /dev/null +++ b/frogpilot/common/frogpilot_utilities.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 +import json +import math +import os +import requests +import subprocess +import threading +import zipfile + +from pathlib import Path + +import openpilot.system.sentry as sentry + +from cereal import messaging + +from openpilot.frogpilot.common.frogpilot_variables import EARTH_RADIUS + +class ThreadManager: + def __init__(self): + self.thread_lock = threading.Lock() + + self.running_threads = {} + + def run_with_lock(self, target, args=(), report=True): + name = target.__name__ + + if not isinstance(args, (tuple, list)): + args = (args,) + + with self.thread_lock: + dead_threads = [key for key, thread in self.running_threads.items() if not thread.is_alive()] + for key in dead_threads: + del self.running_threads[key] + + if name in self.running_threads and self.running_threads[name].is_alive(): + return + + def wrapped_target(*t_args): + try: + target(*t_args) + except Exception as exception: + print(f"Error in thread '{name}': {exception}") + if report: + sentry.capture_exception(exception) + + thread = threading.Thread(args=args, daemon=True, target=wrapped_target) + thread.start() + self.running_threads[name] = thread + + def is_thread_alive(self, name): + with self.thread_lock: + thread = self.running_threads.get(name) + return thread is not None and thread.is_alive() + + +def calculate_bearing_offset(latitude, longitude, current_bearing, distance): + bearing = math.radians(current_bearing) + lat_rad = math.radians(latitude) + lon_rad = math.radians(longitude) + + delta = distance / EARTH_RADIUS + + new_lat = math.asin(math.sin(lat_rad) * math.cos(delta) + math.cos(lat_rad) * math.sin(delta) * math.cos(bearing)) + new_lon = lon_rad + math.atan2(math.sin(bearing) * math.sin(delta) * math.cos(lat_rad), math.cos(delta) - math.sin(lat_rad) * math.sin(new_lat)) + return math.degrees(new_lat), math.degrees(new_lon) + + +def calculate_distance_to_point(lat1, lon1, lat2, lon2): + lat1_rad = math.radians(lat1) + lon1_rad = math.radians(lon1) + lat2_rad = math.radians(lat2) + lon2_rad = math.radians(lon2) + + delta_lat = lat2_rad - lat1_rad + delta_lon = lon2_rad - lon1_rad + + a = (math.sin(delta_lat / 2) ** 2) + math.cos(lat1_rad) * math.cos(lat2_rad) * (math.sin(delta_lon / 2) ** 2) + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + + return EARTH_RADIUS * c + + +def contains_event_type(events, frogpilot_events, *event_types): + return any(events.contains(event_type) or frogpilot_events.contains(event_type) for event_type in event_types) + + +def delete_file(path, print_error=True, report=True): + path = Path(path) + if path.is_file() or path.is_symlink(): + run_cmd(["sudo", "rm", "-f", str(path)], f"Deleted file: {path}", f"Failed to delete file: {path}", report=report) + elif path.is_dir(): + run_cmd(["sudo", "rm", "-rf", str(path)], f"Deleted directory: {path}", f"Failed to delete directory: {path}", report=report) + elif print_error: + print(f"File not found: {path}") + + +def extract_zip(zip_file, extract_path): + with zipfile.ZipFile(zip_file, "r") as zip: + print(f"Extracting {zip_file} to {extract_path}") + zip.extractall(extract_path) + + zip_file.unlink() + print(f"Extraction completed!") + + +def is_url_pingable(url): + if not url: + return False + + if not hasattr(is_url_pingable, "session"): + is_url_pingable.session = requests.Session() + is_url_pingable.session.headers.update({"User-Agent": "frogpilot-ping-test/1.0 (https://github.com/FrogAi/FrogPilot)"}) + + try: + response = is_url_pingable.session.head(url, timeout=10, allow_redirects=True) + if response.status_code in (405, 501): + response = is_url_pingable.session.get(url, timeout=10, allow_redirects=True, stream=True) + + is_accessible = response.ok + response.close() + return is_accessible + + except (requests.exceptions.ConnectionError, requests.exceptions.SSLError): + return False + except requests.exceptions.RequestException as error: + print(f"{error.__class__.__name__} while pinging {url}: {error}") + return False + except Exception as exception: + print(f"Unexpected error while pinging {url}: {exception}") + return False + + +def load_json_file(path): + if path.is_file(): + try: + with open(path) as file: + return json.load(file) + except json.JSONDecodeError: + print(f"Failed to load JSON file: {path}") + return {} + return {} + + +def run_cmd(cmd, success_message, fail_message, env=None, report=True): + try: + result = subprocess.run(cmd, capture_output=True, check=True, env=env, text=True) + print(success_message) + return result.stdout.strip() + except subprocess.CalledProcessError as exception: + print(f"Command failed with error: {exception.stderr}") + print(fail_message) + if report: + sentry.capture_exception(exception.stderr) + return None + except Exception as exception: + print(f"Unexpected error occurred: {exception}") + print(fail_message) + if report: + sentry.capture_exception(exception) + return None + + +def update_can_parser(can_parser, can_sock): + can_msgs = messaging.drain_sock(can_sock, wait_for_one=True) + can_parser.update([(msg.logMonoTime, [[frame.address, frame.dat, frame.src] for frame in msg.can]) for msg in can_msgs if msg.which() == "can"]) + + +def update_json_file(path, data): + temp_path = f"{path}.tmp" + with open(temp_path, "w") as file: + json.dump(data, file, indent=2, sort_keys=True) + file.flush() + os.fsync(file.fileno()) + + os.replace(temp_path, path) diff --git a/frogpilot/frogpilot_process.py b/frogpilot/frogpilot_process.py index 610f4d84..0a73326c 100644 --- a/frogpilot/frogpilot_process.py +++ b/frogpilot/frogpilot_process.py @@ -8,15 +8,16 @@ from openpilot.common.params import Params from openpilot.common.realtime import DT_MDL, Priority, Ratekeeper, config_realtime_process from openpilot.common.time_helpers import system_time_valid +from openpilot.frogpilot.common.frogpilot_utilities import ThreadManager, is_url_pingable from openpilot.frogpilot.controls.frogpilot_planner import FrogPilotPlanner from openpilot.frogpilot.system.frogpilot_stats import send_stats from openpilot.frogpilot.system.frogpilot_tracking import FrogPilotTracking ASSET_CHECK_RATE = (1 / DT_MDL) -def check_assets(params_memory): +def check_assets(thread_manager, params_memory): -def transition_offroad(frogpilot_planner, time_validated, sm, params): +def transition_offroad(frogpilot_planner, thread_manager, time_validated, sm, params): params.put("LastGPSPosition", json.dumps(frogpilot_planner.gps_position)) if time_validated: @@ -24,7 +25,7 @@ def transition_offroad(frogpilot_planner, time_validated, sm, params): def transition_onroad(): -def update_checks(now, params, params_memory, boot_run=False): +def update_checks(now, thread_manager, params, params_memory, boot_run=False): while not (is_url_pingable("https://github.com") or is_url_pingable("https://gitlab.com")): time.sleep(60) @@ -45,6 +46,8 @@ def frogpilot_thread(): params = Params(return_defaults=True) params_memory = Params(memory=True) + thread_manager = ThreadManager() + run_update_checks = False started_previously = False time_validated = False @@ -57,7 +60,7 @@ def frogpilot_thread(): started = sm["deviceState"].started if not started and started_previously: - transition_offroad(frogpilot_planner, time_validated, sm, params) + transition_offroad(frogpilot_planner, thread_manager, time_validated, sm, params) run_update_checks = True elif started and not started_previously: @@ -78,13 +81,13 @@ def frogpilot_thread(): started_previously = started if rate_keeper.frame % ASSET_CHECK_RATE == 0: - check_assets(params_memory) + check_assets(thread_manager, params_memory) run_update_checks |= now.second == 0 and (now.minute % 60 == 0) run_update_checks &= time_validated if run_update_checks: - thread_manager.run_with_lock(update_checks, (now, params, params_memory)) + thread_manager.run_with_lock(update_checks, (now, thread_manager, params, params_memory)) run_update_checks = False elif not time_validated: @@ -93,7 +96,7 @@ def frogpilot_thread(): continue thread_manager.run_with_lock(send_stats, (params)) - thread_manager.run_with_lock(update_checks, (now, params, params_memory, True)) + thread_manager.run_with_lock(update_checks, (now, thread_manager, params, params_memory, True)) rate_keeper.keep_time()