mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-06-13 19:15:02 +08:00
110 lines
3.3 KiB
Python
110 lines
3.3 KiB
Python
#!/usr/bin/env python3
|
|
import datetime
|
|
import os
|
|
import subprocess
|
|
import time
|
|
from typing import NoReturn
|
|
|
|
import cereal.messaging as messaging
|
|
from openpilot.common.time_helpers import min_date, system_time_valid
|
|
from openpilot.common.swaglog import cloudlog
|
|
from openpilot.common.params import Params
|
|
from openpilot.common.gps import get_gps_location_service
|
|
from openpilot.system.hardware import AGNOS
|
|
|
|
try:
|
|
from timezonefinder import TimezoneFinder
|
|
except Exception:
|
|
TimezoneFinder = None
|
|
|
|
|
|
def set_time(new_time):
|
|
diff = datetime.datetime.now() - new_time
|
|
if abs(diff) < datetime.timedelta(seconds=10):
|
|
cloudlog.debug(f"Time diff too small: {diff}")
|
|
return
|
|
|
|
cloudlog.debug(f"Setting time to {new_time}")
|
|
try:
|
|
subprocess.run(f"TZ=UTC date -s '{new_time}'", shell=True, check=True)
|
|
except subprocess.CalledProcessError:
|
|
cloudlog.exception("timed.failed_setting_time")
|
|
|
|
|
|
# StarPilot variables
|
|
def set_timezone(timezone):
|
|
valid_timezones = subprocess.check_output("timedatectl list-timezones", shell=True, encoding="utf8").strip().split("\n")
|
|
if timezone not in valid_timezones:
|
|
cloudlog.error(f"Timezone not supported {timezone}")
|
|
return
|
|
|
|
cloudlog.debug(f"Setting timezone to {timezone}")
|
|
try:
|
|
if AGNOS:
|
|
tzpath = os.path.join("/usr/share/zoneinfo/", timezone)
|
|
subprocess.check_call(f"sudo su -c 'ln -snf {tzpath} /data/etc/tmptime && mv /data/etc/tmptime /data/etc/localtime'", shell=True)
|
|
subprocess.check_call(f"sudo su -c 'echo \'{timezone}\' > /data/etc/timezone'", shell=True)
|
|
else:
|
|
subprocess.check_call(f"sudo timedatectl set-timezone {timezone}", shell=True)
|
|
except subprocess.CalledProcessError:
|
|
cloudlog.exception(f"Error setting timezone to {timezone}")
|
|
|
|
|
|
def main() -> NoReturn:
|
|
"""
|
|
timed has two responsibilities:
|
|
- getting the current time from GPS
|
|
- publishing the time in the logs
|
|
|
|
AGNOS will also use NTP to update the time.
|
|
"""
|
|
|
|
params = Params()
|
|
gps_location_service = get_gps_location_service(params)
|
|
|
|
pm = messaging.PubMaster(['clocks'])
|
|
sm = messaging.SubMaster([gps_location_service])
|
|
|
|
# StarPilot variables
|
|
tf = TimezoneFinder() if TimezoneFinder is not None else None
|
|
timezonefinder_logged = False
|
|
|
|
last_timezone = params.get("Timezone")
|
|
if last_timezone is not None:
|
|
set_timezone(last_timezone)
|
|
|
|
while True:
|
|
sm.update(1000)
|
|
|
|
msg = messaging.new_message('clocks')
|
|
msg.valid = system_time_valid()
|
|
msg.clocks.wallTimeNanos = time.time_ns()
|
|
pm.send('clocks', msg)
|
|
|
|
gps = sm[gps_location_service]
|
|
gps_time = datetime.datetime.fromtimestamp(gps.unixTimestampMillis / 1000.)
|
|
if not sm.updated[gps_location_service] or (time.monotonic() - sm.logMonoTime[gps_location_service] / 1e9) > 2.0:
|
|
continue
|
|
if not gps.hasFix:
|
|
continue
|
|
if gps_time < min_date():
|
|
continue
|
|
|
|
set_time(gps_time)
|
|
|
|
# StarPilot variables
|
|
if tf is not None:
|
|
timezone = tf.timezone_at(lng=gps.longitude, lat=gps.latitude)
|
|
if timezone is not None and timezone != last_timezone:
|
|
set_timezone(timezone)
|
|
params.put_nonblocking("Timezone", timezone)
|
|
last_timezone = timezone
|
|
elif not timezonefinder_logged:
|
|
cloudlog.warning("TimezoneFinder unavailable, skipping automatic timezone updates")
|
|
timezonefinder_logged = True
|
|
|
|
time.sleep(10)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|