1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#pragma once
#ifndef DISPLAY_H
#define DISPLAY_H
 
#include "CommBase.h"
#include "../net/SocketComm.h"
#include <functional>
#include <thread>
#include <atomic>
#include "../proto/Protocol.h"
#include "../proto/ProtocolCodec.h"
 
// »Øµ÷£ºÊÕµ½Ñù±¾µÄÉÏÅ×£¨UI ÓÃËü»­Í¼/´æ±¾µØ£©
using SamplesCallback = std::function<void(uint32_t machineId, uint32_t channelId,
    uint64_t lastTsSent, uint8_t more,
    const std::vector<Proto::SamplePair>& samples)>;
using SamplesMultiCallback = std::function<void(
    uint32_t machineId, uint8_t moreAny,
    const std::vector<Proto::ChannelBlock>& blocks)>;
using MachinesCallback = std::function<void(const std::vector<Proto::MachineInfo>& machines)>;
using StatsCallback = std::function<void(uint32_t machineId, const std::vector<Proto::ChannelStatInfo>& channels)>;
 
// ¡ï ÐÂÔö£ºÅú´ÎÐÅÏ¢»Øµ÷
using BatchInfoCallback = std::function<void(const Proto::RspBatchInfo&)>;
 
class Display : public CommBase {
public:
    Display();
    ~Display();
 
    void sendSampleData(double sample) override;
    void sendWindowData(const std::vector<std::string>& dataFields) override;
 
    void connectServer(const std::string& ip, uint16_t port) override;
    void createServer(uint16_t /*port*/) override {}
    void disconnect() override;
 
    void onConnectionEstablished() override;
    void onConnectionLost() override;
 
    void handleRawData(const std::vector<uint8_t>& rawData);
 
    void setConnectionStatusCallback(std::function<void(int, std::string)> cb) override { cbStatus = std::move(cb); }
    void setRawDataCallback(std::function<void(const std::vector<uint8_t>&)> cb) override { cbRaw = std::move(cb); }
    void setRawDumpEnabled(bool enabled) override { rawDumpEnabled = enabled; }
 
    void startRecvLoop(uint32_t intervalMs = 10);
    void stopRecvLoop();
 
    void setSamplesCallback(SamplesCallback cb) { cbSamples = std::move(cb); }
    void setSamplesMultiCallback(SamplesMultiCallback cb) { cbSamplesMulti = std::move(cb); }
    void setMachinesCallback(MachinesCallback cb) { cbMachines = std::move(cb); }
    void setStatsCallback(StatsCallback cb) { cbStats = std::move(cb); }
    void setBatchInfoCallback(BatchInfoCallback cb) { cbBatchInfo = std::move(cb); } // ¡ï ÐÂÔö
 
    // ¡ª¡ª Ô­ÓС°ÐèÏÔʽ dataId¡±µÄµÍ²ã½Ó¿Ú£¨±£ÁôÒÔ¼æÈÝ£©¡ª¡ª
    void requestMachines(uint32_t dataId);
    void requestStats(uint32_t dataId, uint32_t machineId);
    void requestSince(uint32_t dataId, uint32_t machineId, uint32_t channelId,
        uint64_t sinceTsExclusive, uint16_t maxCount = 1024);
    // ¡ï ÐÂÔö£º´ø batchId µÄÏÔʽ dataId °æ
    void requestSince(uint32_t dataId, uint32_t machineId, uint32_t channelId,
        uint64_t sinceTsExclusive, const std::string& batchId, uint16_t maxCount = 1024);
 
    // ±ã½Ý£ºÕû»ú¶àͨµÀÔöÁ¿£¨ÏÔʽ dataId£©
    void requestSinceAll(uint32_t dataId, uint32_t machineId, uint64_t sinceTsExclusive,
        uint16_t maxPerChannel = 1024);
    // ¡ï ÐÂÔö£º´ø batchId µÄÏÔʽ dataId °æ
    void requestSinceAll(uint32_t dataId, uint32_t machineId, uint64_t sinceTsExclusive,
        const std::string& batchId, uint16_t maxPerChannel = 1024);
 
    // ¡ª¡ª ÐÂÔö£ºÅú´ÎÐÅÏ¢À­È¡£¨ÏÔʽ/±ã½Ý£©¡ª¡ª
    void requestBatchInfo(uint32_t dataId, uint32_t machineId); // ÏÔʽ dataId
    void requestBatchInfo(uint32_t machineId);                  // ±ã½Ý£º×Ô¶¯ m_nextDataId++
 
    // ¡ª¡ª ÐÂÔö£º±ã½Ý¸ß²ã½Ó¿Ú£¨×Ô¶¯·ÖÅä dataId£©¡ª¡ª
    void requestMachines(); // ×Ô¶¯ m_nextDataId++
    void requestStats(uint32_t machineId);
    void requestSince(uint32_t machineId, uint32_t channelId,
        uint64_t sinceTsExclusive, uint16_t maxCount = 1024);
    // ¡ï ÐÂÔö£º±ã½Ý´ø batchId
    void requestSince(uint32_t machineId, uint32_t channelId,
        uint64_t sinceTsExclusive, const std::string& batchId, uint16_t maxCount = 1024);
 
    // ±ã½Ý£ºÕû»ú¶àͨµÀÔöÁ¿
    void requestSinceAll(uint32_t machineId, uint64_t sinceTsExclusive,
        uint16_t maxPerChannel = 1024);
    // ¡ï ÐÂÔö£º±ã½Ý´ø batchId
    void requestSinceAll(uint32_t machineId, uint64_t sinceTsExclusive,
        const std::string& batchId, uint16_t maxPerChannel = 1024);
 
private:
    SocketComm socketComm;
    std::function<void(int, std::string)> cbStatus;
 
    // Ô­Ê¼Êý¾Ý»Øµ÷ & ¿ª¹Ø
    std::function<void(const std::vector<uint8_t>&)> cbRaw;
    bool rawDumpEnabled = true;
 
    std::thread recvThread;
    std::atomic<bool> recvRunning{ false };
    SamplesCallback      cbSamples;
    SamplesMultiCallback cbSamplesMulti;
    MachinesCallback     cbMachines;
    StatsCallback        cbStats;
    BatchInfoCallback    cbBatchInfo; // ¡ï ÐÂÔö
 
    // dataId µÝÔö£¬ÓÃÓÚÅä¶ÔÇëÇó/ÏìÓ¦
    uint32_t m_nextDataId = 1;
    bool m_versionOk = false;
};
 
#endif // DISPLAY_H