#include "Display.h"
|
#include <iostream>
|
#include "../DAQConfig.h"
|
#include "../core/ConnEvents.h"
|
using namespace DAQEvt;
|
|
// ¹«¹²ÐÒéÍ·
|
#include "../proto/Protocol.h"
|
#include "../proto/ProtocolCodec.h"
|
#include "../net/FrameAssembler.h"
|
|
using namespace Proto;
|
|
Display::Display() {}
|
Display::~Display() {
|
stopRecvLoop();
|
disconnect();
|
}
|
|
void Display::connectServer(const std::string& ip, uint16_t port) {
|
if (socketComm.createClientSocket(ip, port)) {
|
if (cbStatus) cbStatus(static_cast<int>(ConnCode::DisplayConnected), "Display connected to server");
|
onConnectionEstablished();
|
|
// Á¬½ÓºóÁ¢¿Ì·¢ËͰ汾ÇëÇó£¨0x0001£©£¬´ø dataId »ØÏÔ
|
ReqVersion vreq;
|
vreq.dataId = m_nextDataId++;
|
auto pkt = encodeRequestVersion(vreq);
|
socketComm.sendDataSingle(pkt);
|
|
startRecvLoop(DAQCfg::RecvLoopIntervalMs);
|
}
|
}
|
|
void Display::disconnect() {
|
stopRecvLoop();
|
socketComm.closeSocket();
|
if (cbStatus) cbStatus(static_cast<int>(ConnCode::DisplayDisconnected), "Display disconnected");
|
onConnectionLost();
|
}
|
|
void Display::startRecvLoop(uint32_t intervalMs) {
|
if (recvRunning.load()) return;
|
recvRunning.store(true);
|
recvThread = std::thread([this, intervalMs]() {
|
std::vector<uint8_t> chunk;
|
std::vector<uint8_t> frame;
|
FrameAssembler fr;
|
|
while (recvRunning.load()) {
|
chunk.clear();
|
if (socketComm.recvSingle(chunk) && !chunk.empty()) {
|
if (rawDumpEnabled && cbRaw) cbRaw(chunk);
|
fr.push(chunk);
|
while (fr.nextFrame(frame)) {
|
if (rawDumpEnabled && cbRaw) cbRaw(frame);
|
handleRawData(frame);
|
}
|
}
|
std::this_thread::sleep_for(std::chrono::milliseconds(intervalMs));
|
}
|
});
|
}
|
|
void Display::stopRecvLoop() {
|
if (!recvRunning.load()) return;
|
recvRunning.store(false);
|
if (recvThread.joinable()) recvThread.join();
|
}
|
|
void Display::sendSampleData(double) { /* ¿Í»§¶ËÕâ±ßÔݲ»·¢ */ }
|
void Display::sendWindowData(const std::vector<std::string>&) { /* ͬÉÏ */ }
|
|
void Display::onConnectionEstablished() { std::cout << "[Display] connected\n"; }
|
void Display::onConnectionLost() { std::cout << "[Display] disconnected\n"; }
|
|
// ¡ª¡ª µÍ²ã½Ó¿Ú£ºÏÔʽ dataId ¡ª¡ª
|
|
// ·¢ËÍ 0x0101£¨ÔöÁ¿ÀÈ¡£©¡ª ԰棨²»´ø batchId£©
|
void Display::requestSince(uint32_t dataId, uint32_t machineId, uint32_t channelId,
|
uint64_t sinceTsExclusive, uint16_t maxCount) {
|
// µ÷µ½´ø batchId µÄÖØÔØ£¬´«¿Õ´® => ²»¸½¼Ó batchId
|
requestSince(dataId, machineId, channelId, sinceTsExclusive, std::string(), maxCount);
|
}
|
|
// ·¢ËÍ 0x0101£¨ÔöÁ¿ÀÈ¡£©¡ª ¡ï Уº´ø batchId
|
void Display::requestSince(uint32_t dataId, uint32_t machineId, uint32_t channelId,
|
uint64_t sinceTsExclusive, const std::string& batchId, uint16_t maxCount) {
|
ReqSince req;
|
req.dataId = dataId;
|
req.machineId = machineId;
|
req.channelId = channelId;
|
req.sinceTsExclusive = sinceTsExclusive;
|
req.maxCount = maxCount;
|
if (!batchId.empty()) {
|
req.flags = SINCE_FLAG_HAS_BATCH;
|
req.batchId = batchId;
|
}
|
else {
|
req.flags = 0;
|
req.batchId.clear();
|
}
|
auto pkt = encodeRequestSince(req);
|
socketComm.sendDataSingle(pkt);
|
}
|
|
void Display::requestMachines(uint32_t dataId) {
|
ReqMachines req; req.dataId = dataId;
|
auto pkt = encodeRequestMachines(req);
|
socketComm.sendDataSingle(pkt);
|
}
|
|
void Display::requestStats(uint32_t dataId, uint32_t machineId) {
|
ReqStats req; req.dataId = dataId; req.machineId = machineId; req.flags = 0;
|
auto pkt = encodeRequestStats(req);
|
socketComm.sendDataSingle(pkt);
|
}
|
|
// Õû»ú¶àͨµÀÔöÁ¿ ¡ª ԰棨²»´ø batchId£©
|
void Display::requestSinceAll(uint32_t dataId, uint32_t machineId, uint64_t sinceTsExclusive,
|
uint16_t maxPerChannel) {
|
// µ÷µ½´ø batchId µÄÖØÔØ£¬´«¿Õ´®
|
requestSinceAll(dataId, machineId, sinceTsExclusive, std::string(), maxPerChannel);
|
}
|
|
// Õû»ú¶àͨµÀÔöÁ¿ ¡ª ¡ï Уº´ø batchId
|
void Display::requestSinceAll(uint32_t dataId, uint32_t machineId, uint64_t sinceTsExclusive,
|
const std::string& batchId, uint16_t maxPerChannel) {
|
ReqSinceAll req;
|
req.dataId = dataId;
|
req.machineId = machineId;
|
req.sinceTsExclusive = sinceTsExclusive;
|
req.maxPerChannel = maxPerChannel;
|
if (!batchId.empty()) {
|
req.flags = SINCE_FLAG_HAS_BATCH;
|
req.batchId = batchId;
|
}
|
else {
|
req.flags = 0;
|
req.batchId.clear();
|
}
|
socketComm.sendDataSingle(encodeRequestSinceAll(req));
|
}
|
|
// ¡ª¡ª ÐÂÔö£ºÅú´ÎÐÅÏ¢ÀÈ¡ ¡ª¡ª
|
// ÏÔʽ dataId
|
void Display::requestBatchInfo(uint32_t dataId, uint32_t machineId) {
|
ReqBatchInfo req; req.dataId = dataId; req.machineId = machineId;
|
socketComm.sendDataSingle(encodeRequestBatchInfo(req));
|
}
|
// ±ã½Ý£º×Ô¶¯ dataId
|
void Display::requestBatchInfo(uint32_t machineId) {
|
requestBatchInfo(m_nextDataId++, machineId);
|
}
|
|
// ¡ª¡ª ±ã½Ý½Ó¿Ú£º×Ô¶¯·ÖÅä dataId ¡ª¡ª
|
// Èý×éÖ»ÊǼòµ¥µØÔÚÄÚ²¿µ÷ÓÃÉÏÊöÏÔʽ°æ
|
|
void Display::requestMachines() {
|
requestMachines(m_nextDataId++);
|
}
|
|
void Display::requestStats(uint32_t machineId) {
|
requestStats(m_nextDataId++, machineId);
|
}
|
|
void Display::requestSince(uint32_t machineId, uint32_t channelId,
|
uint64_t sinceTsExclusive, uint16_t maxCount) {
|
requestSince(m_nextDataId++, machineId, channelId, sinceTsExclusive, maxCount);
|
}
|
|
void Display::requestSince(uint32_t machineId, uint32_t channelId,
|
uint64_t sinceTsExclusive, const std::string& batchId, uint16_t maxCount) {
|
requestSince(m_nextDataId++, machineId, channelId, sinceTsExclusive, batchId, maxCount);
|
}
|
|
void Display::requestSinceAll(uint32_t machineId, uint64_t sinceTsExclusive, uint16_t maxPerChannel) {
|
requestSinceAll(m_nextDataId++, machineId, sinceTsExclusive, maxPerChannel);
|
}
|
|
void Display::requestSinceAll(uint32_t machineId, uint64_t sinceTsExclusive,
|
const std::string& batchId, uint16_t maxPerChannel) {
|
requestSinceAll(m_nextDataId++, machineId, sinceTsExclusive, batchId, maxPerChannel);
|
}
|
|
// ÊÕ°ü·Ö·¢£¨ÔÚ½ÓÊÕÏß³ÌÀï±»µ÷Óã©
|
void Display::handleRawData(const std::vector<uint8_t>& rawData) {
|
// F001 °æ±¾ÏìÓ¦
|
{
|
RspVersion vrsp;
|
if (decodeResponseVersion(rawData, vrsp)) {
|
if (cbStatus) cbStatus(static_cast<int>(ConnCode::VersionOk),
|
std::string("Server version: ") + vrsp.version);
|
// m_versionOk = true;
|
return;
|
}
|
}
|
|
// F101 ¡ª¡ª since ÀÈ¡ÏìÓ¦
|
{
|
RspSince rsp;
|
if (decodeResponseSince(rawData, rsp)) {
|
if (cbSamples) cbSamples(rsp.machineId, rsp.channelId, rsp.lastTsSent, rsp.more, rsp.samples);
|
return;
|
}
|
}
|
|
// F103 ¡ª¡ª ͳ¼Æ/ͨµÀ
|
{
|
RspStats st;
|
if (decodeResponseStats(rawData, st)) {
|
if (cbStats) cbStats(st.machineId, st.channels);
|
return;
|
}
|
}
|
|
// F104 ¡ª¡ª »ų́Áбí
|
{
|
RspMachines ms;
|
if (decodeResponseMachines(rawData, ms)) {
|
if (cbMachines) cbMachines(ms.machines);
|
return;
|
}
|
}
|
|
// F105 ¡ª¡ª ¶àͨµÀÔöÁ¿
|
{
|
RspSinceAll ra;
|
if (decodeResponseSinceAll(rawData, ra)) {
|
if (cbSamplesMulti) cbSamplesMulti(ra.machineId, ra.moreAny, ra.blocks);
|
return;
|
}
|
}
|
|
// ¡ï ÐÂÔö£ºF120 ¡ª¡ª Åú´ÎÐÅÏ¢
|
{
|
RspBatchInfo bi;
|
if (decodeResponseBatchInfo(rawData, bi)) {
|
if (cbBatchInfo) cbBatchInfo(bi);
|
return;
|
}
|
}
|
|
// ¡ï ÐÂÔö£ºE100 ¡ª¡ª ͳһ´íÎó£¨×ÔÓú£©
|
{
|
RspError er;
|
if (decodeResponseError(rawData, er)) {
|
if (cbStatus) {
|
std::string s = "ERR ref=0x" + [](uint16_t x) {
|
char buf[8]; std::snprintf(buf, sizeof(buf), "%04X", (unsigned)x); return std::string(buf);
|
}(er.refCmd) +
|
" mid=" + std::to_string(er.machineId) +
|
" code=" + std::to_string((unsigned)er.code) +
|
" msg=" + er.message;
|
cbStatus(static_cast<int>(ConnCode::SocketError), s);
|
}
|
// ´íÎó×ÔÓú£ºNoActive / Mismatch ¡ú ÀÒ»´Î BatchInfo
|
if (er.code == ErrCode::NoActiveBatch || er.code == ErrCode::BatchMismatch) {
|
requestBatchInfo(er.machineId);
|
}
|
return;
|
}
|
}
|
|
// ÆäËüÀàÐÍ£¨½«À´À©Õ¹£©¡¡
|
}
|