From e8a27bb203fe2aff70390a5eca002d7438da9b0f Mon Sep 17 00:00:00 2001
From: mrDarker <mr.darker@163.com>
Date: 星期三, 22 十月 2025 14:24:34 +0800
Subject: [PATCH] Merge branch 'clh' into liuyang
---
SourceCode/Bond/DAQBridge/core/Collector.h | 163 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 163 insertions(+), 0 deletions(-)
diff --git a/SourceCode/Bond/DAQBridge/core/Collector.h b/SourceCode/Bond/DAQBridge/core/Collector.h
new file mode 100644
index 0000000..b595905
--- /dev/null
+++ b/SourceCode/Bond/DAQBridge/core/Collector.h
@@ -0,0 +1,163 @@
+#pragma once
+#ifndef COLLECTOR_H
+#define COLLECTOR_H
+
+#include "CommBase.h"
+#include "../net/SocketComm.h"
+
+#include <functional>
+#include <chrono>
+#include <thread>
+#include <atomic>
+#include <mutex>
+#include <vector>
+#include <string>
+#include <unordered_map>
+
+#include "../buffer/BufferRegistry.h"
+#include "../net/FrameAssembler.h"
+
+// 协议常量/结构(必须包含,供本头文件使用 Proto:: 类型)
+#include "../proto/Protocol.h"
+
+class Collector : public CommBase {
+public:
+ Collector();
+ ~Collector();
+
+ // ===== CommBase 接口 =====
+ void sendSampleData(double sample) override;
+ void sendWindowData(const std::vector<std::string>& dataFields) override;
+
+ void connectServer(const std::string& /*ip*/, uint16_t /*port*/) override {} // 采集端不作为客户端
+ void createServer(uint16_t port) override;
+ void disconnect() override;
+
+ void onConnectionEstablished() override;
+ void onConnectionLost() override;
+
+ // 连接/原始数据回调(UI)
+ void setConnectionStatusCallback(std::function<void(int, std::string)> cb) override { cbStatus = std::move(cb); }
+ void setRawDataCallback(std::function<void(const std::vector<uint8_t>&)> cb) override { cbRaw = std::move(cb); }
+ void setRawDumpEnabled(bool enabled) override { rawDumpEnabled = enabled; }
+
+ // 后台轮询(不阻塞 UI)
+ void startLoop(uint32_t intervalMs = 10);
+ void stopLoop();
+ void poll();
+
+ // 客户端进入/断开事件
+ void setClientEventCallback(std::function<void(const std::string& ip, uint16_t port, bool connected)> cb) {
+ cbClientEvent = std::move(cb);
+ }
+
+ // 客户端快照
+ struct ClientSummary {
+ std::string ip;
+ uint16_t port = 0;
+ bool versionOk = false;
+ SOCKET sock = INVALID_SOCKET;
+ };
+ std::vector<ClientSummary> getClientList();
+
+ // ===== 缓冲/通道相关 =====
+ void buffersSetChannelName(uint32_t machineId, uint32_t channelId, const std::string& name);
+ std::string buffersGetChannelName(uint32_t machineId, uint32_t channelId) const;
+ std::vector<BufferManager::ChannelInfo> buffersListChannelInfos(uint32_t machineId) const;
+
+ BufferManager& registryAddMachine(uint32_t machineId, const std::string& name,
+ const RetentionPolicy& defPolicy = {});
+ BufferManager* registryFind(uint32_t machineId);
+ const BufferManager* registryFind(uint32_t machineId) const;
+ std::vector<uint32_t> registryListMachines() const;
+
+ void buffersPush(uint32_t machineId, uint32_t channelId, int64_t ts_ms, double v);
+
+ std::vector<Sample> buffersGetSince(uint32_t machineId, uint32_t channelId,
+ int64_t tsExclusive, size_t maxCount = 4096) const;
+ std::vector<Sample> buffersGetRange(uint32_t machineId, uint32_t channelId,
+ int64_t from_ts, int64_t to_ts, size_t maxCount = 4096) const;
+
+ void buffersStart(uint32_t machineId); // 清空该机历史并开始
+ void buffersStop(uint32_t machineId); // 暂停(push 忽略)
+ void buffersClear(uint32_t machineId); // 清空历史,不改 running 状态
+ bool buffersIsRunning(uint32_t machineId) const;
+
+ void buffersSetDefaultPolicy(uint32_t machineId, const RetentionPolicy& p, bool applyExisting = true);
+ void buffersSetPolicy(uint32_t machineId, uint32_t channelId, const RetentionPolicy& p);
+
+ std::vector<BufferManager::ChannelStat> buffersListChannelStats(uint32_t machineId) const;
+ BufferManager::ChannelStat buffersGetChannelStat(uint32_t machineId, uint32_t channelId) const;
+
+ // ===== 批次管理(新增) =====
+ struct BatchRec {
+ Proto::BatchState state = Proto::BatchState::Idle; // Idle / Active
+ std::string activeBatchId; // Idle 时为空
+ uint64_t activeStartTs = 0; // ms epoch
+ uint64_t expectedEndTs = 0; // 0: 未知
+ };
+
+ // 新:仅传“预计时长ms”(0=未知),开始时间内部取 nowMs()
+ void batchStart(uint32_t machineId,
+ const std::string& batchId,
+ uint64_t expectedDurationMs = 0);
+
+ // (可选兼容)旧签名保留,但标记为弃用:内部转调新签名
+ void batchStart(uint32_t machineId,
+ const std::string& batchId,
+ uint64_t startTs /*ignored*/,
+ uint64_t expectedEndTs /*treated as duration or absolute?*/);
+
+// 结束当前批次(保持数据,状态转 Idle)
+ void batchStop(uint32_t machineId,
+ uint64_t endTs = 0); // 备用
+
+// 查询该机当前批次(若不存在,返回 Idle 空记录)
+ BatchRec getBatch(uint32_t machineId) const;
+
+private:
+ struct ClientInfo {
+ SOCKET sock = INVALID_SOCKET;
+ std::string ip;
+ uint16_t port = 0;
+ std::chrono::steady_clock::time_point tsConnected{};
+ bool versionOk = false;
+ FrameAssembler fr; // 每个客户端一个帧组装器
+ };
+
+ SocketComm socketComm;
+ std::vector<ClientInfo> clients;
+
+ std::function<void(int, std::string)> cbStatus;
+ std::function<void(const std::vector<uint8_t>&)> cbRaw;
+ bool rawDumpEnabled = true;
+
+ std::function<void(const std::string&, uint16_t, bool)> cbClientEvent;
+
+ std::thread worker;
+ std::atomic<bool> running{ false };
+ std::mutex mClients; // 保护 clients
+
+ BufferRegistry registry_; // 一个 Collector 管所有机台
+
+ // 批次状态
+ std::unordered_map<uint32_t, BatchRec> batches_;
+ mutable std::mutex mBatches;
+
+ // 内部函数
+ void handleClientData(ClientInfo& c, const std::vector<uint8_t>& data);
+ bool isVersionRequest(const std::vector<uint8_t>& data) const;
+
+ // 统一错误回包(6 参数)
+ void sendErrorTo(ClientInfo& c,
+ uint32_t dataId,
+ uint16_t refCmd,
+ uint32_t machineId,
+ Proto::ErrCode code,
+ const std::string& message);
+
+ // 工具:当前 epoch 毫秒
+ static uint64_t nowMs();
+};
+
+#endif // COLLECTOR_H
--
Gitblit v1.9.3