#pragma once #ifndef COLLECTOR_H #define COLLECTOR_H #include "CommBase.h" #include "../net/SocketComm.h" #include #include #include #include #include #include #include #include #include "../buffer/BufferRegistry.h" #include "../net/FrameAssembler.h" // ЭÒé³£Á¿/½á¹¹£¨±ØÐë°üº¬£¬¹©±¾Í·ÎļþʹÓà Proto:: ÀàÐÍ£© #include "../proto/Protocol.h" class Collector : public CommBase { public: Collector(); ~Collector(); // ===== CommBase ½Ó¿Ú ===== void sendSampleData(double sample) override; void sendWindowData(const std::vector& dataFields) override; void connectServer(const std::string& /*ip*/, uint16_t /*port*/) override {} // ²É¼¯¶Ë²»×÷Ϊ¿Í»§¶Ë void createServer(uint16_t port) override; void disconnect() override; void onConnectionEstablished() override; void onConnectionLost() override; // Á¬½Ó/ԭʼÊý¾Ý»Øµ÷£¨UI£© void setConnectionStatusCallback(std::function cb) override { cbStatus = std::move(cb); } void setRawDataCallback(std::function&)> cb) override { cbRaw = std::move(cb); } void setRawDumpEnabled(bool enabled) override { rawDumpEnabled = enabled; } // ºǫ́ÂÖѯ£¨²»×èÈû UI£© void startLoop(uint32_t intervalMs = 10); void stopLoop(); void poll(); // ¿Í»§¶Ë½øÈë/¶Ï¿ªÊ¼þ void setClientEventCallback(std::function cb) { cbClientEvent = std::move(cb); } // ¿Í»§¶Ë¿ìÕÕ struct ClientSummary { std::string ip; uint16_t port = 0; bool versionOk = false; SOCKET sock = INVALID_SOCKET; }; std::vector getClientList(); // ===== »º³å/ͨµÀÏà¹Ø ===== void buffersSetChannelName(uint32_t machineId, uint32_t channelId, const std::string& name); std::string buffersGetChannelName(uint32_t machineId, uint32_t channelId) const; std::vector buffersListChannelInfos(uint32_t machineId) const; BufferManager& registryAddMachine(uint32_t machineId, const std::string& name, const RetentionPolicy& defPolicy = {}); BufferManager* registryFind(uint32_t machineId); const BufferManager* registryFind(uint32_t machineId) const; std::vector registryListMachines() const; void buffersPush(uint32_t machineId, uint32_t channelId, int64_t ts_ms, double v); std::vector buffersGetSince(uint32_t machineId, uint32_t channelId, int64_t tsExclusive, size_t maxCount = 4096) const; std::vector buffersGetRange(uint32_t machineId, uint32_t channelId, int64_t from_ts, int64_t to_ts, size_t maxCount = 4096) const; void buffersStart(uint32_t machineId); // Çå¿Õ¸Ã»úÀúÊ·²¢¿ªÊ¼ void buffersStop(uint32_t machineId); // ÔÝÍ££¨push ºöÂÔ£© void buffersClear(uint32_t machineId); // Çå¿ÕÀúÊ·£¬²»¸Ä running ״̬ bool buffersIsRunning(uint32_t machineId) const; void buffersSetDefaultPolicy(uint32_t machineId, const RetentionPolicy& p, bool applyExisting = true); void buffersSetPolicy(uint32_t machineId, uint32_t channelId, const RetentionPolicy& p); std::vector buffersListChannelStats(uint32_t machineId) const; BufferManager::ChannelStat buffersGetChannelStat(uint32_t machineId, uint32_t channelId) const; // ===== Åú´Î¹ÜÀí£¨ÐÂÔö£© ===== struct BatchRec { Proto::BatchState state = Proto::BatchState::Idle; // Idle / Active std::string activeBatchId; // Idle ʱΪ¿Õ uint64_t activeStartTs = 0; // ms epoch uint64_t expectedEndTs = 0; // 0: δ֪ }; // Уº½ö´«¡°Ô¤¼ÆÊ±³¤ms¡±£¨0=δ֪£©£¬¿ªÊ¼Ê±¼äÄÚ²¿È¡ nowMs() void batchStart(uint32_t machineId, const std::string& batchId, uint64_t expectedDurationMs = 0); // £¨¿ÉÑ¡¼æÈÝ£©¾ÉÇ©Ãû±£Áô£¬µ«±ê¼ÇΪÆúÓãºÄÚ²¿×ªµ÷ÐÂÇ©Ãû void batchStart(uint32_t machineId, const std::string& batchId, uint64_t startTs /*ignored*/, uint64_t expectedEndTs /*treated as duration or absolute?*/); // ½áÊøµ±Ç°Åú´Î£¨±£³ÖÊý¾Ý£¬×´Ì¬×ª Idle£© void batchStop(uint32_t machineId, uint64_t endTs = 0); // ±¸Óà // ²éѯ¸Ã»úµ±Ç°Åú´Î£¨Èô²»´æÔÚ£¬·µ»Ø Idle ¿Õ¼Ç¼£© BatchRec getBatch(uint32_t machineId) const; private: struct ClientInfo { SOCKET sock = INVALID_SOCKET; std::string ip; uint16_t port = 0; std::chrono::steady_clock::time_point tsConnected{}; bool versionOk = false; FrameAssembler fr; // ÿ¸ö¿Í»§¶ËÒ»¸öÖ¡×é×°Æ÷ }; SocketComm socketComm; std::vector clients; std::function cbStatus; std::function&)> cbRaw; bool rawDumpEnabled = true; std::function cbClientEvent; std::thread worker; std::atomic running{ false }; std::mutex mClients; // ±£»¤ clients BufferRegistry registry_; // Ò»¸ö Collector ¹ÜËùÓлų́ // Åú´Î״̬ std::unordered_map batches_; mutable std::mutex mBatches; // ÄÚ²¿º¯Êý void handleClientData(ClientInfo& c, const std::vector& data); bool isVersionRequest(const std::vector& data) const; // ͳһ´íÎ󻨰ü£¨6 ²ÎÊý£© void sendErrorTo(ClientInfo& c, uint32_t dataId, uint16_t refCmd, uint32_t machineId, Proto::ErrCode code, const std::string& message); // ¹¤¾ß£ºµ±Ç° epoch ºÁÃë static uint64_t nowMs(); }; #endif // COLLECTOR_H