From e8a27bb203fe2aff70390a5eca002d7438da9b0f Mon Sep 17 00:00:00 2001
From: mrDarker <mr.darker@163.com>
Date: 星期三, 22 十月 2025 14:24:34 +0800
Subject: [PATCH] Merge branch 'clh' into liuyang

---
 SourceCode/Bond/DAQBridge/core/Collector.h |  163 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 163 insertions(+), 0 deletions(-)

diff --git a/SourceCode/Bond/DAQBridge/core/Collector.h b/SourceCode/Bond/DAQBridge/core/Collector.h
new file mode 100644
index 0000000..b595905
--- /dev/null
+++ b/SourceCode/Bond/DAQBridge/core/Collector.h
@@ -0,0 +1,163 @@
+#pragma once
+#ifndef COLLECTOR_H
+#define COLLECTOR_H
+
+#include "CommBase.h"
+#include "../net/SocketComm.h"
+
+#include <functional>
+#include <chrono>
+#include <thread>
+#include <atomic>
+#include <mutex>
+#include <vector>
+#include <string>
+#include <unordered_map>
+
+#include "../buffer/BufferRegistry.h"
+#include "../net/FrameAssembler.h"
+
+// 协议常量/结构(必须包含,供本头文件使用 Proto:: 类型)
+#include "../proto/Protocol.h"
+
+class Collector : public CommBase {
+public:
+    Collector();
+    ~Collector();
+
+    // ===== CommBase 接口 =====
+    void sendSampleData(double sample) override;
+    void sendWindowData(const std::vector<std::string>& dataFields) override;
+
+    void connectServer(const std::string& /*ip*/, uint16_t /*port*/) override {} // 采集端不作为客户端
+    void createServer(uint16_t port) override;
+    void disconnect() override;
+
+    void onConnectionEstablished() override;
+    void onConnectionLost() override;
+
+    // 连接/原始数据回调(UI)
+    void setConnectionStatusCallback(std::function<void(int, std::string)> cb) override { cbStatus = std::move(cb); }
+    void setRawDataCallback(std::function<void(const std::vector<uint8_t>&)> cb) override { cbRaw = std::move(cb); }
+    void setRawDumpEnabled(bool enabled) override { rawDumpEnabled = enabled; }
+
+    // 后台轮询(不阻塞 UI)
+    void startLoop(uint32_t intervalMs = 10);
+    void stopLoop();
+    void poll();
+
+    // 客户端进入/断开事件
+    void setClientEventCallback(std::function<void(const std::string& ip, uint16_t port, bool connected)> cb) {
+        cbClientEvent = std::move(cb);
+    }
+
+    // 客户端快照
+    struct ClientSummary {
+        std::string ip;
+        uint16_t    port = 0;
+        bool        versionOk = false;
+        SOCKET      sock = INVALID_SOCKET;
+    };
+    std::vector<ClientSummary> getClientList();
+
+    // ===== 缓冲/通道相关 =====
+    void        buffersSetChannelName(uint32_t machineId, uint32_t channelId, const std::string& name);
+    std::string buffersGetChannelName(uint32_t machineId, uint32_t channelId) const;
+    std::vector<BufferManager::ChannelInfo> buffersListChannelInfos(uint32_t machineId) const;
+
+    BufferManager& registryAddMachine(uint32_t machineId, const std::string& name,
+        const RetentionPolicy& defPolicy = {});
+    BufferManager* registryFind(uint32_t machineId);
+    const BufferManager* registryFind(uint32_t machineId) const;
+    std::vector<uint32_t> registryListMachines() const;
+
+    void buffersPush(uint32_t machineId, uint32_t channelId, int64_t ts_ms, double v);
+
+    std::vector<Sample> buffersGetSince(uint32_t machineId, uint32_t channelId,
+        int64_t tsExclusive, size_t maxCount = 4096) const;
+    std::vector<Sample> buffersGetRange(uint32_t machineId, uint32_t channelId,
+        int64_t from_ts, int64_t to_ts, size_t maxCount = 4096) const;
+
+    void buffersStart(uint32_t machineId);   // 清空该机历史并开始
+    void buffersStop(uint32_t machineId);    // 暂停(push 忽略)
+    void buffersClear(uint32_t machineId);   // 清空历史,不改 running 状态
+    bool buffersIsRunning(uint32_t machineId) const;
+
+    void buffersSetDefaultPolicy(uint32_t machineId, const RetentionPolicy& p, bool applyExisting = true);
+    void buffersSetPolicy(uint32_t machineId, uint32_t channelId, const RetentionPolicy& p);
+
+    std::vector<BufferManager::ChannelStat> buffersListChannelStats(uint32_t machineId) const;
+    BufferManager::ChannelStat             buffersGetChannelStat(uint32_t machineId, uint32_t channelId) const;
+
+    // ===== 批次管理(新增) =====
+    struct BatchRec {
+        Proto::BatchState state = Proto::BatchState::Idle; // Idle / Active
+        std::string       activeBatchId;                   // Idle 时为空
+        uint64_t          activeStartTs = 0;               // ms epoch
+        uint64_t          expectedEndTs = 0;               // 0: 未知
+    };
+
+    // 新:仅传“预计时长ms”(0=未知),开始时间内部取 nowMs()
+    void batchStart(uint32_t machineId,
+        const std::string& batchId,
+        uint64_t expectedDurationMs = 0);
+
+    // (可选兼容)旧签名保留,但标记为弃用:内部转调新签名
+    void batchStart(uint32_t machineId,
+        const std::string& batchId,
+        uint64_t startTs /*ignored*/,
+        uint64_t expectedEndTs /*treated as duration or absolute?*/);
+
+// 结束当前批次(保持数据,状态转 Idle)
+    void batchStop(uint32_t machineId,
+        uint64_t endTs = 0);                    // 备用
+
+// 查询该机当前批次(若不存在,返回 Idle 空记录)
+    BatchRec getBatch(uint32_t machineId) const;
+
+private:
+    struct ClientInfo {
+        SOCKET sock = INVALID_SOCKET;
+        std::string ip;
+        uint16_t port = 0;
+        std::chrono::steady_clock::time_point tsConnected{};
+        bool versionOk = false;
+        FrameAssembler fr; // 每个客户端一个帧组装器
+    };
+
+    SocketComm socketComm;
+    std::vector<ClientInfo> clients;
+
+    std::function<void(int, std::string)> cbStatus;
+    std::function<void(const std::vector<uint8_t>&)> cbRaw;
+    bool rawDumpEnabled = true;
+
+    std::function<void(const std::string&, uint16_t, bool)> cbClientEvent;
+
+    std::thread       worker;
+    std::atomic<bool> running{ false };
+    std::mutex        mClients;   // 保护 clients
+
+    BufferRegistry registry_;     // 一个 Collector 管所有机台
+
+    // 批次状态
+    std::unordered_map<uint32_t, BatchRec> batches_;
+    mutable std::mutex mBatches;
+
+    // 内部函数
+    void handleClientData(ClientInfo& c, const std::vector<uint8_t>& data);
+    bool isVersionRequest(const std::vector<uint8_t>& data) const;
+
+    // 统一错误回包(6 参数)
+    void sendErrorTo(ClientInfo& c,
+        uint32_t dataId,
+        uint16_t refCmd,
+        uint32_t machineId,
+        Proto::ErrCode code,
+        const std::string& message);
+
+    // 工具:当前 epoch 毫秒
+    static uint64_t nowMs();
+};
+
+#endif // COLLECTOR_H

--
Gitblit v1.9.3