From 5fc5ba6032e7fde0bc140f31b79fb0820f84f711 Mon Sep 17 00:00:00 2001 From: Finn Dane Date: Sun, 16 Apr 2023 00:17:04 +0200 Subject: [PATCH] major refactor: separated reading into a library --- CMakeLists.txt | 7 +- src/RdaReader/CMakeLists.txt | 7 ++ src/RdaReader/include/RdaReader.hpp | 44 +++++++ src/{ => RdaReader/src}/BucketedZstdData.cpp | 0 src/{ => RdaReader/src}/BucketedZstdData.hpp | 0 src/RdaReader/src/RdaReader.cpp | 62 ++++++++++ src/{ => RdaReader/src}/SharedIndex.cpp | 0 src/{ => RdaReader/src}/SharedIndex.hpp | 0 src/rdaExtractor/CMakeLists.txt | 6 + src/rdaExtractor/src/rdaExtractor.cpp | 80 +++++++++++++ src/spices.cpp | 114 ------------------- 11 files changed, 204 insertions(+), 116 deletions(-) create mode 100644 src/RdaReader/CMakeLists.txt create mode 100644 src/RdaReader/include/RdaReader.hpp rename src/{ => RdaReader/src}/BucketedZstdData.cpp (100%) rename src/{ => RdaReader/src}/BucketedZstdData.hpp (100%) create mode 100644 src/RdaReader/src/RdaReader.cpp rename src/{ => RdaReader/src}/SharedIndex.cpp (100%) rename src/{ => RdaReader/src}/SharedIndex.hpp (100%) create mode 100644 src/rdaExtractor/CMakeLists.txt create mode 100644 src/rdaExtractor/src/rdaExtractor.cpp delete mode 100644 src/spices.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 88c2165..07f636e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,9 @@ project(spices2) +cmake_minimum_required(VERSION 3.18) + set(CMAKE_CXX_STANDARD 20) -add_executable(spices src/spices.cpp src/SharedIndex.cpp src/BucketedZstdData.cpp) -target_link_libraries(spices zstd) \ No newline at end of file +add_subdirectory(src/RdaReader) + +add_subdirectory(src/rdaExtractor) diff --git a/src/RdaReader/CMakeLists.txt b/src/RdaReader/CMakeLists.txt new file mode 100644 index 0000000..7d8743a --- /dev/null +++ b/src/RdaReader/CMakeLists.txt @@ -0,0 +1,7 @@ +project(RdaReader) + +set(CMAKE_CXX_STANDARD 20) + +add_library(RdaReader src/RdaReader src/SharedIndex.cpp src/BucketedZstdData.cpp) +target_include_directories(RdaReader PUBLIC include) +target_link_libraries(RdaReader zstd) \ No newline at end of file diff --git a/src/RdaReader/include/RdaReader.hpp b/src/RdaReader/include/RdaReader.hpp new file mode 100644 index 0000000..4904f75 --- /dev/null +++ b/src/RdaReader/include/RdaReader.hpp @@ -0,0 +1,44 @@ +#ifndef RDAREADER_HPP +#define RDAREADER_HPP + +#include +#include +#include +#include +#include + +class RdaReader { + public: + RdaReader(); + RdaReader(std::function logger); + size_t readDataset( + const std::vector &datasetName, + const std::vector &sharedIndex, + const std::vector &rdas, + std::ostream &output + ); + size_t readDataset( + const std::vector &datasetName, + const std::vector &sharedIndex, + const std::vector &rdas, + std::ostream &output, + std::mutex &outputMutex + ); + + size_t readRda( + std::istream &rda, + uint64_t id, + std::ostream &output + ); + size_t readRda( + std::istream &rda, + uint64_t id, + std::ostream &output, + std::mutex &outputMutex + ); + private: + const std::function logger; + void log(const std::string &string); +}; + +#endif \ No newline at end of file diff --git a/src/BucketedZstdData.cpp b/src/RdaReader/src/BucketedZstdData.cpp similarity index 100% rename from src/BucketedZstdData.cpp rename to src/RdaReader/src/BucketedZstdData.cpp diff --git a/src/BucketedZstdData.hpp b/src/RdaReader/src/BucketedZstdData.hpp similarity index 100% rename from src/BucketedZstdData.hpp rename to src/RdaReader/src/BucketedZstdData.hpp diff --git a/src/RdaReader/src/RdaReader.cpp b/src/RdaReader/src/RdaReader.cpp new file mode 100644 index 0000000..d342329 --- /dev/null +++ b/src/RdaReader/src/RdaReader.cpp @@ -0,0 +1,62 @@ +#include "RdaReader.hpp" +#include "SharedIndex.hpp" +#include "BucketedZstdData.hpp" + +#include +#include +#include +#include + +void RdaReader::log(const std::string &string) { + if(logger) logger(string); +} + +RdaReader::RdaReader() : logger(nullptr) {} + +RdaReader::RdaReader(std::function logger) : logger(logger) {} + +size_t RdaReader::readRda(std::istream &rda, uint64_t id, std::ostream &output, std::mutex &outputMutex) { + log("Reading an rda\n"); + BucketedZstdData bucket(rda); + + if(std::optional>> data = bucket.getEntriesByID(id)) { + const std::lock_guard lock(outputMutex); + log("Writing " + std::to_string(data.value().size()) + " entries to output\n"); + for(const auto &entry : data.value()) { + output.write(entry.data(), entry.size()) << '\n'; + } + log("Done writing entries\n"); + return data.value().size(); + } + log("No entries found\n"); + return 0; +} + +size_t RdaReader::readRda(std::istream &rda, uint64_t id, std::ostream &output) { + std::mutex dummyMutex; + return readRda(rda, id, output, dummyMutex); +} + +size_t RdaReader::readDataset(const std::vector &datasetName, const std::vector &sharedIndex, const std::vector &rdas, std::ostream &output, std::mutex &outputMutex) { + log("Reading shared index... "); + SharedIndex sharedIndexReader(sharedIndex); + + if(std::optional id = sharedIndexReader.getID(datasetName)) { + log("Found ID: " + std::to_string(id.value()) + '\n'); + std::atomic_size_t totalEntries(0); + std::for_each( + std::execution::par, + rdas.begin(), + rdas.end(), + [this, &totalEntries, &id, &output, &outputMutex](std::istream * const &rda) {totalEntries += readRda(*rda, id.value(), output, outputMutex);} + ); + return totalEntries; + } + log("No entries found\n"); + return 0; +} + +size_t RdaReader::readDataset(const std::vector &datasetName, const std::vector &sharedIndex, const std::vector &rdas, std::ostream &output) { + std::mutex dummyMutex; + return readDataset(datasetName, sharedIndex, rdas, output, dummyMutex); +} \ No newline at end of file diff --git a/src/SharedIndex.cpp b/src/RdaReader/src/SharedIndex.cpp similarity index 100% rename from src/SharedIndex.cpp rename to src/RdaReader/src/SharedIndex.cpp diff --git a/src/SharedIndex.hpp b/src/RdaReader/src/SharedIndex.hpp similarity index 100% rename from src/SharedIndex.hpp rename to src/RdaReader/src/SharedIndex.hpp diff --git a/src/rdaExtractor/CMakeLists.txt b/src/rdaExtractor/CMakeLists.txt new file mode 100644 index 0000000..3e9c66f --- /dev/null +++ b/src/rdaExtractor/CMakeLists.txt @@ -0,0 +1,6 @@ +project(rdaExtractor) + +set(CMAKE_CXX_STANDARD 20) + +add_executable(RdaReaderExec src/rdaExtractor.cpp) +target_link_libraries(RdaReaderExec RdaReader) \ No newline at end of file diff --git a/src/rdaExtractor/src/rdaExtractor.cpp b/src/rdaExtractor/src/rdaExtractor.cpp new file mode 100644 index 0000000..508ecfa --- /dev/null +++ b/src/rdaExtractor/src/rdaExtractor.cpp @@ -0,0 +1,80 @@ +#include + +#include +#include +#include + +#ifdef _WIN32 +#include +#define IS_REDIRECTED !(_isatty(_fileno(stdout))) + +#elif __unix__ +#include +#define IS_REDIRECTED !(isatty(fileno(stdout))) + +#else +#warning "Redirection cannot be checked, will always asume to be redirected" +#define IS_REDIRECTED true + +#endif + +std::mutex cerrMutex; + +void threadedLog(const std::string &input) { + std::lock_guard lock(cerrMutex); + std::cerr << input << std::flush; +} + +std::optional> readSharedIndex(const std::filesystem::path &filePath) { + std::ifstream sharedIndexFile(filePath, std::ios::binary | std::ios::ate); + if(!sharedIndexFile.good()) { + return {}; + } + std::vector sharedIndexData(sharedIndexFile.tellg()); + sharedIndexFile.seekg(0, std::ios::beg); + + sharedIndexFile.read(sharedIndexData.data(), sharedIndexData.size()); + return sharedIndexData; +} + +int main(int argc, char **argv) { + if(argc != 3 && argc != 4) { + std::cerr << "usage: datesetname rootdirectory [force write to terminal(true | false)]" << std::endl; + return 1; + } + + if(!IS_REDIRECTED && (argc != 4 || std::string(argv[3]) != "true")) { + std::cerr << "output is not redirected, specify you want to write to the terminal" << std::endl; + return 1; + } + + std::string datesetString = argv[1]; + std::vector datasetName(datesetString.begin(), datesetString.end()); + + const std::filesystem::path rootDirectory(argv[2]); + const std::filesystem::path sharedIndexPath(rootDirectory / "sharedindex.shi"); + + std::vector sharedIndexData; + if(auto data = readSharedIndex(sharedIndexPath)) { + sharedIndexData = data.value(); + } else { + std::cerr << "cannot find '" << sharedIndexPath << "'" << std::endl; + return 2; + } + + std::vector rdasIfstreams; + for(const std::filesystem::directory_entry &file : std::filesystem::directory_iterator(rootDirectory)) { + if(file.path().extension() == ".rda") rdasIfstreams.emplace_back(std::ifstream(file.path(), std::ios::binary)); + } + + std::vector rdaRefs; + for(std::ifstream &stream : rdasIfstreams) { + rdaRefs.push_back(&stream); + } + RdaReader rdaReader(threadedLog); + if(size_t totalRead = rdaReader.readDataset(datasetName, sharedIndexData, rdaRefs, std::cout)) { + std::cerr << "Found a total of " << totalRead << " entries" << std::endl; + } else { + std::cerr << "Cannot find '" << argv[1] << "' in the shared index" << std::endl; + } +} \ No newline at end of file diff --git a/src/spices.cpp b/src/spices.cpp deleted file mode 100644 index afed355..0000000 --- a/src/spices.cpp +++ /dev/null @@ -1,114 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include - -#include "SharedIndex.hpp" -#include "BucketedZstdData.hpp" - -#ifdef _WIN32 -#include -#define IS_REDIRECTED !(_isatty(_fileno(stdout))) - -#elif __unix__ -#include -#define IS_REDIRECTED !(isatty(fileno(stdout))) - -#else -#warning "Redirection cannot be checked, will always asume to be redirected" -#define IS_REDIRECTED true - -#endif - -std::mutex cerrMutex; - -#define CERRLOG(...) \ - { \ - std::lock_guard lock(cerrMutex); \ - fprintf(stderr, __VA_ARGS__); \ - } - - -std::optional> readSharedIndex(const std::filesystem::path &filePath) { - std::ifstream sharedIndexFile(filePath, std::ios::binary | std::ios::ate); - if(!sharedIndexFile.good()) { - return {}; - } - std::vector sharedIndexData(sharedIndexFile.tellg()); - sharedIndexFile.seekg(0, std::ios::beg); - - sharedIndexFile.read(sharedIndexData.data(), sharedIndexData.size()); - return sharedIndexData; -} - -void processRDA(const std::filesystem::directory_entry &file, std::atomic_size_t &totalEntries, uint64_t id, std::mutex &outputMutex) { - if(file.path().extension() == ".rda") { - std::string fileName(file.path().filename()); - std::ifstream fileStream(file.path(), std::ios::binary); - CERRLOG("Reading %s\n", fileName.c_str()); - BucketedZstdData bucket(fileStream); - - if(std::optional>> data = bucket.getEntriesByID(id)) { - totalEntries += data.value().size(); - const std::lock_guard lock(outputMutex); - CERRLOG("Writing %s\n", fileName.c_str()); - for(const auto &entry : data.value()) { - std::cout.write(entry.data(), entry.size()) << '\n'; - } - } else { - CERRLOG("No entries found in %s\n", fileName.c_str()); - } - } -} - -int main(int argc, char **argv) { - if(argc != 3 && argc != 4) { - std::cerr << "usage: datesetname rootdirectory [force write to terminal(true | false)]" << std::endl; - return 1; - } - - if(!IS_REDIRECTED && (argc != 4 || std::string(argv[3]) != "true")) { - std::cerr << "output is not redirected, specify you want to write to the terminal" << std::endl; - return 1; - } - - const std::filesystem::path rootDirectory(argv[2]); - const std::filesystem::path sharedIndexPath(rootDirectory / "sharedindex.shi"); - - std::cerr << "Loading shared index..." << std::flush; - std::vector sharedIndexData; - if(auto data = readSharedIndex(sharedIndexPath)) { - sharedIndexData = data.value(); - } else { - std::cerr << "cannot find '" << sharedIndexPath << "'" << std::endl; - return 2; - } - std::cerr << "Loaded shared index" << std::endl; - - std::string datesetString = argv[1]; - std::vector datasetName(datesetString.begin(), datesetString.end()); - - SharedIndex sharedIndex(sharedIndexData); - - std::cerr << "Fetching ID from shared index... " << std::flush; - std::atomic_size_t totalEntries(0); - if(std::optional id = sharedIndex.getID(datasetName)) { - std::cerr << "Found ID: " << id.value() << std::endl; - - std::mutex outputMutex; - std::for_each( - std::execution::par, - std::filesystem::begin(std::filesystem::directory_iterator(rootDirectory)), - std::filesystem::end(std::filesystem::directory_iterator()), - [&totalEntries, &id, &outputMutex](const auto& file) {processRDA(file, totalEntries, id.value(), outputMutex);} - ); - - std::cerr << "Found a total of " << totalEntries << " entries" << std::endl; - } else { - std::cerr << "Cannot find '" << argv[1] << "' in the shared index" << std::endl; - return 2; - } -} \ No newline at end of file