// BufferManager.cpp
|
#include "BufferManager.h"
|
|
BufferManager::BufferManager(uint32_t id, std::string name, RetentionPolicy defaultPolicy)
|
: id_(id), name_(std::move(name)), defPolicy_(defaultPolicy) {}
|
|
std::string BufferManager::name() const {
|
std::shared_lock lk(mtx_);
|
return name_;
|
}
|
void BufferManager::rename(const std::string& newName) {
|
std::unique_lock lk(mtx_);
|
name_ = newName;
|
}
|
|
void BufferManager::start() {
|
std::unique_lock lk(mtx_);
|
for (auto& kv : map_) kv.second->clear();
|
running_.store(true);
|
}
|
void BufferManager::stop() {
|
running_.store(false);
|
}
|
void BufferManager::clearAll() {
|
std::unique_lock lk(mtx_);
|
for (auto& kv : map_) kv.second->clear();
|
}
|
|
SampleBuffer& BufferManager::getOrCreate(uint32_t channelId) {
|
auto it = map_.find(channelId);
|
if (it != map_.end()) return *it->second;
|
auto buf = std::make_unique<SampleBuffer>(defPolicy_);
|
auto& ref = *buf;
|
map_[channelId] = std::move(buf);
|
return ref;
|
}
|
|
void BufferManager::push(uint32_t channelId, int64_t ts_ms, double v) {
|
if (!running_.load()) return; // ֹͣʱ¶ªÆúÐÂÊý¾Ý
|
std::unique_lock lk(mtx_);
|
getOrCreate(channelId).push(ts_ms, v);
|
}
|
|
std::vector<Sample> BufferManager::getSince(uint32_t channelId, int64_t tsExclusive, size_t maxCount) const {
|
std::shared_lock lk(mtx_);
|
auto it = map_.find(channelId);
|
if (it == map_.end()) return {};
|
return it->second->getSince(tsExclusive, maxCount);
|
}
|
std::vector<Sample> BufferManager::getRange(uint32_t channelId, int64_t from_ts, int64_t to_ts, size_t maxCount) const {
|
std::shared_lock lk(mtx_);
|
auto it = map_.find(channelId);
|
if (it == map_.end()) return {};
|
return it->second->getRange(from_ts, to_ts, maxCount);
|
}
|
|
void BufferManager::setDefaultPolicy(const RetentionPolicy& p, bool applyToExisting) {
|
std::unique_lock lk(mtx_);
|
defPolicy_ = p;
|
if (applyToExisting) {
|
for (auto& kv : map_) kv.second->setPolicy(p);
|
}
|
}
|
void BufferManager::setPolicy(uint32_t channelId, const RetentionPolicy& p) {
|
std::unique_lock lk(mtx_);
|
getOrCreate(channelId).setPolicy(p);
|
}
|
RetentionPolicy BufferManager::getPolicy(uint32_t channelId) const {
|
std::shared_lock lk(mtx_);
|
auto it = map_.find(channelId);
|
if (it == map_.end()) return defPolicy_;
|
return it->second->getPolicy();
|
}
|
|
std::vector<uint32_t> BufferManager::listChannels() const {
|
std::shared_lock lk(mtx_);
|
std::vector<uint32_t> ids; ids.reserve(map_.size());
|
for (auto& kv : map_) ids.push_back(kv.first);
|
return ids;
|
}
|
|
void BufferManager::setChannelName(uint32_t channelId, const std::string& name) {
|
std::unique_lock lk(mtx_);
|
channelNames_[channelId] = name;
|
// Ç¿ÖÆ´´½¨¶ÔÓ¦µÄ SampleBuffer£¨¿ÉÑ¡£©
|
(void)getOrCreate(channelId);
|
}
|
|
std::string BufferManager::getChannelName(uint32_t channelId) const {
|
std::shared_lock lk(mtx_);
|
auto it = channelNames_.find(channelId);
|
if (it != channelNames_.end() && !it->second.empty()) return it->second;
|
// ĬÈÏÃû
|
return "Ch-" + std::to_string(channelId);
|
}
|
|
std::vector<BufferManager::ChannelInfo> BufferManager::listChannelInfos() const {
|
std::shared_lock lk(mtx_);
|
std::vector<ChannelInfo> out;
|
out.reserve(map_.size());
|
// ÒÔ¡°ÒÑÓлº³åÆ÷µÄͨµÀ¡±Îª»ù×¼Áгö£»ÈçÐèÁгöËùÓС°ÃüÃû¹ýµ«ÉÐδ²úÉúÊý¾Ý¡±µÄͨµÀ£¬Ò²¿É±éÀú channelNames_ ºÏ²¢
|
for (auto& kv : map_) {
|
const uint32_t ch = kv.first;
|
auto it = channelNames_.find(ch);
|
out.push_back(ChannelInfo{ ch, (it != channelNames_.end() && !it->second.empty())
|
? it->second
|
: ("Ch-" + std::to_string(ch)) });
|
}
|
return out;
|
}
|
|
BufferManager::ChannelStat BufferManager::getChannelStat(uint32_t channelId) const {
|
std::shared_lock lk(mtx_);
|
auto it = map_.find(channelId);
|
if (it == map_.end()) return ChannelStat{ channelId, 0, 0, 0 };
|
const auto& buf = *it->second;
|
return ChannelStat{ channelId, buf.size(), buf.earliestTs(), buf.latestTs() };
|
}
|
|
std::vector<BufferManager::ChannelStat> BufferManager::listChannelStats() const {
|
std::shared_lock lk(mtx_);
|
std::vector<ChannelStat> out; out.reserve(map_.size());
|
for (auto& kv : map_) {
|
const auto& buf = *kv.second;
|
out.push_back(ChannelStat{ kv.first, buf.size(), buf.earliestTs(), buf.latestTs() });
|
}
|
return out;
|
}
|