mirror of
https://github.com/dragonpilot/dragonpilot.git
synced 2026-06-27 00:42:05 +08:00
replay/logreader: handle abort in load (#23321)
* handle abort * handle abort in decompressBZ2
This commit is contained in:
@@ -56,15 +56,17 @@ bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool loca
|
||||
}
|
||||
|
||||
bool LogReader::load(const std::byte *data, size_t size, std::atomic<bool> *abort) {
|
||||
raw_ = decompressBZ2(data, size);
|
||||
raw_ = decompressBZ2(data, size, abort);
|
||||
if (raw_.empty()) {
|
||||
std::cout << "failed to decompress log" << std::endl;
|
||||
if (!(abort && *abort)) {
|
||||
std::cout << "failed to decompress log" << std::endl;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));
|
||||
while (words.size() > 0) {
|
||||
while (words.size() > 0 && !(abort && *abort)) {
|
||||
|
||||
#ifdef HAS_MEMORY_RESOURCE
|
||||
Event *evt = new (mbr_) Event(words);
|
||||
@@ -91,11 +93,14 @@ bool LogReader::load(const std::byte *data, size_t size, std::atomic<bool> *abor
|
||||
}
|
||||
} catch (const kj::Exception &e) {
|
||||
std::cout << "failed to parse log : " << e.getDescription().cStr() << std::endl;
|
||||
if (events.empty()) return false;
|
||||
|
||||
std::cout << "read " << events.size() << " events from corrupt log" << std::endl;
|
||||
if (!events.empty()) {
|
||||
std::cout << "read " << events.size() << " events from corrupt log" << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
std::sort(events.begin(), events.end(), Event::lessThan());
|
||||
return true;
|
||||
if (!events.empty() && !(abort && *abort)) {
|
||||
std::sort(events.begin(), events.end(), Event::lessThan());
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -192,11 +192,11 @@ bool httpDownload(const std::string &url, const std::string &file, size_t chunk_
|
||||
return httpDownload(url, of, chunk_size, size, abort);
|
||||
}
|
||||
|
||||
std::string decompressBZ2(const std::string &in) {
|
||||
return decompressBZ2((std::byte *)in.data(), in.size());
|
||||
std::string decompressBZ2(const std::string &in, std::atomic<bool> *abort) {
|
||||
return decompressBZ2((std::byte *)in.data(), in.size(), abort);
|
||||
}
|
||||
|
||||
std::string decompressBZ2(const std::byte *in, size_t in_size) {
|
||||
std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool> *abort) {
|
||||
if (in_size == 0) return {};
|
||||
|
||||
bz_stream strm = {};
|
||||
@@ -222,10 +222,10 @@ std::string decompressBZ2(const std::byte *in, size_t in_size) {
|
||||
if (bzerror == BZ_OK && strm.avail_in > 0 && strm.avail_out == 0) {
|
||||
out.resize(out.size() * 2);
|
||||
}
|
||||
} while (bzerror == BZ_OK);
|
||||
} while (bzerror == BZ_OK && !(abort && *abort));
|
||||
|
||||
BZ2_bzDecompressEnd(&strm);
|
||||
if (bzerror == BZ_STREAM_END) {
|
||||
if (bzerror == BZ_STREAM_END && !(abort && *abort)) {
|
||||
out.resize(strm.total_out_lo32);
|
||||
return out;
|
||||
}
|
||||
|
||||
@@ -5,8 +5,8 @@
|
||||
|
||||
std::string sha256(const std::string &str);
|
||||
void precise_nano_sleep(long sleep_ns);
|
||||
std::string decompressBZ2(const std::string &in);
|
||||
std::string decompressBZ2(const std::byte *in, size_t in_size);
|
||||
std::string decompressBZ2(const std::string &in, std::atomic<bool> *abort = nullptr);
|
||||
std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool> *abort = nullptr);
|
||||
void enableHttpLogging(bool enable);
|
||||
std::string getUrlWithoutQuery(const std::string &url);
|
||||
size_t getRemoteFileSize(const std::string &url);
|
||||
|
||||
Reference in New Issue
Block a user