Testing Ground: framework and UI scaffolding (unused slots)

This commit is contained in:
firestar5683
2026-03-06 01:20:52 -06:00
parent cd7cb862fd
commit cca4a277f1
7 changed files with 971 additions and 0 deletions
+224
View File
@@ -0,0 +1,224 @@
import json
import threading
import time
from pathlib import Path
from openpilot.system.hardware import PC
TESTING_GROUNDS_SCHEMA_VERSION = 1
TESTING_GROUND_1 = "1"
TESTING_GROUND_2 = "2"
TESTING_GROUND_3 = "3"
TESTING_GROUND_4 = "4"
TESTING_GROUND_5 = "5"
TESTING_GROUND_6 = "6"
TESTING_GROUND_7 = "7"
TESTING_GROUND_IDS = (
TESTING_GROUND_1,
TESTING_GROUND_2,
TESTING_GROUND_3,
TESTING_GROUND_4,
TESTING_GROUND_5,
TESTING_GROUND_6,
TESTING_GROUND_7,
)
TESTING_GROUNDS_STATE_PATH = Path("/tmp/the_pond_testing_grounds_slots.json") if PC else Path("/data/testing_grounds/slots.json")
# Edit slot names/descriptions once here. The Pond and runtime checks share this table.
# Slots named "Unused" are hidden from the dropdown.
# Adding cLabel/dLabel/etc. automatically adds more mode buttons for that slot in Testing Ground.
TESTING_GROUNDS_SLOT_DEFINITIONS = (
{
"id": TESTING_GROUND_1,
"name": "Unused",
"description": "",
"aLabel": "A",
"bLabel": "B",
},
{
"id": TESTING_GROUND_2,
"name": "Unused",
"description": "",
"aLabel": "A",
"bLabel": "B",
},
{
"id": TESTING_GROUND_3,
"name": "Unused",
"description": "",
"aLabel": "A",
"bLabel": "B",
},
{
"id": TESTING_GROUND_4,
"name": "Unused",
"description": "",
"aLabel": "A",
"bLabel": "B",
},
{
"id": TESTING_GROUND_5,
"name": "Unused",
"description": "",
"aLabel": "A",
"bLabel": "B",
},
{
"id": TESTING_GROUND_6,
"name": "Unused",
"description": "",
"aLabel": "A",
"bLabel": "B",
},
{
"id": TESTING_GROUND_7,
"name": "Unused",
"description": "",
"aLabel": "A",
"bLabel": "B",
},
)
_DEFAULT_ACTIVE_SLOT = TESTING_GROUND_1
DEFAULT_TESTING_GROUND_VARIANT = "A"
TESTING_GROUND_TEST_VARIANT = "B"
_CACHE_LOCK = threading.Lock()
_CACHE_LAST_REFRESH = 0.0
_CACHE_LAST_MTIME_NS = -1
_CACHE_ACTIVE_SLOT = _DEFAULT_ACTIVE_SLOT
_CACHE_ACTIVE_VARIANT = DEFAULT_TESTING_GROUND_VARIANT
def _extract_variant_labels(slot_definition):
labels = {}
for key, value in dict(slot_definition).items():
if not isinstance(key, str) or not key.endswith("Label"):
continue
variant = key[:-5].upper()
if len(variant) != 1 or not variant.isalpha():
continue
label = str(value or "").strip()
if label:
labels[variant] = label
if DEFAULT_TESTING_GROUND_VARIANT not in labels:
labels[DEFAULT_TESTING_GROUND_VARIANT] = DEFAULT_TESTING_GROUND_VARIANT
return dict(sorted(labels.items()))
TESTING_GROUND_VARIANT_LABELS = {
str(slot.get("id") or "").strip(): _extract_variant_labels(slot)
for slot in TESTING_GROUNDS_SLOT_DEFINITIONS
}
TESTING_GROUND_VARIANTS = tuple(
sorted({
variant
for labels in TESTING_GROUND_VARIANT_LABELS.values()
for variant in labels.keys()
})
)
def _normalize_variant(value, slot_id=None):
normalized_slot_id = str(slot_id or "").strip()
allowed_variants = set(TESTING_GROUND_VARIANT_LABELS.get(normalized_slot_id, {}).keys())
if not allowed_variants:
allowed_variants = set(TESTING_GROUND_VARIANTS) or {DEFAULT_TESTING_GROUND_VARIANT}
variant = str(value or "").strip().upper()
return variant if variant in allowed_variants else DEFAULT_TESTING_GROUND_VARIANT
def get_testing_ground_selection(refresh_interval_s=0.5):
global _CACHE_LAST_REFRESH, _CACHE_LAST_MTIME_NS, _CACHE_ACTIVE_SLOT, _CACHE_ACTIVE_VARIANT
now = time.monotonic()
with _CACHE_LOCK:
if (now - _CACHE_LAST_REFRESH) < refresh_interval_s:
return _CACHE_ACTIVE_SLOT, _CACHE_ACTIVE_VARIANT
_CACHE_LAST_REFRESH = now
try:
stat_result = TESTING_GROUNDS_STATE_PATH.stat()
except FileNotFoundError:
_CACHE_LAST_MTIME_NS = -1
_CACHE_ACTIVE_SLOT = _DEFAULT_ACTIVE_SLOT
_CACHE_ACTIVE_VARIANT = DEFAULT_TESTING_GROUND_VARIANT
return _CACHE_ACTIVE_SLOT, _CACHE_ACTIVE_VARIANT
except OSError:
return _CACHE_ACTIVE_SLOT, _CACHE_ACTIVE_VARIANT
if stat_result.st_mtime_ns == _CACHE_LAST_MTIME_NS:
return _CACHE_ACTIVE_SLOT, _CACHE_ACTIVE_VARIANT
try:
payload = json.loads(TESTING_GROUNDS_STATE_PATH.read_text(encoding="utf-8"))
except Exception:
return _CACHE_ACTIVE_SLOT, _CACHE_ACTIVE_VARIANT
if not isinstance(payload, dict):
return _CACHE_ACTIVE_SLOT, _CACHE_ACTIVE_VARIANT
slot_id = str(payload.get("activeSlot") or _DEFAULT_ACTIVE_SLOT).strip()
if slot_id not in TESTING_GROUND_IDS:
slot_id = _DEFAULT_ACTIVE_SLOT
_CACHE_ACTIVE_SLOT = slot_id
_CACHE_ACTIVE_VARIANT = _normalize_variant(payload.get("activeVariant"), slot_id)
_CACHE_LAST_MTIME_NS = stat_result.st_mtime_ns
return _CACHE_ACTIVE_SLOT, _CACHE_ACTIVE_VARIANT
def is_testing_ground_active(slot_id, variant=TESTING_GROUND_TEST_VARIANT, refresh_interval_s=0.5):
active_slot, active_variant = get_testing_ground_selection(refresh_interval_s=refresh_interval_s)
normalized_slot_id = str(slot_id or "").strip()
target_variant = _normalize_variant(variant, normalized_slot_id)
return normalized_slot_id == active_slot and active_variant == target_variant
class _TestingGroundFlag:
__slots__ = ("slot_id", "variant")
def __init__(self, slot_id, variant=TESTING_GROUND_TEST_VARIANT):
self.slot_id = str(slot_id or "").strip()
self.variant = _normalize_variant(variant, self.slot_id)
def __bool__(self):
return is_testing_ground_active(self.slot_id, self.variant)
def __repr__(self):
state = "active" if bool(self) else "inactive"
return f"<TestingGroundFlag slot={self.slot_id} variant={self.variant} {state}>"
use_testing_ground_1 = _TestingGroundFlag(TESTING_GROUND_1)
use_testing_ground_2 = _TestingGroundFlag(TESTING_GROUND_2)
use_testing_ground_3 = _TestingGroundFlag(TESTING_GROUND_3)
use_testing_ground_4 = _TestingGroundFlag(TESTING_GROUND_4)
use_testing_ground_5 = _TestingGroundFlag(TESTING_GROUND_5)
use_testing_ground_6 = _TestingGroundFlag(TESTING_GROUND_6)
use_testing_ground_7 = _TestingGroundFlag(TESTING_GROUND_7)
class _TestingGroundNamespace:
__slots__ = ()
id_1 = TESTING_GROUND_1
id_2 = TESTING_GROUND_2
id_3 = TESTING_GROUND_3
id_4 = TESTING_GROUND_4
id_5 = TESTING_GROUND_5
id_6 = TESTING_GROUND_6
id_7 = TESTING_GROUND_7
use_1 = use_testing_ground_1
use_2 = use_testing_ground_2
use_3 = use_testing_ground_3
use_4 = use_testing_ground_4
use_5 = use_testing_ground_5
use_6 = use_testing_ground_6
use_7 = use_testing_ground_7
def use(self, slot_id, variant=TESTING_GROUND_TEST_VARIANT):
return is_testing_ground_active(slot_id, variant)
def selection(self):
return get_testing_ground_selection()
testing_ground = _TestingGroundNamespace()
@@ -16,6 +16,7 @@ import { SpeedLimits } from "/assets/components/tools/speed_limits.js"
import { ModelManager } from "/assets/components/tools/model_manager.js?v=20260303t"
import { LivePlots } from "/assets/components/tools/plots.js"
import { ThemeMaker } from "/assets/components/tools/theme_maker.js"
import { TestingGround } from "/assets/components/tools/testing_ground.js"
import { TmuxLog } from "/assets/components/tools/tmux.js"
import { ToggleControl } from "/assets/components/tools/toggles.js"
import { UpdateManager } from "/assets/components/tools/update_manager.js"
@@ -46,6 +47,7 @@ function Root() {
createRoute("model_manager", "/manage_models", ModelManager),
createRoute("plots", "/plots", LivePlots),
createRoute("thememaker", "/theme_maker", ThemeMaker),
createRoute("testing_ground", "/testing_ground", TestingGround),
createRoute("tmux", "/manage_tmux", TmuxLog),
createRoute("toggles", "/manage_toggles", ToggleControl),
createRoute("updates", "/manage_updates", UpdateManager),
@@ -21,6 +21,7 @@ const MenuItems = {
{ name: "Galaxy", link: "/galaxy", icon: "bi-globe2" },
{ name: "Model Manager", link: "/manage_models", icon: "bi-cpu" },
{ name: "Plots", link: "/plots", icon: "bi-graph-up-arrow" },
{ name: "Testing Ground", link: "/testing_ground", icon: "bi-bezier2" },
{ name: "Theme Maker", link: "/theme_maker", icon: "bi-palette-fill" },
{ name: "Tmux Log", link: "/manage_tmux", icon: "bi-terminal" },
{ name: "Toggles", link: "/manage_toggles", icon: "bi-toggle-on" },
@@ -0,0 +1,152 @@
.testingGroundPage {
max-width: var(--width-xxl);
}
.testingGroundCard {
background-color: var(--secondary-bg);
border-radius: var(--border-radius-md);
color: var(--text-color);
max-width: 90%;
padding: var(--border-radius-xl);
}
.testingGroundIntro {
margin-top: 0;
}
.testingGroundTopList {
margin-top: var(--margin-base);
}
.testingGroundTopList ol {
margin: var(--margin-xs) 0 0;
padding-left: 1.4rem;
}
.testingGroundTopList li {
font-size: var(--font-size-sm);
margin-bottom: 0.25rem;
}
.testingGroundStatusGrid {
display: grid;
gap: 0.5em 1em;
grid-template-columns: repeat(2, minmax(0, 1fr));
margin-top: var(--margin-base);
}
.testingGroundStatusGrid p {
font-size: var(--font-size-sm);
margin: 0;
}
.testingGroundSelectionRow {
align-items: center;
display: flex;
gap: var(--gap-sm);
margin-top: var(--margin-base);
}
.testingGroundSelect {
background-color: var(--main-bg);
border: 1px solid var(--track-color);
border-radius: var(--border-radius-sm);
color: var(--text-color);
flex: 1;
font-size: var(--font-size-sm);
min-width: 0;
padding: var(--padding-sm);
}
.testingGroundModeButton:hover {
transform: var(--hover-scale-sm);
transition: transform var(--transition-fast), background-color var(--transition-fast);
}
.testingGroundDetails {
margin-top: var(--margin-base);
}
.testingGroundDetails h3 {
margin-bottom: var(--margin-xs);
}
.testingGroundDetails p {
font-size: var(--font-size-sm);
margin: 0 0 var(--margin-xs);
}
.testingGroundMuted {
color: var(--text-muted);
}
.testingGroundActionGrid {
display: grid;
gap: var(--gap-sm);
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
margin-top: var(--margin-base);
}
.testingGroundModeButton {
align-items: center;
background: #2a2752;
border: none;
border-radius: var(--border-radius-sm);
color: var(--text-color);
display: flex;
flex-direction: column;
gap: var(--gap-xs);
min-height: 120px;
padding: var(--padding-base);
}
.testingGroundModeButtonB {
background: #27434b;
}
.testingGroundModeButtonC {
background: #413029;
}
.testingGroundModeButton.active {
box-shadow: 0 0 0 2px var(--main-fg) inset;
}
.testingGroundModeLetter {
font-size: clamp(2rem, 4vw, 2.75rem);
font-weight: var(--font-weight-bold);
line-height: 1;
}
.testingGroundModeLabel {
font-size: var(--font-size-sm);
text-align: center;
}
.testingGroundError {
color: var(--danger-fg);
margin-top: var(--margin-base);
}
.testingGroundActiveSummary {
margin-top: var(--margin-base);
}
@media only screen and (max-width: 768px) and (orientation: portrait) {
.testingGroundCard {
max-width: 100%;
}
.testingGroundStatusGrid {
grid-template-columns: 1fr;
}
.testingGroundSelectionRow {
align-items: stretch;
flex-direction: column;
}
.testingGroundActionGrid {
grid-template-columns: 1fr;
}
}
@@ -0,0 +1,288 @@
import { html, reactive } from "https://esm.sh/@arrow-js/core"
const state = reactive({
loading: true,
error: "",
busy: false,
data: null,
selectedSlot: "",
})
let initialized = false
function slotId(slot) {
return String(slot?.id || "").trim()
}
function isUnusedSlot(slot) {
const name = String(slot?.name || "").trim().toLowerCase()
return name === "unused" || name.startsWith("unused ")
}
function getSelectableSlots() {
const selectable = Array.isArray(state.data?.selectableSlots) ? state.data.selectableSlots : []
if (selectable.length) return selectable
const slots = Array.isArray(state.data?.slots) ? state.data.slots : []
return slots.filter((slot) => !isUnusedSlot(slot))
}
function getVisibleSlotLines() {
return getSelectableSlots().map((slot) => `${slot.id}. ${slot.name}`)
}
function getSelectedSlot() {
const slots = Array.isArray(state.data?.slots) ? state.data.slots : []
const selectedId = String(state.selectedSlot || "").trim()
if (!selectedId) return null
return slots.find((slot) => slotId(slot) === selectedId) || null
}
function getActiveSlot() {
const slots = Array.isArray(state.data?.slots) ? state.data.slots : []
const activeId = String(state.data?.activeSlot || "").trim()
if (!activeId) return null
return slots.find((slot) => slotId(slot) === activeId) || null
}
function getVariantLabels(slot) {
if (!slot || typeof slot !== "object") {
return { A: "A" }
}
const labels = {}
const rawVariantLabels = slot.variantLabels
if (rawVariantLabels && typeof rawVariantLabels === "object") {
Object.entries(rawVariantLabels).forEach(([rawMode, rawLabel]) => {
const mode = String(rawMode || "").trim().toUpperCase()
const label = String(rawLabel || "").trim()
if (mode.length === 1 && /^[A-Z]$/.test(mode) && label) {
labels[mode] = label
}
})
}
const aLabel = String(slot.aLabel || "").trim()
const bLabel = String(slot.bLabel || "").trim()
if (aLabel) labels.A = labels.A || aLabel
if (bLabel) labels.B = labels.B || bLabel
if (!labels.A) labels.A = "A"
return Object.keys(labels).sort().reduce((acc, key) => {
acc[key] = labels[key]
return acc
}, {})
}
function getVariantModes(slot) {
return Object.keys(getVariantLabels(slot))
}
function getDefaultMode(slot) {
const modes = getVariantModes(slot)
if (modes.includes("A")) return "A"
return modes[0] || "A"
}
function toModeLabel(slot, mode) {
const labels = getVariantLabels(slot)
return labels[String(mode || "").trim().toUpperCase()] || String(mode || "").trim().toUpperCase() || "A"
}
function isModeActive(mode) {
const normalizedMode = String(mode || "").trim().toUpperCase()
return String(state.data?.activeSlot || "").trim() === String(state.selectedSlot || "").trim() && String(state.data?.activeVariant || "").trim().toUpperCase() === normalizedMode
}
function getSelectedMode() {
const selectedSlot = getSelectedSlot()
if (!selectedSlot) return "A"
if (String(state.data?.activeSlot || "").trim() === String(state.selectedSlot || "").trim()) {
return String(state.data?.activeVariant || "").trim().toUpperCase() || getDefaultMode(selectedSlot)
}
return getDefaultMode(selectedSlot)
}
function modeButtonClass(mode) {
const normalizedMode = String(mode || "").trim().toUpperCase()
return [
"testingGroundModeButton",
`testingGroundModeButton${normalizedMode}`,
isModeActive(normalizedMode) ? "active" : "",
].filter(Boolean).join(" ")
}
async function fetchTestingGrounds() {
try {
const response = await fetch("/api/testing_grounds")
const payload = await response.json()
if (!response.ok) {
throw new Error(payload.error || response.statusText || "Failed to load testing grounds")
}
state.data = payload
state.error = ""
const selectable = Array.isArray(payload.selectableSlots) ? payload.selectableSlots : []
const hasCurrentSelection = selectable.some((slot) => slotId(slot) === String(state.selectedSlot || "").trim())
if (!hasCurrentSelection) {
const activeSlot = String(payload.activeSlot || "").trim()
const activeSelectable = selectable.some((slot) => slotId(slot) === activeSlot)
state.selectedSlot = activeSelectable ? activeSlot : slotId(selectable[0] || {})
}
} catch (error) {
const message = error?.message || "Failed to load testing grounds"
state.error = message
}
}
async function applySelection(slotValue, mode, showToast = true) {
const normalizedSlot = String(slotValue || "").trim()
const normalizedMode = String(mode || "").trim().toUpperCase()
if (!normalizedSlot || !normalizedMode) return false
state.busy = true
try {
const response = await fetch("/api/testing_grounds/select", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ slotId: normalizedSlot, variant: normalizedMode }),
})
const payload = await response.json()
if (!response.ok) {
throw new Error(payload.error || response.statusText || "Failed to update testing ground mode")
}
state.data = payload
state.error = ""
state.selectedSlot = normalizedSlot
if (showToast) {
showSnackbar(payload.message || `Testing Ground ${normalizedSlot} set to ${normalizedMode}.`)
}
return true
} catch (error) {
const message = error?.message || "Failed to update testing ground mode"
state.error = message
if (showToast) {
showSnackbar(message, "error")
}
return false
} finally {
state.busy = false
}
}
async function selectMode(mode) {
if (state.busy) return
const slot = getSelectedSlot()
const slotValue = slotId(slot)
if (!slotValue) {
showSnackbar("Select a testing ground first.", "error")
return
}
await applySelection(slotValue, mode, true)
}
async function selectSlot(slotValue) {
const normalizedSlot = String(slotValue || "").trim()
state.selectedSlot = normalizedSlot
if (!normalizedSlot || state.busy) return
const activeSlot = String(state.data?.activeSlot || "").trim()
if (normalizedSlot === activeSlot) return
const success = await applySelection(normalizedSlot, "A", false)
if (!success) {
state.selectedSlot = activeSlot
if (state.error) {
showSnackbar(state.error, "error")
}
}
}
function initialize() {
if (initialized) return
initialized = true
fetchTestingGrounds().finally(() => {
state.loading = false
})
}
export function TestingGround() {
initialize()
return html`
<div class="testingGroundPage">
<h2>Testing Ground</h2>
${() => state.loading ? html`<div class="testingGroundCard">Loading testing ground state...</div>` : ""}
${() => !state.loading ? html`
<div class="testingGroundCard">
<p class="testingGroundIntro">
A/B Tuning Sandbox. If you don't know what this is, you probably shouldn't be here ;)
</p>
<div class="testingGroundTopList">
<strong>Current Test Slots</strong>
<ol>
${getVisibleSlotLines().map((line) => html`<li>${line}</li>`)}
</ol>
</div>
<div class="testingGroundStatusGrid">
<p><strong>Selected Slot:</strong> ${getSelectedSlot()?.name || "Unknown"}</p>
<p><strong>Selected Mode:</strong> ${getSelectedMode()}</p>
<p><strong>Onroad:</strong> ${state.data?.isOnroad ? "Yes" : "No"}</p>
<p><strong>Selectable Slots:</strong> ${getSelectableSlots().length}</p>
</div>
<div class="testingGroundSelectionRow">
<select
class="testingGroundSelect"
?disabled="${state.busy || getSelectableSlots().length === 0}"
@change="${(event) => {
selectSlot(String(event.target.value || ""))
}}">
${() => getSelectableSlots().length
? getSelectableSlots().map((slot) => html`<option value="${slotId(slot)}" ${slotId(slot) === state.selectedSlot ? "selected" : ""}>${slot.id}. ${slot.name}</option>`)
: html`<option value="">No active test slots</option>`
}
</select>
</div>
${() => getSelectedSlot() ? html`
<div class="testingGroundDetails">
<h3>${getSelectedSlot().name}</h3>
${() => getSelectedSlot().description ? html`<p>${getSelectedSlot().description}</p>` : ""}
</div>
` : ""}
${() => getSelectedSlot() ? html`
<div class="testingGroundActionGrid">
${() => getVariantModes(getSelectedSlot()).map((mode) => html`
<button
class="${modeButtonClass(mode)}"
?disabled="${state.busy}"
@click="${() => selectMode(mode)}">
<span class="testingGroundModeLetter">${mode}</span>
<span class="testingGroundModeLabel">${toModeLabel(getSelectedSlot(), mode)}</span>
</button>
`)}
</div>
` : ""}
${() => state.error ? html`<p class="testingGroundError"><strong>Error:</strong> ${state.error}</p>` : ""}
${() => getActiveSlot() ? html`
<p class="testingGroundActiveSummary">
Currently active: <strong>${getActiveSlot().id}. ${getActiveSlot().name}</strong> in mode <strong>${state.data?.activeVariant || "A"}</strong>.
</p>
` : ""}
</div>
` : ""}
</div>
`
}
@@ -31,6 +31,7 @@
<link rel="stylesheet" href="/assets/components/tools/plots.css">
<link rel="stylesheet" href="/assets/components/tools/speed_limits.css">
<link rel="stylesheet" href="/assets/components/tools/theme_maker.css">
<link rel="stylesheet" href="/assets/components/tools/testing_ground.css">
<link rel="stylesheet" href="/assets/components/tools/tmux.css">
<link rel="stylesheet" href="/assets/components/tools/toggles.css">
<link rel="stylesheet" href="/assets/components/tools/update_manager.css">
+303
View File
@@ -37,6 +37,14 @@ from openpilot.frogpilot.assets.theme_manager import HOLIDAY_THEME_PATH, THEME_C
from openpilot.frogpilot.common.frogpilot_utilities import delete_file, get_lock_status, run_cmd, extract_tar
from openpilot.frogpilot.common.frogpilot_variables import ACTIVE_THEME_PATH, ERROR_LOGS_PATH, EXCLUDED_KEYS, RESOURCES_REPO, SCREEN_RECORDINGS_PATH, THEME_SAVE_PATH,\
frogpilot_default_params, params, params_memory, update_frogpilot_toggles
from openpilot.frogpilot.common.testing_grounds import (
DEFAULT_TESTING_GROUND_VARIANT as SHARED_DEFAULT_TESTING_GROUND_VARIANT,
TESTING_GROUND_VARIANT_LABELS as SHARED_TESTING_GROUND_VARIANT_LABELS,
TESTING_GROUND_VARIANTS as SHARED_TESTING_GROUND_VARIANTS,
TESTING_GROUNDS_SCHEMA_VERSION as SHARED_TESTING_GROUNDS_SCHEMA_VERSION,
TESTING_GROUNDS_SLOT_DEFINITIONS as SHARED_TESTING_GROUNDS_SLOT_DEFINITIONS,
TESTING_GROUNDS_STATE_PATH as SHARED_TESTING_GROUNDS_STATE_PATH,
)
from openpilot.frogpilot.system.the_pond import utilities
DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL")
@@ -145,6 +153,18 @@ _FAST_UPDATE_FETCH_TIMEOUT_S = 60
_FAST_BRANCH_SWITCH_FETCH_TIMEOUT_S = 60
_GIT_PROGRESS_PERCENT_RE = re.compile(r'([A-Za-z][A-Za-z /_-]+):\s*([0-9]{1,3})%')
_GIT_SUBMODULE_SECTION_RE = re.compile(r'^\s*\[submodule\s+"[^"]+"\]\s*$', re.MULTILINE)
_TESTING_GROUNDS_SCHEMA_VERSION = SHARED_TESTING_GROUNDS_SCHEMA_VERSION
_TESTING_GROUNDS_SLOT_COUNT = len(SHARED_TESTING_GROUNDS_SLOT_DEFINITIONS)
_TESTING_GROUNDS_DEFAULT_VARIANT = SHARED_DEFAULT_TESTING_GROUND_VARIANT
_TESTING_GROUNDS_VARIANTS = set(SHARED_TESTING_GROUND_VARIANTS) or {_TESTING_GROUNDS_DEFAULT_VARIANT}
_TESTING_GROUNDS_LOCK = threading.Lock()
_TESTING_GROUNDS_STATE_PATH = SHARED_TESTING_GROUNDS_STATE_PATH
# Slot labels live in frogpilot/common/testing_grounds.py.
_TESTING_GROUNDS_SLOT_DEFINITIONS = [dict(slot) for slot in SHARED_TESTING_GROUNDS_SLOT_DEFINITIONS]
_TESTING_GROUNDS_VARIANT_LABELS_BY_SLOT = {
str(slot_id or "").strip(): dict(labels or {})
for slot_id, labels in SHARED_TESTING_GROUND_VARIANT_LABELS.items()
}
_fast_update_state = {
"running": False,
"stage": "idle",
@@ -1043,6 +1063,256 @@ def _get_param_type_info():
_cached_param_types = types
return _cached_allowed_keys, _cached_param_types
def _extract_testing_ground_variant_labels(slot_data, include_default=True):
labels = {}
if not isinstance(slot_data, dict):
slot_data = {}
raw_variant_labels = slot_data.get("variantLabels")
if isinstance(raw_variant_labels, dict):
for raw_variant, raw_label in raw_variant_labels.items():
variant = str(raw_variant or "").strip().upper()
label = str(raw_label or "").strip()
if len(variant) == 1 and variant.isalpha() and label:
labels[variant] = label
for key, value in slot_data.items():
if not isinstance(key, str) or not key.endswith("Label"):
continue
variant = key[:-5].strip().upper()
if len(variant) != 1 or not variant.isalpha():
continue
label = str(value or "").strip()
if label:
labels[variant] = label
if include_default and _TESTING_GROUNDS_DEFAULT_VARIANT not in labels:
labels[_TESTING_GROUNDS_DEFAULT_VARIANT] = _TESTING_GROUNDS_DEFAULT_VARIANT
return dict(sorted(labels.items()))
def _get_testing_ground_variant_labels(slot_id, slot=None):
normalized_slot_id = str(slot_id or "").strip()
labels = {}
shared_labels = _TESTING_GROUNDS_VARIANT_LABELS_BY_SLOT.get(normalized_slot_id, {})
if shared_labels:
labels.update({
str(variant or "").strip().upper(): str(label or "").strip()
for variant, label in shared_labels.items()
if len(str(variant or "").strip().upper()) == 1 and str(variant or "").strip().upper().isalpha() and str(label or "").strip()
})
labels.update(_extract_testing_ground_variant_labels(slot if isinstance(slot, dict) else {}, include_default=False))
if _TESTING_GROUNDS_DEFAULT_VARIANT not in labels:
labels[_TESTING_GROUNDS_DEFAULT_VARIANT] = _TESTING_GROUNDS_DEFAULT_VARIANT
return dict(sorted(labels.items()))
def _normalize_testing_ground_variant(slot_id, variant, slot=None):
allowed_variants = set(_get_testing_ground_variant_labels(slot_id, slot).keys()) or set(_TESTING_GROUNDS_VARIANTS)
normalized_variant = str(variant or "").strip().upper()
return normalized_variant if normalized_variant in allowed_variants else _TESTING_GROUNDS_DEFAULT_VARIANT
def _build_testing_ground_fallback_slots():
definitions_by_id = {}
for definition in _TESTING_GROUNDS_SLOT_DEFINITIONS:
if not isinstance(definition, dict):
continue
slot_id = str(definition.get("id") or "").strip()
if not slot_id:
continue
variant_labels = _get_testing_ground_variant_labels(slot_id, definition)
definitions_by_id[slot_id] = {
"id": slot_id,
"name": str(definition.get("name") or "Unused").strip() or "Unused",
"description": str(definition.get("description") or "").strip(),
"variantLabels": variant_labels,
"aLabel": variant_labels.get("A", "A"),
"bLabel": variant_labels.get("B", "B"),
}
slots = []
for slot_number in range(1, _TESTING_GROUNDS_SLOT_COUNT + 1):
slot_id = str(slot_number)
default_variant_labels = {
_TESTING_GROUNDS_DEFAULT_VARIANT: _TESTING_GROUNDS_DEFAULT_VARIANT,
"B": "B",
}
fallback_slot = definitions_by_id.get(slot_id, {
"id": slot_id,
"name": "Unused",
"description": "",
"variantLabels": default_variant_labels,
"aLabel": "A",
"bLabel": "B",
})
slot = dict(fallback_slot)
slot_variant_labels = _get_testing_ground_variant_labels(slot_id, slot)
slot["variantLabels"] = slot_variant_labels
slot["aLabel"] = slot_variant_labels.get("A", slot.get("aLabel", "A"))
slot["bLabel"] = slot_variant_labels.get("B", slot.get("bLabel", "B"))
slots.append(slot)
return slots
def _default_testing_grounds_state():
return {
"schemaVersion": _TESTING_GROUNDS_SCHEMA_VERSION,
"activeSlot": "1",
"activeVariant": _TESTING_GROUNDS_DEFAULT_VARIANT,
"slots": _build_testing_ground_fallback_slots(),
}
def _normalize_testing_ground_slot(raw_slot, fallback_slot):
slot = dict(fallback_slot)
if not isinstance(raw_slot, dict):
return slot
name = str(raw_slot.get("name") or "").strip()
slot["name"] = name or slot["name"]
slot["description"] = str(raw_slot.get("description") or slot["description"]).strip()
variant_labels = _get_testing_ground_variant_labels(slot.get("id"), raw_slot)
if not variant_labels:
variant_labels = _get_testing_ground_variant_labels(slot.get("id"), slot)
slot["variantLabels"] = variant_labels
slot["aLabel"] = variant_labels.get("A", slot.get("aLabel", "A"))
slot["bLabel"] = variant_labels.get("B", slot.get("bLabel", "B"))
return slot
def _load_testing_grounds_state_unlocked():
state = _default_testing_grounds_state()
fallback_slots = state["slots"]
fallback_slot_ids = {slot["id"] for slot in fallback_slots}
needs_write = False
raw_state = {}
try:
raw_state = json.loads(_TESTING_GROUNDS_STATE_PATH.read_text(encoding="utf-8"))
if not isinstance(raw_state, dict):
raw_state = {}
needs_write = True
except FileNotFoundError:
needs_write = True
except Exception:
needs_write = True
if raw_state.get("schemaVersion") != _TESTING_GROUNDS_SCHEMA_VERSION:
needs_write = True
raw_slots = raw_state.get("slots")
if isinstance(raw_slots, list):
raw_by_id = {}
for index, raw_slot in enumerate(raw_slots, start=1):
if not isinstance(raw_slot, dict):
needs_write = True
continue
slot_id = str(raw_slot.get("id") or "").strip() or str(index)
if slot_id not in fallback_slot_ids:
needs_write = True
continue
raw_by_id[slot_id] = raw_slot
normalized_slots = []
for fallback_slot in fallback_slots:
slot_id = fallback_slot["id"]
normalized_slots.append(_normalize_testing_ground_slot(raw_by_id.get(slot_id), fallback_slot))
if slot_id not in raw_by_id:
needs_write = True
state["slots"] = normalized_slots
else:
needs_write = True
active_slot = str(raw_state.get("activeSlot") or "").strip()
if active_slot not in fallback_slot_ids:
active_slot = state["activeSlot"]
needs_write = True
state["activeSlot"] = active_slot
active_slot_data = _find_testing_ground_slot(state, active_slot)
raw_active_variant = str(raw_state.get("activeVariant") or "").strip().upper()
active_variant = _normalize_testing_ground_variant(active_slot, raw_active_variant, active_slot_data)
if raw_active_variant != active_variant:
needs_write = True
state["activeVariant"] = active_variant
return state, needs_write
def _write_testing_grounds_state_unlocked(state):
_TESTING_GROUNDS_STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
tmp_path = _TESTING_GROUNDS_STATE_PATH.with_name(f".tmp_{_TESTING_GROUNDS_STATE_PATH.name}")
tmp_path.write_text(json.dumps(state, indent=2), encoding="utf-8")
os.replace(tmp_path, _TESTING_GROUNDS_STATE_PATH)
def _get_testing_grounds_state():
with _TESTING_GROUNDS_LOCK:
state, needs_write = _load_testing_grounds_state_unlocked()
if needs_write:
try:
_write_testing_grounds_state_unlocked(state)
except Exception:
pass
return state
def _is_unused_testing_ground_slot(slot):
name = str(slot.get("name") or "").strip().lower()
return name == "unused" or name.startswith("unused ")
def _find_testing_ground_slot(state, slot_id):
for slot in state.get("slots", []):
if str(slot.get("id") or "").strip() == slot_id:
return slot
return {}
def _serialize_testing_grounds_state(state):
slots = state.get("slots", [])
active_slot_id = str(state.get("activeSlot") or "").strip()
active_slot = _find_testing_ground_slot(state, active_slot_id)
active_variant = _normalize_testing_ground_variant(active_slot_id, state.get("activeVariant"), active_slot)
active_variant_labels = _get_testing_ground_variant_labels(active_slot_id, active_slot)
return {
"schemaVersion": state.get("schemaVersion", _TESTING_GROUNDS_SCHEMA_VERSION),
"activeSlot": active_slot_id,
"activeVariant": active_variant,
"activeVariantLabel": active_variant_labels.get(active_variant, active_variant),
"activeSlotName": active_slot.get("name", active_slot_id),
"slots": slots,
"slotSummaryLines": [f"{slot.get('id', '?')}. {slot.get('name', 'Unused')}" for slot in slots],
"selectableSlots": [slot for slot in slots if not _is_unused_testing_ground_slot(slot)],
}
def _set_testing_ground_selection(slot_id, variant):
normalized_slot_id = str(slot_id or "").strip()
requested_variant = str(variant or "").strip().upper()
with _TESTING_GROUNDS_LOCK:
state, _ = _load_testing_grounds_state_unlocked()
slot_ids = {slot["id"] for slot in state["slots"]}
if normalized_slot_id not in slot_ids:
raise ValueError(f"Unknown testing ground slot '{normalized_slot_id}'.")
slot = _find_testing_ground_slot(state, normalized_slot_id)
allowed_variant_labels = _get_testing_ground_variant_labels(normalized_slot_id, slot)
if requested_variant not in allowed_variant_labels:
allowed_variants = ", ".join(sorted(allowed_variant_labels.keys()))
raise ValueError(f"Variant must be one of: {allowed_variants}.")
normalized_variant = _normalize_testing_ground_variant(normalized_slot_id, requested_variant, slot)
state["activeSlot"] = normalized_slot_id
state["activeVariant"] = normalized_variant
_write_testing_grounds_state_unlocked(state)
return state
def setup(app):
model_status_debug = {
"last_signature": None,
@@ -2135,6 +2405,39 @@ def setup(app):
"stale": age_seconds > _PLOTS_SAMPLE_STALE_AFTER_S,
}), 200
@app.route("/api/testing_grounds", methods=["GET"])
def get_testing_grounds():
state = _get_testing_grounds_state()
return jsonify({
**_serialize_testing_grounds_state(state),
"isOnroad": params.get_bool("IsOnroad"),
}), 200
@app.route("/api/testing_grounds/select", methods=["POST"])
def select_testing_ground():
request_data = request.get_json() or {}
slot_id = str(request_data.get("slotId") or "").strip()
variant = str(request_data.get("variant") or "").strip().upper()
if not slot_id:
return jsonify({"error": "Missing 'slotId' in request body."}), 400
try:
state = _set_testing_ground_selection(slot_id, variant)
except ValueError as exception:
return jsonify({"error": str(exception)}), 400
except Exception as exception:
return jsonify({"error": str(exception)}), 500
slot = _find_testing_ground_slot(state, slot_id)
slot_name = slot.get("name", f"Testing Ground {slot_id}")
selected_variant = str(state.get("activeVariant") or _TESTING_GROUNDS_DEFAULT_VARIANT)
return jsonify({
"message": f"{slot_name} set to variant {selected_variant}.",
**_serialize_testing_grounds_state(state),
"isOnroad": params.get_bool("IsOnroad"),
}), 200
@app.route("/api/update/fast/status", methods=["GET"])
def get_fast_update_status():
state_data = _get_fast_update_state()