From e8a27bb203fe2aff70390a5eca002d7438da9b0f Mon Sep 17 00:00:00 2001
From: mrDarker <mr.darker@163.com>
Date: 星期三, 22 十月 2025 14:24:34 +0800
Subject: [PATCH] Merge branch 'clh' into liuyang
---
SourceCode/Bond/DAQBridge/core/Display.cpp | 266 +++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 266 insertions(+), 0 deletions(-)
diff --git a/SourceCode/Bond/DAQBridge/core/Display.cpp b/SourceCode/Bond/DAQBridge/core/Display.cpp
new file mode 100644
index 0000000..9b3b71b
--- /dev/null
+++ b/SourceCode/Bond/DAQBridge/core/Display.cpp
@@ -0,0 +1,266 @@
+#include "Display.h"
+#include <iostream>
+#include "../DAQConfig.h"
+#include "../core/ConnEvents.h"
+using namespace DAQEvt;
+
+// 公共协议头
+#include "../proto/Protocol.h"
+#include "../proto/ProtocolCodec.h"
+#include "../net/FrameAssembler.h"
+
+using namespace Proto;
+
+Display::Display() {}
+Display::~Display() {
+ stopRecvLoop();
+ disconnect();
+}
+
+void Display::connectServer(const std::string& ip, uint16_t port) {
+ if (socketComm.createClientSocket(ip, port)) {
+ if (cbStatus) cbStatus(static_cast<int>(ConnCode::DisplayConnected), "Display connected to server");
+ onConnectionEstablished();
+
+ // 连接后立刻发送版本请求(0x0001),带 dataId 回显
+ ReqVersion vreq;
+ vreq.dataId = m_nextDataId++;
+ auto pkt = encodeRequestVersion(vreq);
+ socketComm.sendDataSingle(pkt);
+
+ startRecvLoop(DAQCfg::RecvLoopIntervalMs);
+ }
+}
+
+void Display::disconnect() {
+ stopRecvLoop();
+ socketComm.closeSocket();
+ if (cbStatus) cbStatus(static_cast<int>(ConnCode::DisplayDisconnected), "Display disconnected");
+ onConnectionLost();
+}
+
+void Display::startRecvLoop(uint32_t intervalMs) {
+ if (recvRunning.load()) return;
+ recvRunning.store(true);
+ recvThread = std::thread([this, intervalMs]() {
+ std::vector<uint8_t> chunk;
+ std::vector<uint8_t> frame;
+ FrameAssembler fr;
+
+ while (recvRunning.load()) {
+ chunk.clear();
+ if (socketComm.recvSingle(chunk) && !chunk.empty()) {
+ if (rawDumpEnabled && cbRaw) cbRaw(chunk);
+ fr.push(chunk);
+ while (fr.nextFrame(frame)) {
+ if (rawDumpEnabled && cbRaw) cbRaw(frame);
+ handleRawData(frame);
+ }
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(intervalMs));
+ }
+ });
+}
+
+void Display::stopRecvLoop() {
+ if (!recvRunning.load()) return;
+ recvRunning.store(false);
+ if (recvThread.joinable()) recvThread.join();
+}
+
+void Display::sendSampleData(double) { /* 客户端这边暂不发 */ }
+void Display::sendWindowData(const std::vector<std::string>&) { /* 同上 */ }
+
+void Display::onConnectionEstablished() { std::cout << "[Display] connected\n"; }
+void Display::onConnectionLost() { std::cout << "[Display] disconnected\n"; }
+
+// —— 低层接口:显式 dataId ——
+
+// 发送 0x0101(增量拉取)— 原版(不带 batchId)
+void Display::requestSince(uint32_t dataId, uint32_t machineId, uint32_t channelId,
+ uint64_t sinceTsExclusive, uint16_t maxCount) {
+ // 调到带 batchId 的重载,传空串 => 不附加 batchId
+ requestSince(dataId, machineId, channelId, sinceTsExclusive, std::string(), maxCount);
+}
+
+// 发送 0x0101(增量拉取)— ★ 新:带 batchId
+void Display::requestSince(uint32_t dataId, uint32_t machineId, uint32_t channelId,
+ uint64_t sinceTsExclusive, const std::string& batchId, uint16_t maxCount) {
+ ReqSince req;
+ req.dataId = dataId;
+ req.machineId = machineId;
+ req.channelId = channelId;
+ req.sinceTsExclusive = sinceTsExclusive;
+ req.maxCount = maxCount;
+ if (!batchId.empty()) {
+ req.flags = SINCE_FLAG_HAS_BATCH;
+ req.batchId = batchId;
+ }
+ else {
+ req.flags = 0;
+ req.batchId.clear();
+ }
+ auto pkt = encodeRequestSince(req);
+ socketComm.sendDataSingle(pkt);
+}
+
+void Display::requestMachines(uint32_t dataId) {
+ ReqMachines req; req.dataId = dataId;
+ auto pkt = encodeRequestMachines(req);
+ socketComm.sendDataSingle(pkt);
+}
+
+void Display::requestStats(uint32_t dataId, uint32_t machineId) {
+ ReqStats req; req.dataId = dataId; req.machineId = machineId; req.flags = 0;
+ auto pkt = encodeRequestStats(req);
+ socketComm.sendDataSingle(pkt);
+}
+
+// 整机多通道增量 — 原版(不带 batchId)
+void Display::requestSinceAll(uint32_t dataId, uint32_t machineId, uint64_t sinceTsExclusive,
+ uint16_t maxPerChannel) {
+ // 调到带 batchId 的重载,传空串
+ requestSinceAll(dataId, machineId, sinceTsExclusive, std::string(), maxPerChannel);
+}
+
+// 整机多通道增量 — ★ 新:带 batchId
+void Display::requestSinceAll(uint32_t dataId, uint32_t machineId, uint64_t sinceTsExclusive,
+ const std::string& batchId, uint16_t maxPerChannel) {
+ ReqSinceAll req;
+ req.dataId = dataId;
+ req.machineId = machineId;
+ req.sinceTsExclusive = sinceTsExclusive;
+ req.maxPerChannel = maxPerChannel;
+ if (!batchId.empty()) {
+ req.flags = SINCE_FLAG_HAS_BATCH;
+ req.batchId = batchId;
+ }
+ else {
+ req.flags = 0;
+ req.batchId.clear();
+ }
+ socketComm.sendDataSingle(encodeRequestSinceAll(req));
+}
+
+// —— 新增:批次信息拉取 ——
+// 显式 dataId
+void Display::requestBatchInfo(uint32_t dataId, uint32_t machineId) {
+ ReqBatchInfo req; req.dataId = dataId; req.machineId = machineId;
+ socketComm.sendDataSingle(encodeRequestBatchInfo(req));
+}
+// 便捷:自动 dataId
+void Display::requestBatchInfo(uint32_t machineId) {
+ requestBatchInfo(m_nextDataId++, machineId);
+}
+
+// —— 便捷接口:自动分配 dataId ——
+// 三组只是简单地在内部调用上述显式版
+
+void Display::requestMachines() {
+ requestMachines(m_nextDataId++);
+}
+
+void Display::requestStats(uint32_t machineId) {
+ requestStats(m_nextDataId++, machineId);
+}
+
+void Display::requestSince(uint32_t machineId, uint32_t channelId,
+ uint64_t sinceTsExclusive, uint16_t maxCount) {
+ requestSince(m_nextDataId++, machineId, channelId, sinceTsExclusive, maxCount);
+}
+
+void Display::requestSince(uint32_t machineId, uint32_t channelId,
+ uint64_t sinceTsExclusive, const std::string& batchId, uint16_t maxCount) {
+ requestSince(m_nextDataId++, machineId, channelId, sinceTsExclusive, batchId, maxCount);
+}
+
+void Display::requestSinceAll(uint32_t machineId, uint64_t sinceTsExclusive, uint16_t maxPerChannel) {
+ requestSinceAll(m_nextDataId++, machineId, sinceTsExclusive, maxPerChannel);
+}
+
+void Display::requestSinceAll(uint32_t machineId, uint64_t sinceTsExclusive,
+ const std::string& batchId, uint16_t maxPerChannel) {
+ requestSinceAll(m_nextDataId++, machineId, sinceTsExclusive, batchId, maxPerChannel);
+}
+
+// 收包分发(在接收线程里被调用)
+void Display::handleRawData(const std::vector<uint8_t>& rawData) {
+ // F001 版本响应
+ {
+ RspVersion vrsp;
+ if (decodeResponseVersion(rawData, vrsp)) {
+ if (cbStatus) cbStatus(static_cast<int>(ConnCode::VersionOk),
+ std::string("Server version: ") + vrsp.version);
+ // m_versionOk = true;
+ return;
+ }
+ }
+
+ // F101 —— since 拉取响应
+ {
+ RspSince rsp;
+ if (decodeResponseSince(rawData, rsp)) {
+ if (cbSamples) cbSamples(rsp.machineId, rsp.channelId, rsp.lastTsSent, rsp.more, rsp.samples);
+ return;
+ }
+ }
+
+ // F103 —— 统计/通道
+ {
+ RspStats st;
+ if (decodeResponseStats(rawData, st)) {
+ if (cbStats) cbStats(st.machineId, st.channels);
+ return;
+ }
+ }
+
+ // F104 —— 机台列表
+ {
+ RspMachines ms;
+ if (decodeResponseMachines(rawData, ms)) {
+ if (cbMachines) cbMachines(ms.machines);
+ return;
+ }
+ }
+
+ // F105 —— 多通道增量
+ {
+ RspSinceAll ra;
+ if (decodeResponseSinceAll(rawData, ra)) {
+ if (cbSamplesMulti) cbSamplesMulti(ra.machineId, ra.moreAny, ra.blocks);
+ return;
+ }
+ }
+
+ // ★ 新增:F120 —— 批次信息
+ {
+ RspBatchInfo bi;
+ if (decodeResponseBatchInfo(rawData, bi)) {
+ if (cbBatchInfo) cbBatchInfo(bi);
+ return;
+ }
+ }
+
+ // ★ 新增:E100 —— 统一错误(自愈)
+ {
+ RspError er;
+ if (decodeResponseError(rawData, er)) {
+ if (cbStatus) {
+ std::string s = "ERR ref=0x" + [](uint16_t x) {
+ char buf[8]; std::snprintf(buf, sizeof(buf), "%04X", (unsigned)x); return std::string(buf);
+ }(er.refCmd) +
+ " mid=" + std::to_string(er.machineId) +
+ " code=" + std::to_string((unsigned)er.code) +
+ " msg=" + er.message;
+ cbStatus(static_cast<int>(ConnCode::SocketError), s);
+ }
+ // 错误自愈:NoActive / Mismatch → 拉一次 BatchInfo
+ if (er.code == ErrCode::NoActiveBatch || er.code == ErrCode::BatchMismatch) {
+ requestBatchInfo(er.machineId);
+ }
+ return;
+ }
+ }
+
+ // 其它类型(将来扩展)……
+}
--
Gitblit v1.9.3