#include "Collector.h" #include #include #include "../DAQConfig.h" #include "../core/ConnEvents.h" using namespace DAQEvt; // ЭÒé±à½âÂë #include "../proto/Protocol.h" #include "../proto/ProtocolCodec.h" using namespace Proto; Collector::Collector() {} void Collector::startLoop(uint32_t intervalMs) { if (running.load()) return; running.store(true); worker = std::thread([this, intervalMs]() { while (running.load()) { this->poll(); std::this_thread::sleep_for(std::chrono::milliseconds(intervalMs)); } }); } void Collector::stopLoop() { if (!running.load()) return; running.store(false); if (worker.joinable()) worker.join(); } Collector::~Collector() { stopLoop(); disconnect(); } std::vector Collector::getClientList() { std::lock_guard lk(mClients); std::vector out; out.reserve(clients.size()); for (const auto& c : clients) { out.push_back(ClientSummary{ c.ip, c.port, c.versionOk, c.sock }); } return out; } void Collector::createServer(uint16_t port) { if (socketComm.createServerSocket(port)) { if (cbStatus) cbStatus(static_cast(DAQEvt::ConnCode::ServerListening), "Collector server listening on port " + std::to_string(port)); onConnectionEstablished(); } } void Collector::disconnect() { { std::lock_guard lk(mClients); for (auto& c : clients) { socketComm.closeClient(c.sock); if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false); } clients.clear(); } socketComm.closeSocket(); if (cbStatus) cbStatus(static_cast(DAQEvt::ConnCode::ServerStopped), "Collector server stopped"); onConnectionLost(); } void Collector::poll() { // 1) ½Óпͻ§¶Ë while (true) { SOCKET cs; std::string ip; uint16_t port; if (!socketComm.acceptOne(cs, ip, port)) break; ClientInfo ci; ci.sock = cs; ci.ip = ip; ci.port = port; ci.tsConnected = std::chrono::steady_clock::now(); { std::lock_guard lk(mClients); clients.push_back(ci); } if (cbStatus) cbStatus(static_cast(DAQEvt::ConnCode::ClientAccepted), "Client connected: " + ip + ":" + std::to_string(port)); if (cbClientEvent) cbClientEvent(ip, port, /*connected*/true); } // 2) ÂÖѯ¸÷¿Í»§¶Ë auto now = std::chrono::steady_clock::now(); std::lock_guard lk(mClients); for (size_t i = 0; i < clients.size(); ) { auto& c = clients[i]; // 2a) °æ±¾ÎÕÊÖ³¬Ê± if (!c.versionOk) { auto elapsed = std::chrono::duration_cast(now - c.tsConnected).count(); if (elapsed >= DAQCfg::KickIfNoVersionSec) { if (cbStatus) cbStatus(static_cast(DAQEvt::ConnCode::VersionTimeoutKick), "Kick client (no version within 5s): " + c.ip + ":" + std::to_string(c.port)); socketComm.closeClient(c.sock); if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false); clients.erase(clients.begin() + i); continue; } } // 2b) ÊÕÊý¾Ý std::vector buf; bool peerClosed = false; if (socketComm.recvFrom(c.sock, buf, peerClosed)) { if (!buf.empty()) { if (rawDumpEnabled && cbRaw) cbRaw(buf); // Ö¡×é×° c.fr.push(buf); std::vector frame; while (c.fr.nextFrame(frame)) { if (rawDumpEnabled && cbRaw) cbRaw(frame); handleClientData(c, frame); } } } else if (peerClosed) { // ¶Ô¶ËÖ÷¶¯¶Ï¿ª if (cbStatus) cbStatus(static_cast(DAQEvt::ConnCode::ClientDisconnected), "Client disconnected: " + c.ip + ":" + std::to_string(c.port)); socketComm.closeClient(c.sock); if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false); clients.erase(clients.begin() + i); continue; } ++i; } } void Collector::sendSampleData(double sample) { (void)sample; #ifdef _DEBUG if (cbStatus) cbStatus(0, "sendSampleData() no-op."); #endif } void Collector::sendWindowData(const std::vector& fields) { (void)fields; #ifdef _DEBUG if (cbStatus) cbStatus(0, "sendWindowData() no-op."); #endif } void Collector::onConnectionEstablished() { std::cout << "[Collector] ready.\n"; } void Collector::onConnectionLost() { std::cout << "[Collector] stopped.\n"; } // ¼æÈݾÉÃŽûº¯Êý bool Collector::isVersionRequest(const std::vector& data) const { if (data.size() < 4 + 4 + 2 + 2 + 1) return false; if (!(data[0] == 0x11 && data[1] == 0x88 && data[2] == 0x11 && data[3] == 0x88)) return false; size_t cmdPos = 4 + 4 + 2; uint16_t cmd = (uint16_t(data[cmdPos]) << 8) | data[cmdPos + 1]; return (cmd == CMD_REQ_VERSION); } // ========== BufferRegistry ֱͨ ========== BufferManager& Collector::registryAddMachine(uint32_t machineId, const std::string& name, const RetentionPolicy& defPolicy) { return registry_.getOrCreate(machineId, name, defPolicy); } BufferManager* Collector::registryFind(uint32_t machineId) { return registry_.find(machineId); } const BufferManager* Collector::registryFind(uint32_t machineId) const { return registry_.find(machineId); } std::vector Collector::registryListMachines() const { return registry_.listManagers(); } void Collector::buffersPush(uint32_t machineId, uint32_t channelId, int64_t ts_ms, double v) { auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId)); bm.push(channelId, ts_ms, v); // δ start() ʱ½«±»ÄÚ²¿ºöÂÔ } std::vector Collector::buffersGetSince(uint32_t machineId, uint32_t channelId, int64_t tsExclusive, size_t maxCount) const { auto* bm = registry_.find(machineId); if (!bm) return {}; return bm->getSince(channelId, tsExclusive, maxCount); } std::vector Collector::buffersGetRange(uint32_t machineId, uint32_t channelId, int64_t from_ts, int64_t to_ts, size_t maxCount) const { auto* bm = registry_.find(machineId); if (!bm) return {}; return bm->getRange(channelId, from_ts, to_ts, maxCount); } void Collector::buffersStart(uint32_t machineId) { auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId)); bm.start(); // Çå¿Õ + ÔÊÐíд } void Collector::buffersStop(uint32_t machineId) { if (auto* bm = registry_.find(machineId)) bm->stop(); } void Collector::buffersClear(uint32_t machineId) { if (auto* bm = registry_.find(machineId)) bm->clearAll(); } bool Collector::buffersIsRunning(uint32_t machineId) const { auto* bm = registry_.find(machineId); return bm ? bm->isRunning() : false; } void Collector::buffersSetDefaultPolicy(uint32_t machineId, const RetentionPolicy& p, bool applyExisting) { auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId)); bm.setDefaultPolicy(p, applyExisting); } void Collector::buffersSetPolicy(uint32_t machineId, uint32_t channelId, const RetentionPolicy& p) { auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId)); bm.setPolicy(channelId, p); } void Collector::buffersSetChannelName(uint32_t machineId, uint32_t channelId, const std::string& name) { auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId)); bm.setChannelName(channelId, name); } std::string Collector::buffersGetChannelName(uint32_t machineId, uint32_t channelId) const { auto* bm = registry_.find(machineId); if (!bm) return {}; return bm->getChannelName(channelId); } std::vector Collector::buffersListChannelInfos(uint32_t machineId) const { auto* bm = registry_.find(machineId); if (!bm) return {}; return bm->listChannelInfos(); } std::vector Collector::buffersListChannelStats(uint32_t machineId) const { auto* bm = registry_.find(machineId); if (!bm) return {}; return bm->listChannelStats(); } BufferManager::ChannelStat Collector::buffersGetChannelStat(uint32_t machineId, uint32_t channelId) const { auto* bm = registry_.find(machineId); if (!bm) return BufferManager::ChannelStat{ channelId, 0, 0, 0 }; return bm->getChannelStat(channelId); } // ====== Åú´Î¹ÜÀí ====== // ÐÂʵÏÖ£º¸ù¾Ý expectedDurationMs ¼ÆËã expectedEndTs void Collector::batchStart(uint32_t machineId, const std::string& batchId, uint64_t expectedDurationMs) { const uint64_t startTs = nowMs(); const uint64_t expectedEndTs = (expectedDurationMs > 0) ? (startTs + expectedDurationMs) : 0; { std::lock_guard lk(mBatches); auto& br = batches_[machineId]; br.state = BatchState::Active; br.activeBatchId = batchId; br.activeStartTs = startTs; br.expectedEndTs = expectedEndTs; } // Åú´Î¿ªÊ¼£ºÇå¿Õ²¢¿ªÊ¼²É¼¯ buffersStart(machineId); if (cbStatus) cbStatus(0, "batchStart: machine " + std::to_string(machineId) + " batch=" + batchId + " durMs=" + std::to_string(expectedDurationMs)); } // ¼æÈݾÉÇ©Ãû£ººöÂÔ startTs£»°Ñ expectedEndTs µ±¡°Ê±³¤¡± void Collector::batchStart(uint32_t machineId, const std::string& batchId, uint64_t /*startTs_ignored*/, uint64_t expectedEndTs_orDuration) { // Èôµ÷Ó÷½Îó´«Á˾ø¶ÔµÄ¡°½áÊøÊ±¼ä´Á¡±£¬ÕâÀï»á±»µ±×÷¡°Ê±³¤¡±£» // ÈçÐèÑϸñÇø·Ö£¬ÄãÒ²¿ÉÒÔ¼ÓÒ»¸öãÐÖµÅжϣ¨±ÈÈç´óÓÚ 10^12 ÈÏΪÊǾø¶Ôʱ¼ä´Á£© // È»ºóת»»£ºduration = max(0, endTs - now) uint64_t durationMs = expectedEndTs_orDuration; // ¿ÉÑ¡£º×ö¸ö¼òµ¥µÄ¡°Ïñ¾ø¶Ôʱ¼ä´Á¡±µÄÅжϲ¢×Ô¶¯×ªÎªÊ±³¤ // ÒÔºÁÃëʱ¼ä´ÁãÐÖµ¾ÙÀý£¨~2001-09-09£©£º1e12 if (expectedEndTs_orDuration > 1000000000000ULL) { uint64_t now = nowMs(); if (expectedEndTs_orDuration > now) durationMs = expectedEndTs_orDuration - now; else durationMs = 0; // ÒѹýÆÚ£¬µ±Î´Öª´¦Àí } batchStart(machineId, batchId, durationMs); } void Collector::batchStop(uint32_t machineId, uint64_t /*endTs*/) { { std::lock_guard lk(mBatches); auto it = batches_.find(machineId); if (it != batches_.end()) { it->second.state = BatchState::Idle; it->second.activeBatchId.clear(); it->second.activeStartTs = 0; it->second.expectedEndTs = 0; } else { batches_[machineId] = BatchRec{}; // Éú³ÉÒ»Ìõ Idle } } // Í£Ö¹²É¼¯£¬µ«²»ÇåÊý¾Ý buffersStop(machineId); if (cbStatus) cbStatus(0, "batchStop: machine " + std::to_string(machineId)); } Collector::BatchRec Collector::getBatch(uint32_t machineId) const { std::lock_guard lk(mBatches); auto it = batches_.find(machineId); if (it != batches_.end()) return it->second; return BatchRec{}; // ĬÈÏ Idle } // ========== ÒµÎñ·Ö·¢ ========== void Collector::handleClientData(ClientInfo& c, const std::vector& data) { // °æ±¾Ð£Ñé if (!c.versionOk) { ReqVersion vreq{}; if (decodeRequestVersion(data, vreq)) { c.versionOk = true; if (cbStatus) cbStatus(static_cast(DAQEvt::ConnCode::VersionOk), "Client " + c.ip + ":" + std::to_string(c.port) + " version OK"); RspVersion vrst; vrst.dataId = vreq.dataId; // »ØÏÔ vrst.version = DAQCfg::Version; // ¼¯ÖÐÅäÖà auto rsp = encodeResponseVersion(vrst); socketComm.sendTo(c.sock, rsp); } return; // δͨ¹ý°æ±¾Ð£Ñ飬ÆäËüºöÂÔ } // ͨ¹ý°æ±¾Ð£Ñéºó£º°´ cmd ·Ö·¢ const uint16_t cmd = peek_cmd(data); // 0x0101 ¡ª¡ª µ¥Í¨µÀÔöÁ¿À­È¡ if (cmd == CMD_REQ_SINCE) { ReqSince req; if (!decodeRequestSince(data, req)) return; auto vec = buffersGetSince(req.machineId, req.channelId, static_cast(req.sinceTsExclusive), req.maxCount); uint64_t lastTsSent = req.sinceTsExclusive; if (!vec.empty()) lastTsSent = static_cast(vec.back().ts_ms); auto stat = buffersGetChannelStat(req.machineId, req.channelId); uint8_t more = (stat.latestTs > static_cast(lastTsSent)) ? 1 : 0; RspSince rsp; rsp.dataId = req.dataId; // »ØÏÔ rsp.machineId = req.machineId; rsp.channelId = req.channelId; rsp.lastTsSent = lastTsSent; rsp.more = more; rsp.samples.reserve(vec.size()); for (auto& s : vec) rsp.samples.push_back({ static_cast(s.ts_ms), s.value }); auto pkt = encodeResponseSince(rsp); socketComm.sendTo(c.sock, pkt); return; } // 0x0103 ¡ª¡ª ͳ¼Æ/ͨµÀÁбí if (cmd == CMD_REQ_STATS) { ReqStats req; if (!decodeRequestStats(data, req)) return; auto* bm = registry_.find(req.machineId); if (!bm) { // ʾÀý£º¿É»Ø´íÎóÖ¡ // sendErrorTo(c, req.dataId, CMD_REQ_STATS, req.machineId, ErrCode::NoActiveBatch, "machine not found"); return; } RspStats rsp; rsp.dataId = req.dataId; rsp.machineId = req.machineId; auto stats = bm->listChannelStats(); rsp.channels.reserve(stats.size()); for (auto& s : stats) { ChannelStatInfo ci; ci.channelId = s.id; ci.earliestTs = static_cast(s.earliestTs); ci.latestTs = static_cast(s.latestTs); ci.size = static_cast(s.size); ci.name = bm->getChannelName(s.id); // UTF-8 rsp.channels.push_back(std::move(ci)); } auto pkt = encodeResponseStats(rsp); socketComm.sendTo(c.sock, pkt); return; } // 0x0104 ¡ª¡ª »ų́Áбí if (cmd == CMD_REQ_MACHINES) { ReqMachines req; if (!decodeRequestMachines(data, req)) return; RspMachines rsp; rsp.dataId = req.dataId; auto mids = registry_.listManagers(); rsp.machines.reserve(mids.size()); for (auto id : mids) { auto* bm = registry_.find(id); if (!bm) continue; rsp.machines.push_back(MachineInfo{ id, bm->name() }); } auto pkt = encodeResponseMachines(rsp); socketComm.sendTo(c.sock, pkt); return; } // 0x0105 ¡ª¡ª Õû»ú¶àͨµÀÔöÁ¿À­È¡ if (cmd == CMD_REQ_SINCE_ALL) { ReqSinceAll req; if (!decodeRequestSinceAll(data, req)) return; RspSinceAll rsp; rsp.dataId = req.dataId; // »ØÏÔ rsp.machineId = req.machineId; rsp.moreAny = 0; auto* bm = registry_.find(req.machineId); if (!bm) { auto pkt = encodeResponseSinceAll(rsp); socketComm.sendTo(c.sock, pkt); return; } // ±ÜÃⳬ¹ý 2 ×Ö½Ú³¤¶È£ºÕýÎÄ ~60KB constexpr size_t kMaxBody = 60 * 1024; rsp.blocks.clear(); auto stats = bm->listChannelStats(); for (auto& s : stats) { auto vec = bm->getSince(s.id, static_cast(req.sinceTsExclusive), req.maxPerChannel); if (vec.empty()) continue; ChannelBlock blk; blk.channelId = s.id; blk.lastTsSent = static_cast(vec.back().ts_ms); auto st = bm->getChannelStat(s.id); blk.more = (st.latestTs > static_cast(blk.lastTsSent)) ? 1 : 0; blk.samples.reserve(vec.size()); for (auto& sm : vec) { blk.samples.push_back({ static_cast(sm.ts_ms), sm.value }); } rsp.blocks.push_back(std::move(blk)); auto tentative = encodeResponseSinceAll(rsp); if (tentative.size() > (4 + 4 + 2) + kMaxBody + 1) { // ³¬Ìå»ý£º°Ñ×îºóÒ»¿éÄóöÀ´£¬ÏÈ·¢Ç°ÃæµÄ auto last = std::move(rsp.blocks.back()); rsp.blocks.pop_back(); auto pkt = encodeResponseSinceAll(rsp); socketComm.sendTo(c.sock, pkt); rsp.blocks.clear(); rsp.blocks.push_back(std::move(last)); } } rsp.moreAny = 0; for (const auto& b : rsp.blocks) { if (b.more) { rsp.moreAny = 1; break; } } auto pkt = encodeResponseSinceAll(rsp); socketComm.sendTo(c.sock, pkt); return; } // 0x0120 ¡ª¡ª Åú´ÎÐÅÏ¢ if (cmd == CMD_REQ_BATCH_INFO) { ReqBatchInfo req; if (!decodeRequestBatchInfo(data, req)) return; RspBatchInfo rsp; rsp.dataId = req.dataId; rsp.machineId = req.machineId; const auto br = getBatch(req.machineId); rsp.state = br.state; rsp.activeBatchId = br.activeBatchId; rsp.activeStartTs = br.activeStartTs; rsp.expectedEndTs = br.expectedEndTs; auto pkt = encodeResponseBatchInfo(rsp); socketComm.sendTo(c.sock, pkt); return; } // ÆäËüÖ¸Á°´ÐèÀ©Õ¹ } // ͳһ´íÎ󻨰ü void Collector::sendErrorTo(ClientInfo& c, uint32_t dataId, uint16_t refCmd, uint32_t machineId, ErrCode code, const std::string& message) { RspError er; er.dataId = dataId; er.refCmd = refCmd; er.machineId = machineId; er.code = code; er.message = message; auto pkt = encodeResponseError(er); socketComm.sendTo(c.sock, pkt); } // ¹¤¾ß£ºµ±Ç° epoch ºÁÃë uint64_t Collector::nowMs() { using namespace std::chrono; return duration_cast(system_clock::now().time_since_epoch()).count(); }