mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-06-21 22:12:05 +08:00
cabana: eliminate deep copy of events in live stream mode (#27588)
This commit is contained in:
@@ -108,14 +108,14 @@ ChartsWidget::ChartsWidget(QWidget *parent) : QFrame(parent) {
|
||||
|
||||
void ChartsWidget::eventsMerged() {
|
||||
{
|
||||
assert(!can->liveStreaming());
|
||||
QFutureSynchronizer<void> future_synchronizer;
|
||||
const auto events = can->events();
|
||||
for (auto c : charts) {
|
||||
future_synchronizer.addFuture(QtConcurrent::run(c, &ChartView::updateSeries, nullptr, events, true));
|
||||
future_synchronizer.addFuture(QtConcurrent::run(c, &ChartView::updateSeries, nullptr));
|
||||
}
|
||||
}
|
||||
updateState();
|
||||
if (can->isPaused()) {
|
||||
updateState();
|
||||
}
|
||||
}
|
||||
|
||||
void ChartsWidget::zoomIn(double min, double max) {
|
||||
@@ -133,20 +133,13 @@ void ChartsWidget::zoomReset() {
|
||||
void ChartsWidget::updateState() {
|
||||
if (charts.isEmpty()) return;
|
||||
|
||||
const auto events = can->events();
|
||||
if (can->liveStreaming()) {
|
||||
// appends incoming events to the end of series
|
||||
for (auto c : charts) {
|
||||
c->updateSeries(nullptr, events, false);
|
||||
}
|
||||
}
|
||||
|
||||
const double cur_sec = can->currentSec();
|
||||
if (!is_zoomed) {
|
||||
double pos = (cur_sec - display_range.first) / std::max(1.0, (display_range.second - display_range.first));
|
||||
if (pos < 0 || pos > 0.8) {
|
||||
display_range.first = std::max(0.0, cur_sec - max_chart_range * 0.1);
|
||||
}
|
||||
auto events = can->events();
|
||||
double max_event_sec = events->empty() ? 0 : (events->back()->mono_time / 1e9 - can->routeStartTime());
|
||||
double max_sec = std::min(std::floor(display_range.first + max_chart_range), max_event_sec);
|
||||
display_range.first = std::max(0.0, max_sec - max_chart_range);
|
||||
@@ -502,11 +495,11 @@ void ChartView::updateSeriesPoints() {
|
||||
}
|
||||
}
|
||||
|
||||
void ChartView::updateSeries(const cabana::Signal *sig, const std::vector<Event *> *events, bool clear) {
|
||||
events = events ? events : can->events();
|
||||
void ChartView::updateSeries(const cabana::Signal *sig) {
|
||||
const auto events = can->events();
|
||||
for (auto &s : sigs) {
|
||||
if (!sig || s.sig == sig) {
|
||||
if (clear) {
|
||||
if (!can->liveStreaming()) {
|
||||
s.vals.clear();
|
||||
s.step_vals.clear();
|
||||
s.vals.reserve(settings.max_cached_minutes * 60 * 100); // [n]seconds * 100hz
|
||||
|
||||
@@ -32,7 +32,7 @@ public:
|
||||
ChartView(QWidget *parent = nullptr);
|
||||
void addSeries(const MessageId &msg_id, const cabana::Signal *sig);
|
||||
bool hasSeries(const MessageId &msg_id, const cabana::Signal *sig) const;
|
||||
void updateSeries(const cabana::Signal *sig = nullptr, const std::vector<Event*> *events = nullptr, bool clear = true);
|
||||
void updateSeries(const cabana::Signal *sig = nullptr);
|
||||
void updatePlot(double cur, double min, double max);
|
||||
void setSeriesType(SeriesType type);
|
||||
void updatePlotArea(int left);
|
||||
|
||||
@@ -59,7 +59,7 @@ public:
|
||||
QSet<uint8_t> sources;
|
||||
|
||||
protected:
|
||||
void process(QHash<MessageId, CanData> *);
|
||||
virtual void process(QHash<MessageId, CanData> *);
|
||||
bool updateEvent(const Event *event);
|
||||
void updateLastMsgsTo(double sec);
|
||||
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
#include "tools/cabana/streams/livestream.h"
|
||||
|
||||
LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), AbstractStream(parent, true) {
|
||||
timer = new QTimer(this);
|
||||
timer->callOnTimeout(this, &LiveStream::removeExpiredEvents);
|
||||
timer->start(3 * 1000);
|
||||
#include <QTimer>
|
||||
|
||||
LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), AbstractStream(parent, true) {
|
||||
stream_thread = new QThread(this);
|
||||
QObject::connect(stream_thread, &QThread::started, [=]() { streamThread(); });
|
||||
QObject::connect(stream_thread, &QThread::finished, stream_thread, &QThread::deleteLater);
|
||||
@@ -15,8 +13,6 @@ LiveStream::~LiveStream() {
|
||||
stream_thread->requestInterruption();
|
||||
stream_thread->quit();
|
||||
stream_thread->wait();
|
||||
for (Event *e : can_events) ::delete e;
|
||||
for (auto m : messages) delete m;
|
||||
}
|
||||
|
||||
void LiveStream::streamThread() {
|
||||
@@ -35,11 +31,8 @@ void LiveStream::streamThread() {
|
||||
QThread::msleep(50);
|
||||
continue;
|
||||
}
|
||||
AlignedBuffer *buf = messages.emplace_back(new AlignedBuffer());
|
||||
Event *evt = ::new Event(buf->align(msg));
|
||||
delete msg;
|
||||
|
||||
handleEvent(evt);
|
||||
std::lock_guard lk(lock);
|
||||
handleEvent(messages.emplace_back(msg).event);
|
||||
// TODO: write stream to log file to replay it with cabana --data_dir flag.
|
||||
}
|
||||
}
|
||||
@@ -53,11 +46,10 @@ void LiveStream::handleEvent(Event *evt) {
|
||||
emit streamStarted();
|
||||
}
|
||||
|
||||
std::lock_guard lk(lock);
|
||||
can_events.push_back(evt);
|
||||
received.push_back(evt);
|
||||
if (!pause_) {
|
||||
if (speed_ < 1 && last_update_ts > 0) {
|
||||
auto it = std::upper_bound(can_events.cbegin(), can_events.cend(), current_ts, [](uint64_t ts, auto &e) {
|
||||
auto it = std::upper_bound(received.cbegin(), received.cend(), current_ts, [](uint64_t ts, auto &e) {
|
||||
return ts < e->mono_time;
|
||||
});
|
||||
if (it != can_events.cend()) {
|
||||
@@ -73,27 +65,20 @@ void LiveStream::handleEvent(Event *evt) {
|
||||
}
|
||||
}
|
||||
|
||||
void LiveStream::removeExpiredEvents() {
|
||||
std::lock_guard lk(lock);
|
||||
if (can_events.size() > 0) {
|
||||
const uint64_t max_ns = settings.max_cached_minutes * 60 * 1e9;
|
||||
const uint64_t last_ns = can_events.back()->mono_time;
|
||||
while (!can_events.empty() && (last_ns - can_events.front()->mono_time) > max_ns) {
|
||||
::delete can_events.front();
|
||||
delete messages.front();
|
||||
can_events.pop_front();
|
||||
messages.pop_front();
|
||||
void LiveStream::process(QHash<MessageId, CanData> *last_messages) {
|
||||
{
|
||||
std::lock_guard lk(lock);
|
||||
uint64_t last_ts = can_events.empty() ? 0 : can_events.back()->mono_time;
|
||||
auto first = std::upper_bound(received.cbegin(), received.cend(), last_ts, [](uint64_t ts, auto &e) {
|
||||
return ts < e->mono_time;
|
||||
});
|
||||
can_events.insert(can_events.end(), first, received.cend());
|
||||
if (speed_ == 1) {
|
||||
received.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const std::vector<Event *> *LiveStream::events() const {
|
||||
std::lock_guard lk(lock);
|
||||
if (events_vector.capacity() <= can_events.size()) {
|
||||
events_vector.reserve(can_events.size() * 2);
|
||||
}
|
||||
events_vector.assign(can_events.begin(), can_events.end());
|
||||
return &events_vector;
|
||||
emit eventsMerged();
|
||||
AbstractStream::process(last_messages);
|
||||
}
|
||||
|
||||
void LiveStream::pause(bool pause) {
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <QTimer>
|
||||
#include "tools/cabana/streams/abstractstream.h"
|
||||
|
||||
class LiveStream : public AbstractStream {
|
||||
@@ -17,17 +16,29 @@ public:
|
||||
void setSpeed(float speed) override { speed_ = std::min<float>(1.0, speed); }
|
||||
bool isPaused() const override { return pause_; }
|
||||
void pause(bool pause) override;
|
||||
const std::vector<Event *> *events() const override;
|
||||
const std::vector<Event *> *events() const override { return &can_events; }
|
||||
|
||||
protected:
|
||||
void process(QHash<MessageId, CanData> *) override;
|
||||
virtual void handleEvent(Event *evt);
|
||||
virtual void streamThread();
|
||||
virtual void removeExpiredEvents();
|
||||
|
||||
struct Msg {
|
||||
Msg(Message *m) {
|
||||
event = ::new Event(aligned_buf.align(m));
|
||||
delete m;
|
||||
}
|
||||
~Msg() {
|
||||
::delete event;
|
||||
}
|
||||
Event *event;
|
||||
AlignedBuffer aligned_buf;
|
||||
};
|
||||
|
||||
mutable std::mutex lock;
|
||||
mutable std::vector<Event *> events_vector;
|
||||
std::deque<Event *> can_events;
|
||||
std::deque<AlignedBuffer *> messages;
|
||||
std::vector<Event *> can_events;
|
||||
std::vector<Event *> received;
|
||||
std::deque<Msg> messages;
|
||||
std::atomic<uint64_t> start_ts = 0;
|
||||
std::atomic<uint64_t> current_ts = 0;
|
||||
std::atomic<float> speed_ = 1;
|
||||
@@ -36,5 +47,4 @@ protected:
|
||||
|
||||
const QString zmq_address;
|
||||
QThread *stream_thread;
|
||||
QTimer *timer;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user