mirror of
https://github.com/firestar5683/StarPilot.git
synced 2026-06-29 10:32:10 +08:00
replay: optimize memory usage with MonotonicBuffer (#32278)
Optimize Memory Usage with MonotonicBuffe old-commit-hash: bbd1648f0561b7b3270a2bc7b416841ac10fd9db
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
#include "cereal/messaging/messaging.h"
|
||||
#include "tools/cabana/dbc/dbcmanager.h"
|
||||
#include "tools/cabana/utils/util.h"
|
||||
#include "tools/replay/util.h"
|
||||
|
||||
struct CanData {
|
||||
void compute(const MessageId &msg_id, const uint8_t *dat, const int size, double current_sec,
|
||||
|
||||
@@ -263,26 +263,3 @@ QString signalToolTip(const cabana::Signal *sig) {
|
||||
)").arg(sig->name).arg(sig->start_bit).arg(sig->size).arg(sig->msb).arg(sig->lsb)
|
||||
.arg(sig->is_little_endian ? "Y" : "N").arg(sig->is_signed ? "Y" : "N");
|
||||
}
|
||||
|
||||
// MonotonicBuffer
|
||||
|
||||
void *MonotonicBuffer::allocate(size_t bytes, size_t alignment) {
|
||||
assert(bytes > 0);
|
||||
void *p = std::align(alignment, bytes, current_buf, available);
|
||||
if (p == nullptr) {
|
||||
available = next_buffer_size = std::max(next_buffer_size, bytes);
|
||||
current_buf = buffers.emplace_back(std::aligned_alloc(alignment, next_buffer_size));
|
||||
next_buffer_size *= growth_factor;
|
||||
p = current_buf;
|
||||
}
|
||||
|
||||
current_buf = (char *)current_buf + bytes;
|
||||
available -= bytes;
|
||||
return p;
|
||||
}
|
||||
|
||||
MonotonicBuffer::~MonotonicBuffer() {
|
||||
for (auto buf : buffers) {
|
||||
free(buf);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
#include <array>
|
||||
#include <cmath>
|
||||
#include <deque>
|
||||
#include <vector>
|
||||
#include <utility>
|
||||
|
||||
@@ -160,20 +159,5 @@ private:
|
||||
QSocketNotifier *sn;
|
||||
};
|
||||
|
||||
class MonotonicBuffer {
|
||||
public:
|
||||
MonotonicBuffer(size_t initial_size) : next_buffer_size(initial_size) {}
|
||||
~MonotonicBuffer();
|
||||
void *allocate(size_t bytes, size_t alignment = 16ul);
|
||||
void deallocate(void *p) {}
|
||||
|
||||
private:
|
||||
void *current_buf = nullptr;
|
||||
size_t next_buffer_size = 0;
|
||||
size_t available = 0;
|
||||
std::deque<void *> buffers;
|
||||
static constexpr float growth_factor = 1.5;
|
||||
};
|
||||
|
||||
int num_decimals(double num);
|
||||
QString signalToolTip(const cabana::Signal *sig);
|
||||
|
||||
+18
-10
@@ -1,18 +1,19 @@
|
||||
#include "tools/replay/logreader.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <utility>
|
||||
#include "tools/replay/filereader.h"
|
||||
#include "tools/replay/util.h"
|
||||
|
||||
bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool local_cache, int chunk_size, int retries) {
|
||||
raw_ = FileReader(local_cache, chunk_size, retries).read(url, abort);
|
||||
if (raw_.empty()) return false;
|
||||
std::string data = FileReader(local_cache, chunk_size, retries).read(url, abort);
|
||||
if (!data.empty() && url.find(".bz2") != std::string::npos)
|
||||
data = decompressBZ2(data, abort);
|
||||
|
||||
if (url.find(".bz2") != std::string::npos) {
|
||||
raw_ = decompressBZ2(raw_, abort);
|
||||
if (raw_.empty()) return false;
|
||||
}
|
||||
return load(raw_.data(), raw_.size(), abort);
|
||||
bool success = !data.empty() && load(data.data(), data.size(), abort);
|
||||
if (filters_.empty())
|
||||
raw_ = std::move(data);
|
||||
return success;
|
||||
}
|
||||
|
||||
bool LogReader::load(const char *data, size_t size, std::atomic<bool> *abort) {
|
||||
@@ -23,9 +24,18 @@ bool LogReader::load(const char *data, size_t size, std::atomic<bool> *abort) {
|
||||
capnp::FlatArrayMessageReader reader(words);
|
||||
auto event = reader.getRoot<cereal::Event>();
|
||||
auto which = event.which();
|
||||
uint64_t mono_time = event.getLogMonoTime();
|
||||
auto event_data = kj::arrayPtr(words.begin(), reader.getEnd());
|
||||
words = kj::arrayPtr(reader.getEnd(), words.end());
|
||||
|
||||
if (!filters_.empty()) {
|
||||
if (which >= filters_.size() || !filters_[which])
|
||||
continue;
|
||||
auto buf = buffer_.allocate(event_data.size() * sizeof(capnp::word));
|
||||
memcpy(buf, event_data.begin(), event_data.size() * sizeof(capnp::word));
|
||||
event_data = kj::arrayPtr((const capnp::word *)buf, event_data.size());
|
||||
}
|
||||
|
||||
uint64_t mono_time = event.getLogMonoTime();
|
||||
const Event &evt = events.emplace_back(which, mono_time, event_data);
|
||||
// Add encodeIdx packet again as a frame packet for the video stream
|
||||
if (evt.which == cereal::Event::ROAD_ENCODE_IDX ||
|
||||
@@ -37,8 +47,6 @@ bool LogReader::load(const char *data, size_t size, std::atomic<bool> *abort) {
|
||||
}
|
||||
events.emplace_back(which, mono_time, event_data, idx.getSegmentNum());
|
||||
}
|
||||
|
||||
words = kj::arrayPtr(reader.getEnd(), words.end());
|
||||
}
|
||||
} catch (const kj::Exception &e) {
|
||||
rWarning("Failed to parse log : %s.\nRetrieved %zu events from corrupt log", e.getDescription().cStr(), events.size());
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
|
||||
#include "cereal/gen/cpp/log.capnp.h"
|
||||
#include "system/camerad/cameras/camera_common.h"
|
||||
#include "tools/replay/util.h"
|
||||
|
||||
const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam};
|
||||
const int MAX_CAMERAS = std::size(ALL_CAMERAS);
|
||||
@@ -26,6 +27,7 @@ public:
|
||||
|
||||
class LogReader {
|
||||
public:
|
||||
LogReader(const std::vector<bool> &filters = {}) { filters_ = filters; }
|
||||
bool load(const std::string &url, std::atomic<bool> *abort = nullptr,
|
||||
bool local_cache = false, int chunk_size = -1, int retries = 0);
|
||||
bool load(const char *data, size_t size, std::atomic<bool> *abort = nullptr);
|
||||
@@ -33,4 +35,6 @@ public:
|
||||
|
||||
private:
|
||||
std::string raw_;
|
||||
std::vector<bool> filters_;
|
||||
MonotonicBuffer buffer_{1024 * 1024};
|
||||
};
|
||||
|
||||
@@ -27,6 +27,11 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
|
||||
sockets_[which] = name.c_str();
|
||||
}
|
||||
}
|
||||
if (!allow.isEmpty()) {
|
||||
for (int i = 0; i < sockets_.size(); ++i) {
|
||||
filters_.push_back(i == cereal::Event::Which::INIT_DATA || i == cereal::Event::Which::CAR_PARAMS || sockets_[i]);
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<const char *> s;
|
||||
std::copy_if(sockets_.begin(), sockets_.end(), std::back_inserter(s),
|
||||
@@ -259,7 +264,7 @@ void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator
|
||||
auto it = std::find_if(begin, end, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); });
|
||||
if (it != end && !it->second) {
|
||||
rDebug("loading segment %d...", it->first);
|
||||
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_);
|
||||
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_, filters_);
|
||||
QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -135,6 +135,7 @@ protected:
|
||||
SubMaster *sm = nullptr;
|
||||
std::unique_ptr<PubMaster> pm;
|
||||
std::vector<const char*> sockets_;
|
||||
std::vector<bool> filters_;
|
||||
std::unique_ptr<Route> route_;
|
||||
std::unique_ptr<CameraServer> camera_server_;
|
||||
std::atomic<uint32_t> flags_ = REPLAY_FLAG_NONE;
|
||||
|
||||
@@ -131,7 +131,8 @@ void Route::addFileToSegment(int n, const QString &file) {
|
||||
|
||||
// class Segment
|
||||
|
||||
Segment::Segment(int n, const SegmentFile &files, uint32_t flags) : seg_num(n), flags(flags) {
|
||||
Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector<bool> &filters)
|
||||
: seg_num(n), flags(flags), filters_(filters) {
|
||||
// [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog
|
||||
const std::array file_list = {
|
||||
(flags & REPLAY_FLAG_QCAMERA) || files.road_cam.isEmpty() ? files.qcamera : files.road_cam,
|
||||
@@ -161,7 +162,7 @@ void Segment::loadFile(int id, const std::string file) {
|
||||
frames[id] = std::make_unique<FrameReader>();
|
||||
success = frames[id]->load(file, flags & REPLAY_FLAG_NO_HW_DECODER, &abort_, local_cache, 20 * 1024 * 1024, 3);
|
||||
} else {
|
||||
log = std::make_unique<LogReader>();
|
||||
log = std::make_unique<LogReader>(filters_);
|
||||
success = log->load(file, &abort_, local_cache, 0, 3);
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include <QDateTime>
|
||||
#include <QFutureSynchronizer>
|
||||
@@ -55,7 +56,7 @@ class Segment : public QObject {
|
||||
Q_OBJECT
|
||||
|
||||
public:
|
||||
Segment(int n, const SegmentFile &files, uint32_t flags);
|
||||
Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector<bool> &filters = {});
|
||||
~Segment();
|
||||
inline bool isLoaded() const { return !loading_ && !abort_; }
|
||||
|
||||
@@ -73,4 +74,5 @@ protected:
|
||||
std::atomic<int> loading_ = 0;
|
||||
QFutureSynchronizer<void> synchronizer_;
|
||||
uint32_t flags;
|
||||
std::vector<bool> filters_;
|
||||
};
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#include <openssl/sha.h>
|
||||
|
||||
#include <cassert>
|
||||
#include <algorithm>
|
||||
#include <cmath>
|
||||
#include <cstdarg>
|
||||
#include <cstring>
|
||||
@@ -354,3 +355,26 @@ std::string sha256(const std::string &str) {
|
||||
SHA256_Final(hash, &sha256);
|
||||
return util::hexdump(hash, SHA256_DIGEST_LENGTH);
|
||||
}
|
||||
|
||||
// MonotonicBuffer
|
||||
|
||||
void *MonotonicBuffer::allocate(size_t bytes, size_t alignment) {
|
||||
assert(bytes > 0);
|
||||
void *p = std::align(alignment, bytes, current_buf, available);
|
||||
if (p == nullptr) {
|
||||
available = next_buffer_size = std::max(next_buffer_size, bytes);
|
||||
current_buf = buffers.emplace_back(std::aligned_alloc(alignment, next_buffer_size));
|
||||
next_buffer_size *= growth_factor;
|
||||
p = current_buf;
|
||||
}
|
||||
|
||||
current_buf = (char *)current_buf + bytes;
|
||||
available -= bytes;
|
||||
return p;
|
||||
}
|
||||
|
||||
MonotonicBuffer::~MonotonicBuffer() {
|
||||
for (auto buf : buffers) {
|
||||
free(buf);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <deque>
|
||||
#include <functional>
|
||||
#include <string>
|
||||
|
||||
@@ -20,6 +21,21 @@ void logMessage(ReplyMsgType type, const char* fmt, ...);
|
||||
#define rWarning(fmt, ...) ::logMessage(ReplyMsgType::Warning, fmt, ## __VA_ARGS__)
|
||||
#define rError(fmt, ...) ::logMessage(ReplyMsgType::Critical , fmt, ## __VA_ARGS__)
|
||||
|
||||
class MonotonicBuffer {
|
||||
public:
|
||||
MonotonicBuffer(size_t initial_size) : next_buffer_size(initial_size) {}
|
||||
~MonotonicBuffer();
|
||||
void *allocate(size_t bytes, size_t alignment = 16ul);
|
||||
void deallocate(void *p) {}
|
||||
|
||||
private:
|
||||
void *current_buf = nullptr;
|
||||
size_t next_buffer_size = 0;
|
||||
size_t available = 0;
|
||||
std::deque<void *> buffers;
|
||||
static constexpr float growth_factor = 1.5;
|
||||
};
|
||||
|
||||
std::string sha256(const std::string &str);
|
||||
void precise_nano_sleep(int64_t nanoseconds);
|
||||
std::string decompressBZ2(const std::string &in, std::atomic<bool> *abort = nullptr);
|
||||
|
||||
Reference in New Issue
Block a user