#include "Display.h" #include #include "../DAQConfig.h" #include "../core/ConnEvents.h" using namespace DAQEvt; // ¹«¹²Ð­ÒéÍ· #include "../proto/Protocol.h" #include "../proto/ProtocolCodec.h" #include "../net/FrameAssembler.h" using namespace Proto; Display::Display() {} Display::~Display() { stopRecvLoop(); disconnect(); } void Display::connectServer(const std::string& ip, uint16_t port) { if (socketComm.createClientSocket(ip, port)) { if (cbStatus) cbStatus(static_cast(ConnCode::DisplayConnected), "Display connected to server"); onConnectionEstablished(); // Á¬½ÓºóÁ¢¿Ì·¢ËͰ汾ÇëÇó£¨0x0001£©£¬´ø dataId »ØÏÔ ReqVersion vreq; vreq.dataId = m_nextDataId++; auto pkt = encodeRequestVersion(vreq); socketComm.sendDataSingle(pkt); startRecvLoop(DAQCfg::RecvLoopIntervalMs); } } void Display::disconnect() { stopRecvLoop(); socketComm.closeSocket(); if (cbStatus) cbStatus(static_cast(ConnCode::DisplayDisconnected), "Display disconnected"); onConnectionLost(); } void Display::startRecvLoop(uint32_t intervalMs) { if (recvRunning.load()) return; recvRunning.store(true); recvThread = std::thread([this, intervalMs]() { std::vector chunk; std::vector frame; FrameAssembler fr; while (recvRunning.load()) { chunk.clear(); if (socketComm.recvSingle(chunk) && !chunk.empty()) { if (rawDumpEnabled && cbRaw) cbRaw(chunk); fr.push(chunk); while (fr.nextFrame(frame)) { if (rawDumpEnabled && cbRaw) cbRaw(frame); handleRawData(frame); } } std::this_thread::sleep_for(std::chrono::milliseconds(intervalMs)); } }); } void Display::stopRecvLoop() { if (!recvRunning.load()) return; recvRunning.store(false); if (recvThread.joinable()) recvThread.join(); } void Display::sendSampleData(double) { /* ¿Í»§¶ËÕâ±ßÔݲ»·¢ */ } void Display::sendWindowData(const std::vector&) { /* ͬÉÏ */ } void Display::onConnectionEstablished() { std::cout << "[Display] connected\n"; } void Display::onConnectionLost() { std::cout << "[Display] disconnected\n"; } // ¡ª¡ª µÍ²ã½Ó¿Ú£ºÏÔʽ dataId ¡ª¡ª // ·¢ËÍ 0x0101£¨ÔöÁ¿À­È¡£©¡ª Ô­°æ£¨²»´ø batchId£© void Display::requestSince(uint32_t dataId, uint32_t machineId, uint32_t channelId, uint64_t sinceTsExclusive, uint16_t maxCount) { // µ÷µ½´ø batchId µÄÖØÔØ£¬´«¿Õ´® => ²»¸½¼Ó batchId requestSince(dataId, machineId, channelId, sinceTsExclusive, std::string(), maxCount); } // ·¢ËÍ 0x0101£¨ÔöÁ¿À­È¡£©¡ª ¡ï Уº´ø batchId void Display::requestSince(uint32_t dataId, uint32_t machineId, uint32_t channelId, uint64_t sinceTsExclusive, const std::string& batchId, uint16_t maxCount) { ReqSince req; req.dataId = dataId; req.machineId = machineId; req.channelId = channelId; req.sinceTsExclusive = sinceTsExclusive; req.maxCount = maxCount; if (!batchId.empty()) { req.flags = SINCE_FLAG_HAS_BATCH; req.batchId = batchId; } else { req.flags = 0; req.batchId.clear(); } auto pkt = encodeRequestSince(req); socketComm.sendDataSingle(pkt); } void Display::requestMachines(uint32_t dataId) { ReqMachines req; req.dataId = dataId; auto pkt = encodeRequestMachines(req); socketComm.sendDataSingle(pkt); } void Display::requestStats(uint32_t dataId, uint32_t machineId) { ReqStats req; req.dataId = dataId; req.machineId = machineId; req.flags = 0; auto pkt = encodeRequestStats(req); socketComm.sendDataSingle(pkt); } // Õû»ú¶àͨµÀÔöÁ¿ ¡ª Ô­°æ£¨²»´ø batchId£© void Display::requestSinceAll(uint32_t dataId, uint32_t machineId, uint64_t sinceTsExclusive, uint16_t maxPerChannel) { // µ÷µ½´ø batchId µÄÖØÔØ£¬´«¿Õ´® requestSinceAll(dataId, machineId, sinceTsExclusive, std::string(), maxPerChannel); } // Õû»ú¶àͨµÀÔöÁ¿ ¡ª ¡ï Уº´ø batchId void Display::requestSinceAll(uint32_t dataId, uint32_t machineId, uint64_t sinceTsExclusive, const std::string& batchId, uint16_t maxPerChannel) { ReqSinceAll req; req.dataId = dataId; req.machineId = machineId; req.sinceTsExclusive = sinceTsExclusive; req.maxPerChannel = maxPerChannel; if (!batchId.empty()) { req.flags = SINCE_FLAG_HAS_BATCH; req.batchId = batchId; } else { req.flags = 0; req.batchId.clear(); } socketComm.sendDataSingle(encodeRequestSinceAll(req)); } // ¡ª¡ª ÐÂÔö£ºÅú´ÎÐÅÏ¢À­È¡ ¡ª¡ª // ÏÔʽ dataId void Display::requestBatchInfo(uint32_t dataId, uint32_t machineId) { ReqBatchInfo req; req.dataId = dataId; req.machineId = machineId; socketComm.sendDataSingle(encodeRequestBatchInfo(req)); } // ±ã½Ý£º×Ô¶¯ dataId void Display::requestBatchInfo(uint32_t machineId) { requestBatchInfo(m_nextDataId++, machineId); } // ¡ª¡ª ±ã½Ý½Ó¿Ú£º×Ô¶¯·ÖÅä dataId ¡ª¡ª // Èý×éÖ»ÊǼòµ¥µØÔÚÄÚ²¿µ÷ÓÃÉÏÊöÏÔʽ°æ void Display::requestMachines() { requestMachines(m_nextDataId++); } void Display::requestStats(uint32_t machineId) { requestStats(m_nextDataId++, machineId); } void Display::requestSince(uint32_t machineId, uint32_t channelId, uint64_t sinceTsExclusive, uint16_t maxCount) { requestSince(m_nextDataId++, machineId, channelId, sinceTsExclusive, maxCount); } void Display::requestSince(uint32_t machineId, uint32_t channelId, uint64_t sinceTsExclusive, const std::string& batchId, uint16_t maxCount) { requestSince(m_nextDataId++, machineId, channelId, sinceTsExclusive, batchId, maxCount); } void Display::requestSinceAll(uint32_t machineId, uint64_t sinceTsExclusive, uint16_t maxPerChannel) { requestSinceAll(m_nextDataId++, machineId, sinceTsExclusive, maxPerChannel); } void Display::requestSinceAll(uint32_t machineId, uint64_t sinceTsExclusive, const std::string& batchId, uint16_t maxPerChannel) { requestSinceAll(m_nextDataId++, machineId, sinceTsExclusive, batchId, maxPerChannel); } // ÊÕ°ü·Ö·¢£¨ÔÚ½ÓÊÕÏß³ÌÀï±»µ÷Óã© void Display::handleRawData(const std::vector& rawData) { // F001 °æ±¾ÏìÓ¦ { RspVersion vrsp; if (decodeResponseVersion(rawData, vrsp)) { if (cbStatus) cbStatus(static_cast(ConnCode::VersionOk), std::string("Server version: ") + vrsp.version); // m_versionOk = true; return; } } // F101 ¡ª¡ª since À­È¡ÏìÓ¦ { RspSince rsp; if (decodeResponseSince(rawData, rsp)) { if (cbSamples) cbSamples(rsp.machineId, rsp.channelId, rsp.lastTsSent, rsp.more, rsp.samples); return; } } // F103 ¡ª¡ª ͳ¼Æ/ͨµÀ { RspStats st; if (decodeResponseStats(rawData, st)) { if (cbStats) cbStats(st.machineId, st.channels); return; } } // F104 ¡ª¡ª »ų́Áбí { RspMachines ms; if (decodeResponseMachines(rawData, ms)) { if (cbMachines) cbMachines(ms.machines); return; } } // F105 ¡ª¡ª ¶àͨµÀÔöÁ¿ { RspSinceAll ra; if (decodeResponseSinceAll(rawData, ra)) { if (cbSamplesMulti) cbSamplesMulti(ra.machineId, ra.moreAny, ra.blocks); return; } } // ¡ï ÐÂÔö£ºF120 ¡ª¡ª Åú´ÎÐÅÏ¢ { RspBatchInfo bi; if (decodeResponseBatchInfo(rawData, bi)) { if (cbBatchInfo) cbBatchInfo(bi); return; } } // ¡ï ÐÂÔö£ºE100 ¡ª¡ª ͳһ´íÎó£¨×ÔÓú£© { RspError er; if (decodeResponseError(rawData, er)) { if (cbStatus) { std::string s = "ERR ref=0x" + [](uint16_t x) { char buf[8]; std::snprintf(buf, sizeof(buf), "%04X", (unsigned)x); return std::string(buf); }(er.refCmd) + " mid=" + std::to_string(er.machineId) + " code=" + std::to_string((unsigned)er.code) + " msg=" + er.message; cbStatus(static_cast(ConnCode::SocketError), s); } // ´íÎó×ÔÓú£ºNoActive / Mismatch ¡ú À­Ò»´Î BatchInfo if (er.code == ErrCode::NoActiveBatch || er.code == ErrCode::BatchMismatch) { requestBatchInfo(er.machineId); } return; } } // ÆäËüÀàÐÍ£¨½«À´À©Õ¹£©¡­¡­ }