// BufferManager.cpp #include "BufferManager.h" BufferManager::BufferManager(uint32_t id, std::string name, RetentionPolicy defaultPolicy) : id_(id), name_(std::move(name)), defPolicy_(defaultPolicy) {} std::string BufferManager::name() const { std::shared_lock lk(mtx_); return name_; } void BufferManager::rename(const std::string& newName) { std::unique_lock lk(mtx_); name_ = newName; } void BufferManager::start() { std::unique_lock lk(mtx_); for (auto& kv : map_) kv.second->clear(); running_.store(true); } void BufferManager::stop() { running_.store(false); } void BufferManager::clearAll() { std::unique_lock lk(mtx_); for (auto& kv : map_) kv.second->clear(); } SampleBuffer& BufferManager::getOrCreate(uint32_t channelId) { auto it = map_.find(channelId); if (it != map_.end()) return *it->second; auto buf = std::make_unique(defPolicy_); auto& ref = *buf; map_[channelId] = std::move(buf); return ref; } void BufferManager::push(uint32_t channelId, int64_t ts_ms, double v) { if (!running_.load()) return; // ֹͣʱ¶ªÆúÐÂÊý¾Ý std::unique_lock lk(mtx_); getOrCreate(channelId).push(ts_ms, v); } std::vector BufferManager::getSince(uint32_t channelId, int64_t tsExclusive, size_t maxCount) const { std::shared_lock lk(mtx_); auto it = map_.find(channelId); if (it == map_.end()) return {}; return it->second->getSince(tsExclusive, maxCount); } std::vector BufferManager::getRange(uint32_t channelId, int64_t from_ts, int64_t to_ts, size_t maxCount) const { std::shared_lock lk(mtx_); auto it = map_.find(channelId); if (it == map_.end()) return {}; return it->second->getRange(from_ts, to_ts, maxCount); } void BufferManager::setDefaultPolicy(const RetentionPolicy& p, bool applyToExisting) { std::unique_lock lk(mtx_); defPolicy_ = p; if (applyToExisting) { for (auto& kv : map_) kv.second->setPolicy(p); } } void BufferManager::setPolicy(uint32_t channelId, const RetentionPolicy& p) { std::unique_lock lk(mtx_); getOrCreate(channelId).setPolicy(p); } RetentionPolicy BufferManager::getPolicy(uint32_t channelId) const { std::shared_lock lk(mtx_); auto it = map_.find(channelId); if (it == map_.end()) return defPolicy_; return it->second->getPolicy(); } std::vector BufferManager::listChannels() const { std::shared_lock lk(mtx_); std::vector ids; ids.reserve(map_.size()); for (auto& kv : map_) ids.push_back(kv.first); return ids; } void BufferManager::setChannelName(uint32_t channelId, const std::string& name) { std::unique_lock lk(mtx_); channelNames_[channelId] = name; // Ç¿ÖÆ´´½¨¶ÔÓ¦µÄ SampleBuffer£¨¿ÉÑ¡£© (void)getOrCreate(channelId); } std::string BufferManager::getChannelName(uint32_t channelId) const { std::shared_lock lk(mtx_); auto it = channelNames_.find(channelId); if (it != channelNames_.end() && !it->second.empty()) return it->second; // ĬÈÏÃû return "Ch-" + std::to_string(channelId); } std::vector BufferManager::listChannelInfos() const { std::shared_lock lk(mtx_); std::vector out; out.reserve(map_.size()); // ÒÔ¡°ÒÑÓлº³åÆ÷µÄͨµÀ¡±Îª»ù×¼Áгö£»ÈçÐèÁгöËùÓС°ÃüÃû¹ýµ«ÉÐδ²úÉúÊý¾Ý¡±µÄͨµÀ£¬Ò²¿É±éÀú channelNames_ ºÏ²¢ for (auto& kv : map_) { const uint32_t ch = kv.first; auto it = channelNames_.find(ch); out.push_back(ChannelInfo{ ch, (it != channelNames_.end() && !it->second.empty()) ? it->second : ("Ch-" + std::to_string(ch)) }); } return out; } BufferManager::ChannelStat BufferManager::getChannelStat(uint32_t channelId) const { std::shared_lock lk(mtx_); auto it = map_.find(channelId); if (it == map_.end()) return ChannelStat{ channelId, 0, 0, 0 }; const auto& buf = *it->second; return ChannelStat{ channelId, buf.size(), buf.earliestTs(), buf.latestTs() }; } std::vector BufferManager::listChannelStats() const { std::shared_lock lk(mtx_); std::vector out; out.reserve(map_.size()); for (auto& kv : map_) { const auto& buf = *kv.second; out.push_back(ChannelStat{ kv.first, buf.size(), buf.earliestTs(), buf.latestTs() }); } return out; }