1.ProcessStart和ProcessEnd加调上层时加上SlotNo, 状态也关联到SlotNo, 因为多腔可能 并行工作。
2.加入曲线采集服务端到项目中。
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #pragma once |
| | | #include <cstdint> |
| | | namespace DAQCfg { |
| | | inline constexpr const char* Version = "1.0.1"; |
| | | inline constexpr uint32_t KickIfNoVersionSec = 5; |
| | | inline constexpr uint32_t RecvLoopIntervalMs = 5; |
| | | } |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | // BufferManager.cpp |
| | | #include "BufferManager.h" |
| | | |
| | | BufferManager::BufferManager(uint32_t id, std::string name, RetentionPolicy defaultPolicy) |
| | | : id_(id), name_(std::move(name)), defPolicy_(defaultPolicy) {} |
| | | |
| | | std::string BufferManager::name() const { |
| | | std::shared_lock lk(mtx_); |
| | | return name_; |
| | | } |
| | | void BufferManager::rename(const std::string& newName) { |
| | | std::unique_lock lk(mtx_); |
| | | name_ = newName; |
| | | } |
| | | |
| | | void BufferManager::start() { |
| | | std::unique_lock lk(mtx_); |
| | | for (auto& kv : map_) kv.second->clear(); |
| | | running_.store(true); |
| | | } |
| | | void BufferManager::stop() { |
| | | running_.store(false); |
| | | } |
| | | void BufferManager::clearAll() { |
| | | std::unique_lock lk(mtx_); |
| | | for (auto& kv : map_) kv.second->clear(); |
| | | } |
| | | |
| | | SampleBuffer& BufferManager::getOrCreate(uint32_t channelId) { |
| | | auto it = map_.find(channelId); |
| | | if (it != map_.end()) return *it->second; |
| | | auto buf = std::make_unique<SampleBuffer>(defPolicy_); |
| | | auto& ref = *buf; |
| | | map_[channelId] = std::move(buf); |
| | | return ref; |
| | | } |
| | | |
| | | void BufferManager::push(uint32_t channelId, int64_t ts_ms, double v) { |
| | | if (!running_.load()) return; // 忢æ¶ä¸¢å¼æ°æ°æ® |
| | | std::unique_lock lk(mtx_); |
| | | getOrCreate(channelId).push(ts_ms, v); |
| | | } |
| | | |
| | | std::vector<Sample> BufferManager::getSince(uint32_t channelId, int64_t tsExclusive, size_t maxCount) const { |
| | | std::shared_lock lk(mtx_); |
| | | auto it = map_.find(channelId); |
| | | if (it == map_.end()) return {}; |
| | | return it->second->getSince(tsExclusive, maxCount); |
| | | } |
| | | std::vector<Sample> BufferManager::getRange(uint32_t channelId, int64_t from_ts, int64_t to_ts, size_t maxCount) const { |
| | | std::shared_lock lk(mtx_); |
| | | auto it = map_.find(channelId); |
| | | if (it == map_.end()) return {}; |
| | | return it->second->getRange(from_ts, to_ts, maxCount); |
| | | } |
| | | |
| | | void BufferManager::setDefaultPolicy(const RetentionPolicy& p, bool applyToExisting) { |
| | | std::unique_lock lk(mtx_); |
| | | defPolicy_ = p; |
| | | if (applyToExisting) { |
| | | for (auto& kv : map_) kv.second->setPolicy(p); |
| | | } |
| | | } |
| | | void BufferManager::setPolicy(uint32_t channelId, const RetentionPolicy& p) { |
| | | std::unique_lock lk(mtx_); |
| | | getOrCreate(channelId).setPolicy(p); |
| | | } |
| | | RetentionPolicy BufferManager::getPolicy(uint32_t channelId) const { |
| | | std::shared_lock lk(mtx_); |
| | | auto it = map_.find(channelId); |
| | | if (it == map_.end()) return defPolicy_; |
| | | return it->second->getPolicy(); |
| | | } |
| | | |
| | | std::vector<uint32_t> BufferManager::listChannels() const { |
| | | std::shared_lock lk(mtx_); |
| | | std::vector<uint32_t> ids; ids.reserve(map_.size()); |
| | | for (auto& kv : map_) ids.push_back(kv.first); |
| | | return ids; |
| | | } |
| | | |
| | | void BufferManager::setChannelName(uint32_t channelId, const std::string& name) { |
| | | std::unique_lock lk(mtx_); |
| | | channelNames_[channelId] = name; |
| | | // 强å¶å建对åºç SampleBufferï¼å¯éï¼ |
| | | (void)getOrCreate(channelId); |
| | | } |
| | | |
| | | std::string BufferManager::getChannelName(uint32_t channelId) const { |
| | | std::shared_lock lk(mtx_); |
| | | auto it = channelNames_.find(channelId); |
| | | if (it != channelNames_.end() && !it->second.empty()) return it->second; |
| | | // é»è®¤å |
| | | return "Ch-" + std::to_string(channelId); |
| | | } |
| | | |
| | | std::vector<BufferManager::ChannelInfo> BufferManager::listChannelInfos() const { |
| | | std::shared_lock lk(mtx_); |
| | | std::vector<ChannelInfo> out; |
| | | out.reserve(map_.size()); |
| | | // 以âå·²æç¼å²å¨çééâ为åºåååºï¼å¦éååºææâå½åè¿ä½å°æªäº§çæ°æ®âçééï¼ä¹å¯éå channelNames_ åå¹¶ |
| | | for (auto& kv : map_) { |
| | | const uint32_t ch = kv.first; |
| | | auto it = channelNames_.find(ch); |
| | | out.push_back(ChannelInfo{ ch, (it != channelNames_.end() && !it->second.empty()) |
| | | ? it->second |
| | | : ("Ch-" + std::to_string(ch)) }); |
| | | } |
| | | return out; |
| | | } |
| | | |
| | | BufferManager::ChannelStat BufferManager::getChannelStat(uint32_t channelId) const { |
| | | std::shared_lock lk(mtx_); |
| | | auto it = map_.find(channelId); |
| | | if (it == map_.end()) return ChannelStat{ channelId, 0, 0, 0 }; |
| | | const auto& buf = *it->second; |
| | | return ChannelStat{ channelId, buf.size(), buf.earliestTs(), buf.latestTs() }; |
| | | } |
| | | |
| | | std::vector<BufferManager::ChannelStat> BufferManager::listChannelStats() const { |
| | | std::shared_lock lk(mtx_); |
| | | std::vector<ChannelStat> out; out.reserve(map_.size()); |
| | | for (auto& kv : map_) { |
| | | const auto& buf = *kv.second; |
| | | out.push_back(ChannelStat{ kv.first, buf.size(), buf.earliestTs(), buf.latestTs() }); |
| | | } |
| | | return out; |
| | | } |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #pragma once |
| | | #include "SampleBuffer.h" |
| | | #include <unordered_map> |
| | | #include <memory> |
| | | #include <shared_mutex> |
| | | #include <string> |
| | | #include <atomic> |
| | | #include <vector> |
| | | |
| | | class BufferManager { |
| | | public: |
| | | struct ChannelStat { |
| | | uint32_t id; |
| | | size_t size; |
| | | int64_t earliestTs; |
| | | int64_t latestTs; |
| | | }; |
| | | |
| | | public: |
| | | BufferManager(uint32_t id, std::string name, RetentionPolicy defaultPolicy = {}); |
| | | |
| | | // æ è¯ |
| | | uint32_t id() const { return id_; } |
| | | std::string name() const; |
| | | void rename(const std::string& newName); |
| | | |
| | | // ééæ§å¶ |
| | | void start(); |
| | | void stop(); |
| | | bool isRunning() const { return running_.load(); } |
| | | void clearAll(); |
| | | |
| | | // æ°æ®å读 |
| | | void push(uint32_t channelId, int64_t ts_ms, double v); |
| | | std::vector<Sample> getSince(uint32_t channelId, int64_t tsExclusive, size_t maxCount = 4096) const; |
| | | std::vector<Sample> getRange(uint32_t channelId, int64_t from_ts, int64_t to_ts, size_t maxCount = 4096) const; |
| | | |
| | | // çç¥ |
| | | void setDefaultPolicy(const RetentionPolicy& p, bool applyToExisting = false); |
| | | void setPolicy(uint32_t channelId, const RetentionPolicy& p); |
| | | RetentionPolicy getPolicy(uint32_t channelId) const; |
| | | |
| | | // éé管ç |
| | | std::vector<uint32_t> listChannels() const; |
| | | |
| | | // ===== æ°å¢ï¼ééï¼æ²çº¿ï¼åç§° ===== |
| | | void setChannelName(uint32_t channelId, const std::string& name); // 设置/æ´æ° |
| | | std::string getChannelName(uint32_t channelId) const; // è¥æªè®¾ç½®ï¼è¿å "Ch-<id>" |
| | | struct ChannelInfo { uint32_t id; std::string name; }; |
| | | std::vector<ChannelInfo> listChannelInfos() const; // æ¹ä¾¿ UI å表 |
| | | |
| | | std::vector<ChannelStat> listChannelStats() const; |
| | | // å个éé |
| | | ChannelStat getChannelStat(uint32_t channelId) const; |
| | | private: |
| | | SampleBuffer& getOrCreate(uint32_t channelId); |
| | | |
| | | mutable std::shared_mutex mtx_; |
| | | std::unordered_map<uint32_t, std::unique_ptr<SampleBuffer>> map_; |
| | | |
| | | // ===== æ°å¢ï¼ééå称表 ===== |
| | | std::unordered_map<uint32_t, std::string> channelNames_; |
| | | |
| | | uint32_t id_; |
| | | std::string name_; |
| | | RetentionPolicy defPolicy_; |
| | | std::atomic<bool> running_{ false }; |
| | | |
| | | |
| | | |
| | | |
| | | }; |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | // BufferRegistry.cpp |
| | | #include "BufferRegistry.h" |
| | | |
| | | BufferManager& BufferRegistry::getOrCreate(uint32_t managerId, const std::string& name, const RetentionPolicy& defPolicy) { |
| | | std::unique_lock lk(mtx_); |
| | | auto it = managers_.find(managerId); |
| | | if (it != managers_.end()) return *it->second; |
| | | auto bm = std::make_unique<BufferManager>(managerId, name, defPolicy); |
| | | auto& ref = *bm; |
| | | managers_[managerId] = std::move(bm); |
| | | return ref; |
| | | } |
| | | |
| | | std::vector<uint32_t> BufferRegistry::listManagers() const { |
| | | std::shared_lock lk(mtx_); |
| | | std::vector<uint32_t> ids; ids.reserve(managers_.size()); |
| | | for (auto& kv : managers_) ids.push_back(kv.first); |
| | | return ids; |
| | | } |
| | | |
| | | BufferManager* BufferRegistry::find(uint32_t managerId) { |
| | | std::shared_lock lk(mtx_); |
| | | auto it = managers_.find(managerId); |
| | | return (it == managers_.end()) ? nullptr : it->second.get(); |
| | | } |
| | | const BufferManager* BufferRegistry::find(uint32_t managerId) const { |
| | | std::shared_lock lk(mtx_); |
| | | auto it = managers_.find(managerId); |
| | | return (it == managers_.end()) ? nullptr : it->second.get(); |
| | | } |
| | | |
| | | void BufferRegistry::remove(uint32_t managerId) { |
| | | std::unique_lock lk(mtx_); |
| | | managers_.erase(managerId); |
| | | } |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | // BufferRegistry.h |
| | | #pragma once |
| | | #include "BufferManager.h" |
| | | #include <unordered_map> |
| | | #include <memory> |
| | | #include <shared_mutex> |
| | | |
| | | class BufferRegistry { |
| | | public: |
| | | // è·å/å建æå°æºå¨ç BufferManager |
| | | BufferManager& getOrCreate(uint32_t managerId, const std::string& name, const RetentionPolicy& defPolicy = {}); |
| | | |
| | | // æ¥è¯¢ |
| | | std::vector<uint32_t> listManagers() const; |
| | | BufferManager* find(uint32_t managerId); |
| | | const BufferManager* find(uint32_t managerId) const; |
| | | |
| | | // ç§»é¤ä¸ä¸ªç®¡çå¨ï¼å¯éï¼ |
| | | void remove(uint32_t managerId); |
| | | |
| | | private: |
| | | mutable std::shared_mutex mtx_; |
| | | std::unordered_map<uint32_t, std::unique_ptr<BufferManager>> managers_; |
| | | }; |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | // SampleBuffer.cpp |
| | | #include "SampleBuffer.h" |
| | | #include <algorithm> |
| | | |
| | | void SampleBuffer::push(int64_t ts_ms, double v) { |
| | | std::unique_lock lk(mtx_); |
| | | if (!data_.empty() && ts_ms < data_.back().ts_ms) { |
| | | ts_ms = data_.back().ts_ms; // ç®åçº æ£ä¸ºééåï¼ä¸¥æ ¼åºæ¯å¯æ¹ä¸ºä¸¢å¼/æå
¥æåº |
| | | } |
| | | data_.push_back({ ts_ms, v }); |
| | | pruneUnlocked(ts_ms); // ç¨å½ååå
¥æ¶é´ä½ä¸ºåèæ¶é´ |
| | | } |
| | | |
| | | std::vector<Sample> SampleBuffer::getSince(int64_t tsExclusive, size_t maxCount) const { |
| | | std::shared_lock lk(mtx_); |
| | | std::vector<Sample> out; |
| | | if (data_.empty()) return out; |
| | | const size_t start = lowerBoundIndex(tsExclusive); |
| | | if (start >= data_.size()) return out; |
| | | const size_t n = std::min(maxCount, data_.size() - start); |
| | | out.reserve(n); |
| | | for (size_t i = 0; i < n; ++i) out.push_back(data_[start + i]); |
| | | return out; |
| | | } |
| | | |
| | | std::vector<Sample> SampleBuffer::getRange(int64_t from_ts, int64_t to_ts, size_t maxCount) const { |
| | | if (to_ts < from_ts) return {}; |
| | | std::shared_lock lk(mtx_); |
| | | std::vector<Sample> out; |
| | | if (data_.empty()) return out; |
| | | const size_t L = lowerBoundInclusive(from_ts); |
| | | const size_t R = upperBoundInclusive(to_ts); |
| | | if (L >= R) return out; |
| | | const size_t n = std::min(maxCount, R - L); |
| | | out.reserve(n); |
| | | for (size_t i = 0; i < n; ++i) out.push_back(data_[L + i]); |
| | | return out; |
| | | } |
| | | |
| | | size_t SampleBuffer::size() const { std::shared_lock lk(mtx_); return data_.size(); } |
| | | bool SampleBuffer::empty() const { std::shared_lock lk(mtx_); return data_.empty(); } |
| | | int64_t SampleBuffer::latestTs() const { |
| | | std::shared_lock lk(mtx_); |
| | | return data_.empty() ? 0 : data_.back().ts_ms; |
| | | } |
| | | void SampleBuffer::clear() { |
| | | std::unique_lock lk(mtx_); |
| | | data_.clear(); |
| | | } |
| | | |
| | | void SampleBuffer::setPolicy(const RetentionPolicy& p) { |
| | | std::unique_lock lk(mtx_); |
| | | policy_ = p; |
| | | // ç«å³ææ°çç¥æ¸
䏿¬¡ï¼ä»¥âå½åææ° tsâ为åèï¼ |
| | | pruneUnlocked(data_.empty() ? 0 : data_.back().ts_ms); |
| | | } |
| | | RetentionPolicy SampleBuffer::getPolicy() const { |
| | | std::shared_lock lk(mtx_); |
| | | return policy_; |
| | | } |
| | | |
| | | void SampleBuffer::pruneUnlocked(int64_t ref_now_ms) { |
| | | switch (policy_.mode) { |
| | | case RetainMode::ByCount: |
| | | if (policy_.maxSamples > 0) { |
| | | while (data_.size() > policy_.maxSamples) data_.pop_front(); |
| | | } |
| | | break; |
| | | case RetainMode::ByRollingAge: { |
| | | if (policy_.maxAge.count() > 0) { |
| | | const int64_t cutoff = ref_now_ms - policy_.maxAge.count(); |
| | | while (!data_.empty() && data_.front().ts_ms < cutoff) data_.pop_front(); |
| | | } |
| | | break; |
| | | } |
| | | case RetainMode::ByAbsoluteRange: { |
| | | const bool valid = (policy_.absTo >= policy_.absFrom); |
| | | if (valid) { |
| | | // 丢å¼çªå£ä¹å¤çæ°æ®ï¼ä¸¤ç«¯é½è£ï¼ |
| | | while (!data_.empty() && data_.front().ts_ms < policy_.absFrom) data_.pop_front(); |
| | | while (!data_.empty() && data_.back().ts_ms > policy_.absTo) data_.pop_back(); |
| | | } |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | |
| | | size_t SampleBuffer::lowerBoundIndex(int64_t tsExclusive) const { |
| | | size_t L = 0, R = data_.size(); |
| | | while (L < R) { |
| | | size_t m = (L + R) >> 1; |
| | | if (data_[m].ts_ms <= tsExclusive) L = m + 1; else R = m; |
| | | } |
| | | return L; // 第ä¸ä¸ª > tsExclusive |
| | | } |
| | | size_t SampleBuffer::lowerBoundInclusive(int64_t tsInclusive) const { |
| | | size_t L = 0, R = data_.size(); |
| | | while (L < R) { |
| | | size_t m = (L + R) >> 1; |
| | | if (data_[m].ts_ms < tsInclusive) L = m + 1; else R = m; |
| | | } |
| | | return L; // 第ä¸ä¸ª >= tsInclusive |
| | | } |
| | | size_t SampleBuffer::upperBoundInclusive(int64_t tsInclusive) const { |
| | | size_t L = 0, R = data_.size(); |
| | | while (L < R) { |
| | | size_t m = (L + R) >> 1; |
| | | if (data_[m].ts_ms <= tsInclusive) L = m + 1; else R = m; |
| | | } |
| | | return L; // æåä¸ä¸ª <= tsInclusive çä¸ä¸ä¸ªä½ç½® |
| | | } |
| | | |
| | | int64_t SampleBuffer::earliestTs() const { |
| | | std::shared_lock lk(mtx_); |
| | | return data_.empty() ? 0 : data_.front().ts_ms; |
| | | } |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | // SampleBuffer.h |
| | | #pragma once |
| | | #include "../core/DataTypes.h" |
| | | #include <deque> |
| | | #include <vector> |
| | | #include <shared_mutex> |
| | | #include <chrono> |
| | | |
| | | enum class RetainMode { |
| | | ByCount, // ææ ·æ¬æ°ä¸é |
| | | ByRollingAge, // ææ»å¨æ¶é´çªå£ï¼maxAgeï¼ |
| | | ByAbsoluteRange // æç»å¯¹æ¶é´æ®µ [absFrom, absTo] |
| | | }; |
| | | |
| | | struct RetentionPolicy { |
| | | RetainMode mode = RetainMode::ByCount; |
| | | size_t maxSamples = 100000; // ByCount ç¨ |
| | | std::chrono::milliseconds maxAge{ std::chrono::hours(1) }; // ByRollingAge ç¨ |
| | | int64_t absFrom = 0; // ByAbsoluteRange ç¨ |
| | | int64_t absTo = 0; // ByAbsoluteRange ç¨ï¼absTo>=absFrom ææï¼ |
| | | }; |
| | | |
| | | class SampleBuffer { |
| | | public: |
| | | explicit SampleBuffer(RetentionPolicy policy = {}) : policy_(policy) {} |
| | | |
| | | // åï¼ææ¶é´æ³é墿¨å
¥ï¼ä¹±åºä¼è¢«ç®åçº æ£ä¸ºééåï¼ |
| | | void push(int64_t ts_ms, double v); |
| | | |
| | | // è¯»ï¼æâtsExclusive ä¹åâçæ°æ°æ® |
| | | std::vector<Sample> getSince(int64_t tsExclusive, size_t maxCount = 4096) const; |
| | | |
| | | // è¯»ï¼æåºé´ [from, to]ï¼å
å«è¾¹çï¼ |
| | | std::vector<Sample> getRange(int64_t from_ts, int64_t to_ts, size_t maxCount = 4096) const; |
| | | |
| | | // æ¥è¯¢ / ç»´æ¤ |
| | | size_t size() const; |
| | | bool empty() const; |
| | | int64_t latestTs() const; |
| | | void clear(); |
| | | |
| | | // é
ç½® |
| | | void setPolicy(const RetentionPolicy& p); |
| | | RetentionPolicy getPolicy() const; |
| | | |
| | | int64_t earliestTs() const; // æ æ°æ®æ¶è¿å 0 |
| | | |
| | | private: |
| | | void pruneUnlocked(int64_t ref_now_ms); // æçç¥æ¸
ç |
| | | size_t lowerBoundIndex(int64_t tsExclusive) const; // äºåï¼ç¬¬ä¸ä¸ª > tsExclusive |
| | | size_t lowerBoundInclusive(int64_t tsInclusive) const; // 第ä¸ä¸ª >= tsInclusive |
| | | size_t upperBoundInclusive(int64_t tsInclusive) const; // æåä¸ä¸ª <= tsInclusive çä¸ä¸ä¸ªä½ç½® |
| | | |
| | | mutable std::shared_mutex mtx_; |
| | | std::deque<Sample> data_; |
| | | RetentionPolicy policy_; |
| | | }; |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #include "Collector.h" |
| | | #include <iostream> |
| | | #include <chrono> |
| | | |
| | | #include "../DAQConfig.h" |
| | | #include "../core/ConnEvents.h" |
| | | using namespace DAQEvt; |
| | | |
| | | // åè®®ç¼è§£ç |
| | | #include "../proto/Protocol.h" |
| | | #include "../proto/ProtocolCodec.h" |
| | | |
| | | using namespace Proto; |
| | | |
| | | Collector::Collector() {} |
| | | |
| | | void Collector::startLoop(uint32_t intervalMs) { |
| | | if (running.load()) return; |
| | | running.store(true); |
| | | worker = std::thread([this, intervalMs]() { |
| | | while (running.load()) { |
| | | this->poll(); |
| | | std::this_thread::sleep_for(std::chrono::milliseconds(intervalMs)); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | void Collector::stopLoop() { |
| | | if (!running.load()) return; |
| | | running.store(false); |
| | | if (worker.joinable()) worker.join(); |
| | | } |
| | | |
| | | Collector::~Collector() { |
| | | stopLoop(); |
| | | disconnect(); |
| | | } |
| | | |
| | | std::vector<Collector::ClientSummary> Collector::getClientList() { |
| | | std::lock_guard<std::mutex> lk(mClients); |
| | | std::vector<ClientSummary> out; |
| | | out.reserve(clients.size()); |
| | | for (const auto& c : clients) { |
| | | out.push_back(ClientSummary{ c.ip, c.port, c.versionOk, c.sock }); |
| | | } |
| | | return out; |
| | | } |
| | | |
| | | void Collector::createServer(uint16_t port) { |
| | | if (socketComm.createServerSocket(port)) { |
| | | if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ServerListening), |
| | | "Collector server listening on port " + std::to_string(port)); |
| | | onConnectionEstablished(); |
| | | } |
| | | } |
| | | |
| | | void Collector::disconnect() { |
| | | { |
| | | std::lock_guard<std::mutex> lk(mClients); |
| | | for (auto& c : clients) { |
| | | socketComm.closeClient(c.sock); |
| | | if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false); |
| | | } |
| | | clients.clear(); |
| | | } |
| | | socketComm.closeSocket(); |
| | | if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ServerStopped), "Collector server stopped"); |
| | | onConnectionLost(); |
| | | } |
| | | |
| | | void Collector::poll() { |
| | | // 1) æ¥æ°å®¢æ·ç«¯ |
| | | while (true) { |
| | | SOCKET cs; std::string ip; uint16_t port; |
| | | if (!socketComm.acceptOne(cs, ip, port)) break; |
| | | |
| | | ClientInfo ci; |
| | | ci.sock = cs; ci.ip = ip; ci.port = port; |
| | | ci.tsConnected = std::chrono::steady_clock::now(); |
| | | |
| | | { |
| | | std::lock_guard<std::mutex> lk(mClients); |
| | | clients.push_back(ci); |
| | | } |
| | | |
| | | if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ClientAccepted), |
| | | "Client connected: " + ip + ":" + std::to_string(port)); |
| | | if (cbClientEvent) cbClientEvent(ip, port, /*connected*/true); |
| | | } |
| | | |
| | | // 2) 轮询å客æ·ç«¯ |
| | | auto now = std::chrono::steady_clock::now(); |
| | | std::lock_guard<std::mutex> lk(mClients); |
| | | for (size_t i = 0; i < clients.size(); ) { |
| | | auto& c = clients[i]; |
| | | |
| | | // 2a) çæ¬æ¡æè¶
æ¶ |
| | | if (!c.versionOk) { |
| | | auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - c.tsConnected).count(); |
| | | if (elapsed >= DAQCfg::KickIfNoVersionSec) { |
| | | if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::VersionTimeoutKick), |
| | | "Kick client (no version within 5s): " + c.ip + ":" + std::to_string(c.port)); |
| | | socketComm.closeClient(c.sock); |
| | | if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false); |
| | | clients.erase(clients.begin() + i); |
| | | continue; |
| | | } |
| | | } |
| | | |
| | | // 2b) æ¶æ°æ® |
| | | std::vector<uint8_t> buf; |
| | | bool peerClosed = false; |
| | | if (socketComm.recvFrom(c.sock, buf, peerClosed)) { |
| | | if (!buf.empty()) { |
| | | if (rawDumpEnabled && cbRaw) cbRaw(buf); |
| | | |
| | | // 帧ç»è£
|
| | | c.fr.push(buf); |
| | | std::vector<uint8_t> frame; |
| | | while (c.fr.nextFrame(frame)) { |
| | | if (rawDumpEnabled && cbRaw) cbRaw(frame); |
| | | handleClientData(c, frame); |
| | | } |
| | | } |
| | | } |
| | | else if (peerClosed) { |
| | | // å¯¹ç«¯ä¸»å¨æå¼ |
| | | if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ClientDisconnected), |
| | | "Client disconnected: " + c.ip + ":" + std::to_string(c.port)); |
| | | socketComm.closeClient(c.sock); |
| | | if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false); |
| | | clients.erase(clients.begin() + i); |
| | | continue; |
| | | } |
| | | |
| | | ++i; |
| | | } |
| | | } |
| | | |
| | | void Collector::sendSampleData(double sample) { |
| | | (void)sample; |
| | | #ifdef _DEBUG |
| | | if (cbStatus) cbStatus(0, "sendSampleData() no-op."); |
| | | #endif |
| | | } |
| | | |
| | | void Collector::sendWindowData(const std::vector<std::string>& fields) { |
| | | (void)fields; |
| | | #ifdef _DEBUG |
| | | if (cbStatus) cbStatus(0, "sendWindowData() no-op."); |
| | | #endif |
| | | } |
| | | |
| | | void Collector::onConnectionEstablished() { |
| | | std::cout << "[Collector] ready.\n"; |
| | | } |
| | | void Collector::onConnectionLost() { |
| | | std::cout << "[Collector] stopped.\n"; |
| | | } |
| | | |
| | | // å
¼å®¹æ§é¨ç¦å½æ° |
| | | bool Collector::isVersionRequest(const std::vector<uint8_t>& data) const { |
| | | if (data.size() < 4 + 4 + 2 + 2 + 1) return false; |
| | | if (!(data[0] == 0x11 && data[1] == 0x88 && data[2] == 0x11 && data[3] == 0x88)) return false; |
| | | size_t cmdPos = 4 + 4 + 2; |
| | | uint16_t cmd = (uint16_t(data[cmdPos]) << 8) | data[cmdPos + 1]; |
| | | return (cmd == CMD_REQ_VERSION); |
| | | } |
| | | |
| | | // ========== BufferRegistry ֱͨ ========== |
| | | BufferManager& Collector::registryAddMachine(uint32_t machineId, const std::string& name, |
| | | const RetentionPolicy& defPolicy) { |
| | | return registry_.getOrCreate(machineId, name, defPolicy); |
| | | } |
| | | BufferManager* Collector::registryFind(uint32_t machineId) { return registry_.find(machineId); } |
| | | const BufferManager* Collector::registryFind(uint32_t machineId) const { return registry_.find(machineId); } |
| | | std::vector<uint32_t> Collector::registryListMachines() const { return registry_.listManagers(); } |
| | | |
| | | void Collector::buffersPush(uint32_t machineId, uint32_t channelId, int64_t ts_ms, double v) { |
| | | auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId)); |
| | | bm.push(channelId, ts_ms, v); // æª start() æ¶å°è¢«å
é¨å¿½ç¥ |
| | | } |
| | | |
| | | std::vector<Sample> Collector::buffersGetSince(uint32_t machineId, uint32_t channelId, |
| | | int64_t tsExclusive, size_t maxCount) const { |
| | | auto* bm = registry_.find(machineId); |
| | | if (!bm) return {}; |
| | | return bm->getSince(channelId, tsExclusive, maxCount); |
| | | } |
| | | std::vector<Sample> Collector::buffersGetRange(uint32_t machineId, uint32_t channelId, |
| | | int64_t from_ts, int64_t to_ts, size_t maxCount) const { |
| | | auto* bm = registry_.find(machineId); |
| | | if (!bm) return {}; |
| | | return bm->getRange(channelId, from_ts, to_ts, maxCount); |
| | | } |
| | | |
| | | void Collector::buffersStart(uint32_t machineId) { |
| | | auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId)); |
| | | bm.start(); // æ¸
空 + å
许å |
| | | } |
| | | void Collector::buffersStop(uint32_t machineId) { |
| | | if (auto* bm = registry_.find(machineId)) bm->stop(); |
| | | } |
| | | void Collector::buffersClear(uint32_t machineId) { |
| | | if (auto* bm = registry_.find(machineId)) bm->clearAll(); |
| | | } |
| | | bool Collector::buffersIsRunning(uint32_t machineId) const { |
| | | auto* bm = registry_.find(machineId); |
| | | return bm ? bm->isRunning() : false; |
| | | } |
| | | |
| | | void Collector::buffersSetDefaultPolicy(uint32_t machineId, const RetentionPolicy& p, bool applyExisting) { |
| | | auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId)); |
| | | bm.setDefaultPolicy(p, applyExisting); |
| | | } |
| | | void Collector::buffersSetPolicy(uint32_t machineId, uint32_t channelId, const RetentionPolicy& p) { |
| | | auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId)); |
| | | bm.setPolicy(channelId, p); |
| | | } |
| | | void Collector::buffersSetChannelName(uint32_t machineId, uint32_t channelId, const std::string& name) { |
| | | auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId)); |
| | | bm.setChannelName(channelId, name); |
| | | } |
| | | std::string Collector::buffersGetChannelName(uint32_t machineId, uint32_t channelId) const { |
| | | auto* bm = registry_.find(machineId); |
| | | if (!bm) return {}; |
| | | return bm->getChannelName(channelId); |
| | | } |
| | | std::vector<BufferManager::ChannelInfo> Collector::buffersListChannelInfos(uint32_t machineId) const { |
| | | auto* bm = registry_.find(machineId); |
| | | if (!bm) return {}; |
| | | return bm->listChannelInfos(); |
| | | } |
| | | std::vector<BufferManager::ChannelStat> Collector::buffersListChannelStats(uint32_t machineId) const { |
| | | auto* bm = registry_.find(machineId); |
| | | if (!bm) return {}; |
| | | return bm->listChannelStats(); |
| | | } |
| | | BufferManager::ChannelStat Collector::buffersGetChannelStat(uint32_t machineId, uint32_t channelId) const { |
| | | auto* bm = registry_.find(machineId); |
| | | if (!bm) return BufferManager::ChannelStat{ channelId, 0, 0, 0 }; |
| | | return bm->getChannelStat(channelId); |
| | | } |
| | | |
| | | // ====== æ¹æ¬¡ç®¡ç ====== |
| | | // æ°å®ç°ï¼æ ¹æ® expectedDurationMs è®¡ç® expectedEndTs |
| | | void Collector::batchStart(uint32_t machineId, |
| | | const std::string& batchId, |
| | | uint64_t expectedDurationMs) |
| | | { |
| | | const uint64_t startTs = nowMs(); |
| | | const uint64_t expectedEndTs = (expectedDurationMs > 0) ? (startTs + expectedDurationMs) : 0; |
| | | |
| | | { |
| | | std::lock_guard<std::mutex> lk(mBatches); |
| | | auto& br = batches_[machineId]; |
| | | br.state = BatchState::Active; |
| | | br.activeBatchId = batchId; |
| | | br.activeStartTs = startTs; |
| | | br.expectedEndTs = expectedEndTs; |
| | | } |
| | | |
| | | // æ¹æ¬¡å¼å§ï¼æ¸
空并å¼å§éé |
| | | buffersStart(machineId); |
| | | |
| | | if (cbStatus) cbStatus(0, "batchStart: machine " + std::to_string(machineId) + |
| | | " batch=" + batchId + |
| | | " durMs=" + std::to_string(expectedDurationMs)); |
| | | } |
| | | |
| | | // å
¼å®¹æ§ç¾åï¼å¿½ç¥ startTsï¼æ expectedEndTs å½âæ¶é¿â |
| | | void Collector::batchStart(uint32_t machineId, |
| | | const std::string& batchId, |
| | | uint64_t /*startTs_ignored*/, |
| | | uint64_t expectedEndTs_orDuration) |
| | | { |
| | | // è¥è°ç¨æ¹è¯¯ä¼ äºç»å¯¹çâç»ææ¶é´æ³âï¼è¿éä¼è¢«å½ä½âæ¶é¿âï¼ |
| | | // å¦éä¸¥æ ¼åºåï¼ä½ ä¹å¯ä»¥å ä¸ä¸ªéå¼å¤æï¼æ¯å¦å¤§äº 10^12 认为æ¯ç»å¯¹æ¶é´æ³ï¼ |
| | | // ç¶å转æ¢ï¼duration = max(0, endTs - now) |
| | | |
| | | uint64_t durationMs = expectedEndTs_orDuration; |
| | | |
| | | // å¯éï¼å个ç®åçâåç»å¯¹æ¶é´æ³âç夿并èªå¨è½¬ä¸ºæ¶é¿ |
| | | // ä»¥æ¯«ç§æ¶é´æ³éå¼ä¸¾ä¾ï¼~2001-09-09ï¼ï¼1e12 |
| | | if (expectedEndTs_orDuration > 1000000000000ULL) { |
| | | uint64_t now = nowMs(); |
| | | if (expectedEndTs_orDuration > now) |
| | | durationMs = expectedEndTs_orDuration - now; |
| | | else |
| | | durationMs = 0; // å·²è¿æï¼å½æªç¥å¤ç |
| | | } |
| | | |
| | | batchStart(machineId, batchId, durationMs); |
| | | } |
| | | |
| | | void Collector::batchStop(uint32_t machineId, uint64_t /*endTs*/) { |
| | | { |
| | | std::lock_guard<std::mutex> lk(mBatches); |
| | | auto it = batches_.find(machineId); |
| | | if (it != batches_.end()) { |
| | | it->second.state = BatchState::Idle; |
| | | it->second.activeBatchId.clear(); |
| | | it->second.activeStartTs = 0; |
| | | it->second.expectedEndTs = 0; |
| | | } |
| | | else { |
| | | batches_[machineId] = BatchRec{}; // çæä¸æ¡ Idle |
| | | } |
| | | } |
| | | |
| | | // 忢ééï¼ä½ä¸æ¸
æ°æ® |
| | | buffersStop(machineId); |
| | | |
| | | if (cbStatus) cbStatus(0, "batchStop: machine " + std::to_string(machineId)); |
| | | } |
| | | |
| | | Collector::BatchRec Collector::getBatch(uint32_t machineId) const { |
| | | std::lock_guard<std::mutex> lk(mBatches); |
| | | auto it = batches_.find(machineId); |
| | | if (it != batches_.end()) return it->second; |
| | | return BatchRec{}; // é»è®¤ Idle |
| | | } |
| | | |
| | | // ========== ä¸å¡åå ========== |
| | | void Collector::handleClientData(ClientInfo& c, const std::vector<uint8_t>& data) { |
| | | // çæ¬æ ¡éª |
| | | if (!c.versionOk) { |
| | | ReqVersion vreq{}; |
| | | if (decodeRequestVersion(data, vreq)) { |
| | | c.versionOk = true; |
| | | if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::VersionOk), |
| | | "Client " + c.ip + ":" + std::to_string(c.port) + " version OK"); |
| | | |
| | | RspVersion vrst; |
| | | vrst.dataId = vreq.dataId; // åæ¾ |
| | | vrst.version = DAQCfg::Version; // éä¸é
ç½® |
| | | auto rsp = encodeResponseVersion(vrst); |
| | | socketComm.sendTo(c.sock, rsp); |
| | | } |
| | | return; // æªéè¿çæ¬æ ¡éªï¼å
¶å®å¿½ç¥ |
| | | } |
| | | |
| | | // éè¿çæ¬æ ¡éªåï¼æ cmd åå |
| | | const uint16_t cmd = peek_cmd(data); |
| | | |
| | | // 0x0101 ââ åééå¢éæå |
| | | if (cmd == CMD_REQ_SINCE) { |
| | | ReqSince req; |
| | | if (!decodeRequestSince(data, req)) return; |
| | | |
| | | auto vec = buffersGetSince(req.machineId, req.channelId, |
| | | static_cast<int64_t>(req.sinceTsExclusive), req.maxCount); |
| | | |
| | | uint64_t lastTsSent = req.sinceTsExclusive; |
| | | if (!vec.empty()) lastTsSent = static_cast<uint64_t>(vec.back().ts_ms); |
| | | |
| | | auto stat = buffersGetChannelStat(req.machineId, req.channelId); |
| | | uint8_t more = (stat.latestTs > static_cast<int64_t>(lastTsSent)) ? 1 : 0; |
| | | |
| | | RspSince rsp; |
| | | rsp.dataId = req.dataId; // åæ¾ |
| | | rsp.machineId = req.machineId; |
| | | rsp.channelId = req.channelId; |
| | | rsp.lastTsSent = lastTsSent; |
| | | rsp.more = more; |
| | | rsp.samples.reserve(vec.size()); |
| | | for (auto& s : vec) rsp.samples.push_back({ static_cast<uint64_t>(s.ts_ms), s.value }); |
| | | |
| | | auto pkt = encodeResponseSince(rsp); |
| | | socketComm.sendTo(c.sock, pkt); |
| | | return; |
| | | } |
| | | |
| | | // 0x0103 ââ ç»è®¡/ééå表 |
| | | if (cmd == CMD_REQ_STATS) { |
| | | ReqStats req; |
| | | if (!decodeRequestStats(data, req)) return; |
| | | |
| | | auto* bm = registry_.find(req.machineId); |
| | | if (!bm) { |
| | | // 示ä¾ï¼å¯åé误帧 |
| | | // sendErrorTo(c, req.dataId, CMD_REQ_STATS, req.machineId, ErrCode::NoActiveBatch, "machine not found"); |
| | | return; |
| | | } |
| | | |
| | | RspStats rsp; |
| | | rsp.dataId = req.dataId; |
| | | rsp.machineId = req.machineId; |
| | | |
| | | auto stats = bm->listChannelStats(); |
| | | rsp.channels.reserve(stats.size()); |
| | | for (auto& s : stats) { |
| | | ChannelStatInfo ci; |
| | | ci.channelId = s.id; |
| | | ci.earliestTs = static_cast<uint64_t>(s.earliestTs); |
| | | ci.latestTs = static_cast<uint64_t>(s.latestTs); |
| | | ci.size = static_cast<uint32_t>(s.size); |
| | | ci.name = bm->getChannelName(s.id); // UTF-8 |
| | | rsp.channels.push_back(std::move(ci)); |
| | | } |
| | | |
| | | auto pkt = encodeResponseStats(rsp); |
| | | socketComm.sendTo(c.sock, pkt); |
| | | return; |
| | | } |
| | | |
| | | // 0x0104 ââ æºå°å表 |
| | | if (cmd == CMD_REQ_MACHINES) { |
| | | ReqMachines req; |
| | | if (!decodeRequestMachines(data, req)) return; |
| | | |
| | | RspMachines rsp; |
| | | rsp.dataId = req.dataId; |
| | | |
| | | auto mids = registry_.listManagers(); |
| | | rsp.machines.reserve(mids.size()); |
| | | for (auto id : mids) { |
| | | auto* bm = registry_.find(id); |
| | | if (!bm) continue; |
| | | rsp.machines.push_back(MachineInfo{ id, bm->name() }); |
| | | } |
| | | |
| | | auto pkt = encodeResponseMachines(rsp); |
| | | socketComm.sendTo(c.sock, pkt); |
| | | return; |
| | | } |
| | | |
| | | // 0x0105 ââ æ´æºå¤ééå¢éæå |
| | | if (cmd == CMD_REQ_SINCE_ALL) { |
| | | ReqSinceAll req; |
| | | if (!decodeRequestSinceAll(data, req)) return; |
| | | |
| | | RspSinceAll rsp; |
| | | rsp.dataId = req.dataId; // åæ¾ |
| | | rsp.machineId = req.machineId; |
| | | rsp.moreAny = 0; |
| | | |
| | | auto* bm = registry_.find(req.machineId); |
| | | if (!bm) { |
| | | auto pkt = encodeResponseSinceAll(rsp); |
| | | socketComm.sendTo(c.sock, pkt); |
| | | return; |
| | | } |
| | | |
| | | // é¿å
è¶
è¿ 2 åèé¿åº¦ï¼æ£æ ~60KB |
| | | constexpr size_t kMaxBody = 60 * 1024; |
| | | |
| | | rsp.blocks.clear(); |
| | | auto stats = bm->listChannelStats(); |
| | | for (auto& s : stats) { |
| | | auto vec = bm->getSince(s.id, static_cast<int64_t>(req.sinceTsExclusive), req.maxPerChannel); |
| | | if (vec.empty()) continue; |
| | | |
| | | ChannelBlock blk; |
| | | blk.channelId = s.id; |
| | | blk.lastTsSent = static_cast<uint64_t>(vec.back().ts_ms); |
| | | |
| | | auto st = bm->getChannelStat(s.id); |
| | | blk.more = (st.latestTs > static_cast<int64_t>(blk.lastTsSent)) ? 1 : 0; |
| | | |
| | | blk.samples.reserve(vec.size()); |
| | | for (auto& sm : vec) { |
| | | blk.samples.push_back({ static_cast<uint64_t>(sm.ts_ms), sm.value }); |
| | | } |
| | | |
| | | rsp.blocks.push_back(std::move(blk)); |
| | | auto tentative = encodeResponseSinceAll(rsp); |
| | | if (tentative.size() > (4 + 4 + 2) + kMaxBody + 1) { |
| | | // è¶
ä½ç§¯ï¼ææåä¸åæ¿åºæ¥ï¼å
ååé¢ç |
| | | auto last = std::move(rsp.blocks.back()); |
| | | rsp.blocks.pop_back(); |
| | | |
| | | auto pkt = encodeResponseSinceAll(rsp); |
| | | socketComm.sendTo(c.sock, pkt); |
| | | |
| | | rsp.blocks.clear(); |
| | | rsp.blocks.push_back(std::move(last)); |
| | | } |
| | | } |
| | | |
| | | rsp.moreAny = 0; |
| | | for (const auto& b : rsp.blocks) { |
| | | if (b.more) { rsp.moreAny = 1; break; } |
| | | } |
| | | |
| | | auto pkt = encodeResponseSinceAll(rsp); |
| | | socketComm.sendTo(c.sock, pkt); |
| | | return; |
| | | } |
| | | |
| | | // 0x0120 ââ æ¹æ¬¡ä¿¡æ¯ |
| | | if (cmd == CMD_REQ_BATCH_INFO) { |
| | | ReqBatchInfo req; |
| | | if (!decodeRequestBatchInfo(data, req)) return; |
| | | |
| | | RspBatchInfo rsp; |
| | | rsp.dataId = req.dataId; |
| | | rsp.machineId = req.machineId; |
| | | |
| | | const auto br = getBatch(req.machineId); |
| | | rsp.state = br.state; |
| | | rsp.activeBatchId = br.activeBatchId; |
| | | rsp.activeStartTs = br.activeStartTs; |
| | | rsp.expectedEndTs = br.expectedEndTs; |
| | | |
| | | auto pkt = encodeResponseBatchInfo(rsp); |
| | | socketComm.sendTo(c.sock, pkt); |
| | | return; |
| | | } |
| | | |
| | | // å
¶å®æä»¤ï¼æéæ©å± |
| | | } |
| | | |
| | | // ç»ä¸é误åå
|
| | | void Collector::sendErrorTo(ClientInfo& c, |
| | | uint32_t dataId, |
| | | uint16_t refCmd, |
| | | uint32_t machineId, |
| | | ErrCode code, |
| | | const std::string& message) |
| | | { |
| | | RspError er; |
| | | er.dataId = dataId; |
| | | er.refCmd = refCmd; |
| | | er.machineId = machineId; |
| | | er.code = code; |
| | | er.message = message; |
| | | |
| | | auto pkt = encodeResponseError(er); |
| | | socketComm.sendTo(c.sock, pkt); |
| | | } |
| | | |
| | | // å·¥å
·ï¼å½å epoch æ¯«ç§ |
| | | uint64_t Collector::nowMs() { |
| | | using namespace std::chrono; |
| | | return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
| | | } |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #pragma once |
| | | #ifndef COLLECTOR_H |
| | | #define COLLECTOR_H |
| | | |
| | | #include "CommBase.h" |
| | | #include "../net/SocketComm.h" |
| | | |
| | | #include <functional> |
| | | #include <chrono> |
| | | #include <thread> |
| | | #include <atomic> |
| | | #include <mutex> |
| | | #include <vector> |
| | | #include <string> |
| | | #include <unordered_map> |
| | | |
| | | #include "../buffer/BufferRegistry.h" |
| | | #include "../net/FrameAssembler.h" |
| | | |
| | | // å议常é/ç»æï¼å¿
é¡»å
å«ï¼ä¾æ¬å¤´æä»¶ä½¿ç¨ Proto:: ç±»åï¼ |
| | | #include "../proto/Protocol.h" |
| | | |
| | | class Collector : public CommBase { |
| | | public: |
| | | Collector(); |
| | | ~Collector(); |
| | | |
| | | // ===== CommBase æ¥å£ ===== |
| | | void sendSampleData(double sample) override; |
| | | void sendWindowData(const std::vector<std::string>& dataFields) override; |
| | | |
| | | void connectServer(const std::string& /*ip*/, uint16_t /*port*/) override {} // éé端ä¸ä½ä¸ºå®¢æ·ç«¯ |
| | | void createServer(uint16_t port) override; |
| | | void disconnect() override; |
| | | |
| | | void onConnectionEstablished() override; |
| | | void onConnectionLost() override; |
| | | |
| | | // è¿æ¥/åå§æ°æ®åè°ï¼UIï¼ |
| | | void setConnectionStatusCallback(std::function<void(int, std::string)> cb) override { cbStatus = std::move(cb); } |
| | | void setRawDataCallback(std::function<void(const std::vector<uint8_t>&)> cb) override { cbRaw = std::move(cb); } |
| | | void setRawDumpEnabled(bool enabled) override { rawDumpEnabled = enabled; } |
| | | |
| | | // åå°è½®è¯¢ï¼ä¸é»å¡ UIï¼ |
| | | void startLoop(uint32_t intervalMs = 10); |
| | | void stopLoop(); |
| | | void poll(); |
| | | |
| | | // 客æ·ç«¯è¿å
¥/æå¼äºä»¶ |
| | | void setClientEventCallback(std::function<void(const std::string& ip, uint16_t port, bool connected)> cb) { |
| | | cbClientEvent = std::move(cb); |
| | | } |
| | | |
| | | // 客æ·ç«¯å¿«ç
§ |
| | | struct ClientSummary { |
| | | std::string ip; |
| | | uint16_t port = 0; |
| | | bool versionOk = false; |
| | | SOCKET sock = INVALID_SOCKET; |
| | | }; |
| | | std::vector<ClientSummary> getClientList(); |
| | | |
| | | // ===== ç¼å²/ééç¸å
³ ===== |
| | | void buffersSetChannelName(uint32_t machineId, uint32_t channelId, const std::string& name); |
| | | std::string buffersGetChannelName(uint32_t machineId, uint32_t channelId) const; |
| | | std::vector<BufferManager::ChannelInfo> buffersListChannelInfos(uint32_t machineId) const; |
| | | |
| | | BufferManager& registryAddMachine(uint32_t machineId, const std::string& name, |
| | | const RetentionPolicy& defPolicy = {}); |
| | | BufferManager* registryFind(uint32_t machineId); |
| | | const BufferManager* registryFind(uint32_t machineId) const; |
| | | std::vector<uint32_t> registryListMachines() const; |
| | | |
| | | void buffersPush(uint32_t machineId, uint32_t channelId, int64_t ts_ms, double v); |
| | | |
| | | std::vector<Sample> buffersGetSince(uint32_t machineId, uint32_t channelId, |
| | | int64_t tsExclusive, size_t maxCount = 4096) const; |
| | | std::vector<Sample> buffersGetRange(uint32_t machineId, uint32_t channelId, |
| | | int64_t from_ts, int64_t to_ts, size_t maxCount = 4096) const; |
| | | |
| | | void buffersStart(uint32_t machineId); // æ¸
空该æºåå²å¹¶å¼å§ |
| | | void buffersStop(uint32_t machineId); // æåï¼push 忽ç¥ï¼ |
| | | void buffersClear(uint32_t machineId); // æ¸
空åå²ï¼ä¸æ¹ running ç¶æ |
| | | bool buffersIsRunning(uint32_t machineId) const; |
| | | |
| | | void buffersSetDefaultPolicy(uint32_t machineId, const RetentionPolicy& p, bool applyExisting = true); |
| | | void buffersSetPolicy(uint32_t machineId, uint32_t channelId, const RetentionPolicy& p); |
| | | |
| | | std::vector<BufferManager::ChannelStat> buffersListChannelStats(uint32_t machineId) const; |
| | | BufferManager::ChannelStat buffersGetChannelStat(uint32_t machineId, uint32_t channelId) const; |
| | | |
| | | // ===== æ¹æ¬¡ç®¡çï¼æ°å¢ï¼ ===== |
| | | struct BatchRec { |
| | | Proto::BatchState state = Proto::BatchState::Idle; // Idle / Active |
| | | std::string activeBatchId; // Idle æ¶ä¸ºç©º |
| | | uint64_t activeStartTs = 0; // ms epoch |
| | | uint64_t expectedEndTs = 0; // 0: δ֪ |
| | | }; |
| | | |
| | | // æ°ï¼ä»
ä¼ âé¢è®¡æ¶é¿msâï¼0=æªç¥ï¼ï¼å¼å§æ¶é´å
é¨å nowMs() |
| | | void batchStart(uint32_t machineId, |
| | | const std::string& batchId, |
| | | uint64_t expectedDurationMs = 0); |
| | | |
| | | // ï¼å¯éå
¼å®¹ï¼æ§ç¾åä¿çï¼ä½æ 记为å¼ç¨ï¼å
é¨è½¬è°æ°ç¾å |
| | | void batchStart(uint32_t machineId, |
| | | const std::string& batchId, |
| | | uint64_t startTs /*ignored*/, |
| | | uint64_t expectedEndTs /*treated as duration or absolute?*/); |
| | | |
| | | // ç»æå½åæ¹æ¬¡ï¼ä¿ææ°æ®ï¼ç¶æè½¬ Idleï¼ |
| | | void batchStop(uint32_t machineId, |
| | | uint64_t endTs = 0); // å¤ç¨ |
| | | |
| | | // æ¥è¯¢è¯¥æºå½åæ¹æ¬¡ï¼è¥ä¸åå¨ï¼è¿å Idle 空记å½ï¼ |
| | | BatchRec getBatch(uint32_t machineId) const; |
| | | |
| | | private: |
| | | struct ClientInfo { |
| | | SOCKET sock = INVALID_SOCKET; |
| | | std::string ip; |
| | | uint16_t port = 0; |
| | | std::chrono::steady_clock::time_point tsConnected{}; |
| | | bool versionOk = false; |
| | | FrameAssembler fr; // æ¯ä¸ªå®¢æ·ç«¯ä¸ä¸ªå¸§ç»è£
å¨ |
| | | }; |
| | | |
| | | SocketComm socketComm; |
| | | std::vector<ClientInfo> clients; |
| | | |
| | | std::function<void(int, std::string)> cbStatus; |
| | | std::function<void(const std::vector<uint8_t>&)> cbRaw; |
| | | bool rawDumpEnabled = true; |
| | | |
| | | std::function<void(const std::string&, uint16_t, bool)> cbClientEvent; |
| | | |
| | | std::thread worker; |
| | | std::atomic<bool> running{ false }; |
| | | std::mutex mClients; // ä¿æ¤ clients |
| | | |
| | | BufferRegistry registry_; // ä¸ä¸ª Collector ç®¡æææºå° |
| | | |
| | | // æ¹æ¬¡ç¶æ |
| | | std::unordered_map<uint32_t, BatchRec> batches_; |
| | | mutable std::mutex mBatches; |
| | | |
| | | // å
é¨å½æ° |
| | | void handleClientData(ClientInfo& c, const std::vector<uint8_t>& data); |
| | | bool isVersionRequest(const std::vector<uint8_t>& data) const; |
| | | |
| | | // ç»ä¸é误åå
ï¼6 åæ°ï¼ |
| | | void sendErrorTo(ClientInfo& c, |
| | | uint32_t dataId, |
| | | uint16_t refCmd, |
| | | uint32_t machineId, |
| | | Proto::ErrCode code, |
| | | const std::string& message); |
| | | |
| | | // å·¥å
·ï¼å½å epoch æ¯«ç§ |
| | | static uint64_t nowMs(); |
| | | }; |
| | | |
| | | #endif // COLLECTOR_H |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #pragma once |
| | | #ifndef COMM_BASE_H |
| | | #define COMM_BASE_H |
| | | |
| | | #include <string> |
| | | #include <vector> |
| | | #include <functional> |
| | | |
| | | class CommBase { |
| | | public: |
| | | virtual void sendSampleData(double sample) = 0; |
| | | virtual void sendWindowData(const std::vector<std::string>& dataFields) = 0; |
| | | virtual void connectServer(const std::string& ip, uint16_t port) = 0; |
| | | virtual void createServer(uint16_t port) = 0; |
| | | virtual void disconnect() = 0; |
| | | |
| | | // è¿æ¥ç¶æåè° |
| | | virtual void onConnectionEstablished() = 0; |
| | | virtual void onConnectionLost() = 0; |
| | | virtual void setConnectionStatusCallback(std::function<void(int, std::string)> callback) = 0; |
| | | |
| | | // æ°å¢ï¼åå§æ°æ®ä¸æï¼æ¶å°çâåèæµâç´æ¥åè°ç»åºç¨å±ï¼ |
| | | virtual void setRawDataCallback(std::function<void(const std::vector<uint8_t>&)> callback) = 0; |
| | | |
| | | // æ°å¢ï¼å¼å
³ï¼é»è®¤ trueï¼ |
| | | virtual void setRawDumpEnabled(bool enabled) = 0; |
| | | }; |
| | | |
| | | #endif // COMM_BASE_H |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #pragma once |
| | | #include <string> |
| | | |
| | | namespace DAQEvt { |
| | | |
| | | // ç¨æä¸¾å®ä¹ï¼å¤é¨åè°ä»ä»¥ int ä¼ é |
| | | enum class ConnCode : int { |
| | | // Collectorï¼æå¡ç«¯ï¼ |
| | | ServerListening = 100, // createServer æå |
| | | ServerStopped = 101, // disconnect/close |
| | | ClientAccepted = 110, // æ°å®¢æ·ç«¯æ¥å
¥ |
| | | ClientDisconnected = 111, // 客æ·ç«¯ä¸»å¨æå¼ |
| | | VersionOk = 120, // 客æ·ç«¯çæ¬æ ¡éªéè¿ |
| | | VersionTimeoutKick = 121, // 5 ç§æªæ¡æè¢«è¸¢ |
| | | |
| | | // Displayï¼å®¢æ·ç«¯ï¼ |
| | | DisplayConnected = 200, // connectServer æåï¼å·²å»ºè¿ï¼ |
| | | DisplayDisconnected = 201, // disconnect/close |
| | | |
| | | // éç¨/é误 |
| | | SocketError = 900 |
| | | }; |
| | | |
| | | // å¯éï¼æ code 转æé»è®¤å符串ï¼å¤ç¨ï¼ |
| | | inline const char* ToString(ConnCode c) { |
| | | switch (c) { |
| | | case ConnCode::ServerListening: return "Server listening"; |
| | | case ConnCode::ServerStopped: return "Server stopped"; |
| | | case ConnCode::ClientAccepted: return "Client accepted"; |
| | | case ConnCode::ClientDisconnected: return "Client disconnected"; |
| | | case ConnCode::VersionOk: return "Version OK"; |
| | | case ConnCode::VersionTimeoutKick: return "Version timeout kick"; |
| | | case ConnCode::DisplayConnected: return "Display connected"; |
| | | case ConnCode::DisplayDisconnected: return "Display disconnected"; |
| | | case ConnCode::SocketError: return "Socket error"; |
| | | default: return "Unknown"; |
| | | } |
| | | } |
| | | |
| | | } // namespace DAQEvt |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | // DataTypes.h |
| | | #pragma once |
| | | #include <cstdint> |
| | | |
| | | struct Sample { |
| | | int64_t ts_ms = 0; // æ¶é´æ³ï¼æ¯«ç§ |
| | | double value = 0.0; |
| | | }; |
| | | #pragma once |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #include "Display.h" |
| | | #include <iostream> |
| | | #include "../DAQConfig.h" |
| | | #include "../core/ConnEvents.h" |
| | | using namespace DAQEvt; |
| | | |
| | | // å
Œ
±å议头 |
| | | #include "../proto/Protocol.h" |
| | | #include "../proto/ProtocolCodec.h" |
| | | #include "../net/FrameAssembler.h" |
| | | |
| | | using namespace Proto; |
| | | |
| | | Display::Display() {} |
| | | Display::~Display() { |
| | | stopRecvLoop(); |
| | | disconnect(); |
| | | } |
| | | |
| | | void Display::connectServer(const std::string& ip, uint16_t port) { |
| | | if (socketComm.createClientSocket(ip, port)) { |
| | | if (cbStatus) cbStatus(static_cast<int>(ConnCode::DisplayConnected), "Display connected to server"); |
| | | onConnectionEstablished(); |
| | | |
| | | // è¿æ¥åç«å»åéçæ¬è¯·æ±ï¼0x0001ï¼ï¼å¸¦ dataId åæ¾ |
| | | ReqVersion vreq; |
| | | vreq.dataId = m_nextDataId++; |
| | | auto pkt = encodeRequestVersion(vreq); |
| | | socketComm.sendDataSingle(pkt); |
| | | |
| | | startRecvLoop(DAQCfg::RecvLoopIntervalMs); |
| | | } |
| | | } |
| | | |
| | | void Display::disconnect() { |
| | | stopRecvLoop(); |
| | | socketComm.closeSocket(); |
| | | if (cbStatus) cbStatus(static_cast<int>(ConnCode::DisplayDisconnected), "Display disconnected"); |
| | | onConnectionLost(); |
| | | } |
| | | |
| | | void Display::startRecvLoop(uint32_t intervalMs) { |
| | | if (recvRunning.load()) return; |
| | | recvRunning.store(true); |
| | | recvThread = std::thread([this, intervalMs]() { |
| | | std::vector<uint8_t> chunk; |
| | | std::vector<uint8_t> frame; |
| | | FrameAssembler fr; |
| | | |
| | | while (recvRunning.load()) { |
| | | chunk.clear(); |
| | | if (socketComm.recvSingle(chunk) && !chunk.empty()) { |
| | | if (rawDumpEnabled && cbRaw) cbRaw(chunk); |
| | | fr.push(chunk); |
| | | while (fr.nextFrame(frame)) { |
| | | if (rawDumpEnabled && cbRaw) cbRaw(frame); |
| | | handleRawData(frame); |
| | | } |
| | | } |
| | | std::this_thread::sleep_for(std::chrono::milliseconds(intervalMs)); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | void Display::stopRecvLoop() { |
| | | if (!recvRunning.load()) return; |
| | | recvRunning.store(false); |
| | | if (recvThread.joinable()) recvThread.join(); |
| | | } |
| | | |
| | | void Display::sendSampleData(double) { /* 客æ·ç«¯è¿è¾¹æä¸å */ } |
| | | void Display::sendWindowData(const std::vector<std::string>&) { /* åä¸ */ } |
| | | |
| | | void Display::onConnectionEstablished() { std::cout << "[Display] connected\n"; } |
| | | void Display::onConnectionLost() { std::cout << "[Display] disconnected\n"; } |
| | | |
| | | // ââ ä½å±æ¥å£ï¼æ¾å¼ dataId ââ |
| | | |
| | | // åé 0x0101ï¼å¢éæåï¼â åçï¼ä¸å¸¦ batchIdï¼ |
| | | void Display::requestSince(uint32_t dataId, uint32_t machineId, uint32_t channelId, |
| | | uint64_t sinceTsExclusive, uint16_t maxCount) { |
| | | // è°å°å¸¦ batchId çéè½½ï¼ä¼ 空串 => ä¸éå batchId |
| | | requestSince(dataId, machineId, channelId, sinceTsExclusive, std::string(), maxCount); |
| | | } |
| | | |
| | | // åé 0x0101ï¼å¢éæåï¼â â
æ°ï¼å¸¦ batchId |
| | | void Display::requestSince(uint32_t dataId, uint32_t machineId, uint32_t channelId, |
| | | uint64_t sinceTsExclusive, const std::string& batchId, uint16_t maxCount) { |
| | | ReqSince req; |
| | | req.dataId = dataId; |
| | | req.machineId = machineId; |
| | | req.channelId = channelId; |
| | | req.sinceTsExclusive = sinceTsExclusive; |
| | | req.maxCount = maxCount; |
| | | if (!batchId.empty()) { |
| | | req.flags = SINCE_FLAG_HAS_BATCH; |
| | | req.batchId = batchId; |
| | | } |
| | | else { |
| | | req.flags = 0; |
| | | req.batchId.clear(); |
| | | } |
| | | auto pkt = encodeRequestSince(req); |
| | | socketComm.sendDataSingle(pkt); |
| | | } |
| | | |
| | | void Display::requestMachines(uint32_t dataId) { |
| | | ReqMachines req; req.dataId = dataId; |
| | | auto pkt = encodeRequestMachines(req); |
| | | socketComm.sendDataSingle(pkt); |
| | | } |
| | | |
| | | void Display::requestStats(uint32_t dataId, uint32_t machineId) { |
| | | ReqStats req; req.dataId = dataId; req.machineId = machineId; req.flags = 0; |
| | | auto pkt = encodeRequestStats(req); |
| | | socketComm.sendDataSingle(pkt); |
| | | } |
| | | |
| | | // æ´æºå¤ééå¢é â åçï¼ä¸å¸¦ batchIdï¼ |
| | | void Display::requestSinceAll(uint32_t dataId, uint32_t machineId, uint64_t sinceTsExclusive, |
| | | uint16_t maxPerChannel) { |
| | | // è°å°å¸¦ batchId çéè½½ï¼ä¼ 空串 |
| | | requestSinceAll(dataId, machineId, sinceTsExclusive, std::string(), maxPerChannel); |
| | | } |
| | | |
| | | // æ´æºå¤ééå¢é â â
æ°ï¼å¸¦ batchId |
| | | void Display::requestSinceAll(uint32_t dataId, uint32_t machineId, uint64_t sinceTsExclusive, |
| | | const std::string& batchId, uint16_t maxPerChannel) { |
| | | ReqSinceAll req; |
| | | req.dataId = dataId; |
| | | req.machineId = machineId; |
| | | req.sinceTsExclusive = sinceTsExclusive; |
| | | req.maxPerChannel = maxPerChannel; |
| | | if (!batchId.empty()) { |
| | | req.flags = SINCE_FLAG_HAS_BATCH; |
| | | req.batchId = batchId; |
| | | } |
| | | else { |
| | | req.flags = 0; |
| | | req.batchId.clear(); |
| | | } |
| | | socketComm.sendDataSingle(encodeRequestSinceAll(req)); |
| | | } |
| | | |
| | | // ââ æ°å¢ï¼æ¹æ¬¡ä¿¡æ¯æå ââ |
| | | // æ¾å¼ dataId |
| | | void Display::requestBatchInfo(uint32_t dataId, uint32_t machineId) { |
| | | ReqBatchInfo req; req.dataId = dataId; req.machineId = machineId; |
| | | socketComm.sendDataSingle(encodeRequestBatchInfo(req)); |
| | | } |
| | | // 便æ·ï¼èªå¨ dataId |
| | | void Display::requestBatchInfo(uint32_t machineId) { |
| | | requestBatchInfo(m_nextDataId++, machineId); |
| | | } |
| | | |
| | | // ââ ä¾¿æ·æ¥å£ï¼èªå¨åé
dataId ââ |
| | | // ä¸ç»åªæ¯ç®åå°å¨å
é¨è°ç¨ä¸è¿°æ¾å¼ç |
| | | |
| | | void Display::requestMachines() { |
| | | requestMachines(m_nextDataId++); |
| | | } |
| | | |
| | | void Display::requestStats(uint32_t machineId) { |
| | | requestStats(m_nextDataId++, machineId); |
| | | } |
| | | |
| | | void Display::requestSince(uint32_t machineId, uint32_t channelId, |
| | | uint64_t sinceTsExclusive, uint16_t maxCount) { |
| | | requestSince(m_nextDataId++, machineId, channelId, sinceTsExclusive, maxCount); |
| | | } |
| | | |
| | | void Display::requestSince(uint32_t machineId, uint32_t channelId, |
| | | uint64_t sinceTsExclusive, const std::string& batchId, uint16_t maxCount) { |
| | | requestSince(m_nextDataId++, machineId, channelId, sinceTsExclusive, batchId, maxCount); |
| | | } |
| | | |
| | | void Display::requestSinceAll(uint32_t machineId, uint64_t sinceTsExclusive, uint16_t maxPerChannel) { |
| | | requestSinceAll(m_nextDataId++, machineId, sinceTsExclusive, maxPerChannel); |
| | | } |
| | | |
| | | void Display::requestSinceAll(uint32_t machineId, uint64_t sinceTsExclusive, |
| | | const std::string& batchId, uint16_t maxPerChannel) { |
| | | requestSinceAll(m_nextDataId++, machineId, sinceTsExclusive, batchId, maxPerChannel); |
| | | } |
| | | |
| | | // æ¶å
ååï¼å¨æ¥æ¶çº¿ç¨é被è°ç¨ï¼ |
| | | void Display::handleRawData(const std::vector<uint8_t>& rawData) { |
| | | // F001 çæ¬ååº |
| | | { |
| | | RspVersion vrsp; |
| | | if (decodeResponseVersion(rawData, vrsp)) { |
| | | if (cbStatus) cbStatus(static_cast<int>(ConnCode::VersionOk), |
| | | std::string("Server version: ") + vrsp.version); |
| | | // m_versionOk = true; |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // F101 ââ since æåååº |
| | | { |
| | | RspSince rsp; |
| | | if (decodeResponseSince(rawData, rsp)) { |
| | | if (cbSamples) cbSamples(rsp.machineId, rsp.channelId, rsp.lastTsSent, rsp.more, rsp.samples); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // F103 ââ ç»è®¡/éé |
| | | { |
| | | RspStats st; |
| | | if (decodeResponseStats(rawData, st)) { |
| | | if (cbStats) cbStats(st.machineId, st.channels); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // F104 ââ æºå°å表 |
| | | { |
| | | RspMachines ms; |
| | | if (decodeResponseMachines(rawData, ms)) { |
| | | if (cbMachines) cbMachines(ms.machines); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // F105 ââ å¤ééå¢é |
| | | { |
| | | RspSinceAll ra; |
| | | if (decodeResponseSinceAll(rawData, ra)) { |
| | | if (cbSamplesMulti) cbSamplesMulti(ra.machineId, ra.moreAny, ra.blocks); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // â
æ°å¢ï¼F120 ââ æ¹æ¬¡ä¿¡æ¯ |
| | | { |
| | | RspBatchInfo bi; |
| | | if (decodeResponseBatchInfo(rawData, bi)) { |
| | | if (cbBatchInfo) cbBatchInfo(bi); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // â
æ°å¢ï¼E100 ââ ç»ä¸é误ï¼èªæï¼ |
| | | { |
| | | RspError er; |
| | | if (decodeResponseError(rawData, er)) { |
| | | if (cbStatus) { |
| | | std::string s = "ERR ref=0x" + [](uint16_t x) { |
| | | char buf[8]; std::snprintf(buf, sizeof(buf), "%04X", (unsigned)x); return std::string(buf); |
| | | }(er.refCmd) + |
| | | " mid=" + std::to_string(er.machineId) + |
| | | " code=" + std::to_string((unsigned)er.code) + |
| | | " msg=" + er.message; |
| | | cbStatus(static_cast<int>(ConnCode::SocketError), s); |
| | | } |
| | | // éè¯¯èªæï¼NoActive / Mismatch â æä¸æ¬¡ BatchInfo |
| | | if (er.code == ErrCode::NoActiveBatch || er.code == ErrCode::BatchMismatch) { |
| | | requestBatchInfo(er.machineId); |
| | | } |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // å
¶å®ç±»åï¼å°æ¥æ©å±ï¼â¦â¦ |
| | | } |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #pragma once |
| | | #ifndef DISPLAY_H |
| | | #define DISPLAY_H |
| | | |
| | | #include "CommBase.h" |
| | | #include "../net/SocketComm.h" |
| | | #include <functional> |
| | | #include <thread> |
| | | #include <atomic> |
| | | #include "../proto/Protocol.h" |
| | | #include "../proto/ProtocolCodec.h" |
| | | |
| | | // åè°ï¼æ¶å°æ ·æ¬ç䏿ï¼UI ç¨å®ç»å¾/忬å°ï¼ |
| | | using SamplesCallback = std::function<void(uint32_t machineId, uint32_t channelId, |
| | | uint64_t lastTsSent, uint8_t more, |
| | | const std::vector<Proto::SamplePair>& samples)>; |
| | | using SamplesMultiCallback = std::function<void( |
| | | uint32_t machineId, uint8_t moreAny, |
| | | const std::vector<Proto::ChannelBlock>& blocks)>; |
| | | using MachinesCallback = std::function<void(const std::vector<Proto::MachineInfo>& machines)>; |
| | | using StatsCallback = std::function<void(uint32_t machineId, const std::vector<Proto::ChannelStatInfo>& channels)>; |
| | | |
| | | // â
æ°å¢ï¼æ¹æ¬¡ä¿¡æ¯åè° |
| | | using BatchInfoCallback = std::function<void(const Proto::RspBatchInfo&)>; |
| | | |
| | | class Display : public CommBase { |
| | | public: |
| | | Display(); |
| | | ~Display(); |
| | | |
| | | void sendSampleData(double sample) override; |
| | | void sendWindowData(const std::vector<std::string>& dataFields) override; |
| | | |
| | | void connectServer(const std::string& ip, uint16_t port) override; |
| | | void createServer(uint16_t /*port*/) override {} |
| | | void disconnect() override; |
| | | |
| | | void onConnectionEstablished() override; |
| | | void onConnectionLost() override; |
| | | |
| | | void handleRawData(const std::vector<uint8_t>& rawData); |
| | | |
| | | void setConnectionStatusCallback(std::function<void(int, std::string)> cb) override { cbStatus = std::move(cb); } |
| | | void setRawDataCallback(std::function<void(const std::vector<uint8_t>&)> cb) override { cbRaw = std::move(cb); } |
| | | void setRawDumpEnabled(bool enabled) override { rawDumpEnabled = enabled; } |
| | | |
| | | void startRecvLoop(uint32_t intervalMs = 10); |
| | | void stopRecvLoop(); |
| | | |
| | | void setSamplesCallback(SamplesCallback cb) { cbSamples = std::move(cb); } |
| | | void setSamplesMultiCallback(SamplesMultiCallback cb) { cbSamplesMulti = std::move(cb); } |
| | | void setMachinesCallback(MachinesCallback cb) { cbMachines = std::move(cb); } |
| | | void setStatsCallback(StatsCallback cb) { cbStats = std::move(cb); } |
| | | void setBatchInfoCallback(BatchInfoCallback cb) { cbBatchInfo = std::move(cb); } // â
æ°å¢ |
| | | |
| | | // ââ åæâéæ¾å¼ dataIdâçä½å±æ¥å£ï¼ä¿ç以å
¼å®¹ï¼ââ |
| | | void requestMachines(uint32_t dataId); |
| | | void requestStats(uint32_t dataId, uint32_t machineId); |
| | | void requestSince(uint32_t dataId, uint32_t machineId, uint32_t channelId, |
| | | uint64_t sinceTsExclusive, uint16_t maxCount = 1024); |
| | | // â
æ°å¢ï¼å¸¦ batchId çæ¾å¼ dataId ç |
| | | void requestSince(uint32_t dataId, uint32_t machineId, uint32_t channelId, |
| | | uint64_t sinceTsExclusive, const std::string& batchId, uint16_t maxCount = 1024); |
| | | |
| | | // 便æ·ï¼æ´æºå¤ééå¢éï¼æ¾å¼ dataIdï¼ |
| | | void requestSinceAll(uint32_t dataId, uint32_t machineId, uint64_t sinceTsExclusive, |
| | | uint16_t maxPerChannel = 1024); |
| | | // â
æ°å¢ï¼å¸¦ batchId çæ¾å¼ dataId ç |
| | | void requestSinceAll(uint32_t dataId, uint32_t machineId, uint64_t sinceTsExclusive, |
| | | const std::string& batchId, uint16_t maxPerChannel = 1024); |
| | | |
| | | // ââ æ°å¢ï¼æ¹æ¬¡ä¿¡æ¯æåï¼æ¾å¼/便æ·ï¼ââ |
| | | void requestBatchInfo(uint32_t dataId, uint32_t machineId); // æ¾å¼ dataId |
| | | void requestBatchInfo(uint32_t machineId); // 便æ·ï¼èªå¨ m_nextDataId++ |
| | | |
| | | // ââ æ°å¢ï¼ä¾¿æ·é«å±æ¥å£ï¼èªå¨åé
dataIdï¼ââ |
| | | void requestMachines(); // èªå¨ m_nextDataId++ |
| | | void requestStats(uint32_t machineId); |
| | | void requestSince(uint32_t machineId, uint32_t channelId, |
| | | uint64_t sinceTsExclusive, uint16_t maxCount = 1024); |
| | | // â
æ°å¢ï¼ä¾¿æ·å¸¦ batchId |
| | | void requestSince(uint32_t machineId, uint32_t channelId, |
| | | uint64_t sinceTsExclusive, const std::string& batchId, uint16_t maxCount = 1024); |
| | | |
| | | // 便æ·ï¼æ´æºå¤ééå¢é |
| | | void requestSinceAll(uint32_t machineId, uint64_t sinceTsExclusive, |
| | | uint16_t maxPerChannel = 1024); |
| | | // â
æ°å¢ï¼ä¾¿æ·å¸¦ batchId |
| | | void requestSinceAll(uint32_t machineId, uint64_t sinceTsExclusive, |
| | | const std::string& batchId, uint16_t maxPerChannel = 1024); |
| | | |
| | | private: |
| | | SocketComm socketComm; |
| | | std::function<void(int, std::string)> cbStatus; |
| | | |
| | | // åå§æ°æ®åè° & å¼å
³ |
| | | std::function<void(const std::vector<uint8_t>&)> cbRaw; |
| | | bool rawDumpEnabled = true; |
| | | |
| | | std::thread recvThread; |
| | | std::atomic<bool> recvRunning{ false }; |
| | | SamplesCallback cbSamples; |
| | | SamplesMultiCallback cbSamplesMulti; |
| | | MachinesCallback cbMachines; |
| | | StatsCallback cbStats; |
| | | BatchInfoCallback cbBatchInfo; // â
æ°å¢ |
| | | |
| | | // dataId éå¢ï¼ç¨äºé
对请æ±/ååº |
| | | uint32_t m_nextDataId = 1; |
| | | bool m_versionOk = false; |
| | | }; |
| | | |
| | | #endif // DISPLAY_H |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #pragma once |
| | | #include <vector> |
| | | #include <cstdint> |
| | | #include <algorithm> |
| | | |
| | | class FrameAssembler { |
| | | public: |
| | | // 追å ä¸ååå§åè |
| | | void push(const std::vector<uint8_t>& chunk) { |
| | | buf_.insert(buf_.end(), chunk.begin(), chunk.end()); |
| | | } |
| | | void push(const uint8_t* p, size_t n) { |
| | | buf_.insert(buf_.end(), p, p + n); |
| | | } |
| | | |
| | | // æåä¸ä¸å¸§ï¼è¿å true 表示 out æ¿å°äºä¸å¸§å®æ´æ°æ® |
| | | bool nextFrame(std::vector<uint8_t>& out) { |
| | | const uint8_t HEAD[4] = { 0x11,0x88,0x11,0x88 }; |
| | | const uint8_t TAIL = 0x88; |
| | | for (;;) { |
| | | // éè¦è³å° 4B 头 + 4B dataId + 2B len + 1B å°¾ = 11B |
| | | if (buf_.size() < 11) return false; |
| | | |
| | | // æ¾å¤´åæ¥ |
| | | size_t i = 0; |
| | | while (i + 4 <= buf_.size() && !std::equal(HEAD, HEAD + 4, buf_.begin() + i)) ++i; |
| | | if (i + 4 > buf_.size()) { // 没æ¾å°å¤´ï¼æ¸
空 |
| | | buf_.clear(); |
| | | return false; |
| | | } |
| | | if (i > 0) buf_.erase(buf_.begin(), buf_.begin() + i); // 丢å¼å¤´ååªå£° |
| | | |
| | | if (buf_.size() < 11) return false; // è¿ä¸å¤æå°å¸§ |
| | | |
| | | // è¯»åæ£æé¿åº¦ï¼å¤§ç«¯ï¼ |
| | | uint16_t len = (uint16_t(buf_[8]) << 8) | buf_[9]; |
| | | size_t total = 4 + 4 + 2 + size_t(len) + 1; // æ´å¸§é¿åº¦ |
| | | if (buf_.size() < total) return false; // å帧ï¼ç䏿¬¡ |
| | | |
| | | // æ ¡éªå°¾ |
| | | if (buf_[total - 1] != TAIL) { |
| | | // å°¾ä¸å¯¹ï¼ä¸¢å¼ä¸ä¸ªåèï¼éæ°æ¾å¤´ï¼é¿å
æ»éï¼ |
| | | buf_.erase(buf_.begin()); |
| | | continue; |
| | | } |
| | | |
| | | // ååºå®æ´å¸§ |
| | | out.assign(buf_.begin(), buf_.begin() + total); |
| | | buf_.erase(buf_.begin(), buf_.begin() + total); |
| | | return true; |
| | | } |
| | | } |
| | | |
| | | void clear() { buf_.clear(); } |
| | | |
| | | private: |
| | | std::vector<uint8_t> buf_; |
| | | }; |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #include "SocketComm.h" |
| | | |
| | | SocketComm::SocketComm() { |
| | | if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { |
| | | std::cerr << "WSAStartup failed\n"; |
| | | } |
| | | } |
| | | SocketComm::~SocketComm() { |
| | | closeSocket(); |
| | | WSACleanup(); |
| | | } |
| | | |
| | | bool SocketComm::setNonBlocking(SOCKET s, bool nb) { |
| | | u_long mode = nb ? 1UL : 0UL; |
| | | return ioctlsocket(s, FIONBIO, &mode) == 0; |
| | | } |
| | | |
| | | bool SocketComm::createClientSocket(const std::string& serverIP, uint16_t serverPort) { |
| | | sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); |
| | | if (sock == INVALID_SOCKET) return false; |
| | | |
| | | sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_port = htons(serverPort); |
| | | if (InetPton(AF_INET, serverIP.c_str(), &addr.sin_addr) <= 0) return false; |
| | | |
| | | if (connect(sock, (sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) { |
| | | std::cerr << "connect failed: " << WSAGetLastError() << "\n"; |
| | | closesocket(sock); sock = INVALID_SOCKET; return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | bool SocketComm::createServerSocket(uint16_t port) { |
| | | listenSock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); |
| | | if (listenSock == INVALID_SOCKET) return false; |
| | | |
| | | // å¤ç¨å°å |
| | | BOOL yes = 1; setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR, (char*)&yes, sizeof(yes)); |
| | | |
| | | sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_addr.s_addr = INADDR_ANY; addr.sin_port = htons(port); |
| | | if (bind(listenSock, (sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) return false; |
| | | if (listen(listenSock, SOMAXCONN) == SOCKET_ERROR) return false; |
| | | |
| | | // éé»å¡çå¬ |
| | | setNonBlocking(listenSock, true); |
| | | std::cout << "Server listening on port " << port << "\n"; |
| | | return true; |
| | | } |
| | | |
| | | bool SocketComm::acceptOne(SOCKET& outClient, std::string& outIp, uint16_t& outPort) { |
| | | sockaddr_in caddr{}; int len = sizeof(caddr); |
| | | SOCKET cs = accept(listenSock, (sockaddr*)&caddr, &len); |
| | | if (cs == INVALID_SOCKET) { |
| | | int e = WSAGetLastError(); |
| | | if (e == WSAEWOULDBLOCK) return false; // å½åæ²¡ææ°è¿æ¥ |
| | | std::cerr << "accept error: " << e << "\n"; return false; |
| | | } |
| | | setNonBlocking(cs, true); |
| | | char ipbuf[INET_ADDRSTRLEN]{}; |
| | | InetNtop(AF_INET, &caddr.sin_addr, ipbuf, INET_ADDRSTRLEN); |
| | | outIp = ipbuf; outPort = ntohs(caddr.sin_port); |
| | | outClient = cs; |
| | | return true; |
| | | } |
| | | |
| | | bool SocketComm::recvFrom(SOCKET s, std::vector<uint8_t>& buffer, bool& peerClosed) { |
| | | peerClosed = false; |
| | | char tmp[4096]; |
| | | int r = recv(s, tmp, sizeof(tmp), 0); |
| | | if (r > 0) { |
| | | buffer.assign(tmp, tmp + r); |
| | | return true; |
| | | } |
| | | if (r == 0) { // 对端æ£å¸¸å
³é |
| | | peerClosed = true; |
| | | return false; |
| | | } |
| | | int e = WSAGetLastError(); |
| | | if (e == WSAEWOULDBLOCK) return false; // ææ æ°æ® |
| | | // å
¶å®é误ä¹è®¤ä¸ºè¿æ¥ä¸å¯ç¨äº |
| | | peerClosed = true; |
| | | return false; |
| | | } |
| | | |
| | | bool SocketComm::sendTo(SOCKET s, const std::vector<uint8_t>& data) { |
| | | int sent = send(s, reinterpret_cast<const char*>(data.data()), (int)data.size(), 0); |
| | | return sent == (int)data.size(); |
| | | } |
| | | |
| | | void SocketComm::closeClient(SOCKET s) { |
| | | if (s != INVALID_SOCKET) closesocket(s); |
| | | } |
| | | |
| | | void SocketComm::closeSocket() { |
| | | if (listenSock != INVALID_SOCKET) { closesocket(listenSock); listenSock = INVALID_SOCKET; } |
| | | if (sock != INVALID_SOCKET) { closesocket(sock); sock = INVALID_SOCKET; } |
| | | } |
| | | |
| | | bool SocketComm::sendDataSingle(const std::vector<uint8_t>& data) { |
| | | if (sock == INVALID_SOCKET) return false; |
| | | return sendTo(sock, data); |
| | | } |
| | | bool SocketComm::recvSingle(std::vector<uint8_t>& buffer) { |
| | | if (sock == INVALID_SOCKET) return false; |
| | | return recvFrom(sock, buffer); |
| | | } |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #pragma once |
| | | #ifndef SOCKET_COMM_H |
| | | #define SOCKET_COMM_H |
| | | |
| | | #include <winsock2.h> |
| | | #include <ws2tcpip.h> |
| | | #include <string> |
| | | #include <vector> |
| | | #include <iostream> |
| | | |
| | | #pragma comment(lib, "ws2_32.lib") |
| | | |
| | | class SocketComm { |
| | | public: |
| | | SocketComm(); |
| | | ~SocketComm(); |
| | | |
| | | bool createClientSocket(const std::string& serverIP, uint16_t serverPort); // 客æ·ç«¯ |
| | | bool createServerSocket(uint16_t port); // æå¡ç«¯ï¼çå¬ï¼ |
| | | |
| | | // æ°å¢ï¼éé»å¡ accept/æ¶å/å
³éæå®å®¢æ·ç«¯ |
| | | bool acceptOne(SOCKET& outClient, std::string& outIp, uint16_t& outPort); |
| | | bool recvFrom(SOCKET s, std::vector<uint8_t>& buffer, bool& peerClosed); |
| | | |
| | | // å¯éï¼ä¸ºäºå
¼å®¹æ§è°ç¨ï¼ä¿çä¸ä¸ªå
èå
è£
ï¼å¦æå
¶å®å°æ¹è¿ç¨å°äºæ§ç¾åï¼ |
| | | inline bool recvFrom(SOCKET s, std::vector<uint8_t>& buffer) { |
| | | bool closed = false; |
| | | return recvFrom(s, buffer, closed); |
| | | } |
| | | bool sendTo(SOCKET s, const std::vector<uint8_t>& data); |
| | | void closeClient(SOCKET s); |
| | | |
| | | void closeSocket(); // å
³éç嬿åè¿æ¥ |
| | | |
| | | // ä¾ä¸å±å¤ææ¬å¯¹è±¡å½åæ¯å¦æ¯âç嬿¨¡å¼â |
| | | bool isListening() const { return listenSock != INVALID_SOCKET; } |
| | | bool sendDataSingle(const std::vector<uint8_t>& data); // 客æ·ç«¯åè¿æ¥åé |
| | | bool recvSingle(std::vector<uint8_t>& buffer); // 客æ·ç«¯åè¿æ¥æ¥æ¶ |
| | | |
| | | private: |
| | | SOCKET listenSock = INVALID_SOCKET; // çå¬ socketï¼æå¡ç«¯ï¼ |
| | | SOCKET sock = INVALID_SOCKET; // åè¿æ¥æ¨¡å¼ï¼å®¢æ·ç«¯æ¶ç¨ï¼ |
| | | WSADATA wsaData{}; |
| | | |
| | | bool setNonBlocking(SOCKET s, bool nb); |
| | | }; |
| | | |
| | | #endif // SOCKET_COMM_H |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #pragma once |
| | | #include <cstdint> |
| | | #include <string> |
| | | #include <vector> |
| | | |
| | | namespace Proto { |
| | | |
| | | // åºå®å¸§ |
| | | inline constexpr uint8_t kHead[4] = { 0x11, 0x88, 0x11, 0x88 }; |
| | | inline constexpr uint8_t kTail = 0x88; |
| | | |
| | | // æä»¤ç |
| | | enum : uint16_t { |
| | | // æ¡æ |
| | | CMD_REQ_VERSION = 0x0001, |
| | | CMD_RSP_VERSION = 0xF001, |
| | | |
| | | // åééå¢éæå |
| | | CMD_REQ_SINCE = 0x0101, |
| | | CMD_RSP_SINCE = 0xF101, |
| | | |
| | | // ç»è®¡/ééå表 |
| | | CMD_REQ_STATS = 0x0103, |
| | | CMD_RSP_STATS = 0xF103, |
| | | |
| | | // æºå°å表 |
| | | CMD_REQ_MACHINES = 0x0104, |
| | | CMD_RSP_MACHINES = 0xF104, |
| | | |
| | | // æ´æºå¤ééå¢éæå |
| | | CMD_REQ_SINCE_ALL = 0x0105, |
| | | CMD_RSP_SINCE_ALL = 0xF105, |
| | | |
| | | // æ¹æ¬¡ä¿¡æ¯ |
| | | CMD_REQ_BATCH_INFO = 0x0120, |
| | | CMD_RSP_BATCH_INFO = 0xF120, |
| | | |
| | | // é误 |
| | | CMD_RSP_ERROR = 0xE100, |
| | | }; |
| | | |
| | | // === since* 请æ±ééå batchId çæ å¿ä½ === |
| | | inline constexpr uint16_t SINCE_FLAG_HAS_BATCH = 0x0001; |
| | | |
| | | // ===== æ°æ®ç»æï¼ä¸¤ç«¯å
±ç¨ï¼ ===== |
| | | |
| | | // 0x0001 / 0xF001 |
| | | struct ReqVersion { |
| | | uint32_t dataId = 0; |
| | | }; |
| | | struct RspVersion { |
| | | uint32_t dataId = 0; |
| | | std::string version; // UTF-8ï¼ä¾å¦ "1.0.1" |
| | | }; |
| | | |
| | | // 0x0104 / 0xF104 |
| | | struct MachineInfo { |
| | | uint32_t id = 0; |
| | | std::string name; |
| | | }; |
| | | struct ReqMachines { |
| | | uint32_t dataId = 0; |
| | | }; |
| | | struct RspMachines { |
| | | uint32_t dataId = 0; |
| | | std::vector<MachineInfo> machines; |
| | | }; |
| | | |
| | | // 0x0103 / 0xF103 |
| | | struct ReqStats { |
| | | uint32_t dataId = 0; |
| | | uint32_t machineId = 0; |
| | | uint16_t flags = 0; |
| | | }; |
| | | struct ChannelStatInfo { |
| | | uint32_t channelId = 0; |
| | | uint64_t earliestTs = 0; |
| | | uint64_t latestTs = 0; |
| | | uint32_t size = 0; |
| | | std::string name; // UTF-8 |
| | | }; |
| | | struct RspStats { |
| | | uint32_t dataId = 0; |
| | | uint32_t machineId = 0; |
| | | std::vector<ChannelStatInfo> channels; |
| | | }; |
| | | |
| | | // 0x0101 / 0xF101ï¼åééå¢éï¼ |
| | | struct ReqSince { |
| | | uint32_t dataId = 0; |
| | | uint32_t machineId = 0; |
| | | uint32_t channelId = 0; |
| | | uint64_t sinceTsExclusive = 0; // ms |
| | | uint16_t maxCount = 1024; |
| | | uint16_t flags = 0; // æä½ï¼è§ SINCE_FLAG_HAS_BATCH |
| | | std::string batchId; // flags & SINCE_FLAG_HAS_BATCH æ¶ææ |
| | | }; |
| | | struct SamplePair { |
| | | uint64_t ts_ms = 0; |
| | | double value = 0.0; |
| | | }; |
| | | struct RspSince { |
| | | uint32_t dataId = 0; |
| | | uint32_t machineId = 0; |
| | | uint32_t channelId = 0; |
| | | uint64_t lastTsSent = 0; |
| | | uint8_t more = 0; // 该é鿝å¦è¿ææªå |
| | | std::vector<SamplePair> samples; |
| | | }; |
| | | |
| | | // 0x0105 / 0xF105ï¼æ´æºå¤ééå¢éï¼ |
| | | struct ChannelBlock { |
| | | uint32_t channelId = 0; |
| | | uint64_t lastTsSent = 0; // 该ééæ¬æ¬¡æåæ ·æ¬æ¶é´æ³ |
| | | uint8_t more = 0; // 该é鿝å¦è¿ææªå |
| | | std::vector<SamplePair> samples; |
| | | }; |
| | | struct ReqSinceAll { |
| | | uint32_t dataId = 0; |
| | | uint32_t machineId = 0; |
| | | uint64_t sinceTsExclusive = 0; // 对ææééç»ä¸ since |
| | | uint16_t maxPerChannel = 1024; // æ¯æ¡æ²çº¿ä¸é |
| | | uint16_t flags = 0; // æä½ï¼è§ SINCE_FLAG_HAS_BATCH |
| | | std::string batchId; // flags & SINCE_FLAG_HAS_BATCH æ¶ææ |
| | | }; |
| | | struct RspSinceAll { |
| | | uint32_t dataId = 0; |
| | | uint32_t machineId = 0; |
| | | uint8_t moreAny = 0; // æ¯å¦è¿æä»»æééæå©ä½ |
| | | std::vector<ChannelBlock> blocks; // å¤ä¸ªééçå¢é |
| | | }; |
| | | |
| | | // === æ¹æ¬¡ç¶æ === |
| | | enum class BatchState : uint8_t { |
| | | Idle = 0, // æ æ´»å¨æ¹æ¬¡ï¼activeBatchId="", activeStartTs=0, expectedEndTs=0 |
| | | Active = 1, |
| | | }; |
| | | |
| | | // === é误ç === |
| | | enum class ErrCode : uint16_t { |
| | | NoActiveBatch = 0x0001, // å½åæ æ´»å¨æ¹æ¬¡ |
| | | BatchMismatch = 0x0002, // è¯·æ±æºå¸¦ç batchId ä¸å½åæ´»å¨æ¹æ¬¡ä¸ä¸è´ |
| | | }; |
| | | |
| | | // 0x0120 / 0xF120ï¼æ¹æ¬¡ä¿¡æ¯ï¼ |
| | | struct ReqBatchInfo { |
| | | uint32_t dataId = 0; |
| | | uint32_t machineId = 0; |
| | | }; |
| | | struct RspBatchInfo { |
| | | uint32_t dataId = 0; |
| | | uint32_t machineId = 0; |
| | | BatchState state = BatchState::Idle; |
| | | std::string activeBatchId; // state=Idle æ¶åºä¸ºç©º |
| | | uint64_t activeStartTs = 0; |
| | | uint64_t expectedEndTs = 0; // =0 表示æªç¥/Idle |
| | | }; |
| | | |
| | | // 0xE100 é误帧 |
| | | struct RspError { |
| | | uint32_t dataId = 0; |
| | | uint16_t refCmd = 0; // åºé请æ±çæä»¤ï¼å¦ 0x0101/0x0105/0x0120... |
| | | uint32_t machineId = 0; |
| | | ErrCode code = ErrCode::NoActiveBatch; |
| | | std::string message; // ç®çææ¬ |
| | | }; |
| | | |
| | | } // namespace Proto |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #include "ProtocolCodec.h" |
| | | #include <cstring> // std::memcpy |
| | | #include <cstdio> // std::snprintf (å¦éæ¥å¿) |
| | | #include <vector> |
| | | |
| | | namespace Proto { |
| | | |
| | | // --------------------------- |
| | | // Big-endian åºç¡ç¼è§£ç å·¥å
· |
| | | // --------------------------- |
| | | void put_u16(std::vector<uint8_t>& v, uint16_t x) { |
| | | v.push_back(static_cast<uint8_t>((x >> 8) & 0xFF)); |
| | | v.push_back(static_cast<uint8_t>(x & 0xFF)); |
| | | } |
| | | void put_u32(std::vector<uint8_t>& v, uint32_t x) { |
| | | v.push_back(static_cast<uint8_t>((x >> 24) & 0xFF)); |
| | | v.push_back(static_cast<uint8_t>((x >> 16) & 0xFF)); |
| | | v.push_back(static_cast<uint8_t>((x >> 8) & 0xFF)); |
| | | v.push_back(static_cast<uint8_t>(x & 0xFF)); |
| | | } |
| | | void put_u64(std::vector<uint8_t>& v, uint64_t x) { |
| | | for (int i = 7; i >= 0; --i) v.push_back(static_cast<uint8_t>((x >> (i * 8)) & 0xFF)); |
| | | } |
| | | uint16_t get_u16(const uint8_t* p) { |
| | | return static_cast<uint16_t>((uint16_t(p[0]) << 8) | uint16_t(p[1])); |
| | | } |
| | | uint32_t get_u32(const uint8_t* p) { |
| | | return (uint32_t(p[0]) << 24) | (uint32_t(p[1]) << 16) | (uint32_t(p[2]) << 8) | uint32_t(p[3]); |
| | | } |
| | | uint64_t get_u64(const uint8_t* p) { |
| | | uint64_t x = 0; |
| | | for (int i = 0; i < 8; ++i) { x = (x << 8) | p[i]; } |
| | | return x; |
| | | } |
| | | void put_f64_be(std::vector<uint8_t>& v, double d) { |
| | | static_assert(sizeof(double) == 8, "double must be 8 bytes"); |
| | | uint64_t u; |
| | | std::memcpy(&u, &d, 8); |
| | | for (int i = 7; i >= 0; --i) v.push_back(static_cast<uint8_t>((u >> (i * 8)) & 0xFF)); |
| | | } |
| | | double get_f64_be(const uint8_t* p) { |
| | | uint64_t u = 0; |
| | | for (int i = 0; i < 8; ++i) { u = (u << 8) | p[i]; } |
| | | double d; |
| | | std::memcpy(&d, &u, 8); |
| | | return d; |
| | | } |
| | | |
| | | // å¿«éçª¥æ¢æ£æ cmdï¼ä¸å宿´é¿åº¦æ ¸å¯¹ï¼ä¿çä½ åæ¬é£æ ¼ï¼ |
| | | uint16_t peek_cmd(const std::vector<uint8_t>& f) { |
| | | if (f.size() < 12) return 0; |
| | | if (!(f[0] == kHead[0] && f[1] == kHead[1] && f[2] == kHead[2] && f[3] == kHead[3])) return 0; |
| | | return (uint16_t(f[10]) << 8) | f[11]; |
| | | } |
| | | |
| | | // å
è£
帧ï¼4B头 + 4B dataId + 2B bodyLen + body + 1Bå°¾ |
| | | static inline void frame_wrap(uint32_t dataId, const std::vector<uint8_t>& body, std::vector<uint8_t>& out) { |
| | | out.clear(); |
| | | out.reserve(4 + 4 + 2 + body.size() + 1); |
| | | out.push_back(kHead[0]); out.push_back(kHead[1]); out.push_back(kHead[2]); out.push_back(kHead[3]); |
| | | put_u32(out, dataId); |
| | | put_u16(out, static_cast<uint16_t>(body.size())); |
| | | out.insert(out.end(), body.begin(), body.end()); |
| | | out.push_back(kTail); |
| | | } |
| | | |
| | | // --------------------------- |
| | | // 0x0104 / 0xF104 Machines |
| | | // --------------------------- |
| | | std::vector<uint8_t> encodeRequestMachines(const ReqMachines& req) { |
| | | std::vector<uint8_t> body; |
| | | put_u16(body, CMD_REQ_MACHINES); |
| | | |
| | | std::vector<uint8_t> out; |
| | | frame_wrap(req.dataId, body, out); |
| | | return out; |
| | | } |
| | | |
| | | std::vector<uint8_t> encodeResponseMachines(const RspMachines& rsp) { |
| | | std::vector<uint8_t> body; |
| | | put_u16(body, CMD_RSP_MACHINES); |
| | | put_u16(body, static_cast<uint16_t>(rsp.machines.size())); |
| | | for (auto& m : rsp.machines) { |
| | | put_u32(body, m.id); |
| | | const uint16_t n = static_cast<uint16_t>(m.name.size()); |
| | | put_u16(body, n); |
| | | body.insert(body.end(), m.name.begin(), m.name.end()); |
| | | } |
| | | |
| | | std::vector<uint8_t> out; |
| | | frame_wrap(rsp.dataId, body, out); |
| | | return out; |
| | | } |
| | | |
| | | bool decodeRequestMachines(const std::vector<uint8_t>& f, ReqMachines& out) { |
| | | if (f.size() < 11) return false; |
| | | const uint8_t* p = f.data(); |
| | | if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; |
| | | if (f.back() != kTail) return false; |
| | | |
| | | out.dataId = get_u32(p + 4); |
| | | const uint16_t bodyLen = get_u16(p + 8); |
| | | if (10u + bodyLen + 1u != f.size()) return false; |
| | | |
| | | const uint8_t* b = p + 10; |
| | | return (get_u16(b) == CMD_REQ_MACHINES); |
| | | } |
| | | |
| | | bool decodeResponseMachines(const std::vector<uint8_t>& f, RspMachines& out) { |
| | | if (f.size() < 11) return false; |
| | | const uint8_t* p = f.data(); |
| | | if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; |
| | | if (f.back() != kTail) return false; |
| | | |
| | | out.dataId = get_u32(p + 4); |
| | | const uint16_t bodyLen = get_u16(p + 8); |
| | | if (10u + bodyLen + 1u != f.size()) return false; |
| | | |
| | | const uint8_t* b = p + 10; |
| | | const uint8_t* e = b + bodyLen; |
| | | |
| | | if (b + 2 + 2 > e) return false; |
| | | if (get_u16(b) != CMD_RSP_MACHINES) return false; |
| | | b += 2; |
| | | const uint16_t cnt = get_u16(b); b += 2; |
| | | |
| | | out.machines.clear(); |
| | | out.machines.reserve(cnt); |
| | | for (uint16_t i = 0; i < cnt; ++i) { |
| | | if (b + 4 + 2 > e) return false; |
| | | const uint32_t id = get_u32(b); b += 4; |
| | | const uint16_t n = get_u16(b); b += 2; |
| | | if (b + n > e) return false; |
| | | out.machines.push_back({ id, std::string(reinterpret_cast<const char*>(b), n) }); |
| | | b += n; |
| | | } |
| | | return (b == e); |
| | | } |
| | | |
| | | // --------------------------- |
| | | // 0x0001 / 0xF001 Version |
| | | // --------------------------- |
| | | std::vector<uint8_t> encodeRequestVersion(const ReqVersion& req) { |
| | | std::vector<uint8_t> body; |
| | | put_u16(body, CMD_REQ_VERSION); |
| | | |
| | | std::vector<uint8_t> out; |
| | | frame_wrap(req.dataId, body, out); |
| | | return out; |
| | | } |
| | | |
| | | std::vector<uint8_t> encodeResponseVersion(const RspVersion& rsp) { |
| | | std::vector<uint8_t> body; |
| | | put_u16(body, CMD_RSP_VERSION); |
| | | const uint16_t n = static_cast<uint16_t>(rsp.version.size()); |
| | | put_u16(body, n); |
| | | body.insert(body.end(), rsp.version.begin(), rsp.version.end()); // UTF-8 |
| | | |
| | | std::vector<uint8_t> out; |
| | | frame_wrap(rsp.dataId, body, out); |
| | | return out; |
| | | } |
| | | |
| | | bool decodeRequestVersion(const std::vector<uint8_t>& f, ReqVersion& out) { |
| | | if (f.size() < 11) return false; |
| | | const uint8_t* p = f.data(); |
| | | if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; |
| | | if (f.back() != kTail) return false; |
| | | |
| | | out.dataId = get_u32(p + 4); |
| | | const uint16_t bodyLen = get_u16(p + 8); |
| | | if (10u + bodyLen + 1u != f.size()) return false; |
| | | const uint8_t* b = p + 10; |
| | | return (get_u16(b) == CMD_REQ_VERSION); |
| | | } |
| | | |
| | | bool decodeResponseVersion(const std::vector<uint8_t>& f, RspVersion& out) { |
| | | if (f.size() < 11) return false; |
| | | const uint8_t* p = f.data(); |
| | | if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; |
| | | if (f.back() != kTail) return false; |
| | | |
| | | out.dataId = get_u32(p + 4); |
| | | const uint16_t bodyLen = get_u16(p + 8); |
| | | if (10u + bodyLen + 1u != f.size()) return false; |
| | | |
| | | const uint8_t* b = p + 10; |
| | | const uint8_t* e = b + bodyLen; |
| | | |
| | | if (b + 2 + 2 > e) return false; |
| | | if (get_u16(b) != CMD_RSP_VERSION) return false; |
| | | b += 2; |
| | | |
| | | const uint16_t n = get_u16(b); b += 2; |
| | | if (b + n > e) return false; |
| | | out.version.assign(reinterpret_cast<const char*>(b), n); |
| | | b += n; |
| | | |
| | | return (b == e); |
| | | } |
| | | |
| | | // --------------------------- |
| | | // 0x0103 / 0xF103 Stats |
| | | // --------------------------- |
| | | std::vector<uint8_t> encodeRequestStats(const ReqStats& req) { |
| | | std::vector<uint8_t> body; |
| | | put_u16(body, CMD_REQ_STATS); |
| | | put_u32(body, req.machineId); |
| | | put_u16(body, req.flags); |
| | | |
| | | std::vector<uint8_t> out; |
| | | frame_wrap(req.dataId, body, out); |
| | | return out; |
| | | } |
| | | |
| | | std::vector<uint8_t> encodeResponseStats(const RspStats& rsp) { |
| | | std::vector<uint8_t> body; |
| | | put_u16(body, CMD_RSP_STATS); |
| | | put_u32(body, rsp.machineId); |
| | | put_u16(body, static_cast<uint16_t>(rsp.channels.size())); |
| | | |
| | | for (auto& c : rsp.channels) { |
| | | put_u32(body, c.channelId); |
| | | put_u64(body, c.earliestTs); |
| | | put_u64(body, c.latestTs); |
| | | put_u32(body, c.size); |
| | | const uint16_t n = static_cast<uint16_t>(c.name.size()); |
| | | put_u16(body, n); |
| | | body.insert(body.end(), c.name.begin(), c.name.end()); |
| | | } |
| | | |
| | | std::vector<uint8_t> out; |
| | | frame_wrap(rsp.dataId, body, out); |
| | | return out; |
| | | } |
| | | |
| | | bool decodeRequestStats(const std::vector<uint8_t>& f, ReqStats& out) { |
| | | if (f.size() < 11) return false; |
| | | const uint8_t* p = f.data(); |
| | | if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; |
| | | if (f.back() != kTail) return false; |
| | | |
| | | out.dataId = get_u32(p + 4); |
| | | const uint16_t bodyLen = get_u16(p + 8); |
| | | if (10u + bodyLen + 1u != f.size()) return false; |
| | | |
| | | const uint8_t* b = p + 10; |
| | | const uint8_t* e = b + bodyLen; |
| | | if (b + 2 + 4 + 2 > e) return false; |
| | | if (get_u16(b) != CMD_REQ_STATS) return false; |
| | | b += 2; |
| | | |
| | | out.machineId = get_u32(b); b += 4; |
| | | out.flags = get_u16(b); b += 2; |
| | | |
| | | return (b == e); |
| | | } |
| | | |
| | | bool decodeResponseStats(const std::vector<uint8_t>& f, RspStats& out) { |
| | | if (f.size() < 11) return false; |
| | | const uint8_t* p = f.data(); |
| | | if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; |
| | | if (f.back() != kTail) return false; |
| | | |
| | | out.dataId = get_u32(p + 4); |
| | | const uint16_t bodyLen = get_u16(p + 8); |
| | | if (10u + bodyLen + 1u != f.size()) return false; |
| | | |
| | | const uint8_t* b = p + 10; |
| | | const uint8_t* e = b + bodyLen; |
| | | |
| | | if (b + 2 + 4 + 2 > e) return false; |
| | | if (get_u16(b) != CMD_RSP_STATS) return false; |
| | | b += 2; |
| | | |
| | | out.machineId = get_u32(b); b += 4; |
| | | const uint16_t cnt = get_u16(b); b += 2; |
| | | |
| | | out.channels.clear(); |
| | | out.channels.reserve(cnt); |
| | | for (uint16_t i = 0; i < cnt; ++i) { |
| | | if (b + 4 + 8 + 8 + 4 + 2 > e) return false; |
| | | ChannelStatInfo ci{}; |
| | | ci.channelId = get_u32(b); b += 4; |
| | | ci.earliestTs = get_u64(b); b += 8; |
| | | ci.latestTs = get_u64(b); b += 8; |
| | | ci.size = get_u32(b); b += 4; |
| | | |
| | | const uint16_t n = get_u16(b); b += 2; |
| | | if (b + n > e) return false; |
| | | ci.name.assign(reinterpret_cast<const char*>(b), n); |
| | | b += n; |
| | | |
| | | out.channels.push_back(std::move(ci)); |
| | | } |
| | | return (b == e); |
| | | } |
| | | |
| | | // --------------------------------------- |
| | | // 0x0101 / 0xF101 Sinceï¼æ¯æå¯é batchIdï¼ |
| | | // --------------------------------------- |
| | | std::vector<uint8_t> encodeRequestSince(const ReqSince& req) { |
| | | std::vector<uint8_t> body; |
| | | put_u16(body, CMD_REQ_SINCE); |
| | | put_u32(body, req.machineId); |
| | | put_u32(body, req.channelId); |
| | | put_u64(body, req.sinceTsExclusive); |
| | | put_u16(body, req.maxCount); |
| | | put_u16(body, req.flags); |
| | | if (req.flags & SINCE_FLAG_HAS_BATCH) { |
| | | const uint16_t L = static_cast<uint16_t>(req.batchId.size()); |
| | | put_u16(body, L); |
| | | body.insert(body.end(), req.batchId.begin(), req.batchId.end()); |
| | | } |
| | | |
| | | std::vector<uint8_t> out; |
| | | frame_wrap(req.dataId, body, out); |
| | | return out; |
| | | } |
| | | |
| | | std::vector<uint8_t> encodeResponseSince(const RspSince& rsp) { |
| | | std::vector<uint8_t> body; |
| | | put_u16(body, CMD_RSP_SINCE); |
| | | put_u32(body, rsp.machineId); |
| | | put_u32(body, rsp.channelId); |
| | | put_u64(body, rsp.lastTsSent); |
| | | body.push_back(rsp.more ? 1u : 0u); |
| | | put_u16(body, static_cast<uint16_t>(rsp.samples.size())); |
| | | for (auto& s : rsp.samples) { |
| | | put_u64(body, s.ts_ms); |
| | | put_f64_be(body, s.value); |
| | | } |
| | | |
| | | std::vector<uint8_t> out; |
| | | frame_wrap(rsp.dataId, body, out); |
| | | return out; |
| | | } |
| | | |
| | | bool decodeRequestSince(const std::vector<uint8_t>& f, ReqSince& out) { |
| | | if (f.size() < 11) return false; |
| | | const uint8_t* p = f.data(); |
| | | if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; |
| | | if (f.back() != kTail) return false; |
| | | |
| | | out.dataId = get_u32(p + 4); |
| | | const uint16_t bodyLen = get_u16(p + 8); |
| | | if (10u + bodyLen + 1u != f.size()) return false; |
| | | |
| | | const uint8_t* b = p + 10; |
| | | const uint8_t* e = b + bodyLen; |
| | | |
| | | if (b + 2 > e) return false; |
| | | if (get_u16(b) != CMD_REQ_SINCE) return false; |
| | | b += 2; |
| | | |
| | | if (b + 4 + 4 + 8 + 2 + 2 > e) return false; |
| | | out.machineId = get_u32(b); b += 4; |
| | | out.channelId = get_u32(b); b += 4; |
| | | out.sinceTsExclusive = get_u64(b); b += 8; |
| | | out.maxCount = get_u16(b); b += 2; |
| | | out.flags = get_u16(b); b += 2; |
| | | |
| | | out.batchId.clear(); |
| | | if (out.flags & SINCE_FLAG_HAS_BATCH) { |
| | | if (b + 2 > e) return false; |
| | | const uint16_t L = get_u16(b); b += 2; |
| | | if (b + L > e) return false; |
| | | out.batchId.assign(reinterpret_cast<const char*>(b), L); |
| | | b += L; |
| | | } |
| | | |
| | | return (b == e); |
| | | } |
| | | |
| | | bool decodeResponseSince(const std::vector<uint8_t>& f, RspSince& out) { |
| | | if (f.size() < 11) return false; |
| | | const uint8_t* p = f.data(); |
| | | if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; |
| | | if (f.back() != kTail) return false; |
| | | |
| | | out.dataId = get_u32(p + 4); |
| | | const uint16_t bodyLen = get_u16(p + 8); |
| | | if (10u + bodyLen + 1u != f.size()) return false; |
| | | |
| | | const uint8_t* b = p + 10; |
| | | const uint8_t* e = b + bodyLen; |
| | | |
| | | if (b + 2 > e) return false; |
| | | if (get_u16(b) != CMD_RSP_SINCE) return false; |
| | | b += 2; |
| | | |
| | | if (b + 4 + 4 + 8 + 1 + 2 > e) return false; |
| | | out.machineId = get_u32(b); b += 4; |
| | | out.channelId = get_u32(b); b += 4; |
| | | out.lastTsSent = get_u64(b); b += 8; |
| | | out.more = *b++; // u8 |
| | | const uint16_t cnt = get_u16(b); b += 2; |
| | | |
| | | out.samples.clear(); |
| | | out.samples.reserve(cnt); |
| | | for (uint16_t i = 0; i < cnt; ++i) { |
| | | if (b + 8 + 8 > e) return false; |
| | | const uint64_t ts = get_u64(b); b += 8; |
| | | const double v = get_f64_be(b); b += 8; |
| | | out.samples.push_back({ ts, v }); |
| | | } |
| | | return (b == e); |
| | | } |
| | | |
| | | // --------------------------------------- |
| | | // 0x0105 / 0xF105 SinceAllï¼æ´æºå¤ééå¢éï¼ |
| | | // --------------------------------------- |
| | | std::vector<uint8_t> encodeRequestSinceAll(const ReqSinceAll& req) { |
| | | std::vector<uint8_t> body; |
| | | put_u16(body, CMD_REQ_SINCE_ALL); |
| | | put_u32(body, req.machineId); |
| | | put_u64(body, req.sinceTsExclusive); |
| | | put_u16(body, req.maxPerChannel); |
| | | put_u16(body, req.flags); |
| | | if (req.flags & SINCE_FLAG_HAS_BATCH) { |
| | | const uint16_t L = static_cast<uint16_t>(req.batchId.size()); |
| | | put_u16(body, L); |
| | | body.insert(body.end(), req.batchId.begin(), req.batchId.end()); |
| | | } |
| | | |
| | | std::vector<uint8_t> out; |
| | | frame_wrap(req.dataId, body, out); |
| | | return out; |
| | | } |
| | | |
| | | std::vector<uint8_t> encodeResponseSinceAll(const RspSinceAll& rsp) { |
| | | std::vector<uint8_t> body; |
| | | put_u16(body, CMD_RSP_SINCE_ALL); |
| | | put_u32(body, rsp.machineId); |
| | | body.push_back(rsp.moreAny ? 1u : 0u); |
| | | put_u16(body, static_cast<uint16_t>(rsp.blocks.size())); |
| | | |
| | | for (const auto& b : rsp.blocks) { |
| | | put_u32(body, b.channelId); |
| | | put_u64(body, b.lastTsSent); |
| | | body.push_back(b.more ? 1u : 0u); |
| | | put_u16(body, static_cast<uint16_t>(b.samples.size())); |
| | | for (const auto& sp : b.samples) { |
| | | put_u64(body, sp.ts_ms); |
| | | put_f64_be(body, sp.value); |
| | | } |
| | | } |
| | | |
| | | std::vector<uint8_t> out; |
| | | frame_wrap(rsp.dataId, body, out); |
| | | return out; |
| | | } |
| | | |
| | | bool decodeRequestSinceAll(const std::vector<uint8_t>& f, ReqSinceAll& out) { |
| | | if (f.size() < 11) return false; |
| | | const uint8_t* p = f.data(); |
| | | if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; |
| | | if (f.back() != kTail) return false; |
| | | |
| | | out.dataId = get_u32(p + 4); |
| | | const uint16_t bodyLen = get_u16(p + 8); |
| | | if (10u + bodyLen + 1u != f.size()) return false; |
| | | |
| | | const uint8_t* b = p + 10; |
| | | const uint8_t* e = b + bodyLen; |
| | | |
| | | if (b + 2 > e) return false; |
| | | if (get_u16(b) != CMD_REQ_SINCE_ALL) return false; |
| | | b += 2; |
| | | |
| | | if (b + 4 + 8 + 2 + 2 > e) return false; |
| | | out.machineId = get_u32(b); b += 4; |
| | | out.sinceTsExclusive = get_u64(b); b += 8; |
| | | out.maxPerChannel = get_u16(b); b += 2; |
| | | out.flags = get_u16(b); b += 2; |
| | | |
| | | out.batchId.clear(); |
| | | if (out.flags & SINCE_FLAG_HAS_BATCH) { |
| | | if (b + 2 > e) return false; |
| | | const uint16_t L = get_u16(b); b += 2; |
| | | if (b + L > e) return false; |
| | | out.batchId.assign(reinterpret_cast<const char*>(b), L); |
| | | b += L; |
| | | } |
| | | return (b == e); |
| | | } |
| | | |
| | | bool decodeResponseSinceAll(const std::vector<uint8_t>& f, RspSinceAll& out) { |
| | | if (f.size() < 11) return false; |
| | | const uint8_t* p = f.data(); |
| | | if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; |
| | | if (f.back() != kTail) return false; |
| | | |
| | | out.dataId = get_u32(p + 4); |
| | | const uint16_t bodyLen = get_u16(p + 8); |
| | | if (10u + bodyLen + 1u != f.size()) return false; |
| | | |
| | | const uint8_t* b = p + 10; |
| | | const uint8_t* e = b + bodyLen; |
| | | |
| | | if (b + 2 > e) return false; |
| | | if (get_u16(b) != CMD_RSP_SINCE_ALL) return false; |
| | | b += 2; |
| | | |
| | | if (b + 4 + 1 + 2 > e) return false; |
| | | out.machineId = get_u32(b); b += 4; |
| | | out.moreAny = *b++; // u8 |
| | | const uint16_t N = get_u16(b); b += 2; |
| | | |
| | | out.blocks.clear(); |
| | | out.blocks.reserve(N); |
| | | for (uint16_t i = 0; i < N; ++i) { |
| | | if (b + 4 + 8 + 1 + 2 > e) return false; |
| | | ChannelBlock blk; |
| | | blk.channelId = get_u32(b); b += 4; |
| | | blk.lastTsSent = get_u64(b); b += 8; |
| | | blk.more = *b++; // u8 |
| | | const uint16_t M = get_u16(b); b += 2; |
| | | |
| | | blk.samples.clear(); |
| | | blk.samples.reserve(M); |
| | | for (uint16_t j = 0; j < M; ++j) { |
| | | if (b + 8 + 8 > e) return false; |
| | | SamplePair sp; |
| | | sp.ts_ms = get_u64(b); b += 8; |
| | | sp.value = get_f64_be(b); b += 8; |
| | | blk.samples.push_back(sp); |
| | | } |
| | | out.blocks.push_back(std::move(blk)); |
| | | } |
| | | return (b == e); |
| | | } |
| | | |
| | | // --------------------------- |
| | | // 0x0120 / 0xF120 BatchInfo |
| | | // --------------------------- |
| | | std::vector<uint8_t> encodeRequestBatchInfo(const ReqBatchInfo& req) { |
| | | std::vector<uint8_t> body; |
| | | put_u16(body, CMD_REQ_BATCH_INFO); |
| | | put_u32(body, req.machineId); |
| | | |
| | | std::vector<uint8_t> out; |
| | | frame_wrap(req.dataId, body, out); |
| | | return out; |
| | | } |
| | | |
| | | bool decodeRequestBatchInfo(const std::vector<uint8_t>& f, ReqBatchInfo& out) { |
| | | if (f.size() < 11) return false; |
| | | const uint8_t* p = f.data(); |
| | | if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; |
| | | if (f.back() != kTail) return false; |
| | | |
| | | out.dataId = get_u32(p + 4); |
| | | const uint16_t bodyLen = get_u16(p + 8); |
| | | if (10u + bodyLen + 1u != f.size()) return false; |
| | | |
| | | const uint8_t* b = p + 10; |
| | | const uint8_t* e = b + bodyLen; |
| | | |
| | | if (b + 2 + 4 > e) return false; |
| | | if (get_u16(b) != CMD_REQ_BATCH_INFO) return false; |
| | | b += 2; |
| | | |
| | | out.machineId = get_u32(b); b += 4; |
| | | return (b == e); |
| | | } |
| | | |
| | | std::vector<uint8_t> encodeResponseBatchInfo(const RspBatchInfo& rsp) { |
| | | std::vector<uint8_t> body; |
| | | put_u16(body, CMD_RSP_BATCH_INFO); |
| | | put_u32(body, rsp.machineId); |
| | | body.push_back(static_cast<uint8_t>(rsp.state)); |
| | | |
| | | const uint16_t L = static_cast<uint16_t>(rsp.activeBatchId.size()); |
| | | put_u16(body, L); |
| | | body.insert(body.end(), rsp.activeBatchId.begin(), rsp.activeBatchId.end()); |
| | | |
| | | put_u64(body, rsp.activeStartTs); |
| | | put_u64(body, rsp.expectedEndTs); |
| | | |
| | | std::vector<uint8_t> out; |
| | | frame_wrap(rsp.dataId, body, out); |
| | | return out; |
| | | } |
| | | |
| | | bool decodeResponseBatchInfo(const std::vector<uint8_t>& f, RspBatchInfo& out) { |
| | | if (f.size() < 11) return false; |
| | | const uint8_t* p = f.data(); |
| | | if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; |
| | | if (f.back() != kTail) return false; |
| | | |
| | | out.dataId = get_u32(p + 4); |
| | | const uint16_t bodyLen = get_u16(p + 8); |
| | | if (10u + bodyLen + 1u != f.size()) return false; |
| | | |
| | | const uint8_t* b = p + 10; |
| | | const uint8_t* e = b + bodyLen; |
| | | |
| | | if (b + 2 + 4 + 1 + 2 > e) return false; |
| | | if (get_u16(b) != CMD_RSP_BATCH_INFO) return false; |
| | | b += 2; |
| | | |
| | | out.machineId = get_u32(b); b += 4; |
| | | out.state = static_cast<BatchState>(*b++); |
| | | |
| | | const uint16_t L = get_u16(b); b += 2; |
| | | if (b + L > e) return false; |
| | | out.activeBatchId.assign(reinterpret_cast<const char*>(b), L); b += L; |
| | | |
| | | if (b + 8 + 8 > e) return false; |
| | | out.activeStartTs = get_u64(b); b += 8; |
| | | out.expectedEndTs = get_u64(b); b += 8; |
| | | |
| | | return (b == e); |
| | | } |
| | | |
| | | // --------------------------- |
| | | // 0xE100 Error |
| | | // --------------------------- |
| | | std::vector<uint8_t> encodeResponseError(const RspError& rsp) { |
| | | std::vector<uint8_t> body; |
| | | put_u16(body, CMD_RSP_ERROR); |
| | | put_u16(body, rsp.refCmd); |
| | | put_u32(body, rsp.machineId); |
| | | put_u16(body, static_cast<uint16_t>(rsp.code)); |
| | | const uint16_t L = static_cast<uint16_t>(rsp.message.size()); |
| | | put_u16(body, L); |
| | | body.insert(body.end(), rsp.message.begin(), rsp.message.end()); |
| | | |
| | | std::vector<uint8_t> out; |
| | | frame_wrap(rsp.dataId, body, out); |
| | | return out; |
| | | } |
| | | |
| | | bool decodeResponseError(const std::vector<uint8_t>& f, RspError& out) { |
| | | if (f.size() < 11) return false; |
| | | const uint8_t* p = f.data(); |
| | | if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; |
| | | if (f.back() != kTail) return false; |
| | | |
| | | out.dataId = get_u32(p + 4); |
| | | const uint16_t bodyLen = get_u16(p + 8); |
| | | if (10u + bodyLen + 1u != f.size()) return false; |
| | | |
| | | const uint8_t* b = p + 10; |
| | | const uint8_t* e = b + bodyLen; |
| | | |
| | | if (b + 2 + 2 + 4 + 2 + 2 > e) return false; |
| | | if (get_u16(b) != CMD_RSP_ERROR) return false; |
| | | b += 2; |
| | | |
| | | out.refCmd = get_u16(b); b += 2; |
| | | out.machineId = get_u32(b); b += 4; |
| | | out.code = static_cast<ErrCode>(get_u16(b)); b += 2; |
| | | |
| | | const uint16_t L = get_u16(b); b += 2; |
| | | if (b + L > e) return false; |
| | | out.message.assign(reinterpret_cast<const char*>(b), L); b += L; |
| | | |
| | | return (b == e); |
| | | } |
| | | |
| | | } // namespace Proto |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #pragma once |
| | | #include "Protocol.h" |
| | | |
| | | namespace Proto { |
| | | |
| | | // 大端工å
· |
| | | void put_u16(std::vector<uint8_t>& v, uint16_t x); |
| | | void put_u32(std::vector<uint8_t>& v, uint32_t x); |
| | | void put_u64(std::vector<uint8_t>& v, uint64_t x); |
| | | uint16_t get_u16(const uint8_t* p); |
| | | uint32_t get_u32(const uint8_t* p); |
| | | uint64_t get_u64(const uint8_t* p); |
| | | void put_f64_be(std::vector<uint8_t>& v, double d); |
| | | double get_f64_be(const uint8_t* p); |
| | | |
| | | // 帮å©ï¼peek cmd |
| | | uint16_t peek_cmd(const std::vector<uint8_t>& frame); |
| | | |
| | | // ç¼ç ï¼æå¸§ï¼å¤´ + dataId + len + body + å°¾ï¼ |
| | | std::vector<uint8_t> encodeRequestVersion(const ReqVersion& req); |
| | | std::vector<uint8_t> encodeResponseVersion(const RspVersion& rsp); |
| | | std::vector<uint8_t> encodeRequestMachines(const ReqMachines& req); |
| | | std::vector<uint8_t> encodeResponseMachines(const RspMachines& rsp); |
| | | std::vector<uint8_t> encodeRequestStats(const ReqStats& req); |
| | | std::vector<uint8_t> encodeResponseStats(const RspStats& rsp); |
| | | std::vector<uint8_t> encodeRequestSince(const ReqSince& req); |
| | | std::vector<uint8_t> encodeResponseSince(const RspSince& rsp); |
| | | std::vector<uint8_t> encodeRequestSinceAll(const ReqSinceAll& req); |
| | | std::vector<uint8_t> encodeResponseSinceAll(const RspSinceAll& rsp); |
| | | std::vector<uint8_t> encodeRequestBatchInfo(const ReqBatchInfo& req); |
| | | std::vector<uint8_t> encodeResponseBatchInfo(const RspBatchInfo& rsp); |
| | | |
| | | // è§£ç |
| | | bool decodeRequestVersion(const std::vector<uint8_t>& frame, ReqVersion& out); |
| | | bool decodeResponseVersion(const std::vector<uint8_t>& frame, RspVersion& out); |
| | | bool decodeRequestMachines(const std::vector<uint8_t>& frame, ReqMachines& out); |
| | | bool decodeResponseMachines(const std::vector<uint8_t>& frame, RspMachines& out); |
| | | bool decodeRequestStats(const std::vector<uint8_t>& frame, ReqStats& out); |
| | | bool decodeResponseStats(const std::vector<uint8_t>& frame, RspStats& out); |
| | | bool decodeRequestSince(const std::vector<uint8_t>& frame, ReqSince& out); |
| | | bool decodeResponseSince(const std::vector<uint8_t>& frame, RspSince& out); |
| | | bool decodeRequestSinceAll(const std::vector<uint8_t>& frame, ReqSinceAll& out); |
| | | bool decodeResponseSinceAll(const std::vector<uint8_t>& frame, RspSinceAll& out); |
| | | bool decodeRequestBatchInfo(const std::vector<uint8_t>& frame, ReqBatchInfo& out); |
| | | bool decodeResponseBatchInfo(const std::vector<uint8_t>& frame, RspBatchInfo& out); |
| | | std::vector<uint8_t> encodeResponseError(const RspError& rsp); |
| | | bool decodeResponseError(const std::vector<uint8_t>& frame, RspError& out); |
| | | |
| | | } // namespace Proto |
| | |
| | | return 0; |
| | | } |
| | | |
| | | int CBonder::onProcessStateChanged(PROCESS_STATE state) |
| | | int CBonder::onProcessStateChanged(int slotNo, PROCESS_STATE state) |
| | | { |
| | | CEquipment::onProcessStateChanged(state); |
| | | CEquipment::onProcessStateChanged(slotNo, state); |
| | | |
| | | if (state == PROCESS_STATE::Complete) { |
| | | // æ£æ¥æ°æ®ï¼å½å两çç»çï¼ä¸ç为G1, ä¸ç为G2, ä¸pProcessDataä¸çidè½å¹é
G1æG2 |
| | |
| | | virtual void getAttributeVector(CAttributeVector& attrubutes); |
| | | virtual int recvIntent(CPin* pPin, CIntent* pIntent); |
| | | virtual int onProcessData(CProcessData* pProcessData); |
| | | virtual int onProcessStateChanged(PROCESS_STATE state); |
| | | virtual int onProcessStateChanged(int slotNo, PROCESS_STATE state); |
| | | virtual int getIndexerOperationModeBaseValue(); |
| | | virtual int parsingParams(const char* pszData, size_t size, std::vector<CParam>& parsms); |
| | | virtual int parsingProcessData(const char* pszData, size_t size, std::vector<CParam>& parsms); |
| | |
| | | m_pCclink = nullptr; |
| | | m_nBaseAlarmId = 0; |
| | | m_pArm = nullptr; |
| | | m_processState = PROCESS_STATE::Ready; |
| | | m_blockReadBit = { 0 }; |
| | | m_nTestFlag = 0; |
| | | InitializeCriticalSection(&m_criticalSection); |
| | |
| | | return 0; |
| | | } |
| | | |
| | | void CEquipment::setProcessState(PROCESS_STATE state) |
| | | void CEquipment::setProcessState(int nSlotNo, PROCESS_STATE state) |
| | | { |
| | | m_processState = state; |
| | | onProcessStateChanged(m_processState); |
| | | if (nSlotNo <= 0 || nSlotNo > 8) return; |
| | | |
| | | m_processState[nSlotNo - 1] = state; |
| | | onProcessStateChanged(nSlotNo, m_processState[nSlotNo - 1]); |
| | | |
| | | if (m_listener.onProcessStateChanged != nullptr) { |
| | | m_listener.onProcessStateChanged(this, m_processState); |
| | | m_listener.onProcessStateChanged(this, nSlotNo, m_processState[nSlotNo - 1]); |
| | | } |
| | | } |
| | | |
| | |
| | | Unlock(); |
| | | |
| | | |
| | | if (m_processState != PROCESS_STATE::Ready) { |
| | | setProcessState(PROCESS_STATE::Ready); |
| | | if (m_processState[port] != PROCESS_STATE::Ready) { |
| | | setProcessState(port, PROCESS_STATE::Ready); |
| | | } |
| | | |
| | | if (m_listener.onDataChanged != nullptr) { |
| | |
| | | pGlass->release(); // tempFetchOutéè¦è°ç¨ä¸æ¬¡release |
| | | Unlock(); |
| | | |
| | | /* |
| | | if (m_processState != PROCESS_STATE::Processing) { |
| | | setProcessState(PROCESS_STATE::Processing); |
| | | } |
| | | */ |
| | | |
| | | if (m_listener.onDataChanged != nullptr) { |
| | | m_listener.onDataChanged(this, EDCC_STORED_JOB); |
| | |
| | | year, month, day, hour, minute, second |
| | | ); |
| | | |
| | | CGlass* pGlass = getGlassFromSlot(slotNo); |
| | | if (pGlass == nullptr) { |
| | | LOGE("<CEquipment-%s>decodeJobProcessStartReport, æ¾ä¸å°å¯¹åºglass", getName().c_str()); |
| | | } |
| | | if (slotNo <= 0 || slotNo > 8) return -1; |
| | | |
| | | if (m_processState != PROCESS_STATE::Processing) { |
| | | if (m_processState[slotNo -1] != PROCESS_STATE::Processing) { |
| | | Lock(); |
| | | m_svDatas.clear(); |
| | | Unlock(); |
| | | setProcessState(PROCESS_STATE::Processing); |
| | | setProcessState(slotNo, PROCESS_STATE::Processing); |
| | | } |
| | | |
| | | |
| | |
| | | ); |
| | | |
| | | |
| | | if (m_processState != PROCESS_STATE::Complete) { |
| | | setProcessState(PROCESS_STATE::Complete); |
| | | CGlass* pGlass = getGlassFromSlot(slotNo); |
| | | if (m_processState[slotNo - 1] != PROCESS_STATE::Complete) { |
| | | setProcessState(slotNo, PROCESS_STATE::Complete); |
| | | } |
| | | |
| | | CGlass* pGlass = getGlassFromSlot(slotNo); |
| | | if (pGlass == nullptr) { |
| | | LOGE("<CEquipment-%s>decodeJobProcessEndReport, æ¾ä¸å°å¯¹åºglass", getName().c_str()); |
| | | } |
| | |
| | | if (pJs->getCassetteSequenceNo() == cassetteNo |
| | | && pJs->getJobSequenceNo() == jobSequenceNo) { |
| | | pGlass->processEnd(m_nID, getSlotUnit(slotNo)); |
| | | if (m_processState != PROCESS_STATE::Complete) { |
| | | setProcessState(PROCESS_STATE::Complete); |
| | | } |
| | | } |
| | | else { |
| | | LOGE("<CEquipment-%s>decodeJobProcessEndReport, jobSequenceNoæjobSequenceNoä¸å¹é
", |
| | |
| | | return 0; |
| | | } |
| | | |
| | | int CEquipment::onProcessStateChanged(PROCESS_STATE state) |
| | | int CEquipment::onProcessStateChanged(int nSlotNo, PROCESS_STATE state) |
| | | { |
| | | return 0; |
| | | } |
| | |
| | | typedef std::function<void(void* pEiuipment, void* pReport)> ONVCREVENTREPORT; |
| | | typedef std::function<BOOL(void* pEiuipment, int port, CJobDataB* pJobDataB)> ONPREFETCHEDOUTJOB; |
| | | typedef std::function<BOOL(void* pEiuipment, int port, CJobDataB* pJobDataB, short& putSlot)> ONPRESTOREDJOB; |
| | | typedef std::function<void(void* pEiuipment, PROCESS_STATE state)> ONPROCESSSTATE; |
| | | typedef std::function<void(void* pEiuipment, int nSlotNo, PROCESS_STATE state)> ONPROCESSSTATE; |
| | | typedef std::function<void(void* pEiuipment, short scanMap, short downMap)> ONMAPMISMATCH; |
| | | typedef std::function<void(void* pEiuipment, short status, __int64 data)> ONPORTSTATUSCHANGED; |
| | | |
| | |
| | | virtual int onProcessData(CProcessData* pProcessData); |
| | | virtual int onSendAble(int port); |
| | | virtual int onReceiveAble(int port); |
| | | virtual int onProcessStateChanged(PROCESS_STATE state); |
| | | virtual int onProcessStateChanged(int nSlotNo, PROCESS_STATE state); |
| | | virtual int getIndexerOperationModeBaseValue(); |
| | | virtual bool isSlotProcessed(int slot) { return true; }; |
| | | bool isAlarmStep(SERVO::CStep* pStep); |
| | |
| | | int decodeJobProcessStartReport(CStep* pStep, const char* pszData, size_t size); |
| | | int decodeJobProcessEndReport(CStep* pStep, const char* pszData, size_t size); |
| | | BOOL compareJobData(CJobDataB* pJobDataB, CJobDataS* pJobDataS); |
| | | void setProcessState(PROCESS_STATE state); |
| | | void setProcessState(int nSlotNo, PROCESS_STATE state); |
| | | float toFloat(const char* pszAddr); |
| | | |
| | | protected: |
| | |
| | | int m_nBaseAlarmId; |
| | | CRecipesManager m_recipesManager; |
| | | CSlot m_slot[SLOT_MAX]; |
| | | PROCESS_STATE m_processState; |
| | | PROCESS_STATE m_processState[SLOT_MAX] = { PROCESS_STATE::Ready }; |
| | | std::vector<SERVO::CSVData> m_svDatas; |
| | | |
| | | private: |
| | |
| | | |
| | | |
| | | namespace SERVO { |
| | | static inline int64_t now_ms_epoch() { |
| | | using namespace std::chrono; |
| | | return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
| | | } |
| | | |
| | | CMaster* g_pMaster = NULL; |
| | | |
| | | unsigned __stdcall DispatchThreadFunction(LPVOID lpParam) |
| | |
| | | } |
| | | m_listEquipment.clear(); |
| | | |
| | | |
| | | if (m_pCollector != nullptr) { |
| | | m_pCollector->stopLoop(); |
| | | delete m_pCollector; |
| | | m_pCollector = nullptr; |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | |
| | | unlock(); |
| | | } |
| | | }; |
| | | listener.onProcessStateChanged = [&](void* pEquipment, PROCESS_STATE state) -> void { |
| | | listener.onProcessStateChanged = [&](void* pEquipment, int slotNo, PROCESS_STATE state) -> void { |
| | | ASSERT(1 <= slotNo && slotNo <= 8); |
| | | int eqid = ((CEquipment*)pEquipment)->getID(); |
| | | CGlass* pGlass = ((CEquipment*)pEquipment)->getGlassFromSlot(slotNo); |
| | | LOGI("<Master>onProcessStateChanged<%d>", (int)state); |
| | | if (state == PROCESS_STATE::Processing) { |
| | | if (pGlass != nullptr) { |
| | | m_pCollector->batchStart(eqid, |
| | | pGlass->getID().c_str(), 10 * 60 * 1000ULL); |
| | | } |
| | | } |
| | | else if (state == PROCESS_STATE::Complete) { |
| | | m_pCollector->batchStop(eqid); |
| | | } |
| | | }; |
| | | listener.onMapMismatch = [&](void* pEquipment, short scanMap, short downMap) { |
| | | LOGE("<Master-%s>Port InUse, map(%d!=%d)ä¸ä¸è´ï¼è¯·æ£æ¥ã", |
| | |
| | | std::vector<CParam> params; |
| | | ((CEquipment*)pEquipment)->parsingSVData((const char*)rawData.data(), rawData.size(), params); |
| | | |
| | | |
| | | // 以ä¸å å
¥å°æ²çº¿æ°æ®ä¸ |
| | | const int64_t ts = now_ms_epoch(); |
| | | int eqid = ((CEquipment*)pEquipment)->getID(); |
| | | if (eqid == EQ_ID_Bonder1 || eqid == EQ_ID_Bonder2) { |
| | | m_pCollector->buffersPush(eqid, 1, ts, params.at(1).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 2, ts, params.at(2).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 3, ts, params.at(3).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 4, ts, params.at(4).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 5, ts, params.at(5).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 6, ts, params.at(6).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 7, ts, params.at(7).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 8, ts, params.at(8).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 9, ts, params.at(9).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 10, ts, params.at(10).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 11, ts, params.at(11).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 12, ts, params.at(12).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 13, ts, params.at(13).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 14, ts, params.at(14).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 15, ts, params.at(15).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 16, ts, params.at(16).getDoubleValue()); |
| | | } |
| | | else if (eqid == EQ_ID_VACUUMBAKE) { |
| | | m_pCollector->buffersPush(eqid, 1, ts, params.at(1).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 2, ts, params.at(2).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 3, ts, params.at(3).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 4, ts, params.at(4).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 5, ts, params.at(5).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 6, ts, params.at(6).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 7, ts, params.at(7).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 8, ts, params.at(10).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 9, ts, params.at(11).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 10, ts, params.at(12).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 11, ts, params.at(13).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 12, ts, params.at(14).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 13, ts, params.at(15).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 14, ts, params.at(16).getDoubleValue()); |
| | | } |
| | | else if (eqid == EQ_ID_BAKE_COOLING) { |
| | | m_pCollector->buffersPush(eqid, 1, ts, params.at(1).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 2, ts, params.at(2).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 3, ts, params.at(3).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 4, ts, params.at(4).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 5, ts, params.at(5).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 6, ts, params.at(6).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 7, ts, params.at(11).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 8, ts, params.at(12).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 9, ts, params.at(13).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 10, ts, params.at(14).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 11, ts, params.at(15).getDoubleValue()); |
| | | m_pCollector->buffersPush(eqid, 12, ts, params.at(16).getDoubleValue()); |
| | | } |
| | | |
| | | |
| | | // 以䏿¯è¾åºæµè¯ |
| | | std::string strOut; |
| | | char szBuffer[256]; |
| | | for (auto p : params) { |
| | |
| | | |
| | | return nullptr; |
| | | } |
| | | |
| | | void CMaster::CreateDAQBridgeServer() |
| | | { |
| | | auto connectionStatusCallback = [&](int code, const std::string& status) { |
| | | LOGI("<DAQBridge>status:", status.c_str()); |
| | | }; |
| | | auto rawDataCallback = [](const std::vector<uint8_t>& bytes) { |
| | | |
| | | }; |
| | | |
| | | // äºä»¶ï¼æäººè¿å
¥/æå¼å°±ä¸æ¥å¿ |
| | | auto clieintEventCallback = [](const std::string& ip, uint16_t port, bool connected) { |
| | | LOGI("<DAQBridge>[Client %s] %s:%u", connected ? _T("JOIN") : _T("LEAVE"), ip.c_str(), port); |
| | | }; |
| | | |
| | | if (m_pCollector == nullptr) { |
| | | m_pCollector = new Collector(); |
| | | m_pCollector->setConnectionStatusCallback(connectionStatusCallback); |
| | | m_pCollector->setRawDataCallback(rawDataCallback); |
| | | m_pCollector->setClientEventCallback(clieintEventCallback); |
| | | m_pCollector->createServer(8081); |
| | | m_pCollector->startLoop(10); |
| | | |
| | | // 1) æ³¨åæºå°ï¼æ¨èï¼å
注å id + æºå¨åç§°ï¼ |
| | | RetentionPolicy defP; defP.mode = RetainMode::ByCount; defP.maxSamples = 200; |
| | | m_pCollector->registryAddMachine(EQ_ID_Bonder1, "Bonder1", defP); |
| | | m_pCollector->registryAddMachine(EQ_ID_Bonder2, "Bonder2", defP); |
| | | m_pCollector->registryAddMachine(EQ_ID_VACUUMBAKE, "åçç¤", defP); |
| | | m_pCollector->registryAddMachine(EQ_ID_BAKE_COOLING, "çç¤å·å´", defP); |
| | | |
| | | |
| | | // 2) 为ééè®¾ç½®âæ²çº¿åç§°â |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 1, "æ°ååå"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 2, "ä¸è
åå"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 3, "管éç空è§å¼"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 4, "è
ä½ç空è§å¼"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 5, "ä¸è
温度1"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 6, "ä¸è
温度2"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 7, "ä¸è
温度3"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 8, "ä¸è
温度4"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 9, "ä¸è
温度5"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 10, "ä¸è
温度6"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 11, "ä¸è
温度1"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 12, "ä¸è
温度2"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 13, "ä¸è
温度3"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 14, "ä¸è
温度4"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 15, "ä¸è
温度5"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder1, 16, "ä¸è
温度6"); |
| | | |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 1, "æ°ååå"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 2, "ä¸è
åå"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 3, "管éç空è§å¼"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 4, "è
ä½ç空è§å¼"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 5, "ä¸è
温度1"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 6, "ä¸è
温度2"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 7, "ä¸è
温度3"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 8, "ä¸è
温度4"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 9, "ä¸è
温度5"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 10, "ä¸è
温度6"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 11, "ä¸è
温度1"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 12, "ä¸è
温度2"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 13, "ä¸è
温度3"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 14, "ä¸è
温度4"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 15, "ä¸è
温度5"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_Bonder2, 16, "ä¸è
温度6"); |
| | | |
| | | m_pCollector->buffersSetChannelName(EQ_ID_VACUUMBAKE, 1, "Aè
ç空è§å¼"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_VACUUMBAKE, 2, "Aè
温æ§1"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_VACUUMBAKE, 3, "Aè
温æ§2"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_VACUUMBAKE, 4, "Aè
温æ§4"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_VACUUMBAKE, 5, "Aè
温æ§5"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_VACUUMBAKE, 6, "Aè
温æ§6"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_VACUUMBAKE, 7, "Aè
温æ§7"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_VACUUMBAKE, 8, "Bè
ç空è§å¼"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_VACUUMBAKE, 9, "Bè
温æ§1"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_VACUUMBAKE, 10, "Bè
温æ§2"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_VACUUMBAKE, 11, "Bè
温æ§4"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_VACUUMBAKE, 12, "Bè
温æ§5"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_VACUUMBAKE, 13, "Bè
温æ§6"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_VACUUMBAKE, 14, "Bè
温æ§7"); |
| | | |
| | | m_pCollector->buffersSetChannelName(EQ_ID_BAKE_COOLING, 1, "Açç¤æ¸©æ§1"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_BAKE_COOLING, 2, "Açç¤æ¸©æ§2"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_BAKE_COOLING, 3, "Açç¤æ¸©æ§4"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_BAKE_COOLING, 4, "Açç¤æ¸©æ§5"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_BAKE_COOLING, 5, "Açç¤æ¸©æ§6"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_BAKE_COOLING, 6, "Açç¤æ¸©æ§7"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_BAKE_COOLING, 7, "Bçç¤æ¸©æ§1"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_BAKE_COOLING, 8, "Bçç¤æ¸©æ§2"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_BAKE_COOLING, 9, "Bçç¤æ¸©æ§4"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_BAKE_COOLING, 10, "Bçç¤æ¸©æ§5"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_BAKE_COOLING, 11, "Bçç¤æ¸©æ§6"); |
| | | m_pCollector->buffersSetChannelName(EQ_ID_BAKE_COOLING, 12, "Bçç¤æ¸©æ§7"); |
| | | } |
| | | } |
| | | } |
| | |
| | | #include "CRobotTask.h" |
| | | #include "ProcessJob.h" |
| | | #include "CControlJob.h" |
| | | #include "../DAQBridge/core/Collector.h" |
| | | |
| | | |
| | | #define CTStep_Unknow 0 |
| | |
| | | |
| | | int m_nTestFlag; |
| | | std::list<CGlass*> m_bufGlass; |
| | | |
| | | private: |
| | | Collector* m_pCollector = nullptr; |
| | | void CreateDAQBridgeServer(); |
| | | }; |
| | | } |
| | | |
| | |
| | | |
| | | double CParam::getDoubleValue() |
| | | { |
| | | if(m_nValueType == PVT_DOUBLE) |
| | | return m_fValue; |
| | | if (m_nValueType == PVT_INT) |
| | | return (double)m_nValue; |
| | | |
| | | return 0.0; |
| | | } |
| | | |
| | | void CParam::setDoubleValue(double value) |
| | |
| | | <Text Include="ReadMe.txt" /> |
| | | </ItemGroup> |
| | | <ItemGroup> |
| | | <ClInclude Include="..\DAQBridge\buffer\BufferManager.h" /> |
| | | <ClInclude Include="..\DAQBridge\buffer\BufferRegistry.h" /> |
| | | <ClInclude Include="..\DAQBridge\buffer\SampleBuffer.h" /> |
| | | <ClInclude Include="..\DAQBridge\core\Collector.h" /> |
| | | <ClInclude Include="..\DAQBridge\core\CommBase.h" /> |
| | | <ClInclude Include="..\DAQBridge\core\ConnEvents.h" /> |
| | | <ClInclude Include="..\DAQBridge\core\DataTypes.h" /> |
| | | <ClInclude Include="..\DAQBridge\core\Display.h" /> |
| | | <ClInclude Include="..\DAQBridge\DAQConfig.h" /> |
| | | <ClInclude Include="..\DAQBridge\net\FrameAssembler.h" /> |
| | | <ClInclude Include="..\DAQBridge\net\SocketComm.h" /> |
| | | <ClInclude Include="..\DAQBridge\proto\Protocol.h" /> |
| | | <ClInclude Include="..\DAQBridge\proto\ProtocolCodec.h" /> |
| | | <ClInclude Include="..\jsoncpp\include\json\autolink.h" /> |
| | | <ClInclude Include="..\jsoncpp\include\json\config.h" /> |
| | | <ClInclude Include="..\jsoncpp\include\json\features.h" /> |
| | |
| | | <ClInclude Include="VerticalLine.h" /> |
| | | </ItemGroup> |
| | | <ItemGroup> |
| | | <ClCompile Include="..\DAQBridge\buffer\BufferManager.cpp"> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">NotUsing</PrecompiledHeader> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">NotUsing</PrecompiledHeader> |
| | | </ClCompile> |
| | | <ClCompile Include="..\DAQBridge\buffer\BufferRegistry.cpp"> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">NotUsing</PrecompiledHeader> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">NotUsing</PrecompiledHeader> |
| | | </ClCompile> |
| | | <ClCompile Include="..\DAQBridge\buffer\SampleBuffer.cpp"> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">NotUsing</PrecompiledHeader> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">NotUsing</PrecompiledHeader> |
| | | </ClCompile> |
| | | <ClCompile Include="..\DAQBridge\core\Collector.cpp"> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">NotUsing</PrecompiledHeader> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">NotUsing</PrecompiledHeader> |
| | | </ClCompile> |
| | | <ClCompile Include="..\DAQBridge\core\Display.cpp"> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">NotUsing</PrecompiledHeader> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">NotUsing</PrecompiledHeader> |
| | | </ClCompile> |
| | | <ClCompile Include="..\DAQBridge\net\SocketComm.cpp"> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">NotUsing</PrecompiledHeader> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">NotUsing</PrecompiledHeader> |
| | | </ClCompile> |
| | | <ClCompile Include="..\DAQBridge\proto\ProtocolCodec.cpp"> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">NotUsing</PrecompiledHeader> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">NotUsing</PrecompiledHeader> |
| | | </ClCompile> |
| | | <ClCompile Include="..\jsoncpp\lib_json\json_reader.cpp"> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">NotUsing</PrecompiledHeader> |
| | | <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">NotUsing</PrecompiledHeader> |
| | |
| | | <ClCompile Include="CCjPageBase.cpp" /> |
| | | <ClCompile Include="CCarrierSlotSelector.cpp" /> |
| | | <ClCompile Include="CCarrierSlotGrid.cpp" /> |
| | | <ClCompile Include="..\DAQBridge\buffer\BufferManager.cpp"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClCompile> |
| | | <ClCompile Include="..\DAQBridge\buffer\BufferRegistry.cpp"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClCompile> |
| | | <ClCompile Include="..\DAQBridge\buffer\SampleBuffer.cpp"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClCompile> |
| | | <ClCompile Include="..\DAQBridge\core\Collector.cpp"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClCompile> |
| | | <ClCompile Include="..\DAQBridge\core\Display.cpp"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClCompile> |
| | | <ClCompile Include="..\DAQBridge\net\SocketComm.cpp"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClCompile> |
| | | <ClCompile Include="..\DAQBridge\proto\ProtocolCodec.cpp"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClCompile> |
| | | </ItemGroup> |
| | | <ItemGroup> |
| | | <ClInclude Include="AlarmManager.h" /> |
| | |
| | | <ClInclude Include="CCjPageBase.h" /> |
| | | <ClInclude Include="CCarrierSlotSelector.h" /> |
| | | <ClInclude Include="CCarrierSlotGrid.h" /> |
| | | <ClInclude Include="..\DAQBridge\buffer\BufferManager.h"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClInclude> |
| | | <ClInclude Include="..\DAQBridge\buffer\BufferRegistry.h"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClInclude> |
| | | <ClInclude Include="..\DAQBridge\buffer\SampleBuffer.h"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClInclude> |
| | | <ClInclude Include="..\DAQBridge\core\Collector.h"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClInclude> |
| | | <ClInclude Include="..\DAQBridge\core\CommBase.h"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClInclude> |
| | | <ClInclude Include="..\DAQBridge\core\ConnEvents.h"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClInclude> |
| | | <ClInclude Include="..\DAQBridge\core\DataTypes.h"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClInclude> |
| | | <ClInclude Include="..\DAQBridge\core\Display.h"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClInclude> |
| | | <ClInclude Include="..\DAQBridge\net\FrameAssembler.h"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClInclude> |
| | | <ClInclude Include="..\DAQBridge\net\SocketComm.h"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClInclude> |
| | | <ClInclude Include="..\DAQBridge\proto\Protocol.h"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClInclude> |
| | | <ClInclude Include="..\DAQBridge\proto\ProtocolCodec.h"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClInclude> |
| | | <ClInclude Include="..\DAQBridge\DAQConfig.h"> |
| | | <Filter>DAQBridge</Filter> |
| | | </ClInclude> |
| | | </ItemGroup> |
| | | <ItemGroup> |
| | | <ResourceCompile Include="Servo.rc" /> |
| | |
| | | <Filter Include="JsonCpp"> |
| | | <UniqueIdentifier>{a877af37-2f78-484f-b1bb-276edbbcd694}</UniqueIdentifier> |
| | | </Filter> |
| | | <Filter Include="DAQBridge"> |
| | | <UniqueIdentifier>{885738f6-3122-4bb9-8308-46b7f692fb13}</UniqueIdentifier> |
| | | </Filter> |
| | | </ItemGroup> |
| | | </Project> |