chenluhua1980
2025-12-11 33f080ddc32f3545b685b2e0a7a5df3c35894270
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#pragma once
#ifndef COLLECTOR_H
#define COLLECTOR_H
 
#include "CommBase.h"
#include "../net/SocketComm.h"
 
#include <functional>
#include <chrono>
#include <thread>
#include <atomic>
#include <mutex>
#include <vector>
#include <string>
#include <unordered_map>
 
#include "../buffer/BufferRegistry.h"
#include "../net/FrameAssembler.h"
 
// Ð­Òé³£Á¿/½á¹¹£¨±ØÐë°üº¬£¬¹©±¾Í·ÎļþʹÓàProto:: ÀàÐÍ£©
#include "../proto/Protocol.h"
 
class Collector : public CommBase {
public:
    Collector();
    ~Collector();
 
    // ===== CommBase ½Ó¿Ú =====
    void sendSampleData(double sample) override;
    void sendWindowData(const std::vector<std::string>& dataFields) override;
 
    void connectServer(const std::string& /*ip*/, uint16_t /*port*/) override {} // ²É¼¯¶Ë²»×÷Ϊ¿Í»§¶Ë
    void createServer(uint16_t port) override;
    void disconnect() override;
 
    void onConnectionEstablished() override;
    void onConnectionLost() override;
 
    // Á¬½Ó/ԭʼÊý¾Ý»Øµ÷£¨UI£©
    void setConnectionStatusCallback(std::function<void(int, std::string)> cb) override { cbStatus = std::move(cb); }
    void setRawDataCallback(std::function<void(const std::vector<uint8_t>&)> cb) override { cbRaw = std::move(cb); }
    void setRawDumpEnabled(bool enabled) override { rawDumpEnabled = enabled; }
 
    // ºǫ́ÂÖѯ£¨²»×èÈû UI£©
    void startLoop(uint32_t intervalMs = 10);
    void stopLoop();
    void poll();
 
    // ¿Í»§¶Ë½øÈë/¶Ï¿ªÊ¼þ
    void setClientEventCallback(std::function<void(const std::string& ip, uint16_t port, bool connected)> cb) {
        cbClientEvent = std::move(cb);
    }
 
    // ¿Í»§¶Ë¿ìÕÕ
    struct ClientSummary {
        std::string ip;
        uint16_t    port = 0;
        bool        versionOk = false;
        SOCKET      sock = INVALID_SOCKET;
    };
    std::vector<ClientSummary> getClientList();
 
    // ===== »º³å/ͨµÀÏà¹Ø =====
    void        buffersSetChannelName(uint32_t machineId, uint32_t channelId, const std::string& name);
    std::string buffersGetChannelName(uint32_t machineId, uint32_t channelId) const;
    std::vector<BufferManager::ChannelInfo> buffersListChannelInfos(uint32_t machineId) const;
 
    BufferManager& registryAddMachine(uint32_t machineId, const std::string& name,
        const RetentionPolicy& defPolicy = {});
    BufferManager* registryFind(uint32_t machineId);
    const BufferManager* registryFind(uint32_t machineId) const;
    std::vector<uint32_t> registryListMachines() const;
 
    void buffersPush(uint32_t machineId, uint32_t channelId, int64_t ts_ms, double v);
 
    std::vector<Sample> buffersGetSince(uint32_t machineId, uint32_t channelId,
        int64_t tsExclusive, size_t maxCount = 4096) const;
    std::vector<Sample> buffersGetRange(uint32_t machineId, uint32_t channelId,
        int64_t from_ts, int64_t to_ts, size_t maxCount = 4096) const;
 
    void buffersStart(uint32_t machineId);   // Çå¿Õ¸Ã»úÀúÊ·²¢¿ªÊ¼
    void buffersStop(uint32_t machineId);    // ÔÝÍ££¨push ºöÂÔ£©
    void buffersClear(uint32_t machineId);   // Çå¿ÕÀúÊ·£¬²»¸Ä running ×´Ì¬
    bool buffersIsRunning(uint32_t machineId) const;
 
    void buffersSetDefaultPolicy(uint32_t machineId, const RetentionPolicy& p, bool applyExisting = true);
    void buffersSetPolicy(uint32_t machineId, uint32_t channelId, const RetentionPolicy& p);
 
    std::vector<BufferManager::ChannelStat> buffersListChannelStats(uint32_t machineId) const;
    BufferManager::ChannelStat             buffersGetChannelStat(uint32_t machineId, uint32_t channelId) const;
 
    // ===== Åú´Î¹ÜÀí£¨ÐÂÔö£© =====
    struct BatchRec {
        Proto::BatchState state = Proto::BatchState::Idle; // Idle / Active
        std::string       activeBatchId;                   // Idle Ê±Îª¿Õ
        uint64_t          activeStartTs = 0;               // ms epoch
        uint64_t          expectedEndTs = 0;               // 0: Î´Öª
    };
 
    // Ð£º½ö´«¡°Ô¤¼ÆÊ±³¤ms¡±£¨0=δ֪£©£¬¿ªÊ¼Ê±¼äÄÚ²¿È¡ nowMs()
    void batchStart(uint32_t machineId,
        const std::string& batchId,
        uint64_t expectedDurationMs = 0);
 
    // £¨¿ÉÑ¡¼æÈÝ£©¾ÉÇ©Ãû±£Áô£¬µ«±ê¼ÇΪÆúÓãºÄÚ²¿×ªµ÷ÐÂÇ©Ãû
    void batchStart(uint32_t machineId,
        const std::string& batchId,
        uint64_t startTs /*ignored*/,
        uint64_t expectedEndTs /*treated as duration or absolute?*/);
 
// ½áÊøµ±Ç°Åú´Î£¨±£³ÖÊý¾Ý£¬×´Ì¬×ª Idle£©
    void batchStop(uint32_t machineId,
        uint64_t endTs = 0);                    // ±¸ÓÃ
 
// ²éѯ¸Ã»úµ±Ç°Åú´Î£¨Èô²»´æÔÚ£¬·µ»Ø Idle ¿Õ¼Ç¼£©
    BatchRec getBatch(uint32_t machineId) const;
 
private:
    struct ClientInfo {
        SOCKET sock = INVALID_SOCKET;
        std::string ip;
        uint16_t port = 0;
        std::chrono::steady_clock::time_point tsConnected{};
        bool versionOk = false;
        FrameAssembler fr; // Ã¿¸ö¿Í»§¶ËÒ»¸öÖ¡×é×°Æ÷
    };
 
    SocketComm socketComm;
    std::vector<ClientInfo> clients;
 
    std::function<void(int, std::string)> cbStatus;
    std::function<void(const std::vector<uint8_t>&)> cbRaw;
    bool rawDumpEnabled = true;
 
    std::function<void(const std::string&, uint16_t, bool)> cbClientEvent;
 
    std::thread       worker;
    std::atomic<bool> running{ false };
    std::mutex        mClients;   // ±£»¤ clients
 
    BufferRegistry registry_;     // Ò»¸ö Collector ¹ÜËùÓлų́
 
    // Åú´Î״̬
    std::unordered_map<uint32_t, BatchRec> batches_;
    mutable std::mutex mBatches;
 
    // ÄÚ²¿º¯Êý
    void handleClientData(ClientInfo& c, const std::vector<uint8_t>& data);
    bool isVersionRequest(const std::vector<uint8_t>& data) const;
 
    // Í³Ò»´íÎ󻨰ü£¨6 ²ÎÊý£©
    void sendErrorTo(ClientInfo& c,
        uint32_t dataId,
        uint16_t refCmd,
        uint32_t machineId,
        Proto::ErrCode code,
        const std::string& message);
 
    // ¹¤¾ß£ºµ±Ç° epoch ºÁÃë
    static uint64_t nowMs();
};
 
#endif // COLLECTOR_H