major refactor: separated reading into a library

This commit is contained in:
2023-04-16 00:17:04 +02:00
parent 016640d392
commit 5fc5ba6032
11 changed files with 204 additions and 116 deletions
+7
View File
@@ -0,0 +1,7 @@
project(RdaReader)
set(CMAKE_CXX_STANDARD 20)
add_library(RdaReader src/RdaReader src/SharedIndex.cpp src/BucketedZstdData.cpp)
target_include_directories(RdaReader PUBLIC include)
target_link_libraries(RdaReader zstd)
+44
View File
@@ -0,0 +1,44 @@
#ifndef RDAREADER_HPP
#define RDAREADER_HPP
#include <optional>
#include <iostream>
#include <vector>
#include <mutex>
#include <functional>
class RdaReader {
public:
RdaReader();
RdaReader(std::function<void(const std::string &)> logger);
size_t readDataset(
const std::vector<char> &datasetName,
const std::vector<char> &sharedIndex,
const std::vector<std::istream *> &rdas,
std::ostream &output
);
size_t readDataset(
const std::vector<char> &datasetName,
const std::vector<char> &sharedIndex,
const std::vector<std::istream *> &rdas,
std::ostream &output,
std::mutex &outputMutex
);
size_t readRda(
std::istream &rda,
uint64_t id,
std::ostream &output
);
size_t readRda(
std::istream &rda,
uint64_t id,
std::ostream &output,
std::mutex &outputMutex
);
private:
const std::function<void(const std::string &)> logger;
void log(const std::string &string);
};
#endif
+60
View File
@@ -0,0 +1,60 @@
#include <iostream>
#include "BucketedZstdData.hpp"
#include <zstd.h>
#include <cstring>
constexpr int headerSize = sizeof(uint32_t);
constexpr int indexEntrySize = sizeof(uint64_t)*2;
BucketedZstdData::BucketedZstdData(std::istream &input) : input(input) {}
std::optional<std::vector<char>> BucketedZstdData::getDatasetWithId(std::uint32_t id) {
input.seekg(0, std::ios::beg);
uint32_t indexSize;
input.read((char *)&indexSize, sizeof(indexSize));
if(indexSize < id) return {};
//seek to index entry
input.seekg(sizeof(uint32_t) + indexEntrySize*id, std::ios::beg);
uint64_t offset, length;
input.read((char *)&offset, sizeof(offset));
input.read((char *)&length, sizeof(length));
if(length == 0) return {};
input.seekg(offset + headerSize + indexSize * indexEntrySize, std::ios::beg);
std::vector<char> inBuf(length);
input.read(inBuf.data(), inBuf.size());
std::vector<char> output(ZSTD_getFrameContentSize(inBuf.data(), inBuf.size()));
if(!ZSTD_isError(ZSTD_decompress(output.data(), output.size(), inBuf.data(), inBuf.size()))) {
return output;
}
return {};
}
std::optional<std::vector<std::vector<char>>> BucketedZstdData::getEntriesByID(std::uint32_t id) {
std::optional<std::vector<char>> rawData = getDatasetWithId(id);
if(!rawData.has_value()) return {};
const char *fileIndex = rawData.value().data();
std::vector<std::vector<char>> output;
uint32_t readId;
while(fileIndex < rawData.value().data() + rawData.value().size()) {
const uint32_t *readId = (uint32_t *)fileIndex;
fileIndex += sizeof(uint32_t);
if(*readId == id) {
std::vector<char> &object = output.emplace_back(std::vector<char>(*(uint32_t *)fileIndex));
memcpy(object.data(), fileIndex + sizeof(uint32_t), object.size());
}
fileIndex += *(uint32_t *)fileIndex + sizeof(uint32_t);
}
return output;
}
+18
View File
@@ -0,0 +1,18 @@
#ifndef BUCKETEDZSTDDATA_H
#define BUCKETEDZSTDDATA_H
#include <cstdint>
#include <vector>
#include <optional>
#include <fstream>
class BucketedZstdData {
public:
BucketedZstdData(std::istream &input);
std::optional<std::vector<char>> getDatasetWithId(std::uint32_t id);
std::optional<std::vector<std::vector<char>>> getEntriesByID(std::uint32_t id);
private:
std::istream &input;
};
#endif
+62
View File
@@ -0,0 +1,62 @@
#include "RdaReader.hpp"
#include "SharedIndex.hpp"
#include "BucketedZstdData.hpp"
#include <mutex>
#include <execution>
#include <algorithm>
#include <atomic>
void RdaReader::log(const std::string &string) {
if(logger) logger(string);
}
RdaReader::RdaReader() : logger(nullptr) {}
RdaReader::RdaReader(std::function<void(const std::string &)> logger) : logger(logger) {}
size_t RdaReader::readRda(std::istream &rda, uint64_t id, std::ostream &output, std::mutex &outputMutex) {
log("Reading an rda\n");
BucketedZstdData bucket(rda);
if(std::optional<std::vector<std::vector<char>>> data = bucket.getEntriesByID(id)) {
const std::lock_guard lock(outputMutex);
log("Writing " + std::to_string(data.value().size()) + " entries to output\n");
for(const auto &entry : data.value()) {
output.write(entry.data(), entry.size()) << '\n';
}
log("Done writing entries\n");
return data.value().size();
}
log("No entries found\n");
return 0;
}
size_t RdaReader::readRda(std::istream &rda, uint64_t id, std::ostream &output) {
std::mutex dummyMutex;
return readRda(rda, id, output, dummyMutex);
}
size_t RdaReader::readDataset(const std::vector<char> &datasetName, const std::vector<char> &sharedIndex, const std::vector<std::istream *> &rdas, std::ostream &output, std::mutex &outputMutex) {
log("Reading shared index... ");
SharedIndex sharedIndexReader(sharedIndex);
if(std::optional<std::uint64_t> id = sharedIndexReader.getID(datasetName)) {
log("Found ID: " + std::to_string(id.value()) + '\n');
std::atomic_size_t totalEntries(0);
std::for_each(
std::execution::par,
rdas.begin(),
rdas.end(),
[this, &totalEntries, &id, &output, &outputMutex](std::istream * const &rda) {totalEntries += readRda(*rda, id.value(), output, outputMutex);}
);
return totalEntries;
}
log("No entries found\n");
return 0;
}
size_t RdaReader::readDataset(const std::vector<char> &datasetName, const std::vector<char> &sharedIndex, const std::vector<std::istream *> &rdas, std::ostream &output) {
std::mutex dummyMutex;
return readDataset(datasetName, sharedIndex, rdas, output, dummyMutex);
}
+21
View File
@@ -0,0 +1,21 @@
#include "SharedIndex.hpp"
#include <cstring>
#include <iostream>
SharedIndex::SharedIndex(const std::vector<char> &data) : data(data) {}
std::optional<std::uint64_t> SharedIndex::getID(const std::vector<char> &datasetName) {
const char *fileIndex = data.data();
fileIndex += sizeof(uint32_t);
for(uint64_t entryN = 0;; ++entryN ) {
if(fileIndex >= data.data() + data.size()) return {};
if(*fileIndex == datasetName.size()) {
if(!std::memcmp(fileIndex + 1, datasetName.data(), datasetName.size())) return entryN;
}
fileIndex += *fileIndex + 1;
}
}
+16
View File
@@ -0,0 +1,16 @@
#ifndef SHAREDINDEX_HPP
#define SHAREDINDEX_HPP
#include <cstdint>
#include <vector>
#include <optional>
class SharedIndex {
public:
SharedIndex(const std::vector<char> &data);
std::optional<std::uint64_t> getID(const std::vector<char> &datasetName);
private:
const std::vector<char> &data;
};
#endif