mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-06-23 06:52:07 +08:00
replay: improve segment download and merge (#22654)
* no-cache mode * fix test cases build error * space * don't create cache dir in no-cache mode * fix errors in test cases * no_local_cache_ * set the number of connections by chunk_size * use size_t instead of int64_t * add test case for no-cache mode * rename variables * fix SIGSEGV * cleanup * faster decompressBZ2 * always decompress bz2 * add test cases * prepare for python interface * fix test cases build error * continue * camera_replay: cache remote file * protected inheritance * single option name * TODO * test_case for LogReader&FrameReader * fix wrong require * test case for FileReader * cleanup test * test:fix wrong filename * check cached file's checksum * fix mkdir permissions err cleanup filereader * remove initialize libav network libraries. dd * abort all loading if one failed * cleanup tests * use threadpool to limit concurrent downloads * cache more segments * merge 3 segments for replay * one segment uses about 100M of memory * use segments_need_merge.size() * shutdown * fix stuck if exit replay before keyboard thread started * load one segment at a time * small cleanup * cleanup filereader * space * tiny cleanup * merge master * cleanup test cases * use util:create_directories * cleanup framereader
This commit is contained in:
@@ -21,8 +21,12 @@ else:
|
||||
if USE_FRAME_STREAM:
|
||||
cameras = ['cameras/camera_frame_stream.cc']
|
||||
else:
|
||||
libs += ['avutil', 'avcodec', 'avformat', 'swscale']
|
||||
cameras = ['cameras/camera_replay.cc', env.Object('camera-framereader', '#/selfdrive/ui/replay/framereader.cc')]
|
||||
libs += ['avutil', 'avcodec', 'avformat', 'swscale', 'bz2', 'ssl', 'curl', 'crypto']
|
||||
# TODO: import replay_lib from root SConstruct
|
||||
cameras = ['cameras/camera_replay.cc',
|
||||
env.Object('camera-util', '#/selfdrive/ui/replay/util.cc'),
|
||||
env.Object('camera-framereader', '#/selfdrive/ui/replay/framereader.cc'),
|
||||
env.Object('camera-filereader', '#/selfdrive/ui/replay/filereader.cc')]
|
||||
|
||||
if arch == "Darwin":
|
||||
del libs[libs.index('OpenCL')]
|
||||
|
||||
@@ -23,8 +23,7 @@ std::string get_url(std::string route_name, const std::string &camera, int segme
|
||||
}
|
||||
|
||||
void camera_init(VisionIpcServer *v, CameraState *s, int camera_id, unsigned int fps, cl_device_id device_id, cl_context ctx, VisionStreamType rgb_type, VisionStreamType yuv_type, const std::string &url) {
|
||||
// TODO: cache url file
|
||||
s->frame = new FrameReader();
|
||||
s->frame = new FrameReader(true);
|
||||
if (!s->frame->load(url)) {
|
||||
printf("failed to load stream from %s", url.c_str());
|
||||
assert(0);
|
||||
|
||||
@@ -28,4 +28,4 @@ env.Program(src, LIBS=libs)
|
||||
env.Program('bootlog.cc', LIBS=libs)
|
||||
|
||||
if GetOption('test'):
|
||||
env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc', env.Object('logger_util', '#/selfdrive/ui/replay/util.cc')], LIBS=[libs] + ['curl'])
|
||||
env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc', env.Object('logger_util', '#/selfdrive/ui/replay/util.cc')], LIBS=[libs] + ['curl', 'crypto'])
|
||||
|
||||
@@ -21,12 +21,8 @@ void verify_segment(const std::string &route_path, int segment, int max_segment,
|
||||
REQUIRE(!util::file_exists(segment_path + "/rlog.bz2.lock"));
|
||||
for (const char *fn : {"/rlog.bz2", "/qlog.bz2"}) {
|
||||
const std::string log_file = segment_path + fn;
|
||||
INFO(log_file);
|
||||
|
||||
std::ostringstream stream;
|
||||
bool ret = readBZ2File(log_file, stream);
|
||||
REQUIRE(ret);
|
||||
std::string log = stream.str();
|
||||
std::string log = decompressBZ2(util::read_file(log_file));
|
||||
REQUIRE(!log.empty());
|
||||
int event_cnt = 0, i = 0;
|
||||
kj::ArrayPtr<const capnp::word> words((capnp::word *)log.data(), log.size() / sizeof(capnp::word));
|
||||
while (words.size() > 0) {
|
||||
|
||||
@@ -112,7 +112,7 @@ if GetOption('extras'):
|
||||
if arch in ['x86_64', 'Darwin'] or GetOption('extras'):
|
||||
qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"]
|
||||
|
||||
replay_lib_src = ["replay/replay.cc", "replay/camera.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"]
|
||||
replay_lib_src = ["replay/replay.cc", "replay/camera.cc", "replay/filereader.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"]
|
||||
|
||||
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs)
|
||||
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'swscale'] + qt_libs
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
#include "selfdrive/ui/replay/filereader.h"
|
||||
|
||||
#include <sys/stat.h>
|
||||
|
||||
#include <cassert>
|
||||
#include <cmath>
|
||||
#include <fstream>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
|
||||
#include "selfdrive/common/util.h"
|
||||
#include "selfdrive/ui/replay/util.h"
|
||||
|
||||
std::string cacheFilePath(const std::string &url) {
|
||||
static std::string cache_path = [] {
|
||||
const std::string comma_cache = util::getenv("COMMA_CACHE", "/tmp/comma_download_cache/");
|
||||
util::create_directories(comma_cache, 0755);
|
||||
return comma_cache.back() == '/' ? comma_cache : comma_cache + "/";
|
||||
}();
|
||||
|
||||
return cache_path + sha256(getUrlWithoutQuery(url));;
|
||||
}
|
||||
|
||||
std::string FileReader::read(const std::string &file, std::atomic<bool> *abort) {
|
||||
const bool is_remote = file.find("https://") == 0;
|
||||
const std::string local_file = is_remote ? cacheFilePath(file) : file;
|
||||
std::string result;
|
||||
|
||||
if ((!is_remote || cache_to_local_) && util::file_exists(local_file)) {
|
||||
result = util::read_file(local_file);
|
||||
} else if (is_remote) {
|
||||
result = download(file, abort);
|
||||
if (cache_to_local_ && !result.empty()) {
|
||||
std::ofstream fs(local_file, fs.binary | fs.out);
|
||||
fs.write(result.data(), result.size());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
std::string FileReader::download(const std::string &url, std::atomic<bool> *abort) {
|
||||
std::string result;
|
||||
size_t remote_file_size = 0;
|
||||
for (int i = 0; i <= max_retries_ && !(abort && *abort); ++i) {
|
||||
if (i > 0) {
|
||||
std::cout << "download failed, retrying" << i << std::endl;
|
||||
}
|
||||
if (remote_file_size <= 0) {
|
||||
remote_file_size = getRemoteFileSize(url);
|
||||
}
|
||||
if (remote_file_size > 0 && !(abort && *abort)) {
|
||||
std::ostringstream oss;
|
||||
result.resize(remote_file_size);
|
||||
oss.rdbuf()->pubsetbuf(result.data(), result.size());
|
||||
int chunks = chunk_size_ > 0 ? std::min(1, (int)std::nearbyint(remote_file_size / (float)chunk_size_)) : 1;
|
||||
if (httpMultiPartDownload(url, oss, chunks, remote_file_size, abort)) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <string>
|
||||
|
||||
class FileReader {
|
||||
public:
|
||||
FileReader(bool cache_to_local, int chunk_size = -1, int max_retries = 3)
|
||||
: cache_to_local_(cache_to_local), chunk_size_(chunk_size), max_retries_(max_retries) {}
|
||||
virtual ~FileReader() {}
|
||||
std::string read(const std::string &file, std::atomic<bool> *abort = nullptr);
|
||||
|
||||
private:
|
||||
std::string download(const std::string &url, std::atomic<bool> *abort);
|
||||
int chunk_size_;
|
||||
int max_retries_;
|
||||
bool cache_to_local_;
|
||||
};
|
||||
|
||||
std::string cacheFilePath(const std::string &url);
|
||||
@@ -3,8 +3,11 @@
|
||||
#include <unistd.h>
|
||||
#include <cassert>
|
||||
#include <mutex>
|
||||
#include <sstream>
|
||||
|
||||
static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) {
|
||||
namespace {
|
||||
|
||||
int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) {
|
||||
std::mutex *mutex = (std::mutex *)*arg;
|
||||
switch (op) {
|
||||
case AV_LOCK_CREATE:
|
||||
@@ -22,38 +25,56 @@ static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct AVInitializer {
|
||||
AVInitializer() {
|
||||
int ret = av_lockmgr_register(ffmpeg_lockmgr_cb);
|
||||
assert(ret >= 0);
|
||||
av_register_all();
|
||||
avformat_network_init();
|
||||
}
|
||||
~AVInitializer() { avformat_network_deinit(); }
|
||||
};
|
||||
int readFunction(void *opaque, uint8_t *buf, int buf_size) {
|
||||
auto &iss = *reinterpret_cast<std::istringstream *>(opaque);
|
||||
iss.read(reinterpret_cast<char *>(buf), buf_size);
|
||||
return iss.gcount() ? iss.gcount() : AVERROR_EOF;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
FrameReader::FrameReader(bool local_cache, int chunk_size, int retries) : FileReader(local_cache, chunk_size, retries) {
|
||||
static std::once_flag once_flag;
|
||||
std::call_once(once_flag, [] {
|
||||
av_lockmgr_register(ffmpeg_lockmgr_cb);
|
||||
av_register_all();
|
||||
});
|
||||
|
||||
pFormatCtx_ = avformat_alloc_context();
|
||||
av_frame_ = av_frame_alloc();
|
||||
rgb_frame_ = av_frame_alloc();
|
||||
yuv_frame_ = av_frame_alloc();;
|
||||
|
||||
FrameReader::FrameReader() {
|
||||
static AVInitializer av_initializer;
|
||||
}
|
||||
|
||||
FrameReader::~FrameReader() {
|
||||
for (auto &f : frames_) {
|
||||
av_free_packet(&f.pkt);
|
||||
}
|
||||
if (pCodecCtx_) {
|
||||
avcodec_close(pCodecCtx_);
|
||||
avcodec_free_context(&pCodecCtx_);
|
||||
}
|
||||
if (pCodecCtx_) avcodec_free_context(&pCodecCtx_);
|
||||
if (pFormatCtx_) avformat_close_input(&pFormatCtx_);
|
||||
if (av_frame_) av_frame_free(&av_frame_);
|
||||
if (rgb_frame_) av_frame_free(&rgb_frame_);
|
||||
if (yuv_frame_) av_frame_free(&yuv_frame_);
|
||||
if (rgb_sws_ctx_) sws_freeContext(rgb_sws_ctx_);
|
||||
if (yuv_sws_ctx_) sws_freeContext(yuv_sws_ctx_);
|
||||
|
||||
if (avio_ctx_) {
|
||||
av_freep(&avio_ctx_->buffer);
|
||||
av_freep(&avio_ctx_);
|
||||
}
|
||||
}
|
||||
|
||||
bool FrameReader::load(const std::string &url) {
|
||||
pFormatCtx_ = avformat_alloc_context();
|
||||
bool FrameReader::load(const std::string &url, std::atomic<bool> *abort) {
|
||||
std::string content = read(url, abort);
|
||||
if (content.empty()) return false;
|
||||
|
||||
std::istringstream iss(content);
|
||||
const int avio_ctx_buffer_size = 64 * 1024;
|
||||
unsigned char *avio_ctx_buffer = (unsigned char *)av_malloc(avio_ctx_buffer_size);
|
||||
avio_ctx_ = avio_alloc_context(avio_ctx_buffer, avio_ctx_buffer_size, 0, &iss, readFunction, nullptr, nullptr);
|
||||
pFormatCtx_->pb = avio_ctx_;
|
||||
|
||||
pFormatCtx_->probesize = 10 * 1024 * 1024; // 10MB
|
||||
if (avformat_open_input(&pFormatCtx_, url.c_str(), NULL, NULL) != 0) {
|
||||
printf("error loading %s\n", url.c_str());
|
||||
@@ -75,10 +96,6 @@ bool FrameReader::load(const std::string &url) {
|
||||
ret = avcodec_open2(pCodecCtx_, pCodec, NULL);
|
||||
if (ret < 0) return false;
|
||||
|
||||
av_frame_ = av_frame_alloc();
|
||||
rgb_frame_ = av_frame_alloc();
|
||||
yuv_frame_ = av_frame_alloc();;
|
||||
|
||||
width = (pCodecCtxOrig->width + 3) & ~3;
|
||||
height = pCodecCtxOrig->height;
|
||||
rgb_sws_ctx_ = sws_getContext(pCodecCtxOrig->width, pCodecCtxOrig->height, AV_PIX_FMT_YUV420P,
|
||||
@@ -92,7 +109,7 @@ bool FrameReader::load(const std::string &url) {
|
||||
if (!yuv_sws_ctx_) return false;
|
||||
|
||||
frames_.reserve(60 * 20); // 20fps, one minute
|
||||
while (true) {
|
||||
while (!(abort && *abort)) {
|
||||
Frame &frame = frames_.emplace_back();
|
||||
int err = av_read_frame(pFormatCtx_, &frame.pkt);
|
||||
if (err < 0) {
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "selfdrive/ui/replay/filereader.h"
|
||||
|
||||
extern "C" {
|
||||
#include <libavcodec/avcodec.h>
|
||||
@@ -10,11 +11,11 @@ extern "C" {
|
||||
#include <libavutil/imgutils.h>
|
||||
}
|
||||
|
||||
class FrameReader {
|
||||
class FrameReader : protected FileReader {
|
||||
public:
|
||||
FrameReader();
|
||||
FrameReader(bool local_cache = false, int chunk_size = -1, int retries = 0);
|
||||
~FrameReader();
|
||||
bool load(const std::string &url);
|
||||
bool load(const std::string &url, std::atomic<bool> *abort = nullptr);
|
||||
bool get(int idx, uint8_t *rgb, uint8_t *yuv);
|
||||
int getRGBSize() const { return width * height * 3; }
|
||||
int getYUVSize() const { return width * height * 3 / 2; }
|
||||
@@ -39,4 +40,5 @@ private:
|
||||
AVCodecContext *pCodecCtx_ = nullptr;
|
||||
int key_frames_count_ = 0;
|
||||
bool valid_ = false;
|
||||
AVIOContext *avio_ctx_ = nullptr;
|
||||
};
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
#include "selfdrive/ui/replay/logreader.h"
|
||||
|
||||
#include <sstream>
|
||||
#include "selfdrive/common/util.h"
|
||||
#include <algorithm>
|
||||
#include "selfdrive/ui/replay/util.h"
|
||||
|
||||
Event::Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame) : reader(amsg), frame(frame) {
|
||||
@@ -27,7 +26,7 @@ Event::Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame) : reader(a
|
||||
|
||||
// class LogReader
|
||||
|
||||
LogReader::LogReader(size_t memory_pool_block_size) {
|
||||
LogReader::LogReader(bool local_cache, int chunk_size, int retries, size_t memory_pool_block_size) : FileReader(local_cache, chunk_size, retries) {
|
||||
#ifdef HAS_MEMORY_RESOURCE
|
||||
const size_t buf_size = sizeof(Event) * memory_pool_block_size;
|
||||
pool_buffer_ = ::operator new(buf_size);
|
||||
@@ -47,18 +46,9 @@ LogReader::~LogReader() {
|
||||
#endif
|
||||
}
|
||||
|
||||
bool LogReader::load(const std::string &file) {
|
||||
bool is_bz2 = file.rfind(".bz2") == file.length() - 4;
|
||||
if (is_bz2) {
|
||||
std::ostringstream stream;
|
||||
if (!readBZ2File(file, stream)) {
|
||||
LOGW("bz2 decompress failed");
|
||||
return false;
|
||||
}
|
||||
raw_ = stream.str();
|
||||
} else {
|
||||
raw_ = util::read_file(file);
|
||||
}
|
||||
bool LogReader::load(const std::string &file, std::atomic<bool> *abort) {
|
||||
raw_ = decompressBZ2(read(file, abort));
|
||||
if (raw_.empty()) return false;
|
||||
|
||||
kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));
|
||||
while (words.size() > 0) {
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
|
||||
#include "cereal/gen/cpp/log.capnp.h"
|
||||
#include "selfdrive/camerad/cameras/camera_common.h"
|
||||
#include "selfdrive/ui/replay/filereader.h"
|
||||
|
||||
const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam};
|
||||
const int MAX_CAMERAS = std::size(ALL_CAMERAS);
|
||||
@@ -45,11 +46,11 @@ public:
|
||||
bool frame;
|
||||
};
|
||||
|
||||
class LogReader {
|
||||
class LogReader : protected FileReader {
|
||||
public:
|
||||
LogReader(size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE);
|
||||
LogReader(bool local_cache = false, int chunk_size = -1, int retries = 0, size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE);
|
||||
~LogReader();
|
||||
bool load(const std::string &file);
|
||||
bool load(const std::string &file, std::atomic<bool> *abort = nullptr);
|
||||
|
||||
std::vector<Event*> events;
|
||||
|
||||
|
||||
@@ -1,20 +1,26 @@
|
||||
#include "selfdrive/ui/replay/replay.h"
|
||||
|
||||
#include <csignal>
|
||||
#include <iostream>
|
||||
#include <termios.h>
|
||||
|
||||
#include <QApplication>
|
||||
#include <QCommandLineParser>
|
||||
#include <QDebug>
|
||||
#include <QThread>
|
||||
#include <csignal>
|
||||
#include <iostream>
|
||||
|
||||
#include "selfdrive/ui/replay/replay.h"
|
||||
|
||||
const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36";
|
||||
struct termios oldt = {};
|
||||
Replay *replay = nullptr;
|
||||
|
||||
void sigHandler(int s) {
|
||||
std::signal(s, SIG_DFL);
|
||||
tcsetattr(STDIN_FILENO, TCSANOW, &oldt);
|
||||
if (oldt.c_lflag) {
|
||||
tcsetattr(STDIN_FILENO, TCSANOW, &oldt);
|
||||
}
|
||||
if (replay) {
|
||||
replay->stop();
|
||||
}
|
||||
qApp->quit();
|
||||
}
|
||||
|
||||
@@ -69,7 +75,7 @@ void keyboardThread(Replay *replay) {
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]){
|
||||
int main(int argc, char *argv[]) {
|
||||
QApplication app(argc, argv);
|
||||
std::signal(SIGINT, sigHandler);
|
||||
std::signal(SIGTERM, sigHandler);
|
||||
@@ -78,6 +84,7 @@ int main(int argc, char *argv[]){
|
||||
{"dcam", REPLAY_FLAG_DCAM, "load driver camera"},
|
||||
{"ecam", REPLAY_FLAG_ECAM, "load wide road camera"},
|
||||
{"no-loop", REPLAY_FLAG_NO_LOOP, "stop at the end of the route"},
|
||||
{"no-cache", REPLAY_FLAG_NO_FILE_CACHE, "turn off local cache"},
|
||||
};
|
||||
|
||||
QCommandLineParser parser;
|
||||
@@ -109,7 +116,7 @@ int main(int argc, char *argv[]){
|
||||
replay_flags |= flag;
|
||||
}
|
||||
}
|
||||
Replay *replay = new Replay(route, allow, block, nullptr, replay_flags, parser.value("data_dir"), &app);
|
||||
replay = new Replay(route, allow, block, nullptr, replay_flags, parser.value("data_dir"), &app);
|
||||
if (!replay->load()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -5,8 +5,8 @@
|
||||
|
||||
#include <capnp/dynamic.h>
|
||||
#include "cereal/services.h"
|
||||
#include "selfdrive/common/timing.h"
|
||||
#include "selfdrive/common/params.h"
|
||||
#include "selfdrive/common/timing.h"
|
||||
#include "selfdrive/hardware/hw.h"
|
||||
#include "selfdrive/ui/replay/util.h"
|
||||
|
||||
@@ -35,16 +35,21 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
|
||||
}
|
||||
|
||||
Replay::~Replay() {
|
||||
qDebug() << "shutdown: in progress...";
|
||||
exit_ = updating_events_ = true;
|
||||
if (stream_thread_) {
|
||||
stream_cv_.notify_one();
|
||||
stream_thread_->quit();
|
||||
stream_thread_->wait();
|
||||
}
|
||||
|
||||
stop();
|
||||
delete pm;
|
||||
delete events_;
|
||||
}
|
||||
|
||||
void Replay::stop() {
|
||||
if (stream_thread_ == nullptr) return;
|
||||
|
||||
qDebug() << "shutdown: in progress...";
|
||||
exit_ = updating_events_ = true;
|
||||
stream_cv_.notify_one();
|
||||
stream_thread_->quit();
|
||||
stream_thread_->wait();
|
||||
stream_thread_ = nullptr;
|
||||
|
||||
segments_.clear();
|
||||
camera_server_.reset(nullptr);
|
||||
qDebug() << "shutdown: done";
|
||||
@@ -91,6 +96,7 @@ void Replay::doSeek(int seconds, bool relative) {
|
||||
if (relative) {
|
||||
seconds += currentSeconds();
|
||||
}
|
||||
seconds = std::max(0, seconds);
|
||||
int seg = seconds / 60;
|
||||
if (segments_.find(seg) == segments_.end()) {
|
||||
qInfo() << "can't seek to" << seconds << "s, segment" << seg << "is invalid";
|
||||
@@ -134,26 +140,30 @@ void Replay::segmentLoadFinished(bool success) {
|
||||
void Replay::queueSegment() {
|
||||
if (segments_.empty()) return;
|
||||
|
||||
SegmentMap::iterator begin, cur, end;
|
||||
begin = cur = end = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first));
|
||||
// set fwd to 0 to just load the current segment when seeking to a new window.
|
||||
const int fwd = cur->second == nullptr ? 0 : FORWARD_SEGS;
|
||||
for (int i = 0; end != segments_.end() && i <= fwd; ++end, ++i) {
|
||||
auto &[n, seg] = *end;
|
||||
if (!seg) {
|
||||
seg = std::make_unique<Segment>(n, route_->at(n), hasFlag(REPLAY_FLAG_DCAM), hasFlag(REPLAY_FLAG_ECAM));
|
||||
QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
|
||||
qInfo() << "loading segment" << n << "...";
|
||||
SegmentMap::iterator cur, end;
|
||||
cur = end = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first));
|
||||
for (int i = 0; end != segments_.end() && i <= FORWARD_SEGS; ++i) {
|
||||
++end;
|
||||
}
|
||||
// load one segment at a time
|
||||
for (auto it = cur; it != end; ++it) {
|
||||
if (!it->second) {
|
||||
if (it == cur || std::prev(it)->second->isLoaded()) {
|
||||
auto &[n, seg] = *it;
|
||||
seg = std::make_unique<Segment>(n, route_->at(n), hasFlag(REPLAY_FLAG_DCAM), hasFlag(REPLAY_FLAG_ECAM), !hasFlag(REPLAY_FLAG_NO_FILE_CACHE));
|
||||
QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
|
||||
qInfo() << "loading segment" << n << "...";
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const auto &cur_segment = cur->second;
|
||||
enableHttpLogging(!cur_segment->isLoaded());
|
||||
|
||||
// merge the previous adjacent segment if it's loaded
|
||||
auto prev = segments_.find(cur_segment->seg_num - 1);
|
||||
if (prev != segments_.end() && prev->second && prev->second->isLoaded()) {
|
||||
begin = prev;
|
||||
auto begin = segments_.find(cur_segment->seg_num - 1);
|
||||
if (begin == segments_.end() || !(begin->second && begin->second->isLoaded())) {
|
||||
begin = cur;
|
||||
}
|
||||
mergeSegments(begin, end);
|
||||
|
||||
@@ -168,9 +178,9 @@ void Replay::queueSegment() {
|
||||
}
|
||||
|
||||
void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) {
|
||||
// segments must be merged in sequence.
|
||||
// merge 3 segments in sequence.
|
||||
std::vector<int> segments_need_merge;
|
||||
for (auto it = begin; it != end && it->second->isLoaded(); ++it) {
|
||||
for (auto it = begin; it != end && it->second->isLoaded() && segments_need_merge.size() < 3; ++it) {
|
||||
segments_need_merge.push_back(it->first);
|
||||
}
|
||||
|
||||
|
||||
@@ -5,13 +5,15 @@
|
||||
#include "selfdrive/ui/replay/camera.h"
|
||||
#include "selfdrive/ui/replay/route.h"
|
||||
|
||||
constexpr int FORWARD_SEGS = 2;
|
||||
// one segment uses about 100M of memory
|
||||
constexpr int FORWARD_SEGS = 5;
|
||||
|
||||
enum REPLAY_FLAGS {
|
||||
REPLAY_FLAG_NONE = 0x0000,
|
||||
REPLAY_FLAG_DCAM = 0x0002,
|
||||
REPLAY_FLAG_ECAM = 0x0004,
|
||||
REPLAY_FLAG_NO_LOOP = 0x0010,
|
||||
REPLAY_FLAG_NO_FILE_CACHE = 0x0020,
|
||||
};
|
||||
|
||||
class Replay : public QObject {
|
||||
@@ -23,6 +25,7 @@ public:
|
||||
~Replay();
|
||||
bool load();
|
||||
void start(int seconds = 0);
|
||||
void stop();
|
||||
void pause(bool pause);
|
||||
bool isPaused() const { return paused_; }
|
||||
inline bool hasFlag(REPLAY_FLAGS flag) { return flags_ & flag; };
|
||||
|
||||
@@ -1,15 +1,19 @@
|
||||
#include "selfdrive/ui/replay/route.h"
|
||||
|
||||
#include <QDir>
|
||||
#include <QEventLoop>
|
||||
#include <QJsonArray>
|
||||
#include <QJsonDocument>
|
||||
#include <QRegExp>
|
||||
#include <QtConcurrent>
|
||||
|
||||
#include "selfdrive/hardware/hw.h"
|
||||
#include "selfdrive/ui/qt/api.h"
|
||||
#include "selfdrive/ui/replay/util.h"
|
||||
|
||||
Route::Route(const QString &route, const QString &data_dir) : route_(parseRoute(route)), data_dir_(data_dir) {}
|
||||
Route::Route(const QString &route, const QString &data_dir) : data_dir_(data_dir) {
|
||||
route_ = parseRoute(route);
|
||||
}
|
||||
|
||||
RouteIdentifier Route::parseRoute(const QString &str) {
|
||||
QRegExp rx(R"(^([a-z0-9]{16})([|_/])(\d{4}-\d{2}-\d{2}--\d{2}-\d{2}-\d{2})(?:(--|/)(\d*))?$)");
|
||||
@@ -41,7 +45,7 @@ bool Route::loadFromServer() {
|
||||
|
||||
bool Route::loadFromJson(const QString &json) {
|
||||
QRegExp rx(R"(\/(\d+)\/)");
|
||||
for (const auto &value : QJsonDocument::fromJson(json.trimmed().toUtf8()).object()) {
|
||||
for (const auto &value : QJsonDocument::fromJson(json.trimmed().toUtf8()).object()) {
|
||||
for (const auto &url : value.toArray()) {
|
||||
QString url_str = url.toString();
|
||||
if (rx.indexIn(url_str) != -1) {
|
||||
@@ -86,10 +90,7 @@ void Route::addFileToSegment(int n, const QString &file) {
|
||||
|
||||
// class Segment
|
||||
|
||||
Segment::Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam) : seg_num(n) {
|
||||
static std::once_flag once_flag;
|
||||
std::call_once(once_flag, [=]() { if (!CACHE_DIR.exists()) QDir().mkdir(CACHE_DIR.absolutePath()); });
|
||||
|
||||
Segment::Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam, bool local_cache) : seg_num(n) {
|
||||
// [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog
|
||||
const QString file_list[] = {
|
||||
files.road_cam.isEmpty() ? files.qcamera : files.road_cam,
|
||||
@@ -100,71 +101,34 @@ Segment::Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam
|
||||
for (int i = 0; i < std::size(file_list); i++) {
|
||||
if (!file_list[i].isEmpty()) {
|
||||
loading_++;
|
||||
QThread *t = new QThread();
|
||||
QObject::connect(t, &QThread::started, [=]() { loadFile(i, file_list[i].toStdString()); });
|
||||
loading_threads_.emplace_back(t)->start();
|
||||
synchronizer_.addFuture(QtConcurrent::run([=] { loadFile(i, file_list[i].toStdString(), local_cache); }));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Segment::~Segment() {
|
||||
aborting_ = true;
|
||||
for (QThread *t : loading_threads_) {
|
||||
if (t->isRunning()) {
|
||||
t->quit();
|
||||
t->wait();
|
||||
}
|
||||
delete t;
|
||||
}
|
||||
disconnect();
|
||||
abort_ = true;
|
||||
synchronizer_.setCancelOnWait(true);
|
||||
synchronizer_.waitForFinished();
|
||||
}
|
||||
|
||||
void Segment::loadFile(int id, const std::string file) {
|
||||
const bool is_remote = file.find("https://") == 0;
|
||||
const std::string local_file = is_remote ? cacheFilePath(file) : file;
|
||||
bool file_ready = util::file_exists(local_file);
|
||||
|
||||
if (!file_ready && is_remote) {
|
||||
file_ready = downloadFile(id, file, local_file);
|
||||
void Segment::loadFile(int id, const std::string file, bool local_cache) {
|
||||
bool success = false;
|
||||
if (id < MAX_CAMERAS) {
|
||||
frames[id] = std::make_unique<FrameReader>(local_cache, 20 * 1024 * 1024, 3);
|
||||
success = frames[id]->load(file, &abort_);
|
||||
} else {
|
||||
log = std::make_unique<LogReader>(local_cache, -1, 3);
|
||||
success = log->load(file, &abort_);
|
||||
}
|
||||
|
||||
if (!aborting_ && file_ready) {
|
||||
if (id < MAX_CAMERAS) {
|
||||
frames[id] = std::make_unique<FrameReader>();
|
||||
success_ = success_ && frames[id]->load(local_file);
|
||||
} else {
|
||||
std::string decompressed = cacheFilePath(local_file + ".decompressed");
|
||||
if (!util::file_exists(decompressed)) {
|
||||
std::ofstream ostrm(decompressed, std::ios::binary);
|
||||
readBZ2File(local_file, ostrm);
|
||||
}
|
||||
log = std::make_unique<LogReader>();
|
||||
success_ = success_ && log->load(decompressed);
|
||||
}
|
||||
if (!success) {
|
||||
// abort all loading jobs.
|
||||
abort_ = true;
|
||||
}
|
||||
|
||||
if (!aborting_ && --loading_ == 0) {
|
||||
emit loadFinished(success_);
|
||||
if (--loading_ == 0) {
|
||||
emit loadFinished(!abort_);
|
||||
}
|
||||
}
|
||||
|
||||
bool Segment::downloadFile(int id, const std::string &url, const std::string local_file) {
|
||||
bool ret = false;
|
||||
int retries = 0;
|
||||
while (!aborting_) {
|
||||
ret = httpMultiPartDownload(url, local_file, id < MAX_CAMERAS ? 3 : 1, &aborting_);
|
||||
if (ret || aborting_) break;
|
||||
|
||||
if (++retries > max_retries_) {
|
||||
qInfo() << "download failed after retries" << max_retries_;
|
||||
break;
|
||||
}
|
||||
qInfo() << "download failed, retrying" << retries;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::string Segment::cacheFilePath(const std::string &file) {
|
||||
QString url_no_query = QUrl(file.c_str()).toString(QUrl::RemoveQuery);
|
||||
QString sha256 = QCryptographicHash::hash(url_no_query.toUtf8(), QCryptographicHash::Sha256).toHex();
|
||||
return CACHE_DIR.filePath(sha256 + "." + QFileInfo(url_no_query).suffix()).toStdString();
|
||||
}
|
||||
|
||||
@@ -1,14 +1,10 @@
|
||||
#pragma once
|
||||
|
||||
#include <QDir>
|
||||
#include <QThread>
|
||||
#include <QFutureSynchronizer>
|
||||
|
||||
#include "selfdrive/common/util.h"
|
||||
#include "selfdrive/ui/replay/framereader.h"
|
||||
#include "selfdrive/ui/replay/logreader.h"
|
||||
|
||||
const QDir CACHE_DIR(util::getenv("COMMA_CACHE", "/tmp/comma_download_cache/").c_str());
|
||||
|
||||
struct RouteIdentifier {
|
||||
QString dongle_id;
|
||||
QString timestamp;
|
||||
@@ -49,9 +45,9 @@ class Segment : public QObject {
|
||||
Q_OBJECT
|
||||
|
||||
public:
|
||||
Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam);
|
||||
Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam, bool local_cache);
|
||||
~Segment();
|
||||
inline bool isLoaded() const { return !loading_ && success_; }
|
||||
inline bool isLoaded() const { return !loading_ && !abort_; }
|
||||
|
||||
const int seg_num = 0;
|
||||
std::unique_ptr<LogReader> log;
|
||||
@@ -61,12 +57,9 @@ signals:
|
||||
void loadFinished(bool success);
|
||||
|
||||
protected:
|
||||
void loadFile(int id, const std::string file);
|
||||
bool downloadFile(int id, const std::string &url, const std::string local_file);
|
||||
std::string cacheFilePath(const std::string &file);
|
||||
void loadFile(int id, const std::string file, bool local_cache);
|
||||
|
||||
std::atomic<bool> success_ = true, aborting_ = false;
|
||||
std::atomic<bool> abort_ = false;
|
||||
std::atomic<int> loading_ = 0;
|
||||
std::vector<QThread*> loading_threads_;
|
||||
const int max_retries_ = 3;
|
||||
QFutureSynchronizer<void> synchronizer_;
|
||||
};
|
||||
|
||||
@@ -1,31 +1,37 @@
|
||||
#include <QCryptographicHash>
|
||||
#include <QDebug>
|
||||
#include <QEventLoop>
|
||||
#include <fstream>
|
||||
#include <sstream>
|
||||
|
||||
#include "catch2/catch.hpp"
|
||||
#include "selfdrive/common/util.h"
|
||||
#include "selfdrive/ui/replay/replay.h"
|
||||
#include "selfdrive/ui/replay/util.h"
|
||||
|
||||
const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36";
|
||||
std::string sha_256(const QString &dat) {
|
||||
return QString(QCryptographicHash::hash(dat.toUtf8(), QCryptographicHash::Sha256).toHex()).toStdString();
|
||||
}
|
||||
const std::string TEST_RLOG_URL = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/rlog.bz2";
|
||||
const std::string TEST_RLOG_CHECKSUM = "5b966d4bb21a100a8c4e59195faeb741b975ccbe268211765efd1763d892bfb3";
|
||||
|
||||
TEST_CASE("httpMultiPartDownload") {
|
||||
char filename[] = "/tmp/XXXXXX";
|
||||
close(mkstemp(filename));
|
||||
|
||||
const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/rlog.bz2";
|
||||
SECTION("5 connections") {
|
||||
REQUIRE(httpMultiPartDownload(stream_url, filename, 5));
|
||||
std::string content;
|
||||
auto file_size = getRemoteFileSize(TEST_RLOG_URL);
|
||||
REQUIRE(file_size > 0);
|
||||
SECTION("5 connections, download to file") {
|
||||
std::ofstream of(filename, of.binary | of.out);
|
||||
REQUIRE(httpMultiPartDownload(TEST_RLOG_URL, of, 5, file_size));
|
||||
content = util::read_file(filename);
|
||||
}
|
||||
SECTION("1 connection") {
|
||||
REQUIRE(httpMultiPartDownload(stream_url, filename, 1));
|
||||
SECTION("5 connection, download to buffer") {
|
||||
std::ostringstream oss;
|
||||
content.resize(file_size);
|
||||
oss.rdbuf()->pubsetbuf(content.data(), content.size());
|
||||
REQUIRE(httpMultiPartDownload(TEST_RLOG_URL, oss, 5, file_size));
|
||||
}
|
||||
std::string content = util::read_file(filename);
|
||||
REQUIRE(content.size() == 9112651);
|
||||
std::string checksum = sha_256(QString::fromStdString(content));
|
||||
REQUIRE(checksum == "e44edfbb545abdddfd17020ced2b18b6ec36506152267f32b6a8e3341f8126d6");
|
||||
REQUIRE(sha256(content) == TEST_RLOG_CHECKSUM);
|
||||
}
|
||||
|
||||
int random_int(int min, int max) {
|
||||
@@ -35,13 +41,28 @@ int random_int(int min, int max) {
|
||||
return dist(rng);
|
||||
}
|
||||
|
||||
TEST_CASE("FileReader") {
|
||||
auto enable_local_cache = GENERATE(true, false);
|
||||
std::string cache_file = cacheFilePath(TEST_RLOG_URL);
|
||||
system(("rm " + cache_file + " -f").c_str());
|
||||
|
||||
FileReader reader(enable_local_cache);
|
||||
std::string content = reader.read(TEST_RLOG_URL);
|
||||
REQUIRE(sha256(content) == TEST_RLOG_CHECKSUM);
|
||||
if (enable_local_cache) {
|
||||
REQUIRE(sha256(util::read_file(cache_file)) == TEST_RLOG_CHECKSUM);
|
||||
} else {
|
||||
REQUIRE(util::file_exists(cache_file) == false);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("Segment") {
|
||||
Route demo_route(DEMO_ROUTE);
|
||||
REQUIRE(demo_route.load());
|
||||
REQUIRE(demo_route.segments().size() == 11);
|
||||
|
||||
QEventLoop loop;
|
||||
Segment segment(0, demo_route.at(0), false, false);
|
||||
Segment segment(0, demo_route.at(0), false, false, false);
|
||||
QObject::connect(&segment, &Segment::loadFinished, [&]() {
|
||||
REQUIRE(segment.isLoaded() == true);
|
||||
REQUIRE(segment.log != nullptr);
|
||||
@@ -68,8 +89,8 @@ TEST_CASE("Segment") {
|
||||
|
||||
// helper class for unit tests
|
||||
class TestReplay : public Replay {
|
||||
public:
|
||||
TestReplay(const QString &route) : Replay(route, {}, {}) {}
|
||||
public:
|
||||
TestReplay(const QString &route, uint8_t flags = REPLAY_FLAG_NO_FILE_CACHE) : Replay(route, {}, {}, nullptr, flags) {}
|
||||
void test_seek();
|
||||
void testSeekTo(int seek_to);
|
||||
};
|
||||
@@ -113,7 +134,7 @@ void TestReplay::test_seek() {
|
||||
QEventLoop loop;
|
||||
std::thread thread = std::thread([&]() {
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
testSeekTo(random_int(0, 5 * 60));
|
||||
testSeekTo(random_int(0, 3 * 60));
|
||||
}
|
||||
loop.quit();
|
||||
});
|
||||
@@ -122,7 +143,8 @@ void TestReplay::test_seek() {
|
||||
}
|
||||
|
||||
TEST_CASE("Replay") {
|
||||
TestReplay replay(DEMO_ROUTE);
|
||||
auto flag = GENERATE(REPLAY_FLAG_NO_FILE_CACHE, REPLAY_FLAG_NONE);
|
||||
TestReplay replay(DEMO_ROUTE, flag);
|
||||
REQUIRE(replay.load());
|
||||
replay.test_seek();
|
||||
}
|
||||
|
||||
+76
-59
@@ -1,42 +1,60 @@
|
||||
#include "selfdrive/ui/replay/util.h"
|
||||
|
||||
#include <array>
|
||||
#include <cassert>
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <numeric>
|
||||
|
||||
#include <bzlib.h>
|
||||
#include <curl/curl.h>
|
||||
#include <openssl/sha.h>
|
||||
|
||||
#include <cassert>
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <numeric>
|
||||
#include <sstream>
|
||||
|
||||
#include "selfdrive/common/timing.h"
|
||||
#include "selfdrive/common/util.h"
|
||||
|
||||
namespace {
|
||||
|
||||
static std::atomic<bool> enable_http_logging = false;
|
||||
|
||||
struct CURLGlobalInitializer {
|
||||
CURLGlobalInitializer() { curl_global_init(CURL_GLOBAL_DEFAULT); }
|
||||
~CURLGlobalInitializer() { curl_global_cleanup(); }
|
||||
};
|
||||
|
||||
struct MultiPartWriter {
|
||||
int64_t offset;
|
||||
int64_t end;
|
||||
int64_t written;
|
||||
FILE *fp;
|
||||
size_t offset;
|
||||
size_t end;
|
||||
size_t written;
|
||||
std::ostream *os;
|
||||
};
|
||||
|
||||
static size_t write_cb(char *data, size_t size, size_t count, void *userp) {
|
||||
size_t write_cb(char *data, size_t size, size_t count, void *userp) {
|
||||
MultiPartWriter *w = (MultiPartWriter *)userp;
|
||||
fseek(w->fp, w->offset, SEEK_SET);
|
||||
fwrite(data, size, count, w->fp);
|
||||
w->os->seekp(w->offset);
|
||||
size_t bytes = size * count;
|
||||
w->os->write(data, bytes);
|
||||
w->offset += bytes;
|
||||
w->written += bytes;
|
||||
return bytes;
|
||||
}
|
||||
|
||||
static size_t dumy_write_cb(char *data, size_t size, size_t count, void *userp) { return size * count; }
|
||||
size_t dumy_write_cb(char *data, size_t size, size_t count, void *userp) { return size * count; }
|
||||
|
||||
int64_t getRemoteFileSize(const std::string &url) {
|
||||
std::string formattedDataSize(size_t size) {
|
||||
if (size < 1024) {
|
||||
return std::to_string(size) + " B";
|
||||
} else if (size < 1024 * 1024) {
|
||||
return util::string_format("%.2f KB", (float)size / 1024);
|
||||
} else {
|
||||
return util::string_format("%.2f MB", (float)size / (1024 * 1024));
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
size_t getRemoteFileSize(const std::string &url) {
|
||||
CURL *curl = curl_easy_init();
|
||||
if (!curl) return -1;
|
||||
|
||||
@@ -52,40 +70,26 @@ int64_t getRemoteFileSize(const std::string &url) {
|
||||
std::cout << "Download failed: error code: " << res << std::endl;
|
||||
}
|
||||
curl_easy_cleanup(curl);
|
||||
return res == CURLE_OK ? (int64_t)content_length : -1;
|
||||
return content_length > 0 ? content_length : 0;
|
||||
}
|
||||
|
||||
std::string formattedDataSize(size_t size) {
|
||||
if (size < 1024) {
|
||||
return std::to_string(size) + " B";
|
||||
} else if (size < 1024 * 1024) {
|
||||
return util::string_format("%.2f KB", (float)size / 1024);
|
||||
} else {
|
||||
return util::string_format("%.2f MB", (float)size / (1024 * 1024));
|
||||
}
|
||||
std::string getUrlWithoutQuery(const std::string &url) {
|
||||
size_t idx = url.find("?");
|
||||
return (idx == std::string::npos ? url : url.substr(0, idx));
|
||||
}
|
||||
|
||||
static std::atomic<bool> enable_http_logging = false;
|
||||
|
||||
void enableHttpLogging(bool enable) {
|
||||
enable_http_logging = enable;
|
||||
}
|
||||
|
||||
bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic<bool> *abort) {
|
||||
bool httpMultiPartDownload(const std::string &url, std::ostream &os, int parts, size_t content_length, std::atomic<bool> *abort) {
|
||||
static CURLGlobalInitializer curl_initializer;
|
||||
static std::mutex lock;
|
||||
static uint64_t total_written = 0, prev_total_written = 0;
|
||||
static double last_print_ts = 0;
|
||||
|
||||
int64_t content_length = getRemoteFileSize(url);
|
||||
if (content_length <= 0) return false;
|
||||
|
||||
// create a tmp sparse file
|
||||
const std::string tmp_file = target_file + ".tmp";
|
||||
FILE *fp = fopen(tmp_file.c_str(), "wb");
|
||||
assert(fp);
|
||||
fseek(fp, content_length - 1, SEEK_SET);
|
||||
fwrite("\0", 1, 1, fp);
|
||||
os.seekp(content_length - 1);
|
||||
os.write("\0", 1);
|
||||
|
||||
CURLM *cm = curl_multi_init();
|
||||
|
||||
@@ -94,8 +98,8 @@ bool httpMultiPartDownload(const std::string &url, const std::string &target_fil
|
||||
for (int i = 0; i < parts; ++i) {
|
||||
CURL *eh = curl_easy_init();
|
||||
writers[eh] = {
|
||||
.fp = fp,
|
||||
.offset = i * part_size,
|
||||
.os = &os,
|
||||
.offset = (size_t)(i * part_size),
|
||||
.end = i == parts - 1 ? content_length - 1 : (i + 1) * part_size - 1,
|
||||
};
|
||||
curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, write_cb);
|
||||
@@ -126,9 +130,7 @@ bool httpMultiPartDownload(const std::string &url, const std::string &target_fil
|
||||
if (enable_http_logging && last_print_ts > 0) {
|
||||
size_t average = (total_written - prev_total_written) / ((ts - last_print_ts) / 1000.);
|
||||
int progress = std::min<int>(100, 100.0 * (double)written / (double)content_length);
|
||||
|
||||
size_t idx = url.find("?");
|
||||
std::cout << "downloading " << (idx == std::string::npos ? url : url.substr(0, idx)) << " - " << progress << "% (" << formattedDataSize(average) << "/s)" << std::endl;
|
||||
std::cout << "downloading " << getUrlWithoutQuery(url) << " - " << progress << "% (" << formattedDataSize(average) << "/s)" << std::endl;
|
||||
}
|
||||
prev_total_written = total_written;
|
||||
last_print_ts = ts;
|
||||
@@ -160,32 +162,34 @@ bool httpMultiPartDownload(const std::string &url, const std::string &target_fil
|
||||
}
|
||||
|
||||
curl_multi_cleanup(cm);
|
||||
fclose(fp);
|
||||
|
||||
bool ret = complete == parts;
|
||||
ret = ret && ::rename(tmp_file.c_str(), target_file.c_str()) == 0;
|
||||
return ret;
|
||||
return complete == parts;
|
||||
}
|
||||
|
||||
bool readBZ2File(const std::string_view file, std::ostream &stream) {
|
||||
std::unique_ptr<FILE, decltype(&fclose)> f(fopen(file.data(), "r"), &fclose);
|
||||
if (!f) return false;
|
||||
std::string decompressBZ2(const std::string &in) {
|
||||
if (in.empty()) return {};
|
||||
|
||||
int bzerror = BZ_OK;
|
||||
BZFILE *bz_file = BZ2_bzReadOpen(&bzerror, f.get(), 0, 0, nullptr, 0);
|
||||
if (!bz_file) return false;
|
||||
bz_stream strm = {};
|
||||
int bzerror = BZ2_bzDecompressInit(&strm, 0, 0);
|
||||
assert(bzerror == BZ_OK);
|
||||
|
||||
std::array<char, 64 * 1024> buf;
|
||||
strm.next_in = (char *)in.data();
|
||||
strm.avail_in = in.size();
|
||||
std::string out(in.size() * 5, '\0');
|
||||
do {
|
||||
int size = BZ2_bzRead(&bzerror, bz_file, buf.data(), buf.size());
|
||||
if (bzerror == BZ_OK || bzerror == BZ_STREAM_END) {
|
||||
stream.write(buf.data(), size);
|
||||
strm.next_out = (char *)(&out[strm.total_out_lo32]);
|
||||
strm.avail_out = out.size() - strm.total_out_lo32;
|
||||
bzerror = BZ2_bzDecompress(&strm);
|
||||
if (bzerror == BZ_OK && strm.avail_in > 0 && strm.avail_out == 0) {
|
||||
out.resize(out.size() * 2);
|
||||
}
|
||||
} while (bzerror == BZ_OK);
|
||||
|
||||
bool success = (bzerror == BZ_STREAM_END);
|
||||
BZ2_bzReadClose(&bzerror, bz_file);
|
||||
return success;
|
||||
BZ2_bzDecompressEnd(&strm);
|
||||
if (bzerror == BZ_STREAM_END) {
|
||||
out.resize(strm.total_out_lo32);
|
||||
return out;
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
void precise_nano_sleep(long sleep_ns) {
|
||||
@@ -205,3 +209,16 @@ void precise_nano_sleep(long sleep_ns) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::string sha256(const std::string &str) {
|
||||
unsigned char hash[SHA256_DIGEST_LENGTH];
|
||||
SHA256_CTX sha256;
|
||||
SHA256_Init(&sha256);
|
||||
SHA256_Update(&sha256, str.c_str(), str.size());
|
||||
SHA256_Final(hash, &sha256);
|
||||
std::stringstream ss;
|
||||
for (int i = 0; i < SHA256_DIGEST_LENGTH; i++) {
|
||||
ss << std::hex << std::setw(2) << std::setfill('0') << (int)hash[i];
|
||||
}
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
@@ -4,7 +4,10 @@
|
||||
#include <ostream>
|
||||
#include <string>
|
||||
|
||||
std::string sha256(const std::string &str);
|
||||
void precise_nano_sleep(long sleep_ns);
|
||||
bool readBZ2File(const std::string_view file, std::ostream &stream);
|
||||
std::string decompressBZ2(const std::string &in);
|
||||
void enableHttpLogging(bool enable);
|
||||
bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic<bool> *abort = nullptr);
|
||||
std::string getUrlWithoutQuery(const std::string &url);
|
||||
size_t getRemoteFileSize(const std::string &url);
|
||||
bool httpMultiPartDownload(const std::string &url, std::ostream &os, int parts, size_t content_length, std::atomic<bool> *abort = nullptr);
|
||||
|
||||
Reference in New Issue
Block a user