memory leaks

This commit is contained in:
firestar5683
2026-04-01 12:13:37 -05:00
parent f6377538e9
commit f91eed0586
46 changed files with 228 additions and 36 deletions
+50 -7
View File
@@ -109,7 +109,10 @@ Params::~Params() {
if (future.valid()) {
future.wait();
}
std::scoped_lock lk(pending_writes_lock);
assert(queue.empty());
assert(pending_writes.empty());
assert(!writer_running);
}
std::vector<std::string> Params::allKeys() const {
@@ -245,19 +248,59 @@ void Params::clearAll(ParamKeyFlag key_flag) {
}
void Params::putNonBlocking(const std::string &key, const std::string &val) {
queue.push(std::make_pair(key, val));
// start thread on demand
if (!future.valid() || future.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) {
bool should_enqueue = false;
bool should_start_thread = false;
{
std::scoped_lock lk(pending_writes_lock);
auto it = pending_writes.find(key);
if (it == pending_writes.end()) {
pending_writes.emplace(key, val);
should_enqueue = true;
} else {
it->second = val;
}
if (!writer_running) {
writer_running = true;
should_start_thread = true;
}
}
if (should_enqueue) {
queue.push(key);
}
if (should_start_thread) {
future = std::async(std::launch::async, &Params::asyncWriteThread, this);
}
}
void Params::asyncWriteThread() {
// TODO: write the latest one if a key has multiple values in the queue.
std::pair<std::string, std::string> p;
while (queue.try_pop(p, 0)) {
std::string key;
while (true) {
if (!queue.try_pop(key, 0)) {
std::scoped_lock lk(pending_writes_lock);
if (queue.empty() && pending_writes.empty()) {
writer_running = false;
return;
}
continue;
}
std::string val;
{
std::scoped_lock lk(pending_writes_lock);
auto it = pending_writes.find(key);
if (it == pending_writes.end()) {
continue;
}
val = std::move(it->second);
pending_writes.erase(it);
}
// Params::put is Thread-Safe
put(p.first, p.second);
put(key, val);
}
}
+6 -1
View File
@@ -2,9 +2,11 @@
#include <future>
#include <map>
#include <mutex>
#include <optional>
#include <string>
#include <tuple>
#include <unordered_map>
#include <utility>
#include <vector>
@@ -120,7 +122,10 @@ private:
// for nonblocking write
std::future<void> future;
SafeQueue<std::pair<std::string, std::string>> queue;
SafeQueue<std::string> queue;
std::mutex pending_writes_lock;
std::unordered_map<std::string, std::string> pending_writes;
bool writer_running = false;
// StarPilot variables
std::string cache_path;
Binary file not shown.
+10
View File
@@ -101,6 +101,16 @@ class TestParams:
assert q.get("CarParams") is None
assert q.get("CarParams", True) == b"1"
def test_put_nonblocking_latest_value_wins(self, tmp_path):
q = Params(str(tmp_path))
for i in range(100):
q.put_nonblocking("CarParams", f"value-{i}".encode())
del q
r = Params(str(tmp_path))
assert r.get("CarParams") == b"value-99"
def test_params_all_keys(self):
keys = Params().all_keys()
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+1 -1
View File
@@ -1,2 +1,2 @@
extern const uint8_t gitversion[19];
const uint8_t gitversion[19] = "DEV-1e79d379-DEBUG";
const uint8_t gitversion[19] = "DEV-f6377538-DEBUG";
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+1 -1
View File
@@ -1 +1 @@
DEV-1e79d379-DEBUG
DEV-f6377538-DEBUG
Binary file not shown.
BIN
View File
Binary file not shown.
@@ -25,12 +25,55 @@ class CurveSpeedController:
self.training_timer = 0
curvature_data = self.starpilot_planner.params.get("CurvatureData")
self.curvature_data = curvature_data if isinstance(curvature_data, dict) else {}
self.curvature_data = self._normalize_curvature_data(curvature_data)
self.required_curvatures = [str(round(road_curvature, ROUNDING_PRECISION)) for road_curvature in np.arange(MIN_CURVATURE, MAX_CURVATURE + STEP, STEP)]
self.update_lateral_acceleration()
@staticmethod
def _bucket_curvature(road_curvature):
clipped_curvature = float(np.clip(road_curvature, MIN_CURVATURE, MAX_CURVATURE))
bucket_index = round((clipped_curvature - MIN_CURVATURE) / STEP)
bucketed_curvature = MIN_CURVATURE + (bucket_index * STEP)
return str(round(bucketed_curvature, ROUNDING_PRECISION))
@classmethod
def _normalize_curvature_data(cls, curvature_data):
if not isinstance(curvature_data, dict):
return {}
normalized = {}
for key, value in curvature_data.items():
if not isinstance(value, dict):
continue
try:
raw_curvature = abs(float(key))
average = float(value["average"])
count = int(value["count"])
except (KeyError, TypeError, ValueError):
continue
if count <= 0:
continue
bucket = cls._bucket_curvature(raw_curvature)
if bucket in normalized:
existing = normalized[bucket]
total_count = existing["count"] + count
normalized[bucket] = {
"average": ((existing["average"] * existing["count"]) + (average * count)) / total_count,
"count": total_count,
}
else:
normalized[bucket] = {
"average": average,
"count": count,
}
return normalized
def log_data(self, v_ego, sm):
self.enable_training = v_ego > CRUISING_SPEED
self.enable_training &= not self.starpilot_planner.tracking_lead
@@ -41,9 +84,9 @@ class CurveSpeedController:
if self.training_timer >= PLANNER_TIME and self.starpilot_planner.driving_in_curve and not (sm["carState"].leftBlinker or sm["carState"].rightBlinker):
lateral_acceleration = abs(self.starpilot_planner.lateral_acceleration)
road_curvature = abs(round(self.starpilot_planner.road_curvature, ROUNDING_PRECISION))
road_curvature = self._bucket_curvature(abs(self.starpilot_planner.road_curvature))
key = str(road_curvature)
key = road_curvature
if key in self.curvature_data:
data = self.curvature_data[key]
@@ -70,11 +70,16 @@ class SpeedLimitController:
self.previous_target = self.starpilot_planner.params.get_float("PreviousSpeedLimit")
self.executor = ThreadPoolExecutor(max_workers=1)
self.mapbox_future = None
self.session = requests.Session()
self.session.headers.update({"Accept-Language": "en"})
self.session.headers.update({"User-Agent": "starpilot-mapbox-speed-limit-retriever/1.0 (https://github.com/FrogAi/StarPilot)"})
def shutdown(self):
self.executor.shutdown(wait=False, cancel_futures=True)
self.session.close()
@property
def experimental_mode(self):
return self.target == 0 and bool(getattr(self.starpilot_toggles, "slc_fallback_experimental_mode", False))
@@ -113,11 +118,9 @@ class SpeedLimitController:
return
def make_request():
successful = False
response_data = None
try:
self.calling_mapbox = True
successful = False
if not is_url_pingable(self.mapbox_host):
self.segment_distance = 1000
return None
@@ -160,8 +163,7 @@ class SpeedLimitController:
response.raise_for_status()
successful = True
return response.json()
response_data = response.json()
except Exception as exception:
print(f"Unexpected error in Mapbox request: {exception}")
finally:
@@ -170,8 +172,7 @@ class SpeedLimitController:
if not successful:
self.mapbox_limit = 0
self.segment_distance = v_ego
return None
return response_data
def complete_request(future):
try:
@@ -212,8 +213,18 @@ class SpeedLimitController:
print(f"Mapbox Callback Error: {exception}")
self.mapbox_limit = 0
self.segment_distance = v_ego
finally:
self.mapbox_future = None
future = self.executor.submit(make_request)
self.calling_mapbox = True
try:
future = self.executor.submit(make_request)
except RuntimeError:
self.calling_mapbox = False
self.segment_distance = v_ego
return
self.mapbox_future = future
future.add_done_callback(complete_request)
def handle_limit_change(self, desired_source, desired_target, sm):
+1 -1
View File
@@ -55,7 +55,7 @@ class StarPilotPlanner:
self.tracking_lead_filter = FirstOrderFilter(0, 0.5, DT_MDL)
def shutdown(self):
self.starpilot_vcruise.slc.executor.shutdown(wait=False, cancel_futures=True)
self.starpilot_vcruise.slc.shutdown()
self.starpilot_weather.executor.shutdown(wait=False, cancel_futures=True)
def update(self, now, time_validated, sm, starpilot_toggles):
+93 -13
View File
@@ -19,9 +19,12 @@ NetworkType = log.DeviceState.NetworkType
BOUNDING_BOX_RADIUS_DEGREE = 0.1
MAX_ENTRIES = 1_000_000
MAX_PENDING_ADDITIONS = 2_000
MAX_OVERPASS_DATA_BYTES = 1_073_741_824
MAX_OVERPASS_REQUESTS = 10_000
METERS_PER_DEG_LAT = 111_320
PENDING_FLUSH_BATCH_SIZE = 1_000
PENDING_FLUSH_INTERVAL_S = 60.0
VETTING_INTERVAL_DAYS = 7
ACTIVE_SEGMENT_BUFFER_METERS = 25
MAX_BEARING_DELTA = 45
@@ -40,8 +43,9 @@ class MapSpeedLogger:
self.cached_segments = {}
self.dataset_additions = deque(maxlen=MAX_ENTRIES)
self.dataset_additions = deque(maxlen=MAX_PENDING_ADDITIONS)
self.filtered_dataset = []
self.last_dataset_flush = time.monotonic()
self.overpass_requests = self.params.get("OverpassRequests")
self.overpass_requests.setdefault("day", datetime.now(timezone.utc).day)
@@ -80,6 +84,8 @@ class MapSpeedLogger:
entry_copy = item.copy()
entry_copy.pop("last_vetted", None)
if "last_vetted" in item:
entry_copy = {key: entry_copy[key] for key in required if key != "last_vetted"}
key = json.dumps(entry_copy, sort_keys=True)
cleaned_data[key] = item
@@ -118,13 +124,68 @@ class MapSpeedLogger:
if filtered_dataset is None:
filtered_dataset = self.params.get("SpeedLimitsFiltered") or []
live_match_entries = [entry for entry in filtered_dataset if self.has_live_match_fields(entry)]
if live_match_entries:
self.filtered_dataset = live_match_entries
return
if filtered_dataset and not any(self.has_live_match_fields(entry) for entry in filtered_dataset):
dataset = self.cleanup_dataset(self.params.get("SpeedLimits"))
filtered_dataset = self.enrich_filtered_dataset(dataset, filtered_dataset)
fallback_dataset = self.params.get("SpeedLimits") or []
self.filtered_dataset = list(self.cleanup_dataset(fallback_dataset))
self.filtered_dataset = [entry for entry in filtered_dataset if self.has_live_match_fields(entry)]
@staticmethod
def get_entry_identity(entry):
start_coordinates = entry.get("start_coordinates")
if not isinstance(start_coordinates, dict):
return None
try:
return (
bool(entry.get("incorrect_limit")),
round(float(start_coordinates["latitude"]), 6),
round(float(start_coordinates["longitude"]), 6),
str(entry.get("source", "")),
round(float(entry.get("speed_limit", 0)), 3),
)
except (KeyError, TypeError, ValueError):
return None
def enrich_filtered_dataset(self, dataset, filtered_dataset):
dataset_by_identity = {}
for entry in dataset:
if not self.has_live_match_fields(entry):
continue
identity = self.get_entry_identity(entry)
if identity is None or identity in dataset_by_identity:
continue
dataset_by_identity[identity] = entry
if not dataset_by_identity:
return filtered_dataset
filtered_entries = list(filtered_dataset)
updated = False
for index, entry in enumerate(filtered_entries):
if self.has_live_match_fields(entry):
continue
identity = self.get_entry_identity(entry)
matching_entry = dataset_by_identity.get(identity)
if matching_entry is None:
continue
filtered_entries[index] = {
**entry,
"bearing": matching_entry["bearing"],
"end_coordinates": matching_entry["end_coordinates"],
"road_name": matching_entry["road_name"],
"road_width": matching_entry["road_width"],
}
updated = True
if not updated:
return filtered_dataset
return self.cleanup_dataset(filtered_entries)
def get_speed_limit_source(self):
vision_speed_limit = self.params_memory.get_float("VisionSpeedLimit") if self.params.get_bool("VisionSpeedLimitDetection") else 0
@@ -171,6 +232,28 @@ class MapSpeedLogger:
self.params.put("SpeedLimits", list(dataset))
self.params.put("SpeedLimitsFiltered", list(filtered_dataset))
def flush_pending_dataset_additions(self, force=False):
if not self.dataset_additions:
if force:
self.last_dataset_flush = time.monotonic()
return
now = time.monotonic()
should_flush = force
should_flush |= len(self.dataset_additions) >= PENDING_FLUSH_BATCH_SIZE
should_flush |= (now - self.last_dataset_flush) >= PENDING_FLUSH_INTERVAL_S
if not should_flush:
return
existing_dataset = self.params.get("SpeedLimits") or []
existing_dataset.extend(self.dataset_additions)
new_dataset = self.cleanup_dataset(existing_dataset)
self.params.put("SpeedLimits", list(new_dataset))
self.dataset_additions.clear()
self.last_dataset_flush = now
def find_current_speed_limit_entry(self, latitude, longitude, road_name, current_bearing):
best_match = None
best_score = None
@@ -371,6 +454,7 @@ class MapSpeedLogger:
"speed_limit": speed_limit,
"start_coordinates": self.previous_coordinates,
})
self.flush_pending_dataset_additions()
self.previous_coordinates = {"latitude": current_latitude, "longitude": current_longitude}
@@ -469,6 +553,7 @@ class MapSpeedLogger:
dataset = self.cleanup_dataset(self.params.get("SpeedLimits"))
filtered_dataset = self.cleanup_dataset(self.params.get("SpeedLimitsFiltered"))
filtered_dataset = self.enrich_filtered_dataset(dataset, filtered_dataset)
filtered_dataset = self.vet_entries(filtered_dataset)
self.update_params(dataset, filtered_dataset)
@@ -547,16 +632,11 @@ def main():
previously_started = True
elif previously_started:
existing_dataset = logger.params.get("SpeedLimits") or []
existing_dataset.extend(logger.dataset_additions)
new_dataset = logger.cleanup_dataset(existing_dataset)
logger.params.put("SpeedLimits", list(new_dataset))
logger.flush_pending_dataset_additions(force=True)
if logger.sm["deviceState"].networkType in (NetworkType.ethernet, NetworkType.wifi):
logger.params_memory.put_bool("UpdateSpeedLimits", True)
logger.dataset_additions.clear()
logger.clear_live_speed_limits()
previously_started = False
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.