From e8a27bb203fe2aff70390a5eca002d7438da9b0f Mon Sep 17 00:00:00 2001
From: mrDarker <mr.darker@163.com>
Date: 星期三, 22 十月 2025 14:24:34 +0800
Subject: [PATCH] Merge branch 'clh' into liuyang
---
SourceCode/Bond/DAQBridge/core/Collector.cpp | 537 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 537 insertions(+), 0 deletions(-)
diff --git a/SourceCode/Bond/DAQBridge/core/Collector.cpp b/SourceCode/Bond/DAQBridge/core/Collector.cpp
new file mode 100644
index 0000000..7cb2d3d
--- /dev/null
+++ b/SourceCode/Bond/DAQBridge/core/Collector.cpp
@@ -0,0 +1,537 @@
+#include "Collector.h"
+#include <iostream>
+#include <chrono>
+
+#include "../DAQConfig.h"
+#include "../core/ConnEvents.h"
+using namespace DAQEvt;
+
+// 协议编解码
+#include "../proto/Protocol.h"
+#include "../proto/ProtocolCodec.h"
+
+using namespace Proto;
+
+Collector::Collector() {}
+
+void Collector::startLoop(uint32_t intervalMs) {
+ if (running.load()) return;
+ running.store(true);
+ worker = std::thread([this, intervalMs]() {
+ while (running.load()) {
+ this->poll();
+ std::this_thread::sleep_for(std::chrono::milliseconds(intervalMs));
+ }
+ });
+}
+
+void Collector::stopLoop() {
+ if (!running.load()) return;
+ running.store(false);
+ if (worker.joinable()) worker.join();
+}
+
+Collector::~Collector() {
+ stopLoop();
+ disconnect();
+}
+
+std::vector<Collector::ClientSummary> Collector::getClientList() {
+ std::lock_guard<std::mutex> lk(mClients);
+ std::vector<ClientSummary> out;
+ out.reserve(clients.size());
+ for (const auto& c : clients) {
+ out.push_back(ClientSummary{ c.ip, c.port, c.versionOk, c.sock });
+ }
+ return out;
+}
+
+void Collector::createServer(uint16_t port) {
+ if (socketComm.createServerSocket(port)) {
+ if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ServerListening),
+ "Collector server listening on port " + std::to_string(port));
+ onConnectionEstablished();
+ }
+}
+
+void Collector::disconnect() {
+ {
+ std::lock_guard<std::mutex> lk(mClients);
+ for (auto& c : clients) {
+ socketComm.closeClient(c.sock);
+ if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false);
+ }
+ clients.clear();
+ }
+ socketComm.closeSocket();
+ if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ServerStopped), "Collector server stopped");
+ onConnectionLost();
+}
+
+void Collector::poll() {
+ // 1) 接新客户端
+ while (true) {
+ SOCKET cs; std::string ip; uint16_t port;
+ if (!socketComm.acceptOne(cs, ip, port)) break;
+
+ ClientInfo ci;
+ ci.sock = cs; ci.ip = ip; ci.port = port;
+ ci.tsConnected = std::chrono::steady_clock::now();
+
+ {
+ std::lock_guard<std::mutex> lk(mClients);
+ clients.push_back(ci);
+ }
+
+ if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ClientAccepted),
+ "Client connected: " + ip + ":" + std::to_string(port));
+ if (cbClientEvent) cbClientEvent(ip, port, /*connected*/true);
+ }
+
+ // 2) 轮询各客户端
+ auto now = std::chrono::steady_clock::now();
+ std::lock_guard<std::mutex> lk(mClients);
+ for (size_t i = 0; i < clients.size(); ) {
+ auto& c = clients[i];
+
+ // 2a) 版本握手超时
+ if (!c.versionOk) {
+ auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - c.tsConnected).count();
+ if (elapsed >= DAQCfg::KickIfNoVersionSec) {
+ if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::VersionTimeoutKick),
+ "Kick client (no version within 5s): " + c.ip + ":" + std::to_string(c.port));
+ socketComm.closeClient(c.sock);
+ if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false);
+ clients.erase(clients.begin() + i);
+ continue;
+ }
+ }
+
+ // 2b) 收数据
+ std::vector<uint8_t> buf;
+ bool peerClosed = false;
+ if (socketComm.recvFrom(c.sock, buf, peerClosed)) {
+ if (!buf.empty()) {
+ if (rawDumpEnabled && cbRaw) cbRaw(buf);
+
+ // 帧组装
+ c.fr.push(buf);
+ std::vector<uint8_t> frame;
+ while (c.fr.nextFrame(frame)) {
+ if (rawDumpEnabled && cbRaw) cbRaw(frame);
+ handleClientData(c, frame);
+ }
+ }
+ }
+ else if (peerClosed) {
+ // 对端主动断开
+ if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ClientDisconnected),
+ "Client disconnected: " + c.ip + ":" + std::to_string(c.port));
+ socketComm.closeClient(c.sock);
+ if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false);
+ clients.erase(clients.begin() + i);
+ continue;
+ }
+
+ ++i;
+ }
+}
+
+void Collector::sendSampleData(double sample) {
+ (void)sample;
+#ifdef _DEBUG
+ if (cbStatus) cbStatus(0, "sendSampleData() no-op.");
+#endif
+}
+
+void Collector::sendWindowData(const std::vector<std::string>& fields) {
+ (void)fields;
+#ifdef _DEBUG
+ if (cbStatus) cbStatus(0, "sendWindowData() no-op.");
+#endif
+}
+
+void Collector::onConnectionEstablished() {
+ std::cout << "[Collector] ready.\n";
+}
+void Collector::onConnectionLost() {
+ std::cout << "[Collector] stopped.\n";
+}
+
+// 兼容旧门禁函数
+bool Collector::isVersionRequest(const std::vector<uint8_t>& data) const {
+ if (data.size() < 4 + 4 + 2 + 2 + 1) return false;
+ if (!(data[0] == 0x11 && data[1] == 0x88 && data[2] == 0x11 && data[3] == 0x88)) return false;
+ size_t cmdPos = 4 + 4 + 2;
+ uint16_t cmd = (uint16_t(data[cmdPos]) << 8) | data[cmdPos + 1];
+ return (cmd == CMD_REQ_VERSION);
+}
+
+// ========== BufferRegistry 直通 ==========
+BufferManager& Collector::registryAddMachine(uint32_t machineId, const std::string& name,
+ const RetentionPolicy& defPolicy) {
+ return registry_.getOrCreate(machineId, name, defPolicy);
+}
+BufferManager* Collector::registryFind(uint32_t machineId) { return registry_.find(machineId); }
+const BufferManager* Collector::registryFind(uint32_t machineId) const { return registry_.find(machineId); }
+std::vector<uint32_t> Collector::registryListMachines() const { return registry_.listManagers(); }
+
+void Collector::buffersPush(uint32_t machineId, uint32_t channelId, int64_t ts_ms, double v) {
+ auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
+ bm.push(channelId, ts_ms, v); // 未 start() 时将被内部忽略
+}
+
+std::vector<Sample> Collector::buffersGetSince(uint32_t machineId, uint32_t channelId,
+ int64_t tsExclusive, size_t maxCount) const {
+ auto* bm = registry_.find(machineId);
+ if (!bm) return {};
+ return bm->getSince(channelId, tsExclusive, maxCount);
+}
+std::vector<Sample> Collector::buffersGetRange(uint32_t machineId, uint32_t channelId,
+ int64_t from_ts, int64_t to_ts, size_t maxCount) const {
+ auto* bm = registry_.find(machineId);
+ if (!bm) return {};
+ return bm->getRange(channelId, from_ts, to_ts, maxCount);
+}
+
+void Collector::buffersStart(uint32_t machineId) {
+ auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
+ bm.start(); // 清空 + 允许写
+}
+void Collector::buffersStop(uint32_t machineId) {
+ if (auto* bm = registry_.find(machineId)) bm->stop();
+}
+void Collector::buffersClear(uint32_t machineId) {
+ if (auto* bm = registry_.find(machineId)) bm->clearAll();
+}
+bool Collector::buffersIsRunning(uint32_t machineId) const {
+ auto* bm = registry_.find(machineId);
+ return bm ? bm->isRunning() : false;
+}
+
+void Collector::buffersSetDefaultPolicy(uint32_t machineId, const RetentionPolicy& p, bool applyExisting) {
+ auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
+ bm.setDefaultPolicy(p, applyExisting);
+}
+void Collector::buffersSetPolicy(uint32_t machineId, uint32_t channelId, const RetentionPolicy& p) {
+ auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
+ bm.setPolicy(channelId, p);
+}
+void Collector::buffersSetChannelName(uint32_t machineId, uint32_t channelId, const std::string& name) {
+ auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
+ bm.setChannelName(channelId, name);
+}
+std::string Collector::buffersGetChannelName(uint32_t machineId, uint32_t channelId) const {
+ auto* bm = registry_.find(machineId);
+ if (!bm) return {};
+ return bm->getChannelName(channelId);
+}
+std::vector<BufferManager::ChannelInfo> Collector::buffersListChannelInfos(uint32_t machineId) const {
+ auto* bm = registry_.find(machineId);
+ if (!bm) return {};
+ return bm->listChannelInfos();
+}
+std::vector<BufferManager::ChannelStat> Collector::buffersListChannelStats(uint32_t machineId) const {
+ auto* bm = registry_.find(machineId);
+ if (!bm) return {};
+ return bm->listChannelStats();
+}
+BufferManager::ChannelStat Collector::buffersGetChannelStat(uint32_t machineId, uint32_t channelId) const {
+ auto* bm = registry_.find(machineId);
+ if (!bm) return BufferManager::ChannelStat{ channelId, 0, 0, 0 };
+ return bm->getChannelStat(channelId);
+}
+
+// ====== 批次管理 ======
+// 新实现:根据 expectedDurationMs 计算 expectedEndTs
+void Collector::batchStart(uint32_t machineId,
+ const std::string& batchId,
+ uint64_t expectedDurationMs)
+{
+ const uint64_t startTs = nowMs();
+ const uint64_t expectedEndTs = (expectedDurationMs > 0) ? (startTs + expectedDurationMs) : 0;
+
+ {
+ std::lock_guard<std::mutex> lk(mBatches);
+ auto& br = batches_[machineId];
+ br.state = BatchState::Active;
+ br.activeBatchId = batchId;
+ br.activeStartTs = startTs;
+ br.expectedEndTs = expectedEndTs;
+ }
+
+ // 批次开始:清空并开始采集
+ buffersStart(machineId);
+
+ if (cbStatus) cbStatus(0, "batchStart: machine " + std::to_string(machineId) +
+ " batch=" + batchId +
+ " durMs=" + std::to_string(expectedDurationMs));
+}
+
+// 兼容旧签名:忽略 startTs;把 expectedEndTs 当“时长”
+void Collector::batchStart(uint32_t machineId,
+ const std::string& batchId,
+ uint64_t /*startTs_ignored*/,
+ uint64_t expectedEndTs_orDuration)
+{
+ // 若调用方误传了绝对的“结束时间戳”,这里会被当作“时长”;
+ // 如需严格区分,你也可以加一个阈值判断(比如大于 10^12 认为是绝对时间戳)
+ // 然后转换:duration = max(0, endTs - now)
+
+ uint64_t durationMs = expectedEndTs_orDuration;
+
+ // 可选:做个简单的“像绝对时间戳”的判断并自动转为时长
+ // 以毫秒时间戳阈值举例(~2001-09-09):1e12
+ if (expectedEndTs_orDuration > 1000000000000ULL) {
+ uint64_t now = nowMs();
+ if (expectedEndTs_orDuration > now)
+ durationMs = expectedEndTs_orDuration - now;
+ else
+ durationMs = 0; // 已过期,当未知处理
+ }
+
+ batchStart(machineId, batchId, durationMs);
+}
+
+void Collector::batchStop(uint32_t machineId, uint64_t /*endTs*/) {
+ {
+ std::lock_guard<std::mutex> lk(mBatches);
+ auto it = batches_.find(machineId);
+ if (it != batches_.end()) {
+ it->second.state = BatchState::Idle;
+ it->second.activeBatchId.clear();
+ it->second.activeStartTs = 0;
+ it->second.expectedEndTs = 0;
+ }
+ else {
+ batches_[machineId] = BatchRec{}; // 生成一条 Idle
+ }
+ }
+
+ // 停止采集,但不清数据
+ buffersStop(machineId);
+
+ if (cbStatus) cbStatus(0, "batchStop: machine " + std::to_string(machineId));
+}
+
+Collector::BatchRec Collector::getBatch(uint32_t machineId) const {
+ std::lock_guard<std::mutex> lk(mBatches);
+ auto it = batches_.find(machineId);
+ if (it != batches_.end()) return it->second;
+ return BatchRec{}; // 默认 Idle
+}
+
+// ========== 业务分发 ==========
+void Collector::handleClientData(ClientInfo& c, const std::vector<uint8_t>& data) {
+ // 版本校验
+ if (!c.versionOk) {
+ ReqVersion vreq{};
+ if (decodeRequestVersion(data, vreq)) {
+ c.versionOk = true;
+ if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::VersionOk),
+ "Client " + c.ip + ":" + std::to_string(c.port) + " version OK");
+
+ RspVersion vrst;
+ vrst.dataId = vreq.dataId; // 回显
+ vrst.version = DAQCfg::Version; // 集中配置
+ auto rsp = encodeResponseVersion(vrst);
+ socketComm.sendTo(c.sock, rsp);
+ }
+ return; // 未通过版本校验,其它忽略
+ }
+
+ // 通过版本校验后:按 cmd 分发
+ const uint16_t cmd = peek_cmd(data);
+
+ // 0x0101 —— 单通道增量拉取
+ if (cmd == CMD_REQ_SINCE) {
+ ReqSince req;
+ if (!decodeRequestSince(data, req)) return;
+
+ auto vec = buffersGetSince(req.machineId, req.channelId,
+ static_cast<int64_t>(req.sinceTsExclusive), req.maxCount);
+
+ uint64_t lastTsSent = req.sinceTsExclusive;
+ if (!vec.empty()) lastTsSent = static_cast<uint64_t>(vec.back().ts_ms);
+
+ auto stat = buffersGetChannelStat(req.machineId, req.channelId);
+ uint8_t more = (stat.latestTs > static_cast<int64_t>(lastTsSent)) ? 1 : 0;
+
+ RspSince rsp;
+ rsp.dataId = req.dataId; // 回显
+ rsp.machineId = req.machineId;
+ rsp.channelId = req.channelId;
+ rsp.lastTsSent = lastTsSent;
+ rsp.more = more;
+ rsp.samples.reserve(vec.size());
+ for (auto& s : vec) rsp.samples.push_back({ static_cast<uint64_t>(s.ts_ms), s.value });
+
+ auto pkt = encodeResponseSince(rsp);
+ socketComm.sendTo(c.sock, pkt);
+ return;
+ }
+
+ // 0x0103 —— 统计/通道列表
+ if (cmd == CMD_REQ_STATS) {
+ ReqStats req;
+ if (!decodeRequestStats(data, req)) return;
+
+ auto* bm = registry_.find(req.machineId);
+ if (!bm) {
+ // 示例:可回错误帧
+ // sendErrorTo(c, req.dataId, CMD_REQ_STATS, req.machineId, ErrCode::NoActiveBatch, "machine not found");
+ return;
+ }
+
+ RspStats rsp;
+ rsp.dataId = req.dataId;
+ rsp.machineId = req.machineId;
+
+ auto stats = bm->listChannelStats();
+ rsp.channels.reserve(stats.size());
+ for (auto& s : stats) {
+ ChannelStatInfo ci;
+ ci.channelId = s.id;
+ ci.earliestTs = static_cast<uint64_t>(s.earliestTs);
+ ci.latestTs = static_cast<uint64_t>(s.latestTs);
+ ci.size = static_cast<uint32_t>(s.size);
+ ci.name = bm->getChannelName(s.id); // UTF-8
+ rsp.channels.push_back(std::move(ci));
+ }
+
+ auto pkt = encodeResponseStats(rsp);
+ socketComm.sendTo(c.sock, pkt);
+ return;
+ }
+
+ // 0x0104 —— 机台列表
+ if (cmd == CMD_REQ_MACHINES) {
+ ReqMachines req;
+ if (!decodeRequestMachines(data, req)) return;
+
+ RspMachines rsp;
+ rsp.dataId = req.dataId;
+
+ auto mids = registry_.listManagers();
+ rsp.machines.reserve(mids.size());
+ for (auto id : mids) {
+ auto* bm = registry_.find(id);
+ if (!bm) continue;
+ rsp.machines.push_back(MachineInfo{ id, bm->name() });
+ }
+
+ auto pkt = encodeResponseMachines(rsp);
+ socketComm.sendTo(c.sock, pkt);
+ return;
+ }
+
+ // 0x0105 —— 整机多通道增量拉取
+ if (cmd == CMD_REQ_SINCE_ALL) {
+ ReqSinceAll req;
+ if (!decodeRequestSinceAll(data, req)) return;
+
+ RspSinceAll rsp;
+ rsp.dataId = req.dataId; // 回显
+ rsp.machineId = req.machineId;
+ rsp.moreAny = 0;
+
+ auto* bm = registry_.find(req.machineId);
+ if (!bm) {
+ auto pkt = encodeResponseSinceAll(rsp);
+ socketComm.sendTo(c.sock, pkt);
+ return;
+ }
+
+ // 避免超过 2 字节长度:正文 ~60KB
+ constexpr size_t kMaxBody = 60 * 1024;
+
+ rsp.blocks.clear();
+ auto stats = bm->listChannelStats();
+ for (auto& s : stats) {
+ auto vec = bm->getSince(s.id, static_cast<int64_t>(req.sinceTsExclusive), req.maxPerChannel);
+ if (vec.empty()) continue;
+
+ ChannelBlock blk;
+ blk.channelId = s.id;
+ blk.lastTsSent = static_cast<uint64_t>(vec.back().ts_ms);
+
+ auto st = bm->getChannelStat(s.id);
+ blk.more = (st.latestTs > static_cast<int64_t>(blk.lastTsSent)) ? 1 : 0;
+
+ blk.samples.reserve(vec.size());
+ for (auto& sm : vec) {
+ blk.samples.push_back({ static_cast<uint64_t>(sm.ts_ms), sm.value });
+ }
+
+ rsp.blocks.push_back(std::move(blk));
+ auto tentative = encodeResponseSinceAll(rsp);
+ if (tentative.size() > (4 + 4 + 2) + kMaxBody + 1) {
+ // 超体积:把最后一块拿出来,先发前面的
+ auto last = std::move(rsp.blocks.back());
+ rsp.blocks.pop_back();
+
+ auto pkt = encodeResponseSinceAll(rsp);
+ socketComm.sendTo(c.sock, pkt);
+
+ rsp.blocks.clear();
+ rsp.blocks.push_back(std::move(last));
+ }
+ }
+
+ rsp.moreAny = 0;
+ for (const auto& b : rsp.blocks) {
+ if (b.more) { rsp.moreAny = 1; break; }
+ }
+
+ auto pkt = encodeResponseSinceAll(rsp);
+ socketComm.sendTo(c.sock, pkt);
+ return;
+ }
+
+ // 0x0120 —— 批次信息
+ if (cmd == CMD_REQ_BATCH_INFO) {
+ ReqBatchInfo req;
+ if (!decodeRequestBatchInfo(data, req)) return;
+
+ RspBatchInfo rsp;
+ rsp.dataId = req.dataId;
+ rsp.machineId = req.machineId;
+
+ const auto br = getBatch(req.machineId);
+ rsp.state = br.state;
+ rsp.activeBatchId = br.activeBatchId;
+ rsp.activeStartTs = br.activeStartTs;
+ rsp.expectedEndTs = br.expectedEndTs;
+
+ auto pkt = encodeResponseBatchInfo(rsp);
+ socketComm.sendTo(c.sock, pkt);
+ return;
+ }
+
+ // 其它指令:按需扩展
+}
+
+// 统一错误回包
+void Collector::sendErrorTo(ClientInfo& c,
+ uint32_t dataId,
+ uint16_t refCmd,
+ uint32_t machineId,
+ ErrCode code,
+ const std::string& message)
+{
+ RspError er;
+ er.dataId = dataId;
+ er.refCmd = refCmd;
+ er.machineId = machineId;
+ er.code = code;
+ er.message = message;
+
+ auto pkt = encodeResponseError(er);
+ socketComm.sendTo(c.sock, pkt);
+}
+
+// 工具:当前 epoch 毫秒
+uint64_t Collector::nowMs() {
+ using namespace std::chrono;
+ return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+}
--
Gitblit v1.9.3