#pragma once
|
#ifndef COLLECTOR_H
|
#define COLLECTOR_H
|
|
#include "CommBase.h"
|
#include "../net/SocketComm.h"
|
|
#include <functional>
|
#include <chrono>
|
#include <thread>
|
#include <atomic>
|
#include <mutex>
|
#include <vector>
|
#include <string>
|
#include <unordered_map>
|
|
#include "../buffer/BufferRegistry.h"
|
#include "../net/FrameAssembler.h"
|
|
// ÐÒé³£Á¿/½á¹¹£¨±ØÐë°üº¬£¬¹©±¾Í·ÎļþʹÓà Proto:: ÀàÐÍ£©
|
#include "../proto/Protocol.h"
|
|
class Collector : public CommBase {
|
public:
|
Collector();
|
~Collector();
|
|
// ===== CommBase ½Ó¿Ú =====
|
void sendSampleData(double sample) override;
|
void sendWindowData(const std::vector<std::string>& dataFields) override;
|
|
void connectServer(const std::string& /*ip*/, uint16_t /*port*/) override {} // ²É¼¯¶Ë²»×÷Ϊ¿Í»§¶Ë
|
void createServer(uint16_t port) override;
|
void disconnect() override;
|
|
void onConnectionEstablished() override;
|
void onConnectionLost() override;
|
|
// Á¬½Ó/ÔʼÊý¾Ý»Øµ÷£¨UI£©
|
void setConnectionStatusCallback(std::function<void(int, std::string)> cb) override { cbStatus = std::move(cb); }
|
void setRawDataCallback(std::function<void(const std::vector<uint8_t>&)> cb) override { cbRaw = std::move(cb); }
|
void setRawDumpEnabled(bool enabled) override { rawDumpEnabled = enabled; }
|
|
// ºǫ́ÂÖѯ£¨²»×èÈû UI£©
|
void startLoop(uint32_t intervalMs = 10);
|
void stopLoop();
|
void poll();
|
|
// ¿Í»§¶Ë½øÈë/¶Ï¿ªÊ¼þ
|
void setClientEventCallback(std::function<void(const std::string& ip, uint16_t port, bool connected)> cb) {
|
cbClientEvent = std::move(cb);
|
}
|
|
// ¿Í»§¶Ë¿ìÕÕ
|
struct ClientSummary {
|
std::string ip;
|
uint16_t port = 0;
|
bool versionOk = false;
|
SOCKET sock = INVALID_SOCKET;
|
};
|
std::vector<ClientSummary> getClientList();
|
|
// ===== »º³å/ͨµÀÏà¹Ø =====
|
void buffersSetChannelName(uint32_t machineId, uint32_t channelId, const std::string& name);
|
std::string buffersGetChannelName(uint32_t machineId, uint32_t channelId) const;
|
std::vector<BufferManager::ChannelInfo> buffersListChannelInfos(uint32_t machineId) const;
|
|
BufferManager& registryAddMachine(uint32_t machineId, const std::string& name,
|
const RetentionPolicy& defPolicy = {});
|
BufferManager* registryFind(uint32_t machineId);
|
const BufferManager* registryFind(uint32_t machineId) const;
|
std::vector<uint32_t> registryListMachines() const;
|
|
void buffersPush(uint32_t machineId, uint32_t channelId, int64_t ts_ms, double v);
|
|
std::vector<Sample> buffersGetSince(uint32_t machineId, uint32_t channelId,
|
int64_t tsExclusive, size_t maxCount = 4096) const;
|
std::vector<Sample> buffersGetRange(uint32_t machineId, uint32_t channelId,
|
int64_t from_ts, int64_t to_ts, size_t maxCount = 4096) const;
|
|
void buffersStart(uint32_t machineId); // Çå¿Õ¸Ã»úÀúÊ·²¢¿ªÊ¼
|
void buffersStop(uint32_t machineId); // ÔÝÍ££¨push ºöÂÔ£©
|
void buffersClear(uint32_t machineId); // Çå¿ÕÀúÊ·£¬²»¸Ä running ״̬
|
bool buffersIsRunning(uint32_t machineId) const;
|
|
void buffersSetDefaultPolicy(uint32_t machineId, const RetentionPolicy& p, bool applyExisting = true);
|
void buffersSetPolicy(uint32_t machineId, uint32_t channelId, const RetentionPolicy& p);
|
|
std::vector<BufferManager::ChannelStat> buffersListChannelStats(uint32_t machineId) const;
|
BufferManager::ChannelStat buffersGetChannelStat(uint32_t machineId, uint32_t channelId) const;
|
|
// ===== Åú´Î¹ÜÀí£¨ÐÂÔö£© =====
|
struct BatchRec {
|
Proto::BatchState state = Proto::BatchState::Idle; // Idle / Active
|
std::string activeBatchId; // Idle ʱΪ¿Õ
|
uint64_t activeStartTs = 0; // ms epoch
|
uint64_t expectedEndTs = 0; // 0: δ֪
|
};
|
|
// Уº½ö´«¡°Ô¤¼ÆÊ±³¤ms¡±£¨0=δ֪£©£¬¿ªÊ¼Ê±¼äÄÚ²¿È¡ nowMs()
|
void batchStart(uint32_t machineId,
|
const std::string& batchId,
|
uint64_t expectedDurationMs = 0);
|
|
// £¨¿ÉÑ¡¼æÈÝ£©¾ÉÇ©Ãû±£Áô£¬µ«±ê¼ÇΪÆúÓãºÄÚ²¿×ªµ÷ÐÂÇ©Ãû
|
void batchStart(uint32_t machineId,
|
const std::string& batchId,
|
uint64_t startTs /*ignored*/,
|
uint64_t expectedEndTs /*treated as duration or absolute?*/);
|
|
// ½áÊøµ±Ç°Åú´Î£¨±£³ÖÊý¾Ý£¬×´Ì¬×ª Idle£©
|
void batchStop(uint32_t machineId,
|
uint64_t endTs = 0); // ±¸ÓÃ
|
|
// ²éѯ¸Ã»úµ±Ç°Åú´Î£¨Èô²»´æÔÚ£¬·µ»Ø Idle ¿Õ¼Ç¼£©
|
BatchRec getBatch(uint32_t machineId) const;
|
|
private:
|
struct ClientInfo {
|
SOCKET sock = INVALID_SOCKET;
|
std::string ip;
|
uint16_t port = 0;
|
std::chrono::steady_clock::time_point tsConnected{};
|
bool versionOk = false;
|
FrameAssembler fr; // ÿ¸ö¿Í»§¶ËÒ»¸öÖ¡×é×°Æ÷
|
};
|
|
SocketComm socketComm;
|
std::vector<ClientInfo> clients;
|
|
std::function<void(int, std::string)> cbStatus;
|
std::function<void(const std::vector<uint8_t>&)> cbRaw;
|
bool rawDumpEnabled = true;
|
|
std::function<void(const std::string&, uint16_t, bool)> cbClientEvent;
|
|
std::thread worker;
|
std::atomic<bool> running{ false };
|
std::mutex mClients; // ±£»¤ clients
|
|
BufferRegistry registry_; // Ò»¸ö Collector ¹ÜËùÓлų́
|
|
// Åú´Î״̬
|
std::unordered_map<uint32_t, BatchRec> batches_;
|
mutable std::mutex mBatches;
|
|
// ÄÚ²¿º¯Êý
|
void handleClientData(ClientInfo& c, const std::vector<uint8_t>& data);
|
bool isVersionRequest(const std::vector<uint8_t>& data) const;
|
|
// ͳһ´íÎ󻨰ü£¨6 ²ÎÊý£©
|
void sendErrorTo(ClientInfo& c,
|
uint32_t dataId,
|
uint16_t refCmd,
|
uint32_t machineId,
|
Proto::ErrCode code,
|
const std::string& message);
|
|
// ¹¤¾ß£ºµ±Ç° epoch ºÁÃë
|
static uint64_t nowMs();
|
};
|
|
#endif // COLLECTOR_H
|