chenluhua1980
6 天以前 d400f022161ff47f02cd0ea95a5076d0187ecd4d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// BufferManager.cpp
#include "BufferManager.h"
 
BufferManager::BufferManager(uint32_t id, std::string name, RetentionPolicy defaultPolicy)
    : id_(id), name_(std::move(name)), defPolicy_(defaultPolicy) {}
 
std::string BufferManager::name() const {
    std::shared_lock lk(mtx_);
    return name_;
}
void BufferManager::rename(const std::string& newName) {
    std::unique_lock lk(mtx_);
    name_ = newName;
}
 
void BufferManager::start() {
    std::unique_lock lk(mtx_);
    for (auto& kv : map_) kv.second->clear();
    running_.store(true);
}
void BufferManager::stop() {
    running_.store(false);
}
void BufferManager::clearAll() {
    std::unique_lock lk(mtx_);
    for (auto& kv : map_) kv.second->clear();
}
 
SampleBuffer& BufferManager::getOrCreate(uint32_t channelId) {
    auto it = map_.find(channelId);
    if (it != map_.end()) return *it->second;
    auto buf = std::make_unique<SampleBuffer>(defPolicy_);
    auto& ref = *buf;
    map_[channelId] = std::move(buf);
    return ref;
}
 
void BufferManager::push(uint32_t channelId, int64_t ts_ms, double v) {
    if (!running_.load()) return; // Í£Ö¹Ê±¶ªÆúÐÂÊý¾Ý
    std::unique_lock lk(mtx_);
    getOrCreate(channelId).push(ts_ms, v);
}
 
std::vector<Sample> BufferManager::getSince(uint32_t channelId, int64_t tsExclusive, size_t maxCount) const {
    std::shared_lock lk(mtx_);
    auto it = map_.find(channelId);
    if (it == map_.end()) return {};
    return it->second->getSince(tsExclusive, maxCount);
}
std::vector<Sample> BufferManager::getRange(uint32_t channelId, int64_t from_ts, int64_t to_ts, size_t maxCount) const {
    std::shared_lock lk(mtx_);
    auto it = map_.find(channelId);
    if (it == map_.end()) return {};
    return it->second->getRange(from_ts, to_ts, maxCount);
}
 
void BufferManager::setDefaultPolicy(const RetentionPolicy& p, bool applyToExisting) {
    std::unique_lock lk(mtx_);
    defPolicy_ = p;
    if (applyToExisting) {
        for (auto& kv : map_) kv.second->setPolicy(p);
    }
}
void BufferManager::setPolicy(uint32_t channelId, const RetentionPolicy& p) {
    std::unique_lock lk(mtx_);
    getOrCreate(channelId).setPolicy(p);
}
RetentionPolicy BufferManager::getPolicy(uint32_t channelId) const {
    std::shared_lock lk(mtx_);
    auto it = map_.find(channelId);
    if (it == map_.end()) return defPolicy_;
    return it->second->getPolicy();
}
 
std::vector<uint32_t> BufferManager::listChannels() const {
    std::shared_lock lk(mtx_);
    std::vector<uint32_t> ids; ids.reserve(map_.size());
    for (auto& kv : map_) ids.push_back(kv.first);
    return ids;
}
 
void BufferManager::setChannelName(uint32_t channelId, const std::string& name) {
    std::unique_lock lk(mtx_);
    channelNames_[channelId] = name;
    // Ç¿ÖÆ´´½¨¶ÔÓ¦µÄ SampleBuffer£¨¿ÉÑ¡£©
    (void)getOrCreate(channelId);
}
 
std::string BufferManager::getChannelName(uint32_t channelId) const {
    std::shared_lock lk(mtx_);
    auto it = channelNames_.find(channelId);
    if (it != channelNames_.end() && !it->second.empty()) return it->second;
    // Ä¬ÈÏÃû
    return "Ch-" + std::to_string(channelId);
}
 
std::vector<BufferManager::ChannelInfo> BufferManager::listChannelInfos() const {
    std::shared_lock lk(mtx_);
    std::vector<ChannelInfo> out;
    out.reserve(map_.size());
    // ÒÔ¡°ÒÑÓлº³åÆ÷µÄͨµÀ¡±Îª»ù×¼Áгö£»ÈçÐèÁгöËùÓС°ÃüÃû¹ýµ«ÉÐδ²úÉúÊý¾Ý¡±µÄͨµÀ£¬Ò²¿É±éÀú channelNames_ ºÏ²¢
    for (auto& kv : map_) {
        const uint32_t ch = kv.first;
        auto it = channelNames_.find(ch);
        out.push_back(ChannelInfo{ ch, (it != channelNames_.end() && !it->second.empty())
                                        ? it->second
                                        : ("Ch-" + std::to_string(ch)) });
    }
    return out;
}
 
BufferManager::ChannelStat BufferManager::getChannelStat(uint32_t channelId) const {
    std::shared_lock lk(mtx_);
    auto it = map_.find(channelId);
    if (it == map_.end()) return ChannelStat{ channelId, 0, 0, 0 };
    const auto& buf = *it->second;
    return ChannelStat{ channelId, buf.size(), buf.earliestTs(), buf.latestTs() };
}
 
std::vector<BufferManager::ChannelStat> BufferManager::listChannelStats() const {
    std::shared_lock lk(mtx_);
    std::vector<ChannelStat> out; out.reserve(map_.size());
    for (auto& kv : map_) {
        const auto& buf = *kv.second;
        out.push_back(ChannelStat{ kv.first, buf.size(), buf.earliestTs(), buf.latestTs() });
    }
    return out;
}