mirror of
https://gitlvb.teallvbs.xyz/IQ.Lvbs/IQ.Pilot.git
synced 2026-06-12 00:05:03 +08:00
63 lines
1.9 KiB
Python
63 lines
1.9 KiB
Python
"""
|
|
Copyright © IQ.Lvbs, apart of Project Teal Lvbs, All Rights Reserved, licensed under https://konn3kt.com/tos
|
|
"""
|
|
from abc import abstractmethod, ABC
|
|
|
|
import cereal.messaging as messaging
|
|
from openpilot.common.params import Params
|
|
from openpilot.common.constants import CV
|
|
from openpilot.selfdrive.car.cruise import V_CRUISE_UNSET
|
|
from openpilot.iqpilot.navd.helpers import coordinate_from_param
|
|
|
|
MAX_SPEED_LIMIT = V_CRUISE_UNSET * CV.KPH_TO_MS
|
|
|
|
|
|
class BaseMapData(ABC):
|
|
def __init__(self):
|
|
self.params = Params()
|
|
|
|
self.sm = messaging.SubMaster(['liveLocationKalman'])
|
|
self.pm = messaging.PubMaster(['iqLiveMapData'])
|
|
|
|
self.localizer_valid = False
|
|
self.last_bearing = None
|
|
self.last_position = coordinate_from_param("LastGPSPositionLLK", self.params)
|
|
|
|
@abstractmethod
|
|
def update_location(self) -> None:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_current_speed_limit(self) -> float:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_next_speed_limit_and_distance(self) -> tuple[float, float]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_current_road_name(self) -> str:
|
|
pass
|
|
|
|
def publish(self) -> None:
|
|
speed_limit = self.get_current_speed_limit()
|
|
next_speed_limit, next_speed_limit_distance = self.get_next_speed_limit_and_distance()
|
|
|
|
mapd_send = messaging.new_message('iqLiveMapData')
|
|
mapd_send.valid = self.sm['liveLocationKalman'].gpsOK
|
|
live_map_data = mapd_send.iqLiveMapData
|
|
|
|
live_map_data.speedLimitValid = bool(MAX_SPEED_LIMIT > speed_limit > 0)
|
|
live_map_data.speedLimit = speed_limit
|
|
live_map_data.speedLimitAheadValid = bool(MAX_SPEED_LIMIT > next_speed_limit > 0)
|
|
live_map_data.speedLimitAhead = next_speed_limit
|
|
live_map_data.speedLimitAheadDistance = next_speed_limit_distance
|
|
live_map_data.roadName = self.get_current_road_name()
|
|
|
|
self.pm.send('iqLiveMapData', mapd_send)
|
|
|
|
def tick(self) -> None:
|
|
self.sm.update(0)
|
|
self.update_location()
|
|
self.publish()
|