refactor appd

This commit is contained in:
dragonpilot
2020-01-14 14:17:32 +10:00
parent 27c2d13cb2
commit b4e991d714
3 changed files with 315 additions and 198 deletions
+313 -196
View File
@@ -1,249 +1,366 @@
#!/usr/bin/env python3
import time
import cereal.messaging as messaging
import subprocess
import cereal
import cereal.messaging as messaging
ThermalStatus = cereal.log.ThermalData.ThermalStatus
from selfdrive.swaglog import cloudlog
from common.params import Params, put_nonblocking
params = Params()
# v1.16.2
tomtom = "com.tomtom.speedcams.android.map"
tomtom_main = "com.tomtom.speedcams.android.activities.SpeedCamActivity"
class App():
# v4.3.0.600310 R2098NSLAE
autonavi = "com.autonavi.amapauto"
autonavi_main = "com.autonavi.amapauto.MainMapActivity"
# app type
TYPE_GPS = 0
TYPE_SERVICE = 1
TYPE_FULLSCREEN = 2
TYPE_UTIL = 3
# v6.40.3
mixplorer = "com.mixplorer"
mixplorer_main = "com.mixplorer.activities.BrowseActivity"
# frame app
FRAME = "ai.comma.plus.frame"
FRAME_MAIN = ".MainActivity"
gpsservice = "cn.dragonpilot.gpsservice"
gpsservice_main = "cn.dragonpilot.gpsservice.MainService"
# offroad app
OFFROAD = "ai.comma.plus.offroad"
OFFROAD_MAIN = ".MainActivity"
# v2.9.5 build 74
aegis = "tw.com.ainvest.outpack"
aegis_main = "tw.com.ainvest.outpack.ui.MainActivity"
# manual switch stats
MANUAL_OFF = "-1"
MANUAL_IDLE = "0"
MANUAL_ON = "1"
# v4.57.2.0
waze = "com.waze"
waze_main = "com.waze.MainActivity"
def appops_set(self, package, op, mode):
self.system(f"LD_LIBRARY_PATH= appops set {package} {op} {mode}")
frame = "ai.comma.plus.frame"
frame_main = ".MainActivity"
def pm_grant(self, package, permission):
self.system(f"pm grant {package} {permission}")
offroad = "ai.comma.plus.offroad"
offroad_main = ".MainActivity"
def set_package_permissions(self):
if self.permissions is not None:
for permission in self.permissions:
self.pm_grant(self.app, permission)
if self.opts is not None:
for opt in self.opts:
self.appops_set(self.app, opt, "allow")
def main(gctx=None):
def __init__(self, app, activity, enable_param, auto_run_param, manual_ctrl_param, app_type, permissions, opts):
self.app = app
# main activity
self.activity = activity
# read enable param
self.enable_param = enable_param
# read auto run param
self.auto_run_param = auto_run_param
# read manual run param
self.manual_ctrl_param = manual_ctrl_param
# if it's a service app, we do not kill if device is too hot
# if it's a full screen app, we need to do extra process on frame/offroad
self.app_type = app_type
# app permissions
self.permissions = permissions
# app options
self.opts = opts
dragon_enable_tomtom = True if params.get('DragonEnableTomTom', encoding='utf8') == "1" else False
dragon_enable_autonavi = True if params.get('DragonEnableAutonavi', encoding='utf8') == "1" else False
dragon_enable_aegis = True if params.get('DragonEnableAegis', encoding='utf8') == "1" else False
dragon_enable_mixplorer = True if params.get('DragonEnableMixplorer', encoding='utf8') == "1" else False
dragon_boot_tomtom = True if params.get("DragonBootTomTom", encoding='utf8') == "1" else False
dragon_boot_autonavi = True if params.get("DragonBootAutonavi", encoding='utf8') == "1" else False
dragon_boot_aegis = True if params.get("DragonBootAegis", encoding='utf8') == "1" else False
dragon_greypanda_mode = True if params.get("DragonGreyPandaMode", encoding='utf8') == "1" else False
dragon_waze_mode = True if params.get("DragonWazeMode", encoding='utf8') == "1" else False
if dragon_waze_mode:
dragon_enable_tomtom = False
dragon_enable_autonavi = False
dragon_enable_aegis = False
self.is_enabled = False
self.last_is_enabled = False
self.is_auto_runnable = False
self.is_running = False
self.manual_ctrl_status = self.MANUAL_IDLE
self.manually_ctrled = False
self.set_package_permissions()
self.system("pm disable %s" % self.app)
def read_params(self):
self.last_is_enabled = self.is_enabled
if self.enable_param is None:
self.is_enabled = False
else:
self.is_enabled = True if params.get(self.enable_param, encoding='utf8') == "1" else False
if self.is_enabled:
# a service app should run automatically and not manual controllable.
if self.app_type == App.TYPE_SERVICE:
self.is_auto_runnable = True
self.manual_ctrl_status = self.MANUAL_IDLE
else:
if self.manual_ctrl_param is None:
self.manual_ctrl_status = self.MANUAL_IDLE
else:
self.manual_ctrl_status = params.get(self.manual_ctrl_param, encoding='utf8')
if self.auto_run_param is None:
self.is_auto_runnable = False
else:
self.is_auto_runnable = True if params.get(self.auto_run_param, encoding='utf8') == "1" else False
else:
self.is_auto_runnable = False
self.manual_ctrl_status = self.MANUAL_IDLE
self.manually_ctrled = False
def run(self, force = False):
if force or self.is_enabled:
# app is manually ctrl, we record that
if self.manual_ctrl_param is not None and self.manual_ctrl_status == self.MANUAL_ON:
put_nonblocking(self.manual_ctrl_param, '0')
self.manually_ctrled = True
self.is_running = False
# only run app if it's not running
if force or not self.is_running:
# if it's a full screen app, we need to stop frame and offroad to get keyboard access
if self.app_type == self.TYPE_FULLSCREEN:
self.system("pm disable %s" % self.FRAME)
self.system("am start -n %s/%s" % (self.OFFROAD, self.OFFROAD_MAIN))
self.system("pm enable %s" % self.app)
if self.app_type == self.TYPE_SERVICE:
self.system("am startservice %s/%s" % (self.app, self.activity))
else:
self.system("am start -n %s/%s" % (self.app, self.activity))
self.is_running = True
def kill(self, force = False):
if force or self.is_enabled:
# app is manually ctrl, we record that
if self.manual_ctrl_param is not None and self.manual_ctrl_status == self.MANUAL_OFF:
put_nonblocking(self.manual_ctrl_param, '0')
self.manually_ctrled = True
self.is_running = True
# only kill app if it's running
if force or self.is_running:
# if it's a full screen app, we need to restart offroad and frame
if self.app_type == self.TYPE_FULLSCREEN:
self.system("pm disable %s" % self.OFFROAD)
self.system("pm enable %s" % self.OFFROAD)
self.system("pm enable %s" % self.FRAME)
self.system("am start -n %s/%s" % (self.FRAME, self.FRAME_MAIN))
self.system("pm disable %s" % self.app)
self.is_running = False
def system(self, cmd):
try:
# cloudlog.info("running %s" % cmd)
subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as e:
cloudlog.event("running failed",
cmd=e.cmd,
output=e.output[-1024:],
returncode=e.returncode)
def init_apps(apps):
apps.append(App(
# v1.16.2
"com.tomtom.speedcams.android.map",
"com.tomtom.speedcams.android.activities.SpeedCamActivity",
"DragonEnableTomTom",
"DragonBootTomTom",
"DragonRunTomTom",
App.TYPE_GPS,
[
"android.permission.ACCESS_FINE_LOCATION",
"android.permission.ACCESS_COARSE_LOCATION",
"android.permission.READ_EXTERNAL_STORAGE",
"android.permission.WRITE_EXTERNAL_STORAGE",
],
[
"SYSTEM_ALERT_WINDOW",
]
))
apps.append(App(
# v4.3.0.600310 R2098NSLAE
"com.autonavi.amapauto",
"com.autonavi.amapauto.MainMapActivity",
"DragonEnableAutonavi",
"DragonBootAutonavi",
"DragonRunAutonavi",
App.TYPE_GPS,
[
"android.permission.ACCESS_FINE_LOCATION",
"android.permission.ACCESS_COARSE_LOCATION",
"android.permission.READ_EXTERNAL_STORAGE",
"android.permission.WRITE_EXTERNAL_STORAGE",
],
[
"SYSTEM_ALERT_WINDOW",
]
))
apps.append(App(
# v6.40.3
"com.mixplorer",
"com.mixplorer.activities.BrowseActivity",
"DragonEnableMixplorer",
None,
"DragonRunMixplorer",
App.TYPE_UTIL,
[
"android.permission.READ_EXTERNAL_STORAGE",
"android.permission.WRITE_EXTERNAL_STORAGE",
],
[],
))
apps.append(App(
# v2.9.5 build 74
"tw.com.ainvest.outpack",
"tw.com.ainvest.outpack.ui.MainActivity",
"DragonEnableAegis",
"DragonBootAegis",
"DragonRunAegis",
App.TYPE_GPS,
[
"android.permission.ACCESS_FINE_LOCATION",
"android.permission.READ_EXTERNAL_STORAGE",
"android.permission.WRITE_EXTERNAL_STORAGE",
],
[
"SYSTEM_ALERT_WINDOW",
]
))
apps.append(App(
"cn.dragonpilot.gpsservice",
"cn.dragonpilot.gpsservice.MainService",
"DragonGreyPandaMode",
None,
None,
App.TYPE_SERVICE,
[],
[],
))
apps.append(App(
# v4.57.2.0
"com.waze",
"com.waze.MainActivity",
"DragonWazeMode",
None,
"DragonRunWaze",
App.TYPE_FULLSCREEN,
[
"android.permission.ACCESS_FINE_LOCATION",
"android.permission.ACCESS_COARSE_LOCATION",
"android.permission.READ_EXTERNAL_STORAGE",
"android.permission.WRITE_EXTERNAL_STORAGE",
"android.permission.RECORD_AUDIO",
],
[],
))
def main():
apps = []
#system(f"pm enable com.android.settings")
#system(f"am start -n com.android.settings/.TetherSettings")
#time.sleep(1)
#system(f"LD_LIBRARY_PATH= monkey -f /data/monkey.script 1")
#system(f"pm disable com.android.settings")
#system(f"pm enable com.android.settings")
#system(f"settings put system accelerometer_rotation 0")
#system(f"settings put system user_rotation 1")
init_apps(apps)
dragon_grepanda_mode_started = False
tomtom_is_running = False
autonavi_is_running = False
aegis_is_running = False
mixplorer_is_running = False
waze_is_running = False
allow_auto_boot = True
manual_tomtom = False
manual_autonavi = False
manual_aegis = False
manual_waze = False
last_started = False
thermal_sock = messaging.sub_sock('thermal')
frame = 0
start_delay = None
stop_delay = None
allow_auto_run = True
last_thermal_status = None
thermal_status = None
put_nonblocking('DragonRunTomTom', '0')
put_nonblocking('DragonRunAutonavi', '0')
put_nonblocking('DragonRunMixplorer', '0')
put_nonblocking('DragonRunAegis', '0')
put_nonblocking('DragonRunWaze', '0')
while 1: #has_enabled_apps:
has_fullscreen_apps = False
# we want to disable all app when boot
system("pm disable %s" % tomtom)
system("pm disable %s" % autonavi)
system("pm disable %s" % mixplorer)
system("pm disable %s" % gpsservice)
system("pm disable %s" % aegis)
system("pm disable %s" % waze)
for app in apps:
# read params loop
app.read_params()
if app.last_is_enabled and not app.is_enabled:
app.kill(True)
thermal_sock = messaging.sub_sock('thermal')
if app.is_enabled:
if app.app_type == App.TYPE_FULLSCREEN:
has_fullscreen_apps = True
while dragon_enable_tomtom or dragon_enable_autonavi or dragon_enable_aegis or dragon_enable_mixplorer or dragon_greypanda_mode or dragon_waze_mode:
# allow user to manually start/stop app
if dragon_enable_tomtom:
status = params.get('DragonRunTomTom', encoding='utf8')
if not status == "0":
tomtom_is_running = exec_app(status, tomtom, tomtom_main)
put_nonblocking('DragonRunTomTom', '0')
manual_tomtom = status != "0"
if dragon_enable_autonavi:
status = params.get('DragonRunAutonavi', encoding='utf8')
if not status == "0":
autonavi_is_running = exec_app(status, autonavi, autonavi_main)
put_nonblocking('DragonRunAutonavi', '0')
manual_autonavi = status != "0"
if dragon_enable_aegis:
status = params.get('DragonRunAegis', encoding='utf8')
if not status == "0":
aegis_is_running = exec_app(status, aegis, aegis_main)
put_nonblocking('DragonRunAegis', '0')
manual_aegis = status != "0"
if dragon_enable_mixplorer:
status = params.get('DragonRunMixplorer', encoding='utf8')
if not status == "0":
mixplorer_is_running = exec_app(status, mixplorer, mixplorer_main)
put_nonblocking('DragonRunMixplorer', '0')
if dragon_waze_mode:
status = params.get('DragonRunWaze', encoding='utf8')
if not status == "0":
if status == "1":
start_waze_prep()
elif status == "-1":
stop_waze_prep()
waze_is_running = exec_app(status, waze, waze_main)
put_nonblocking('DragonRunWaze', '0')
manual_waze = status != "0"
# if manual control is set, we do not allow any of the auto actions
auto_tomtom = not manual_tomtom and dragon_enable_tomtom and dragon_boot_tomtom
auto_autonavi = not manual_autonavi and dragon_enable_autonavi and dragon_boot_autonavi
auto_aegis = not manual_aegis and dragon_enable_aegis and dragon_boot_aegis
# process manual ctrl apps
for app in apps:
if app.manual_ctrl_status != App.MANUAL_IDLE:
if app.manual_ctrl_status == App.MANUAL_ON:
app.run(True)
else:
app.kill(True)
msg = messaging.recv_sock(thermal_sock, wait=True)
started = msg.thermal.started
# car on
# when car is running
if started:
stop_delay = None
# apps start 5 secs later
if start_delay is None:
start_delay = frame + 3
start_delay = frame + 5
if dragon_greypanda_mode and not dragon_grepanda_mode_started:
dragon_grepanda_mode_started = True
system("pm enable %s" % gpsservice)
system("am startservice %s/%s" % (gpsservice, gpsservice_main))
thermal_status = msg.thermal.thermalStatus
if thermal_status <= ThermalStatus.yellow:
allow_auto_run = True
# when temp reduce from red to yellow, we add start up delay as well
# so apps will not start up immediately
if last_thermal_status == ThermalStatus.red:
start_delay = frame + 60
elif thermal_status >= ThermalStatus.red:
allow_auto_run = False
if dragon_waze_mode:
if not manual_waze and not waze_is_running:
start_waze_prep()
waze_is_running = exec_app('1', waze, waze_main)
else:
last_thermal_status = thermal_status
#
# Logic:
# if temp reach red, we disable all 3rd party apps.
# once the temp drop below yellow, we then re-enable them
#
# set allow_auto_boot back to True once the thermal status is < yellow
thermal_status = msg.thermal.thermalStatus
if not allow_auto_boot and thermal_status < ThermalStatus.yellow:
allow_auto_boot = True
if allow_auto_boot:
# only allow auto boot when thermal status is < red
if thermal_status < ThermalStatus.red:
if auto_tomtom and not tomtom_is_running and frame > start_delay:
tomtom_is_running = exec_app('1', tomtom, tomtom_main)
if auto_autonavi and not autonavi_is_running and frame > start_delay:
autonavi_is_running = exec_app('1', autonavi, autonavi_main)
if auto_aegis and not aegis_is_running and frame > start_delay:
aegis_is_running = exec_app('1', aegis, aegis_main)
# we run service apps and kill all util apps
# only run once
if last_started != started:
for app in apps:
if app.app_type == App.TYPE_SERVICE:
app.run()
elif app.app_type == App.TYPE_UTIL:
app.kill()
# only run apps that's not manually ctrled
for app in apps:
if not app.manually_ctrled:
if has_fullscreen_apps:
if app.app_type == App.TYPE_FULLSCREEN:
app.run()
elif app.app_type in [App.TYPE_GPS, App.TYPE_UTIL]:
app.kill()
else:
if auto_tomtom and tomtom_is_running:
tomtom_is_running = exec_app('-1', tomtom, tomtom_main)
if auto_autonavi and autonavi_is_running:
autonavi_is_running = exec_app('-1', autonavi, autonavi_main)
if auto_aegis and aegis_is_running:
aegis_is_running = exec_app('-1', aegis, aegis_main)
# set allow_auto_boot to False once the thermal status is >= red
allow_auto_boot = False
# kill mixplorer when car started
if mixplorer_is_running:
mixplorer_is_running = exec_app('-1', mixplorer, mixplorer_main)
# car off
if not allow_auto_run:
app.kill()
else:
if frame > start_delay and app.is_auto_runnable and app.app_type == App.TYPE_GPS:
app.run()
# when car is stopped
else:
start_delay = None
# set delay to 30 seconds
if stop_delay is None:
stop_delay = frame + 30
if dragon_greypanda_mode and dragon_grepanda_mode_started:
dragon_grepanda_mode_started = False
system("pm disable %s" % gpsservice)
for app in apps:
if app.is_running and not app.manually_ctrled:
if has_fullscreen_apps or frame > stop_delay:
app.kill()
if dragon_waze_mode:
if not manual_waze and waze_is_running:
stop_waze_prep()
waze_is_running = exec_app('-1', waze, waze_main)
else:
if auto_tomtom and tomtom_is_running and frame > stop_delay:
tomtom_is_running = exec_app('-1', tomtom, tomtom_main)
if auto_autonavi and autonavi_is_running and frame > stop_delay:
autonavi_is_running = exec_app('-1', autonavi, autonavi_main)
if auto_aegis and aegis_is_running and frame > stop_delay:
aegis_is_running = exec_app('-1', aegis, aegis_main)
# if car state changed, we remove manual control state
if not last_started == started:
manual_tomtom = False
manual_autonavi = False
manual_aegis = False
manual_waze = False
if last_started != started:
for app in apps:
app.manually_ctrled = False
last_started = started
frame += 3
# every 3 seconds, we re-check status
time.sleep(3)
# when starting waze, we want to disable frame
def start_waze_prep():
system("pm disable %s" % frame)
system("am start -n %s/%s" % (offroad, offroad_main))
# when stopping waze
# we want to disable offroad first and enable offroad (this way the offroad sits in the background waiting for launch.)
# then we re-enable frame and start frame app (the offroad app will start shortly after)
def stop_waze_prep():
system("pm disable %s" % offroad)
system("pm enable %s" % offroad)
system("pm enable %s" % frame)
system("am start -n %s/%s" % (frame, frame_main))
def exec_app(status, app, app_main):
if status == "1":
system("pm enable %s" % app)
system("am start -n %s/%s" % (app, app_main))
return True
if status == "-1":
system("pm disable %s" % app)
return False
def system(cmd):
try:
# cloudlog.info("running %s" % cmd)
cloudlog.info("running %s" % cmd)
subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as e:
cloudlog.event("running failed",
+1 -1
View File
@@ -1,4 +1,4 @@
#!/usr/bin/env python2.7
#!/usr/bin/env python3
#
# courtesy of pjlao307 (https://github.com/pjlao307/)
# this is just his original implementation but
+1 -1
View File
@@ -1,4 +1,4 @@
#!/usr/bin/env python2.7
#!/usr/bin/env python3
import os
import time