mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-07-02 20:12:07 +08:00
cabana: split events into chunks and processed concurrently (#27562)
old-commit-hash: dc31c50aa7bda8d5aab43b0382342ae5162ed37d
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
#include "tools/cabana/streams/abstractstream.h"
|
||||
|
||||
#include <QtConcurrent>
|
||||
|
||||
AbstractStream *can = nullptr;
|
||||
|
||||
AbstractStream::AbstractStream(QObject *parent, bool is_live_streaming) : is_live_streaming(is_live_streaming), QObject(parent) {
|
||||
@@ -59,39 +61,59 @@ const CanData &AbstractStream::lastMessage(const MessageId &id) {
|
||||
return it != can_msgs.end() ? it.value() : empty_data;
|
||||
}
|
||||
|
||||
void AbstractStream::updateLastMsgsTo(double sec) {
|
||||
QHash<MessageId, CanData> last_msgs;
|
||||
last_msgs.reserve(can_msgs.size());
|
||||
double route_start_time = routeStartTime();
|
||||
uint64_t last_ts = (sec + route_start_time) * 1e9;
|
||||
auto evs = events();
|
||||
auto last = std::upper_bound(evs->rbegin(), evs->rend(), last_ts, [](uint64_t ts, auto &e) { return e->mono_time < ts; });
|
||||
for (auto it = last; it != evs->rend(); ++it) {
|
||||
static QHash<MessageId, CanData> parseEvents(std::vector<Event *>::const_reverse_iterator first,
|
||||
std::vector<Event *>::const_reverse_iterator last, double route_start_time) {
|
||||
QHash<MessageId, CanData> msgs;
|
||||
msgs.reserve(500);
|
||||
for (auto it = first; it != last; ++it) {
|
||||
if ((*it)->which == cereal::Event::Which::CAN) {
|
||||
for (const auto &c : (*it)->event.getCan()) {
|
||||
auto &m = last_msgs[{.source = c.getSrc(), .address = c.getAddress()}];
|
||||
auto &m = msgs[{.source = c.getSrc(), .address = c.getAddress()}];
|
||||
if (++m.count == 1) {
|
||||
m.ts = ((*it)->mono_time / 1e9) - route_start_time;
|
||||
m.dat = QByteArray((char *)c.getDat().begin(), c.getDat().size());
|
||||
m.colors = QVector<QColor>(m.dat.size(), QColor(0, 0, 0, 0));
|
||||
m.last_change_t = QVector<double>(m.dat.size(), m.ts);
|
||||
m.bit_change_counts.resize(m.dat.size());
|
||||
} else {
|
||||
m.freq = m.count / std::max(1.0, m.ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return msgs;
|
||||
};
|
||||
|
||||
// it is thread safe to update data in updateLastMsgsTo.
|
||||
// updateEvent will not be called before replayStream::seekedTo return.
|
||||
void AbstractStream::updateLastMsgsTo(double sec) {
|
||||
uint64_t ts = (sec + routeStartTime()) * 1e9;
|
||||
const uint64_t delta = std::max(std::ceil(sec / std::thread::hardware_concurrency()), 30.0) * 1e9;
|
||||
const auto evs = events();
|
||||
auto first = std::upper_bound(evs->crbegin(), evs->crend(), ts, [](uint64_t ts, auto &e) { return ts > e->mono_time; });
|
||||
QFutureSynchronizer<QHash<MessageId, CanData>> synchronizer;
|
||||
while(first != evs->crend()) {
|
||||
ts = (*first)->mono_time > delta ? (*first)->mono_time - delta : 0;
|
||||
auto last = std::lower_bound(first, evs->crend(), ts, [](auto &e, uint64_t ts) { return e->mono_time > ts; });
|
||||
synchronizer.addFuture(QtConcurrent::run(parseEvents, first, last, routeStartTime()));
|
||||
first = last;
|
||||
}
|
||||
synchronizer.waitForFinished();
|
||||
|
||||
// it is thread safe to update data here.
|
||||
// updateEvent will not be called before replayStream::seekedTo return.
|
||||
new_msgs->clear();
|
||||
change_trackers.clear();
|
||||
counters.clear();
|
||||
can_msgs.clear();
|
||||
for (auto it = last_msgs.cbegin(); it != last_msgs.cend(); ++it) {
|
||||
can_msgs[it.key()] = it.value();
|
||||
counters[it.key()] = it.value().count;
|
||||
counters.clear();
|
||||
for (const auto &f : synchronizer.futures()) {
|
||||
auto msgs = f.result();
|
||||
for (auto it = msgs.cbegin(); it != msgs.cend(); ++it) {
|
||||
counters[it.key()] += it.value().count;
|
||||
auto m = can_msgs.find(it.key());
|
||||
if (m == can_msgs.end()) {
|
||||
m = can_msgs.insert(it.key(), it.value());
|
||||
} else {
|
||||
m.value().count += it.value().count;
|
||||
}
|
||||
m.value().freq = m.value().count / std::max(1.0, m.value().ts);
|
||||
}
|
||||
}
|
||||
emit updated();
|
||||
emit msgsReceived(&can_msgs);
|
||||
|
||||
Reference in New Issue
Block a user