// SampleBuffer.cpp #include "SampleBuffer.h" #include void SampleBuffer::push(int64_t ts_ms, double v) { std::unique_lock lk(mtx_); if (!data_.empty() && ts_ms < data_.back().ts_ms) { ts_ms = data_.back().ts_ms; // ¼òµ¥¾ÀÕýΪ·ÇµÝ¼õ£»Ñϸñ³¡¾°¿É¸ÄΪ¶ªÆú/²åÈëÅÅÐò } data_.push_back({ ts_ms, v }); pruneUnlocked(ts_ms); // Óõ±Ç°Ð´Èëʱ¼ä×÷Ϊ²Î¿¼Ê±¼ä } std::vector SampleBuffer::getSince(int64_t tsExclusive, size_t maxCount) const { std::shared_lock lk(mtx_); std::vector out; if (data_.empty()) return out; const size_t start = lowerBoundIndex(tsExclusive); if (start >= data_.size()) return out; const size_t n = std::min(maxCount, data_.size() - start); out.reserve(n); for (size_t i = 0; i < n; ++i) out.push_back(data_[start + i]); return out; } std::vector SampleBuffer::getRange(int64_t from_ts, int64_t to_ts, size_t maxCount) const { if (to_ts < from_ts) return {}; std::shared_lock lk(mtx_); std::vector out; if (data_.empty()) return out; const size_t L = lowerBoundInclusive(from_ts); const size_t R = upperBoundInclusive(to_ts); if (L >= R) return out; const size_t n = std::min(maxCount, R - L); out.reserve(n); for (size_t i = 0; i < n; ++i) out.push_back(data_[L + i]); return out; } size_t SampleBuffer::size() const { std::shared_lock lk(mtx_); return data_.size(); } bool SampleBuffer::empty() const { std::shared_lock lk(mtx_); return data_.empty(); } int64_t SampleBuffer::latestTs() const { std::shared_lock lk(mtx_); return data_.empty() ? 0 : data_.back().ts_ms; } void SampleBuffer::clear() { std::unique_lock lk(mtx_); data_.clear(); } void SampleBuffer::setPolicy(const RetentionPolicy& p) { std::unique_lock lk(mtx_); policy_ = p; // Á¢¼´°´Ð²ßÂÔÇåÒ»´Î£¨ÒÔ¡°µ±Ç°×îРts¡±Îª²Î¿¼£© pruneUnlocked(data_.empty() ? 0 : data_.back().ts_ms); } RetentionPolicy SampleBuffer::getPolicy() const { std::shared_lock lk(mtx_); return policy_; } void SampleBuffer::pruneUnlocked(int64_t ref_now_ms) { switch (policy_.mode) { case RetainMode::ByCount: if (policy_.maxSamples > 0) { while (data_.size() > policy_.maxSamples) data_.pop_front(); } break; case RetainMode::ByRollingAge: { if (policy_.maxAge.count() > 0) { const int64_t cutoff = ref_now_ms - policy_.maxAge.count(); while (!data_.empty() && data_.front().ts_ms < cutoff) data_.pop_front(); } break; } case RetainMode::ByAbsoluteRange: { const bool valid = (policy_.absTo >= policy_.absFrom); if (valid) { // ¶ªÆú´°¿ÚÖ®ÍâµÄÊý¾Ý£¨Á½¶Ë¶¼²Ã£© while (!data_.empty() && data_.front().ts_ms < policy_.absFrom) data_.pop_front(); while (!data_.empty() && data_.back().ts_ms > policy_.absTo) data_.pop_back(); } break; } } } size_t SampleBuffer::lowerBoundIndex(int64_t tsExclusive) const { size_t L = 0, R = data_.size(); while (L < R) { size_t m = (L + R) >> 1; if (data_[m].ts_ms <= tsExclusive) L = m + 1; else R = m; } return L; // µÚÒ»¸ö > tsExclusive } size_t SampleBuffer::lowerBoundInclusive(int64_t tsInclusive) const { size_t L = 0, R = data_.size(); while (L < R) { size_t m = (L + R) >> 1; if (data_[m].ts_ms < tsInclusive) L = m + 1; else R = m; } return L; // µÚÒ»¸ö >= tsInclusive } size_t SampleBuffer::upperBoundInclusive(int64_t tsInclusive) const { size_t L = 0, R = data_.size(); while (L < R) { size_t m = (L + R) >> 1; if (data_[m].ts_ms <= tsInclusive) L = m + 1; else R = m; } return L; // ×îºóÒ»¸ö <= tsInclusive µÄÏÂÒ»¸öλÖà } int64_t SampleBuffer::earliestTs() const { std::shared_lock lk(mtx_); return data_.empty() ? 0 : data_.front().ts_ms; }