#include "Collector.h"
|
#include <iostream>
|
#include <chrono>
|
|
#include "../DAQConfig.h"
|
#include "../core/ConnEvents.h"
|
using namespace DAQEvt;
|
|
// ÐÒé±à½âÂë
|
#include "../proto/Protocol.h"
|
#include "../proto/ProtocolCodec.h"
|
|
using namespace Proto;
|
|
Collector::Collector() {}
|
|
void Collector::startLoop(uint32_t intervalMs) {
|
if (running.load()) return;
|
running.store(true);
|
worker = std::thread([this, intervalMs]() {
|
while (running.load()) {
|
this->poll();
|
std::this_thread::sleep_for(std::chrono::milliseconds(intervalMs));
|
}
|
});
|
}
|
|
void Collector::stopLoop() {
|
if (!running.load()) return;
|
running.store(false);
|
if (worker.joinable()) worker.join();
|
}
|
|
Collector::~Collector() {
|
stopLoop();
|
disconnect();
|
}
|
|
std::vector<Collector::ClientSummary> Collector::getClientList() {
|
std::lock_guard<std::mutex> lk(mClients);
|
std::vector<ClientSummary> out;
|
out.reserve(clients.size());
|
for (const auto& c : clients) {
|
out.push_back(ClientSummary{ c.ip, c.port, c.versionOk, c.sock });
|
}
|
return out;
|
}
|
|
void Collector::createServer(uint16_t port) {
|
if (socketComm.createServerSocket(port)) {
|
if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ServerListening),
|
"Collector server listening on port " + std::to_string(port));
|
onConnectionEstablished();
|
}
|
}
|
|
void Collector::disconnect() {
|
{
|
std::lock_guard<std::mutex> lk(mClients);
|
for (auto& c : clients) {
|
socketComm.closeClient(c.sock);
|
if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false);
|
}
|
clients.clear();
|
}
|
socketComm.closeSocket();
|
if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ServerStopped), "Collector server stopped");
|
onConnectionLost();
|
}
|
|
void Collector::poll() {
|
// 1) ½Óпͻ§¶Ë
|
while (true) {
|
SOCKET cs; std::string ip; uint16_t port;
|
if (!socketComm.acceptOne(cs, ip, port)) break;
|
|
ClientInfo ci;
|
ci.sock = cs; ci.ip = ip; ci.port = port;
|
ci.tsConnected = std::chrono::steady_clock::now();
|
|
{
|
std::lock_guard<std::mutex> lk(mClients);
|
clients.push_back(ci);
|
}
|
|
if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ClientAccepted),
|
"Client connected: " + ip + ":" + std::to_string(port));
|
if (cbClientEvent) cbClientEvent(ip, port, /*connected*/true);
|
}
|
|
// 2) ÂÖѯ¸÷¿Í»§¶Ë
|
auto now = std::chrono::steady_clock::now();
|
std::lock_guard<std::mutex> lk(mClients);
|
for (size_t i = 0; i < clients.size(); ) {
|
auto& c = clients[i];
|
|
// 2a) °æ±¾ÎÕÊÖ³¬Ê±
|
if (!c.versionOk) {
|
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - c.tsConnected).count();
|
if (elapsed >= DAQCfg::KickIfNoVersionSec) {
|
if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::VersionTimeoutKick),
|
"Kick client (no version within 5s): " + c.ip + ":" + std::to_string(c.port));
|
socketComm.closeClient(c.sock);
|
if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false);
|
clients.erase(clients.begin() + i);
|
continue;
|
}
|
}
|
|
// 2b) ÊÕÊý¾Ý
|
std::vector<uint8_t> buf;
|
bool peerClosed = false;
|
if (socketComm.recvFrom(c.sock, buf, peerClosed)) {
|
if (!buf.empty()) {
|
if (rawDumpEnabled && cbRaw) cbRaw(buf);
|
|
// Ö¡×é×°
|
c.fr.push(buf);
|
std::vector<uint8_t> frame;
|
while (c.fr.nextFrame(frame)) {
|
if (rawDumpEnabled && cbRaw) cbRaw(frame);
|
handleClientData(c, frame);
|
}
|
}
|
}
|
else if (peerClosed) {
|
// ¶Ô¶ËÖ÷¶¯¶Ï¿ª
|
if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::ClientDisconnected),
|
"Client disconnected: " + c.ip + ":" + std::to_string(c.port));
|
socketComm.closeClient(c.sock);
|
if (cbClientEvent) cbClientEvent(c.ip, c.port, /*connected*/false);
|
clients.erase(clients.begin() + i);
|
continue;
|
}
|
|
++i;
|
}
|
}
|
|
void Collector::sendSampleData(double sample) {
|
(void)sample;
|
#ifdef _DEBUG
|
if (cbStatus) cbStatus(0, "sendSampleData() no-op.");
|
#endif
|
}
|
|
void Collector::sendWindowData(const std::vector<std::string>& fields) {
|
(void)fields;
|
#ifdef _DEBUG
|
if (cbStatus) cbStatus(0, "sendWindowData() no-op.");
|
#endif
|
}
|
|
void Collector::onConnectionEstablished() {
|
std::cout << "[Collector] ready.\n";
|
}
|
void Collector::onConnectionLost() {
|
std::cout << "[Collector] stopped.\n";
|
}
|
|
// ¼æÈݾÉÃŽûº¯Êý
|
bool Collector::isVersionRequest(const std::vector<uint8_t>& data) const {
|
if (data.size() < 4 + 4 + 2 + 2 + 1) return false;
|
if (!(data[0] == 0x11 && data[1] == 0x88 && data[2] == 0x11 && data[3] == 0x88)) return false;
|
size_t cmdPos = 4 + 4 + 2;
|
uint16_t cmd = (uint16_t(data[cmdPos]) << 8) | data[cmdPos + 1];
|
return (cmd == CMD_REQ_VERSION);
|
}
|
|
// ========== BufferRegistry ֱͨ ==========
|
BufferManager& Collector::registryAddMachine(uint32_t machineId, const std::string& name,
|
const RetentionPolicy& defPolicy) {
|
return registry_.getOrCreate(machineId, name, defPolicy);
|
}
|
BufferManager* Collector::registryFind(uint32_t machineId) { return registry_.find(machineId); }
|
const BufferManager* Collector::registryFind(uint32_t machineId) const { return registry_.find(machineId); }
|
std::vector<uint32_t> Collector::registryListMachines() const { return registry_.listManagers(); }
|
|
void Collector::buffersPush(uint32_t machineId, uint32_t channelId, int64_t ts_ms, double v) {
|
auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
|
bm.push(channelId, ts_ms, v); // δ start() ʱ½«±»ÄÚ²¿ºöÂÔ
|
}
|
|
std::vector<Sample> Collector::buffersGetSince(uint32_t machineId, uint32_t channelId,
|
int64_t tsExclusive, size_t maxCount) const {
|
auto* bm = registry_.find(machineId);
|
if (!bm) return {};
|
return bm->getSince(channelId, tsExclusive, maxCount);
|
}
|
std::vector<Sample> Collector::buffersGetRange(uint32_t machineId, uint32_t channelId,
|
int64_t from_ts, int64_t to_ts, size_t maxCount) const {
|
auto* bm = registry_.find(machineId);
|
if (!bm) return {};
|
return bm->getRange(channelId, from_ts, to_ts, maxCount);
|
}
|
|
void Collector::buffersStart(uint32_t machineId) {
|
auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
|
bm.start(); // Çå¿Õ + ÔÊÐíд
|
}
|
void Collector::buffersStop(uint32_t machineId) {
|
if (auto* bm = registry_.find(machineId)) bm->stop();
|
}
|
void Collector::buffersClear(uint32_t machineId) {
|
if (auto* bm = registry_.find(machineId)) bm->clearAll();
|
}
|
bool Collector::buffersIsRunning(uint32_t machineId) const {
|
auto* bm = registry_.find(machineId);
|
return bm ? bm->isRunning() : false;
|
}
|
|
void Collector::buffersSetDefaultPolicy(uint32_t machineId, const RetentionPolicy& p, bool applyExisting) {
|
auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
|
bm.setDefaultPolicy(p, applyExisting);
|
}
|
void Collector::buffersSetPolicy(uint32_t machineId, uint32_t channelId, const RetentionPolicy& p) {
|
auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
|
bm.setPolicy(channelId, p);
|
}
|
void Collector::buffersSetChannelName(uint32_t machineId, uint32_t channelId, const std::string& name) {
|
auto& bm = registry_.getOrCreate(machineId, "Machine-" + std::to_string(machineId));
|
bm.setChannelName(channelId, name);
|
}
|
std::string Collector::buffersGetChannelName(uint32_t machineId, uint32_t channelId) const {
|
auto* bm = registry_.find(machineId);
|
if (!bm) return {};
|
return bm->getChannelName(channelId);
|
}
|
std::vector<BufferManager::ChannelInfo> Collector::buffersListChannelInfos(uint32_t machineId) const {
|
auto* bm = registry_.find(machineId);
|
if (!bm) return {};
|
return bm->listChannelInfos();
|
}
|
std::vector<BufferManager::ChannelStat> Collector::buffersListChannelStats(uint32_t machineId) const {
|
auto* bm = registry_.find(machineId);
|
if (!bm) return {};
|
return bm->listChannelStats();
|
}
|
BufferManager::ChannelStat Collector::buffersGetChannelStat(uint32_t machineId, uint32_t channelId) const {
|
auto* bm = registry_.find(machineId);
|
if (!bm) return BufferManager::ChannelStat{ channelId, 0, 0, 0 };
|
return bm->getChannelStat(channelId);
|
}
|
|
// ====== Åú´Î¹ÜÀí ======
|
// ÐÂʵÏÖ£º¸ù¾Ý expectedDurationMs ¼ÆËã expectedEndTs
|
void Collector::batchStart(uint32_t machineId,
|
const std::string& batchId,
|
uint64_t expectedDurationMs)
|
{
|
const uint64_t startTs = nowMs();
|
const uint64_t expectedEndTs = (expectedDurationMs > 0) ? (startTs + expectedDurationMs) : 0;
|
|
{
|
std::lock_guard<std::mutex> lk(mBatches);
|
auto& br = batches_[machineId];
|
br.state = BatchState::Active;
|
br.activeBatchId = batchId;
|
br.activeStartTs = startTs;
|
br.expectedEndTs = expectedEndTs;
|
}
|
|
// Åú´Î¿ªÊ¼£ºÇå¿Õ²¢¿ªÊ¼²É¼¯
|
buffersStart(machineId);
|
|
if (cbStatus) cbStatus(0, "batchStart: machine " + std::to_string(machineId) +
|
" batch=" + batchId +
|
" durMs=" + std::to_string(expectedDurationMs));
|
}
|
|
// ¼æÈݾÉÇ©Ãû£ººöÂÔ startTs£»°Ñ expectedEndTs µ±¡°Ê±³¤¡±
|
void Collector::batchStart(uint32_t machineId,
|
const std::string& batchId,
|
uint64_t /*startTs_ignored*/,
|
uint64_t expectedEndTs_orDuration)
|
{
|
// Èôµ÷Ó÷½Îó´«Á˾ø¶ÔµÄ¡°½áÊøÊ±¼ä´Á¡±£¬ÕâÀï»á±»µ±×÷¡°Ê±³¤¡±£»
|
// ÈçÐèÑϸñÇø·Ö£¬ÄãÒ²¿ÉÒÔ¼ÓÒ»¸öãÐÖµÅжϣ¨±ÈÈç´óÓÚ 10^12 ÈÏΪÊǾø¶Ôʱ¼ä´Á£©
|
// È»ºóת»»£ºduration = max(0, endTs - now)
|
|
uint64_t durationMs = expectedEndTs_orDuration;
|
|
// ¿ÉÑ¡£º×ö¸ö¼òµ¥µÄ¡°Ïñ¾ø¶Ôʱ¼ä´Á¡±µÄÅжϲ¢×Ô¶¯×ªÎªÊ±³¤
|
// ÒÔºÁÃëʱ¼ä´ÁãÐÖµ¾ÙÀý£¨~2001-09-09£©£º1e12
|
if (expectedEndTs_orDuration > 1000000000000ULL) {
|
uint64_t now = nowMs();
|
if (expectedEndTs_orDuration > now)
|
durationMs = expectedEndTs_orDuration - now;
|
else
|
durationMs = 0; // ÒѹýÆÚ£¬µ±Î´Öª´¦Àí
|
}
|
|
batchStart(machineId, batchId, durationMs);
|
}
|
|
void Collector::batchStop(uint32_t machineId, uint64_t /*endTs*/) {
|
{
|
std::lock_guard<std::mutex> lk(mBatches);
|
auto it = batches_.find(machineId);
|
if (it != batches_.end()) {
|
it->second.state = BatchState::Idle;
|
it->second.activeBatchId.clear();
|
it->second.activeStartTs = 0;
|
it->second.expectedEndTs = 0;
|
}
|
else {
|
batches_[machineId] = BatchRec{}; // Éú³ÉÒ»Ìõ Idle
|
}
|
}
|
|
// Í£Ö¹²É¼¯£¬µ«²»ÇåÊý¾Ý
|
buffersStop(machineId);
|
|
if (cbStatus) cbStatus(0, "batchStop: machine " + std::to_string(machineId));
|
}
|
|
Collector::BatchRec Collector::getBatch(uint32_t machineId) const {
|
std::lock_guard<std::mutex> lk(mBatches);
|
auto it = batches_.find(machineId);
|
if (it != batches_.end()) return it->second;
|
return BatchRec{}; // ĬÈÏ Idle
|
}
|
|
// ========== ÒµÎñ·Ö·¢ ==========
|
void Collector::handleClientData(ClientInfo& c, const std::vector<uint8_t>& data) {
|
// °æ±¾Ð£Ñé
|
if (!c.versionOk) {
|
ReqVersion vreq{};
|
if (decodeRequestVersion(data, vreq)) {
|
c.versionOk = true;
|
if (cbStatus) cbStatus(static_cast<int>(DAQEvt::ConnCode::VersionOk),
|
"Client " + c.ip + ":" + std::to_string(c.port) + " version OK");
|
|
RspVersion vrst;
|
vrst.dataId = vreq.dataId; // »ØÏÔ
|
vrst.version = DAQCfg::Version; // ¼¯ÖÐÅäÖÃ
|
auto rsp = encodeResponseVersion(vrst);
|
socketComm.sendTo(c.sock, rsp);
|
}
|
return; // δͨ¹ý°æ±¾Ð£Ñ飬ÆäËüºöÂÔ
|
}
|
|
// ͨ¹ý°æ±¾Ð£Ñéºó£º°´ cmd ·Ö·¢
|
const uint16_t cmd = peek_cmd(data);
|
|
// 0x0101 ¡ª¡ª µ¥Í¨µÀÔöÁ¿ÀÈ¡
|
if (cmd == CMD_REQ_SINCE) {
|
ReqSince req;
|
if (!decodeRequestSince(data, req)) return;
|
|
auto vec = buffersGetSince(req.machineId, req.channelId,
|
static_cast<int64_t>(req.sinceTsExclusive), req.maxCount);
|
|
uint64_t lastTsSent = req.sinceTsExclusive;
|
if (!vec.empty()) lastTsSent = static_cast<uint64_t>(vec.back().ts_ms);
|
|
auto stat = buffersGetChannelStat(req.machineId, req.channelId);
|
uint8_t more = (stat.latestTs > static_cast<int64_t>(lastTsSent)) ? 1 : 0;
|
|
RspSince rsp;
|
rsp.dataId = req.dataId; // »ØÏÔ
|
rsp.machineId = req.machineId;
|
rsp.channelId = req.channelId;
|
rsp.lastTsSent = lastTsSent;
|
rsp.more = more;
|
rsp.samples.reserve(vec.size());
|
for (auto& s : vec) rsp.samples.push_back({ static_cast<uint64_t>(s.ts_ms), s.value });
|
|
auto pkt = encodeResponseSince(rsp);
|
socketComm.sendTo(c.sock, pkt);
|
return;
|
}
|
|
// 0x0103 ¡ª¡ª ͳ¼Æ/ͨµÀÁбí
|
if (cmd == CMD_REQ_STATS) {
|
ReqStats req;
|
if (!decodeRequestStats(data, req)) return;
|
|
auto* bm = registry_.find(req.machineId);
|
if (!bm) {
|
// ʾÀý£º¿É»Ø´íÎóÖ¡
|
// sendErrorTo(c, req.dataId, CMD_REQ_STATS, req.machineId, ErrCode::NoActiveBatch, "machine not found");
|
return;
|
}
|
|
RspStats rsp;
|
rsp.dataId = req.dataId;
|
rsp.machineId = req.machineId;
|
|
auto stats = bm->listChannelStats();
|
rsp.channels.reserve(stats.size());
|
for (auto& s : stats) {
|
ChannelStatInfo ci;
|
ci.channelId = s.id;
|
ci.earliestTs = static_cast<uint64_t>(s.earliestTs);
|
ci.latestTs = static_cast<uint64_t>(s.latestTs);
|
ci.size = static_cast<uint32_t>(s.size);
|
ci.name = bm->getChannelName(s.id); // UTF-8
|
rsp.channels.push_back(std::move(ci));
|
}
|
|
auto pkt = encodeResponseStats(rsp);
|
socketComm.sendTo(c.sock, pkt);
|
return;
|
}
|
|
// 0x0104 ¡ª¡ª »ų́Áбí
|
if (cmd == CMD_REQ_MACHINES) {
|
ReqMachines req;
|
if (!decodeRequestMachines(data, req)) return;
|
|
RspMachines rsp;
|
rsp.dataId = req.dataId;
|
|
auto mids = registry_.listManagers();
|
rsp.machines.reserve(mids.size());
|
for (auto id : mids) {
|
auto* bm = registry_.find(id);
|
if (!bm) continue;
|
rsp.machines.push_back(MachineInfo{ id, bm->name() });
|
}
|
|
auto pkt = encodeResponseMachines(rsp);
|
socketComm.sendTo(c.sock, pkt);
|
return;
|
}
|
|
// 0x0105 ¡ª¡ª Õû»ú¶àͨµÀÔöÁ¿ÀÈ¡
|
if (cmd == CMD_REQ_SINCE_ALL) {
|
ReqSinceAll req;
|
if (!decodeRequestSinceAll(data, req)) return;
|
|
RspSinceAll rsp;
|
rsp.dataId = req.dataId; // »ØÏÔ
|
rsp.machineId = req.machineId;
|
rsp.moreAny = 0;
|
|
auto* bm = registry_.find(req.machineId);
|
if (!bm) {
|
auto pkt = encodeResponseSinceAll(rsp);
|
socketComm.sendTo(c.sock, pkt);
|
return;
|
}
|
|
// ±ÜÃⳬ¹ý 2 ×Ö½Ú³¤¶È£ºÕýÎÄ ~60KB
|
constexpr size_t kMaxBody = 60 * 1024;
|
|
rsp.blocks.clear();
|
auto stats = bm->listChannelStats();
|
for (auto& s : stats) {
|
auto vec = bm->getSince(s.id, static_cast<int64_t>(req.sinceTsExclusive), req.maxPerChannel);
|
if (vec.empty()) continue;
|
|
ChannelBlock blk;
|
blk.channelId = s.id;
|
blk.lastTsSent = static_cast<uint64_t>(vec.back().ts_ms);
|
|
auto st = bm->getChannelStat(s.id);
|
blk.more = (st.latestTs > static_cast<int64_t>(blk.lastTsSent)) ? 1 : 0;
|
|
blk.samples.reserve(vec.size());
|
for (auto& sm : vec) {
|
blk.samples.push_back({ static_cast<uint64_t>(sm.ts_ms), sm.value });
|
}
|
|
rsp.blocks.push_back(std::move(blk));
|
auto tentative = encodeResponseSinceAll(rsp);
|
if (tentative.size() > (4 + 4 + 2) + kMaxBody + 1) {
|
// ³¬Ìå»ý£º°Ñ×îºóÒ»¿éÄóöÀ´£¬ÏÈ·¢Ç°ÃæµÄ
|
auto last = std::move(rsp.blocks.back());
|
rsp.blocks.pop_back();
|
|
auto pkt = encodeResponseSinceAll(rsp);
|
socketComm.sendTo(c.sock, pkt);
|
|
rsp.blocks.clear();
|
rsp.blocks.push_back(std::move(last));
|
}
|
}
|
|
rsp.moreAny = 0;
|
for (const auto& b : rsp.blocks) {
|
if (b.more) { rsp.moreAny = 1; break; }
|
}
|
|
auto pkt = encodeResponseSinceAll(rsp);
|
socketComm.sendTo(c.sock, pkt);
|
return;
|
}
|
|
// 0x0120 ¡ª¡ª Åú´ÎÐÅÏ¢
|
if (cmd == CMD_REQ_BATCH_INFO) {
|
ReqBatchInfo req;
|
if (!decodeRequestBatchInfo(data, req)) return;
|
|
RspBatchInfo rsp;
|
rsp.dataId = req.dataId;
|
rsp.machineId = req.machineId;
|
|
const auto br = getBatch(req.machineId);
|
rsp.state = br.state;
|
rsp.activeBatchId = br.activeBatchId;
|
rsp.activeStartTs = br.activeStartTs;
|
rsp.expectedEndTs = br.expectedEndTs;
|
|
auto pkt = encodeResponseBatchInfo(rsp);
|
socketComm.sendTo(c.sock, pkt);
|
return;
|
}
|
|
// ÆäËüÖ¸Á°´ÐèÀ©Õ¹
|
}
|
|
// ͳһ´íÎ󻨰ü
|
void Collector::sendErrorTo(ClientInfo& c,
|
uint32_t dataId,
|
uint16_t refCmd,
|
uint32_t machineId,
|
ErrCode code,
|
const std::string& message)
|
{
|
RspError er;
|
er.dataId = dataId;
|
er.refCmd = refCmd;
|
er.machineId = machineId;
|
er.code = code;
|
er.message = message;
|
|
auto pkt = encodeResponseError(er);
|
socketComm.sendTo(c.sock, pkt);
|
}
|
|
// ¹¤¾ß£ºµ±Ç° epoch ºÁÃë
|
uint64_t Collector::nowMs() {
|
using namespace std::chrono;
|
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
|
}
|