chenluhua1980
7 天以前 ceb64b6612309fe384e096dcdc8b5a5e0dfe6cce
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// SampleBuffer.cpp
#include "SampleBuffer.h"
#include <algorithm>
 
void SampleBuffer::push(int64_t ts_ms, double v) {
    std::unique_lock lk(mtx_);
    if (!data_.empty() && ts_ms < data_.back().ts_ms) {
        ts_ms = data_.back().ts_ms; // ¼òµ¥¾ÀÕýΪ·ÇµÝ¼õ£»Ñϸñ³¡¾°¿É¸ÄΪ¶ªÆú/²åÈëÅÅÐò
    }
    data_.push_back({ ts_ms, v });
    pruneUnlocked(ts_ms); // Óõ±Ç°Ð´Èëʱ¼ä×÷Ϊ²Î¿¼Ê±¼ä
}
 
std::vector<Sample> SampleBuffer::getSince(int64_t tsExclusive, size_t maxCount) const {
    std::shared_lock lk(mtx_);
    std::vector<Sample> out;
    if (data_.empty()) return out;
    const size_t start = lowerBoundIndex(tsExclusive);
    if (start >= data_.size()) return out;
    const size_t n = std::min(maxCount, data_.size() - start);
    out.reserve(n);
    for (size_t i = 0; i < n; ++i) out.push_back(data_[start + i]);
    return out;
}
 
std::vector<Sample> SampleBuffer::getRange(int64_t from_ts, int64_t to_ts, size_t maxCount) const {
    if (to_ts < from_ts) return {};
    std::shared_lock lk(mtx_);
    std::vector<Sample> out;
    if (data_.empty()) return out;
    const size_t L = lowerBoundInclusive(from_ts);
    const size_t R = upperBoundInclusive(to_ts);
    if (L >= R) return out;
    const size_t n = std::min(maxCount, R - L);
    out.reserve(n);
    for (size_t i = 0; i < n; ++i) out.push_back(data_[L + i]);
    return out;
}
 
size_t SampleBuffer::size() const { std::shared_lock lk(mtx_); return data_.size(); }
bool   SampleBuffer::empty() const { std::shared_lock lk(mtx_); return data_.empty(); }
int64_t SampleBuffer::latestTs() const {
    std::shared_lock lk(mtx_);
    return data_.empty() ? 0 : data_.back().ts_ms;
}
void SampleBuffer::clear() {
    std::unique_lock lk(mtx_);
    data_.clear();
}
 
void SampleBuffer::setPolicy(const RetentionPolicy& p) {
    std::unique_lock lk(mtx_);
    policy_ = p;
    // Á¢¼´°´Ð²ßÂÔÇåÒ»´Î£¨ÒÔ¡°µ±Ç°×îРts¡±Îª²Î¿¼£©
    pruneUnlocked(data_.empty() ? 0 : data_.back().ts_ms);
}
RetentionPolicy SampleBuffer::getPolicy() const {
    std::shared_lock lk(mtx_);
    return policy_;
}
 
void SampleBuffer::pruneUnlocked(int64_t ref_now_ms) {
    switch (policy_.mode) {
    case RetainMode::ByCount:
        if (policy_.maxSamples > 0) {
            while (data_.size() > policy_.maxSamples) data_.pop_front();
        }
        break;
    case RetainMode::ByRollingAge: {
        if (policy_.maxAge.count() > 0) {
            const int64_t cutoff = ref_now_ms - policy_.maxAge.count();
            while (!data_.empty() && data_.front().ts_ms < cutoff) data_.pop_front();
        }
        break;
    }
    case RetainMode::ByAbsoluteRange: {
        const bool valid = (policy_.absTo >= policy_.absFrom);
        if (valid) {
            // ¶ªÆú´°¿ÚÖ®ÍâµÄÊý¾Ý£¨Á½¶Ë¶¼²Ã£©
            while (!data_.empty() && data_.front().ts_ms < policy_.absFrom) data_.pop_front();
            while (!data_.empty() && data_.back().ts_ms > policy_.absTo)   data_.pop_back();
        }
        break;
    }
    }
}
 
size_t SampleBuffer::lowerBoundIndex(int64_t tsExclusive) const {
    size_t L = 0, R = data_.size();
    while (L < R) {
        size_t m = (L + R) >> 1;
        if (data_[m].ts_ms <= tsExclusive) L = m + 1; else R = m;
    }
    return L; // µÚÒ»¸ö > tsExclusive
}
size_t SampleBuffer::lowerBoundInclusive(int64_t tsInclusive) const {
    size_t L = 0, R = data_.size();
    while (L < R) {
        size_t m = (L + R) >> 1;
        if (data_[m].ts_ms < tsInclusive) L = m + 1; else R = m;
    }
    return L; // µÚÒ»¸ö >= tsInclusive
}
size_t SampleBuffer::upperBoundInclusive(int64_t tsInclusive) const {
    size_t L = 0, R = data_.size();
    while (L < R) {
        size_t m = (L + R) >> 1;
        if (data_[m].ts_ms <= tsInclusive) L = m + 1; else R = m;
    }
    return L; // ×îºóÒ»¸ö <= tsInclusive µÄÏÂÒ»¸öλÖÃ
}
 
int64_t SampleBuffer::earliestTs() const {
    std::shared_lock lk(mtx_);
    return data_.empty() ? 0 : data_.front().ts_ms;
}