From e8a27bb203fe2aff70390a5eca002d7438da9b0f Mon Sep 17 00:00:00 2001
From: mrDarker <mr.darker@163.com>
Date: 星期三, 22 十月 2025 14:24:34 +0800
Subject: [PATCH] Merge branch 'clh' into liuyang

---
 SourceCode/Bond/DAQBridge/core/Collector.cpp |  537 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 537 insertions(+), 0 deletions(-)

diff --git a/SourceCode/Bond/DAQBridge/core/Collector.cpp b/SourceCode/Bond/DAQBridge/core/Collector.cpp
new file mode 100644
index 0000000..7cb2d3d
--- /dev/null
+++ b/SourceCode/Bond/DAQBridge/core/Collector.cpp
@@ -0,0 +1,537 @@
+#include "Collector.h"
+#include <iostream>
+#include <chrono>
+
+#include "../DAQConfig.h"
+#include "../core/ConnEvents.h"
+using namespace DAQEvt;
+
+// 协议编解码
+#include "../proto/Protocol.h"
+#include "../proto/ProtocolCodec.h"
+
+using namespace Proto;
+
+Collector::Collector() {}
+
+void Collector::startLoop(uint32_t intervalMs) {
+    if (running.load()) return;
+    running.store(true);
+    worker = std::thread([this, intervalMs]() {
+        while (running.load()) {
+            this->poll();
+            std::this_thread::sleep_for(std::chrono::milliseconds(intervalMs));
+        }
+        });
+}
+
+void Collector::stopLoop() {
+    if (!running.load()) return;
+    running.store(false);
+    if (worker.joinable()) worker.join();
+}
+
+Collector::~Collector() {
+    stopLoop();
+    disconnect();
+}
+
+std::vector<Collector::ClientSummary> Collector::getClientList() {
+    std::lock_guard<std::mutex> lk(mClients);
+    std::vector<ClientSummary> out;
+    out.reserve(clients.size());
+    for (const auto& c : clients) {
+        out.push_back(ClientSummary{ c.ip, c.port, c.versionOk, c.sock });
+    }
+    return out;
+}
+
+void Collector::createServer(uint16_t port) {
+    if (socketComm.createServerSocket(port)) {
+        if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ServerListening),
+            "Collector server listening on port " + std::to_string(port));
+        onConnectionEstablished();
+    }
+}
+
+void Collector::disconnect() {
+    {
+        std::lock_guard<std::mutex> lk(mClients);
+        for (auto& c : clients) {
+            socketComm.closeClient(c.sock);
+            if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false);
+        }
+        clients.clear();
+    }
+    socketComm.closeSocket();
+    if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ServerStopped), "Collector server stopped");
+    onConnectionLost();
+}
+
+void Collector::poll() {
+    // 1) 接新客户端
+    while (true) {
+        SOCKET cs; std::string ip; uint16_t port;
+        if (!socketComm.acceptOne(cs, ip, port)) break;
+
+        ClientInfo ci;
+        ci.sock = cs; ci.ip = ip; ci.port = port;
+        ci.tsConnected = std::chrono::steady_clock::now();
+
+        {
+            std::lock_guard<std::mutex> lk(mClients);
+            clients.push_back(ci);
+        }
+
+        if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ClientAccepted),
+            "Client connected: " + ip + ":" + std::to_string(port));
+        if (cbClientEvent) cbClientEvent(ip, port, /*connected*/true);
+    }
+
+    // 2) 轮询各客户端
+    auto now = std::chrono::steady_clock::now();
+    std::lock_guard<std::mutex> lk(mClients);
+    for (size_t i = 0; i < clients.size(); ) {
+        auto& c = clients[i];
+
+        // 2a) 版本握手超时
+        if (!c.versionOk) {
+            auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - c.tsConnected).count();
+            if (elapsed >= DAQCfg::KickIfNoVersionSec) {
+                if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::VersionTimeoutKick),
+                    "Kick client (no version within 5s): " + c.ip + ":" + std::to_string(c.port));
+                socketComm.closeClient(c.sock);
+                if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false);
+                clients.erase(clients.begin() + i);
+                continue;
+            }
+        }
+
+        // 2b) 收数据
+        std::vector<uint8_t> buf;
+        bool peerClosed = false;
+        if (socketComm.recvFrom(c.sock, buf, peerClosed)) {
+            if (!buf.empty()) {
+                if (rawDumpEnabled && cbRaw) cbRaw(buf);
+
+                // 帧组装
+                c.fr.push(buf);
+                std::vector<uint8_t> frame;
+                while (c.fr.nextFrame(frame)) {
+                    if (rawDumpEnabled && cbRaw) cbRaw(frame);
+                    handleClientData(c, frame);
+                }
+            }
+        }
+        else if (peerClosed) {
+            // 对端主动断开
+            if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ClientDisconnected),
+                "Client disconnected: " + c.ip + ":" + std::to_string(c.port));
+            socketComm.closeClient(c.sock);
+            if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false);
+            clients.erase(clients.begin() + i);
+            continue;
+        }
+
+        ++i;
+    }
+}
+
+void Collector::sendSampleData(double sample) {
+    (void)sample;
+#ifdef _DEBUG
+    if (cbStatus) cbStatus(0, "sendSampleData() no-op.");
+#endif
+}
+
+void Collector::sendWindowData(const std::vector<std::string>& fields) {
+    (void)fields;
+#ifdef _DEBUG
+    if (cbStatus) cbStatus(0, "sendWindowData() no-op.");
+#endif
+}
+
+void Collector::onConnectionEstablished() {
+    std::cout << "[Collector] ready.\n";
+}
+void Collector::onConnectionLost() {
+    std::cout << "[Collector] stopped.\n";
+}
+
+// 兼容旧门禁函数
+bool Collector::isVersionRequest(const std::vector<uint8_t>& data) const {
+    if (data.size() < 4 + 4 + 2 + 2 + 1) return false;
+    if (!(data[0] == 0x11 && data[1] == 0x88 && data[2] == 0x11 && data[3] == 0x88)) return false;
+    size_t cmdPos = 4 + 4 + 2;
+    uint16_t cmd = (uint16_t(data[cmdPos]) << 8) | data[cmdPos + 1];
+    return (cmd == CMD_REQ_VERSION);
+}
+
+// ========== BufferRegistry 直通 ==========
+BufferManager& Collector::registryAddMachine(uint32_t machineId, const std::string& name,
+    const RetentionPolicy& defPolicy) {
+    return registry_.getOrCreate(machineId, name, defPolicy);
+}
+BufferManager* Collector::registryFind(uint32_t machineId) { return registry_.find(machineId); }
+const BufferManager* Collector::registryFind(uint32_t machineId) const { return registry_.find(machineId); }
+std::vector<uint32_t> Collector::registryListMachines() const { return registry_.listManagers(); }
+
+void Collector::buffersPush(uint32_t machineId, uint32_t channelId, int64_t ts_ms, double v) {
+    auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
+    bm.push(channelId, ts_ms, v); // 未 start() 时将被内部忽略
+}
+
+std::vector<Sample> Collector::buffersGetSince(uint32_t machineId, uint32_t channelId,
+    int64_t tsExclusive, size_t maxCount) const {
+    auto* bm = registry_.find(machineId);
+    if (!bm) return {};
+    return bm->getSince(channelId, tsExclusive, maxCount);
+}
+std::vector<Sample> Collector::buffersGetRange(uint32_t machineId, uint32_t channelId,
+    int64_t from_ts, int64_t to_ts, size_t maxCount) const {
+    auto* bm = registry_.find(machineId);
+    if (!bm) return {};
+    return bm->getRange(channelId, from_ts, to_ts, maxCount);
+}
+
+void Collector::buffersStart(uint32_t machineId) {
+    auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
+    bm.start(); // 清空 + 允许写
+}
+void Collector::buffersStop(uint32_t machineId) {
+    if (auto* bm = registry_.find(machineId)) bm->stop();
+}
+void Collector::buffersClear(uint32_t machineId) {
+    if (auto* bm = registry_.find(machineId)) bm->clearAll();
+}
+bool Collector::buffersIsRunning(uint32_t machineId) const {
+    auto* bm = registry_.find(machineId);
+    return bm ? bm->isRunning() : false;
+}
+
+void Collector::buffersSetDefaultPolicy(uint32_t machineId, const RetentionPolicy& p, bool applyExisting) {
+    auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
+    bm.setDefaultPolicy(p, applyExisting);
+}
+void Collector::buffersSetPolicy(uint32_t machineId, uint32_t channelId, const RetentionPolicy& p) {
+    auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
+    bm.setPolicy(channelId, p);
+}
+void Collector::buffersSetChannelName(uint32_t machineId, uint32_t channelId, const std::string& name) {
+    auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
+    bm.setChannelName(channelId, name);
+}
+std::string Collector::buffersGetChannelName(uint32_t machineId, uint32_t channelId) const {
+    auto* bm = registry_.find(machineId);
+    if (!bm) return {};
+    return bm->getChannelName(channelId);
+}
+std::vector<BufferManager::ChannelInfo> Collector::buffersListChannelInfos(uint32_t machineId) const {
+    auto* bm = registry_.find(machineId);
+    if (!bm) return {};
+    return bm->listChannelInfos();
+}
+std::vector<BufferManager::ChannelStat> Collector::buffersListChannelStats(uint32_t machineId) const {
+    auto* bm = registry_.find(machineId);
+    if (!bm) return {};
+    return bm->listChannelStats();
+}
+BufferManager::ChannelStat Collector::buffersGetChannelStat(uint32_t machineId, uint32_t channelId) const {
+    auto* bm = registry_.find(machineId);
+    if (!bm) return BufferManager::ChannelStat{ channelId, 0, 0, 0 };
+    return bm->getChannelStat(channelId);
+}
+
+// ====== 批次管理 ======
+// 新实现:根据 expectedDurationMs 计算 expectedEndTs
+void Collector::batchStart(uint32_t machineId,
+    const std::string& batchId,
+    uint64_t expectedDurationMs)
+{
+    const uint64_t startTs = nowMs();
+    const uint64_t expectedEndTs = (expectedDurationMs > 0) ? (startTs + expectedDurationMs) : 0;
+
+    {
+        std::lock_guard<std::mutex> lk(mBatches);
+        auto& br = batches_[machineId];
+        br.state = BatchState::Active;
+        br.activeBatchId = batchId;
+        br.activeStartTs = startTs;
+        br.expectedEndTs = expectedEndTs;
+    }
+
+    // 批次开始:清空并开始采集
+    buffersStart(machineId);
+
+    if (cbStatus) cbStatus(0, "batchStart: machine " + std::to_string(machineId) +
+        " batch=" + batchId +
+        " durMs=" + std::to_string(expectedDurationMs));
+}
+
+// 兼容旧签名:忽略 startTs;把 expectedEndTs 当“时长”
+void Collector::batchStart(uint32_t machineId,
+    const std::string& batchId,
+    uint64_t /*startTs_ignored*/,
+    uint64_t expectedEndTs_orDuration)
+{
+    // 若调用方误传了绝对的“结束时间戳”,这里会被当作“时长”;
+    // 如需严格区分,你也可以加一个阈值判断(比如大于 10^12 认为是绝对时间戳)
+    // 然后转换:duration = max(0, endTs - now)
+
+    uint64_t durationMs = expectedEndTs_orDuration;
+
+    // 可选:做个简单的“像绝对时间戳”的判断并自动转为时长
+    // 以毫秒时间戳阈值举例(~2001-09-09):1e12
+    if (expectedEndTs_orDuration > 1000000000000ULL) {
+        uint64_t now = nowMs();
+        if (expectedEndTs_orDuration > now)
+            durationMs = expectedEndTs_orDuration - now;
+        else
+            durationMs = 0; // 已过期,当未知处理
+    }
+
+    batchStart(machineId, batchId, durationMs);
+}
+
+void Collector::batchStop(uint32_t machineId, uint64_t /*endTs*/) {
+    {
+        std::lock_guard<std::mutex> lk(mBatches);
+        auto it = batches_.find(machineId);
+        if (it != batches_.end()) {
+            it->second.state = BatchState::Idle;
+            it->second.activeBatchId.clear();
+            it->second.activeStartTs = 0;
+            it->second.expectedEndTs = 0;
+        }
+        else {
+            batches_[machineId] = BatchRec{}; // 生成一条 Idle
+        }
+    }
+
+    // 停止采集,但不清数据
+    buffersStop(machineId);
+
+    if (cbStatus) cbStatus(0, "batchStop: machine " + std::to_string(machineId));
+}
+
+Collector::BatchRec Collector::getBatch(uint32_t machineId) const {
+    std::lock_guard<std::mutex> lk(mBatches);
+    auto it = batches_.find(machineId);
+    if (it != batches_.end()) return it->second;
+    return BatchRec{}; // 默认 Idle
+}
+
+// ========== 业务分发 ==========
+void Collector::handleClientData(ClientInfo& c, const std::vector<uint8_t>& data) {
+    // 版本校验
+    if (!c.versionOk) {
+        ReqVersion vreq{};
+        if (decodeRequestVersion(data, vreq)) {
+            c.versionOk = true;
+            if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::VersionOk),
+                "Client " + c.ip + ":" + std::to_string(c.port) + " version OK");
+
+            RspVersion vrst;
+            vrst.dataId = vreq.dataId;         // 回显
+            vrst.version = DAQCfg::Version;    // 集中配置
+            auto rsp = encodeResponseVersion(vrst);
+            socketComm.sendTo(c.sock, rsp);
+        }
+        return; // 未通过版本校验,其它忽略
+    }
+
+    // 通过版本校验后:按 cmd 分发
+    const uint16_t cmd = peek_cmd(data);
+
+    // 0x0101 —— 单通道增量拉取
+    if (cmd == CMD_REQ_SINCE) {
+        ReqSince req;
+        if (!decodeRequestSince(data, req)) return;
+
+        auto vec = buffersGetSince(req.machineId, req.channelId,
+            static_cast<int64_t>(req.sinceTsExclusive), req.maxCount);
+
+        uint64_t lastTsSent = req.sinceTsExclusive;
+        if (!vec.empty()) lastTsSent = static_cast<uint64_t>(vec.back().ts_ms);
+
+        auto stat = buffersGetChannelStat(req.machineId, req.channelId);
+        uint8_t more = (stat.latestTs > static_cast<int64_t>(lastTsSent)) ? 1 : 0;
+
+        RspSince rsp;
+        rsp.dataId = req.dataId; // 回显
+        rsp.machineId = req.machineId;
+        rsp.channelId = req.channelId;
+        rsp.lastTsSent = lastTsSent;
+        rsp.more = more;
+        rsp.samples.reserve(vec.size());
+        for (auto& s : vec) rsp.samples.push_back({ static_cast<uint64_t>(s.ts_ms), s.value });
+
+        auto pkt = encodeResponseSince(rsp);
+        socketComm.sendTo(c.sock, pkt);
+        return;
+    }
+
+    // 0x0103 —— 统计/通道列表
+    if (cmd == CMD_REQ_STATS) {
+        ReqStats req;
+        if (!decodeRequestStats(data, req)) return;
+
+        auto* bm = registry_.find(req.machineId);
+        if (!bm) {
+            // 示例:可回错误帧
+            // sendErrorTo(c, req.dataId, CMD_REQ_STATS, req.machineId, ErrCode::NoActiveBatch, "machine not found");
+            return;
+        }
+
+        RspStats rsp;
+        rsp.dataId = req.dataId;
+        rsp.machineId = req.machineId;
+
+        auto stats = bm->listChannelStats();
+        rsp.channels.reserve(stats.size());
+        for (auto& s : stats) {
+            ChannelStatInfo ci;
+            ci.channelId = s.id;
+            ci.earliestTs = static_cast<uint64_t>(s.earliestTs);
+            ci.latestTs = static_cast<uint64_t>(s.latestTs);
+            ci.size = static_cast<uint32_t>(s.size);
+            ci.name = bm->getChannelName(s.id); // UTF-8
+            rsp.channels.push_back(std::move(ci));
+        }
+
+        auto pkt = encodeResponseStats(rsp);
+        socketComm.sendTo(c.sock, pkt);
+        return;
+    }
+
+    // 0x0104 —— 机台列表
+    if (cmd == CMD_REQ_MACHINES) {
+        ReqMachines req;
+        if (!decodeRequestMachines(data, req)) return;
+
+        RspMachines rsp;
+        rsp.dataId = req.dataId;
+
+        auto mids = registry_.listManagers();
+        rsp.machines.reserve(mids.size());
+        for (auto id : mids) {
+            auto* bm = registry_.find(id);
+            if (!bm) continue;
+            rsp.machines.push_back(MachineInfo{ id, bm->name() });
+        }
+
+        auto pkt = encodeResponseMachines(rsp);
+        socketComm.sendTo(c.sock, pkt);
+        return;
+    }
+
+    // 0x0105 —— 整机多通道增量拉取
+    if (cmd == CMD_REQ_SINCE_ALL) {
+        ReqSinceAll req;
+        if (!decodeRequestSinceAll(data, req)) return;
+
+        RspSinceAll rsp;
+        rsp.dataId = req.dataId;      // 回显
+        rsp.machineId = req.machineId;
+        rsp.moreAny = 0;
+
+        auto* bm = registry_.find(req.machineId);
+        if (!bm) {
+            auto pkt = encodeResponseSinceAll(rsp);
+            socketComm.sendTo(c.sock, pkt);
+            return;
+        }
+
+        // 避免超过 2 字节长度:正文 ~60KB
+        constexpr size_t kMaxBody = 60 * 1024;
+
+        rsp.blocks.clear();
+        auto stats = bm->listChannelStats();
+        for (auto& s : stats) {
+            auto vec = bm->getSince(s.id, static_cast<int64_t>(req.sinceTsExclusive), req.maxPerChannel);
+            if (vec.empty()) continue;
+
+            ChannelBlock blk;
+            blk.channelId = s.id;
+            blk.lastTsSent = static_cast<uint64_t>(vec.back().ts_ms);
+
+            auto st = bm->getChannelStat(s.id);
+            blk.more = (st.latestTs > static_cast<int64_t>(blk.lastTsSent)) ? 1 : 0;
+
+            blk.samples.reserve(vec.size());
+            for (auto& sm : vec) {
+                blk.samples.push_back({ static_cast<uint64_t>(sm.ts_ms), sm.value });
+            }
+
+            rsp.blocks.push_back(std::move(blk));
+            auto tentative = encodeResponseSinceAll(rsp);
+            if (tentative.size() > (4 + 4 + 2) + kMaxBody + 1) {
+                // 超体积:把最后一块拿出来,先发前面的
+                auto last = std::move(rsp.blocks.back());
+                rsp.blocks.pop_back();
+
+                auto pkt = encodeResponseSinceAll(rsp);
+                socketComm.sendTo(c.sock, pkt);
+
+                rsp.blocks.clear();
+                rsp.blocks.push_back(std::move(last));
+            }
+        }
+
+        rsp.moreAny = 0;
+        for (const auto& b : rsp.blocks) {
+            if (b.more) { rsp.moreAny = 1; break; }
+        }
+
+        auto pkt = encodeResponseSinceAll(rsp);
+        socketComm.sendTo(c.sock, pkt);
+        return;
+    }
+
+    // 0x0120 —— 批次信息
+    if (cmd == CMD_REQ_BATCH_INFO) {
+        ReqBatchInfo req;
+        if (!decodeRequestBatchInfo(data, req)) return;
+
+        RspBatchInfo rsp;
+        rsp.dataId = req.dataId;
+        rsp.machineId = req.machineId;
+
+        const auto br = getBatch(req.machineId);
+        rsp.state = br.state;
+        rsp.activeBatchId = br.activeBatchId;
+        rsp.activeStartTs = br.activeStartTs;
+        rsp.expectedEndTs = br.expectedEndTs;
+
+        auto pkt = encodeResponseBatchInfo(rsp);
+        socketComm.sendTo(c.sock, pkt);
+        return;
+    }
+
+    // 其它指令:按需扩展
+}
+
+// 统一错误回包
+void Collector::sendErrorTo(ClientInfo& c,
+    uint32_t dataId,
+    uint16_t refCmd,
+    uint32_t machineId,
+    ErrCode code,
+    const std::string& message)
+{
+    RspError er;
+    er.dataId = dataId;
+    er.refCmd = refCmd;
+    er.machineId = machineId;
+    er.code = code;
+    er.message = message;
+
+    auto pkt = encodeResponseError(er);
+    socketComm.sendTo(c.sock, pkt);
+}
+
+// 工具:当前 epoch 毫秒
+uint64_t Collector::nowMs() {
+    using namespace std::chrono;
+    return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+}

--
Gitblit v1.9.3