// SampleBuffer.cpp
|
#include "SampleBuffer.h"
|
#include <algorithm>
|
|
void SampleBuffer::push(int64_t ts_ms, double v) {
|
std::unique_lock lk(mtx_);
|
if (!data_.empty() && ts_ms < data_.back().ts_ms) {
|
ts_ms = data_.back().ts_ms; // ¼òµ¥¾ÀÕýΪ·ÇµÝ¼õ£»Ñϸñ³¡¾°¿É¸ÄΪ¶ªÆú/²åÈëÅÅÐò
|
}
|
data_.push_back({ ts_ms, v });
|
pruneUnlocked(ts_ms); // Óõ±Ç°Ð´Èëʱ¼ä×÷Ϊ²Î¿¼Ê±¼ä
|
}
|
|
std::vector<Sample> SampleBuffer::getSince(int64_t tsExclusive, size_t maxCount) const {
|
std::shared_lock lk(mtx_);
|
std::vector<Sample> out;
|
if (data_.empty()) return out;
|
const size_t start = lowerBoundIndex(tsExclusive);
|
if (start >= data_.size()) return out;
|
const size_t n = std::min(maxCount, data_.size() - start);
|
out.reserve(n);
|
for (size_t i = 0; i < n; ++i) out.push_back(data_[start + i]);
|
return out;
|
}
|
|
std::vector<Sample> SampleBuffer::getRange(int64_t from_ts, int64_t to_ts, size_t maxCount) const {
|
if (to_ts < from_ts) return {};
|
std::shared_lock lk(mtx_);
|
std::vector<Sample> out;
|
if (data_.empty()) return out;
|
const size_t L = lowerBoundInclusive(from_ts);
|
const size_t R = upperBoundInclusive(to_ts);
|
if (L >= R) return out;
|
const size_t n = std::min(maxCount, R - L);
|
out.reserve(n);
|
for (size_t i = 0; i < n; ++i) out.push_back(data_[L + i]);
|
return out;
|
}
|
|
size_t SampleBuffer::size() const { std::shared_lock lk(mtx_); return data_.size(); }
|
bool SampleBuffer::empty() const { std::shared_lock lk(mtx_); return data_.empty(); }
|
int64_t SampleBuffer::latestTs() const {
|
std::shared_lock lk(mtx_);
|
return data_.empty() ? 0 : data_.back().ts_ms;
|
}
|
void SampleBuffer::clear() {
|
std::unique_lock lk(mtx_);
|
data_.clear();
|
}
|
|
void SampleBuffer::setPolicy(const RetentionPolicy& p) {
|
std::unique_lock lk(mtx_);
|
policy_ = p;
|
// Á¢¼´°´Ð²ßÂÔÇåÒ»´Î£¨ÒÔ¡°µ±Ç°×îРts¡±Îª²Î¿¼£©
|
pruneUnlocked(data_.empty() ? 0 : data_.back().ts_ms);
|
}
|
RetentionPolicy SampleBuffer::getPolicy() const {
|
std::shared_lock lk(mtx_);
|
return policy_;
|
}
|
|
void SampleBuffer::pruneUnlocked(int64_t ref_now_ms) {
|
switch (policy_.mode) {
|
case RetainMode::ByCount:
|
if (policy_.maxSamples > 0) {
|
while (data_.size() > policy_.maxSamples) data_.pop_front();
|
}
|
break;
|
case RetainMode::ByRollingAge: {
|
if (policy_.maxAge.count() > 0) {
|
const int64_t cutoff = ref_now_ms - policy_.maxAge.count();
|
while (!data_.empty() && data_.front().ts_ms < cutoff) data_.pop_front();
|
}
|
break;
|
}
|
case RetainMode::ByAbsoluteRange: {
|
const bool valid = (policy_.absTo >= policy_.absFrom);
|
if (valid) {
|
// ¶ªÆú´°¿ÚÖ®ÍâµÄÊý¾Ý£¨Á½¶Ë¶¼²Ã£©
|
while (!data_.empty() && data_.front().ts_ms < policy_.absFrom) data_.pop_front();
|
while (!data_.empty() && data_.back().ts_ms > policy_.absTo) data_.pop_back();
|
}
|
break;
|
}
|
}
|
}
|
|
size_t SampleBuffer::lowerBoundIndex(int64_t tsExclusive) const {
|
size_t L = 0, R = data_.size();
|
while (L < R) {
|
size_t m = (L + R) >> 1;
|
if (data_[m].ts_ms <= tsExclusive) L = m + 1; else R = m;
|
}
|
return L; // µÚÒ»¸ö > tsExclusive
|
}
|
size_t SampleBuffer::lowerBoundInclusive(int64_t tsInclusive) const {
|
size_t L = 0, R = data_.size();
|
while (L < R) {
|
size_t m = (L + R) >> 1;
|
if (data_[m].ts_ms < tsInclusive) L = m + 1; else R = m;
|
}
|
return L; // µÚÒ»¸ö >= tsInclusive
|
}
|
size_t SampleBuffer::upperBoundInclusive(int64_t tsInclusive) const {
|
size_t L = 0, R = data_.size();
|
while (L < R) {
|
size_t m = (L + R) >> 1;
|
if (data_[m].ts_ms <= tsInclusive) L = m + 1; else R = m;
|
}
|
return L; // ×îºóÒ»¸ö <= tsInclusive µÄÏÂÒ»¸öλÖÃ
|
}
|
|
int64_t SampleBuffer::earliestTs() const {
|
std::shared_lock lk(mtx_);
|
return data_.empty() ? 0 : data_.front().ts_ms;
|
}
|