code cleanup & adds error message on not found in sharedindex

This commit is contained in:
Finn Dane 2023-04-15 15:22:37 +02:00
parent 2b8eede409
commit 12f70eb783

View File

@ -4,6 +4,10 @@
#include <execution> #include <execution>
#include <algorithm> #include <algorithm>
#include <mutex> #include <mutex>
#include <atomic>
#include "SharedIndex.hpp"
#include "BucketedZstdData.hpp"
#ifdef _WIN32 #ifdef _WIN32
#include <io.h> #include <io.h>
@ -27,8 +31,6 @@ std::mutex cerrMutex;
fprintf(stderr, __VA_ARGS__); \ fprintf(stderr, __VA_ARGS__); \
} }
#include "SharedIndex.hpp"
#include "BucketedZstdData.hpp"
std::optional<std::vector<char>> readSharedIndex(const std::filesystem::path &filePath) { std::optional<std::vector<char>> readSharedIndex(const std::filesystem::path &filePath) {
std::ifstream sharedIndexFile(filePath, std::ios::binary | std::ios::ate); std::ifstream sharedIndexFile(filePath, std::ios::binary | std::ios::ate);
@ -42,6 +44,23 @@ std::optional<std::vector<char>> readSharedIndex(const std::filesystem::path &fi
return sharedIndexData; return sharedIndexData;
} }
void processRDA(const std::filesystem::directory_entry &file, std::atomic_size_t &totalEntries, uint64_t id, std::mutex &outputMutex) {
if(file.path().extension() == ".rda") {
std::ifstream fileStream(file.path(), std::ios::binary);
CERRLOG("Reading %s\n", std::string(file.path().filename()).c_str());
BucketedZstdData bucket(fileStream);
if(std::optional<std::vector<std::vector<char>>> data = bucket.getEntriesByID(id)) {
totalEntries += data.value().size();
const std::lock_guard lock(outputMutex);
CERRLOG("Writing %s\n", std::string(file.path().filename()).c_str());
for(const auto &entry : data.value()) {
std::cout.write(entry.data(), entry.size()) << '\n';
}
}
}
}
int main(int argc, char **argv) { int main(int argc, char **argv) {
if(argc != 3 && argc != 4) { if(argc != 3 && argc != 4) {
std::cerr << "usage: subreddit rootdirectory [force write to terminal(true | false)]" << std::endl; std::cerr << "usage: subreddit rootdirectory [force write to terminal(true | false)]" << std::endl;
@ -58,13 +77,13 @@ int main(int argc, char **argv) {
std::cerr << "Loading shared index..." << std::flush; std::cerr << "Loading shared index..." << std::flush;
std::vector<char> sharedIndexData; std::vector<char> sharedIndexData;
if(auto data = readSharedIndex(sharedIndexPath); data.has_value()) { if(auto data = readSharedIndex(sharedIndexPath)) {
sharedIndexData.swap(data.value()); sharedIndexData = data.value();
} else { } else {
std::cerr << "cannot find '" << sharedIndexPath << "'" << std::endl; std::cerr << "cannot find '" << sharedIndexPath << "'" << std::endl;
return 1; return 2;
} }
std::cerr << "Loaded shared index" << std::endl; std::cerr << "Loaded shared index" << std::endl;
std::string datesetString = argv[1]; std::string datesetString = argv[1];
std::vector<char> datasetName(datesetString.begin(), datesetString.end()); std::vector<char> datasetName(datesetString.begin(), datesetString.end());
@ -72,34 +91,21 @@ int main(int argc, char **argv) {
SharedIndex sharedIndex(sharedIndexData); SharedIndex sharedIndex(sharedIndexData);
std::cerr << "Fetching ID from shared index... " << std::flush; std::cerr << "Fetching ID from shared index... " << std::flush;
size_t totalEntries = 0; std::atomic_size_t totalEntries(0);
if(auto id = sharedIndex.getID(datasetName)) { if(std::optional<std::uint64_t> id = sharedIndex.getID(datasetName)) {
std::cerr << "Found ID: " << id.value() << std::endl; std::cerr << "Found ID: " << id.value() << std::endl;
std::mutex outputMutex; std::mutex outputMutex;
std::for_each( std::for_each(
std::execution::par, std::execution::par,
std::filesystem::begin(std::filesystem::directory_iterator(rootDirectory)), std::filesystem::begin(std::filesystem::directory_iterator(rootDirectory)),
std::filesystem::end(std::filesystem::directory_iterator()), std::filesystem::end(std::filesystem::directory_iterator()),
[&totalEntries, &id, &outputMutex](const auto& file) [&totalEntries, &id, &outputMutex](const auto& file) {processRDA(file, totalEntries, id.value(), outputMutex);}
{ );
if(file.path().extension() == ".rda") {
std::ifstream fileStream(file.path(), std::ios::binary);
CERRLOG("Reading %s\n", std::string(file.path().filename()).c_str());
BucketedZstdData bucket(fileStream);
if(auto data = bucket.getEntriesByID(id.value()); data.has_value()) {
totalEntries += data.value().size();
const std::lock_guard lock(outputMutex);
CERRLOG("Writing %s\n", std::string(file.path().filename()).c_str());
for(const auto &entry : data.value()) {
std::cout.write(entry.data(), entry.size()) << '\n';
}
}
}
});
std::cerr << "Found a total of " << totalEntries << " entries" << std::endl; std::cerr << "Found a total of " << totalEntries << " entries" << std::endl;
} else {
std::cerr << "Cannot find '" << argv[1] << "' in the shared index" << std::endl;
return 2;
} }
} }