#include "ProtocolCodec.h" #include // std::memcpy #include // std::snprintf (ÈçÐèÈÕÖ¾) #include namespace Proto { // --------------------------- // Big-endian »ù´¡±à½âÂ빤¾ß // --------------------------- void put_u16(std::vector& v, uint16_t x) { v.push_back(static_cast((x >> 8) & 0xFF)); v.push_back(static_cast(x & 0xFF)); } void put_u32(std::vector& v, uint32_t x) { v.push_back(static_cast((x >> 24) & 0xFF)); v.push_back(static_cast((x >> 16) & 0xFF)); v.push_back(static_cast((x >> 8) & 0xFF)); v.push_back(static_cast(x & 0xFF)); } void put_u64(std::vector& v, uint64_t x) { for (int i = 7; i >= 0; --i) v.push_back(static_cast((x >> (i * 8)) & 0xFF)); } uint16_t get_u16(const uint8_t* p) { return static_cast((uint16_t(p[0]) << 8) | uint16_t(p[1])); } uint32_t get_u32(const uint8_t* p) { return (uint32_t(p[0]) << 24) | (uint32_t(p[1]) << 16) | (uint32_t(p[2]) << 8) | uint32_t(p[3]); } uint64_t get_u64(const uint8_t* p) { uint64_t x = 0; for (int i = 0; i < 8; ++i) { x = (x << 8) | p[i]; } return x; } void put_f64_be(std::vector& v, double d) { static_assert(sizeof(double) == 8, "double must be 8 bytes"); uint64_t u; std::memcpy(&u, &d, 8); for (int i = 7; i >= 0; --i) v.push_back(static_cast((u >> (i * 8)) & 0xFF)); } double get_f64_be(const uint8_t* p) { uint64_t u = 0; for (int i = 0; i < 8; ++i) { u = (u << 8) | p[i]; } double d; std::memcpy(&d, &u, 8); return d; } // ¿ìËÙ¿ú̽ÕýÎÄ cmd£¨²»×öÍêÕû³¤¶ÈºË¶Ô£¬±£ÁôÄãÔ­±¾·ç¸ñ£© uint16_t peek_cmd(const std::vector& f) { if (f.size() < 12) return 0; if (!(f[0] == kHead[0] && f[1] == kHead[1] && f[2] == kHead[2] && f[3] == kHead[3])) return 0; return (uint16_t(f[10]) << 8) | f[11]; } // °ü×°Ö¡£º4BÍ· + 4B dataId + 2B bodyLen + body + 1Bβ static inline void frame_wrap(uint32_t dataId, const std::vector& body, std::vector& out) { out.clear(); out.reserve(4 + 4 + 2 + body.size() + 1); out.push_back(kHead[0]); out.push_back(kHead[1]); out.push_back(kHead[2]); out.push_back(kHead[3]); put_u32(out, dataId); put_u16(out, static_cast(body.size())); out.insert(out.end(), body.begin(), body.end()); out.push_back(kTail); } // --------------------------- // 0x0104 / 0xF104 Machines // --------------------------- std::vector encodeRequestMachines(const ReqMachines& req) { std::vector body; put_u16(body, CMD_REQ_MACHINES); std::vector out; frame_wrap(req.dataId, body, out); return out; } std::vector encodeResponseMachines(const RspMachines& rsp) { std::vector body; put_u16(body, CMD_RSP_MACHINES); put_u16(body, static_cast(rsp.machines.size())); for (auto& m : rsp.machines) { put_u32(body, m.id); const uint16_t n = static_cast(m.name.size()); put_u16(body, n); body.insert(body.end(), m.name.begin(), m.name.end()); } std::vector out; frame_wrap(rsp.dataId, body, out); return out; } bool decodeRequestMachines(const std::vector& f, ReqMachines& out) { if (f.size() < 11) return false; const uint8_t* p = f.data(); if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; if (f.back() != kTail) return false; out.dataId = get_u32(p + 4); const uint16_t bodyLen = get_u16(p + 8); if (10u + bodyLen + 1u != f.size()) return false; const uint8_t* b = p + 10; return (get_u16(b) == CMD_REQ_MACHINES); } bool decodeResponseMachines(const std::vector& f, RspMachines& out) { if (f.size() < 11) return false; const uint8_t* p = f.data(); if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; if (f.back() != kTail) return false; out.dataId = get_u32(p + 4); const uint16_t bodyLen = get_u16(p + 8); if (10u + bodyLen + 1u != f.size()) return false; const uint8_t* b = p + 10; const uint8_t* e = b + bodyLen; if (b + 2 + 2 > e) return false; if (get_u16(b) != CMD_RSP_MACHINES) return false; b += 2; const uint16_t cnt = get_u16(b); b += 2; out.machines.clear(); out.machines.reserve(cnt); for (uint16_t i = 0; i < cnt; ++i) { if (b + 4 + 2 > e) return false; const uint32_t id = get_u32(b); b += 4; const uint16_t n = get_u16(b); b += 2; if (b + n > e) return false; out.machines.push_back({ id, std::string(reinterpret_cast(b), n) }); b += n; } return (b == e); } // --------------------------- // 0x0001 / 0xF001 Version // --------------------------- std::vector encodeRequestVersion(const ReqVersion& req) { std::vector body; put_u16(body, CMD_REQ_VERSION); std::vector out; frame_wrap(req.dataId, body, out); return out; } std::vector encodeResponseVersion(const RspVersion& rsp) { std::vector body; put_u16(body, CMD_RSP_VERSION); const uint16_t n = static_cast(rsp.version.size()); put_u16(body, n); body.insert(body.end(), rsp.version.begin(), rsp.version.end()); // UTF-8 std::vector out; frame_wrap(rsp.dataId, body, out); return out; } bool decodeRequestVersion(const std::vector& f, ReqVersion& out) { if (f.size() < 11) return false; const uint8_t* p = f.data(); if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; if (f.back() != kTail) return false; out.dataId = get_u32(p + 4); const uint16_t bodyLen = get_u16(p + 8); if (10u + bodyLen + 1u != f.size()) return false; const uint8_t* b = p + 10; return (get_u16(b) == CMD_REQ_VERSION); } bool decodeResponseVersion(const std::vector& f, RspVersion& out) { if (f.size() < 11) return false; const uint8_t* p = f.data(); if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; if (f.back() != kTail) return false; out.dataId = get_u32(p + 4); const uint16_t bodyLen = get_u16(p + 8); if (10u + bodyLen + 1u != f.size()) return false; const uint8_t* b = p + 10; const uint8_t* e = b + bodyLen; if (b + 2 + 2 > e) return false; if (get_u16(b) != CMD_RSP_VERSION) return false; b += 2; const uint16_t n = get_u16(b); b += 2; if (b + n > e) return false; out.version.assign(reinterpret_cast(b), n); b += n; return (b == e); } // --------------------------- // 0x0103 / 0xF103 Stats // --------------------------- std::vector encodeRequestStats(const ReqStats& req) { std::vector body; put_u16(body, CMD_REQ_STATS); put_u32(body, req.machineId); put_u16(body, req.flags); std::vector out; frame_wrap(req.dataId, body, out); return out; } std::vector encodeResponseStats(const RspStats& rsp) { std::vector body; put_u16(body, CMD_RSP_STATS); put_u32(body, rsp.machineId); put_u16(body, static_cast(rsp.channels.size())); for (auto& c : rsp.channels) { put_u32(body, c.channelId); put_u64(body, c.earliestTs); put_u64(body, c.latestTs); put_u32(body, c.size); const uint16_t n = static_cast(c.name.size()); put_u16(body, n); body.insert(body.end(), c.name.begin(), c.name.end()); } std::vector out; frame_wrap(rsp.dataId, body, out); return out; } bool decodeRequestStats(const std::vector& f, ReqStats& out) { if (f.size() < 11) return false; const uint8_t* p = f.data(); if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; if (f.back() != kTail) return false; out.dataId = get_u32(p + 4); const uint16_t bodyLen = get_u16(p + 8); if (10u + bodyLen + 1u != f.size()) return false; const uint8_t* b = p + 10; const uint8_t* e = b + bodyLen; if (b + 2 + 4 + 2 > e) return false; if (get_u16(b) != CMD_REQ_STATS) return false; b += 2; out.machineId = get_u32(b); b += 4; out.flags = get_u16(b); b += 2; return (b == e); } bool decodeResponseStats(const std::vector& f, RspStats& out) { if (f.size() < 11) return false; const uint8_t* p = f.data(); if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; if (f.back() != kTail) return false; out.dataId = get_u32(p + 4); const uint16_t bodyLen = get_u16(p + 8); if (10u + bodyLen + 1u != f.size()) return false; const uint8_t* b = p + 10; const uint8_t* e = b + bodyLen; if (b + 2 + 4 + 2 > e) return false; if (get_u16(b) != CMD_RSP_STATS) return false; b += 2; out.machineId = get_u32(b); b += 4; const uint16_t cnt = get_u16(b); b += 2; out.channels.clear(); out.channels.reserve(cnt); for (uint16_t i = 0; i < cnt; ++i) { if (b + 4 + 8 + 8 + 4 + 2 > e) return false; ChannelStatInfo ci{}; ci.channelId = get_u32(b); b += 4; ci.earliestTs = get_u64(b); b += 8; ci.latestTs = get_u64(b); b += 8; ci.size = get_u32(b); b += 4; const uint16_t n = get_u16(b); b += 2; if (b + n > e) return false; ci.name.assign(reinterpret_cast(b), n); b += n; out.channels.push_back(std::move(ci)); } return (b == e); } // --------------------------------------- // 0x0101 / 0xF101 Since£¨Ö§³Ö¿ÉÑ¡ batchId£© // --------------------------------------- std::vector encodeRequestSince(const ReqSince& req) { std::vector body; put_u16(body, CMD_REQ_SINCE); put_u32(body, req.machineId); put_u32(body, req.channelId); put_u64(body, req.sinceTsExclusive); put_u16(body, req.maxCount); put_u16(body, req.flags); if (req.flags & SINCE_FLAG_HAS_BATCH) { const uint16_t L = static_cast(req.batchId.size()); put_u16(body, L); body.insert(body.end(), req.batchId.begin(), req.batchId.end()); } std::vector out; frame_wrap(req.dataId, body, out); return out; } std::vector encodeResponseSince(const RspSince& rsp) { std::vector body; put_u16(body, CMD_RSP_SINCE); put_u32(body, rsp.machineId); put_u32(body, rsp.channelId); put_u64(body, rsp.lastTsSent); body.push_back(rsp.more ? 1u : 0u); put_u16(body, static_cast(rsp.samples.size())); for (auto& s : rsp.samples) { put_u64(body, s.ts_ms); put_f64_be(body, s.value); } std::vector out; frame_wrap(rsp.dataId, body, out); return out; } bool decodeRequestSince(const std::vector& f, ReqSince& out) { if (f.size() < 11) return false; const uint8_t* p = f.data(); if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; if (f.back() != kTail) return false; out.dataId = get_u32(p + 4); const uint16_t bodyLen = get_u16(p + 8); if (10u + bodyLen + 1u != f.size()) return false; const uint8_t* b = p + 10; const uint8_t* e = b + bodyLen; if (b + 2 > e) return false; if (get_u16(b) != CMD_REQ_SINCE) return false; b += 2; if (b + 4 + 4 + 8 + 2 + 2 > e) return false; out.machineId = get_u32(b); b += 4; out.channelId = get_u32(b); b += 4; out.sinceTsExclusive = get_u64(b); b += 8; out.maxCount = get_u16(b); b += 2; out.flags = get_u16(b); b += 2; out.batchId.clear(); if (out.flags & SINCE_FLAG_HAS_BATCH) { if (b + 2 > e) return false; const uint16_t L = get_u16(b); b += 2; if (b + L > e) return false; out.batchId.assign(reinterpret_cast(b), L); b += L; } return (b == e); } bool decodeResponseSince(const std::vector& f, RspSince& out) { if (f.size() < 11) return false; const uint8_t* p = f.data(); if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; if (f.back() != kTail) return false; out.dataId = get_u32(p + 4); const uint16_t bodyLen = get_u16(p + 8); if (10u + bodyLen + 1u != f.size()) return false; const uint8_t* b = p + 10; const uint8_t* e = b + bodyLen; if (b + 2 > e) return false; if (get_u16(b) != CMD_RSP_SINCE) return false; b += 2; if (b + 4 + 4 + 8 + 1 + 2 > e) return false; out.machineId = get_u32(b); b += 4; out.channelId = get_u32(b); b += 4; out.lastTsSent = get_u64(b); b += 8; out.more = *b++; // u8 const uint16_t cnt = get_u16(b); b += 2; out.samples.clear(); out.samples.reserve(cnt); for (uint16_t i = 0; i < cnt; ++i) { if (b + 8 + 8 > e) return false; const uint64_t ts = get_u64(b); b += 8; const double v = get_f64_be(b); b += 8; out.samples.push_back({ ts, v }); } return (b == e); } // --------------------------------------- // 0x0105 / 0xF105 SinceAll£¨Õû»ú¶àͨµÀÔöÁ¿£© // --------------------------------------- std::vector encodeRequestSinceAll(const ReqSinceAll& req) { std::vector body; put_u16(body, CMD_REQ_SINCE_ALL); put_u32(body, req.machineId); put_u64(body, req.sinceTsExclusive); put_u16(body, req.maxPerChannel); put_u16(body, req.flags); if (req.flags & SINCE_FLAG_HAS_BATCH) { const uint16_t L = static_cast(req.batchId.size()); put_u16(body, L); body.insert(body.end(), req.batchId.begin(), req.batchId.end()); } std::vector out; frame_wrap(req.dataId, body, out); return out; } std::vector encodeResponseSinceAll(const RspSinceAll& rsp) { std::vector body; put_u16(body, CMD_RSP_SINCE_ALL); put_u32(body, rsp.machineId); body.push_back(rsp.moreAny ? 1u : 0u); put_u16(body, static_cast(rsp.blocks.size())); for (const auto& b : rsp.blocks) { put_u32(body, b.channelId); put_u64(body, b.lastTsSent); body.push_back(b.more ? 1u : 0u); put_u16(body, static_cast(b.samples.size())); for (const auto& sp : b.samples) { put_u64(body, sp.ts_ms); put_f64_be(body, sp.value); } } std::vector out; frame_wrap(rsp.dataId, body, out); return out; } bool decodeRequestSinceAll(const std::vector& f, ReqSinceAll& out) { if (f.size() < 11) return false; const uint8_t* p = f.data(); if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; if (f.back() != kTail) return false; out.dataId = get_u32(p + 4); const uint16_t bodyLen = get_u16(p + 8); if (10u + bodyLen + 1u != f.size()) return false; const uint8_t* b = p + 10; const uint8_t* e = b + bodyLen; if (b + 2 > e) return false; if (get_u16(b) != CMD_REQ_SINCE_ALL) return false; b += 2; if (b + 4 + 8 + 2 + 2 > e) return false; out.machineId = get_u32(b); b += 4; out.sinceTsExclusive = get_u64(b); b += 8; out.maxPerChannel = get_u16(b); b += 2; out.flags = get_u16(b); b += 2; out.batchId.clear(); if (out.flags & SINCE_FLAG_HAS_BATCH) { if (b + 2 > e) return false; const uint16_t L = get_u16(b); b += 2; if (b + L > e) return false; out.batchId.assign(reinterpret_cast(b), L); b += L; } return (b == e); } bool decodeResponseSinceAll(const std::vector& f, RspSinceAll& out) { if (f.size() < 11) return false; const uint8_t* p = f.data(); if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; if (f.back() != kTail) return false; out.dataId = get_u32(p + 4); const uint16_t bodyLen = get_u16(p + 8); if (10u + bodyLen + 1u != f.size()) return false; const uint8_t* b = p + 10; const uint8_t* e = b + bodyLen; if (b + 2 > e) return false; if (get_u16(b) != CMD_RSP_SINCE_ALL) return false; b += 2; if (b + 4 + 1 + 2 > e) return false; out.machineId = get_u32(b); b += 4; out.moreAny = *b++; // u8 const uint16_t N = get_u16(b); b += 2; out.blocks.clear(); out.blocks.reserve(N); for (uint16_t i = 0; i < N; ++i) { if (b + 4 + 8 + 1 + 2 > e) return false; ChannelBlock blk; blk.channelId = get_u32(b); b += 4; blk.lastTsSent = get_u64(b); b += 8; blk.more = *b++; // u8 const uint16_t M = get_u16(b); b += 2; blk.samples.clear(); blk.samples.reserve(M); for (uint16_t j = 0; j < M; ++j) { if (b + 8 + 8 > e) return false; SamplePair sp; sp.ts_ms = get_u64(b); b += 8; sp.value = get_f64_be(b); b += 8; blk.samples.push_back(sp); } out.blocks.push_back(std::move(blk)); } return (b == e); } // --------------------------- // 0x0120 / 0xF120 BatchInfo // --------------------------- std::vector encodeRequestBatchInfo(const ReqBatchInfo& req) { std::vector body; put_u16(body, CMD_REQ_BATCH_INFO); put_u32(body, req.machineId); std::vector out; frame_wrap(req.dataId, body, out); return out; } bool decodeRequestBatchInfo(const std::vector& f, ReqBatchInfo& out) { if (f.size() < 11) return false; const uint8_t* p = f.data(); if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; if (f.back() != kTail) return false; out.dataId = get_u32(p + 4); const uint16_t bodyLen = get_u16(p + 8); if (10u + bodyLen + 1u != f.size()) return false; const uint8_t* b = p + 10; const uint8_t* e = b + bodyLen; if (b + 2 + 4 > e) return false; if (get_u16(b) != CMD_REQ_BATCH_INFO) return false; b += 2; out.machineId = get_u32(b); b += 4; return (b == e); } std::vector encodeResponseBatchInfo(const RspBatchInfo& rsp) { std::vector body; put_u16(body, CMD_RSP_BATCH_INFO); put_u32(body, rsp.machineId); body.push_back(static_cast(rsp.state)); const uint16_t L = static_cast(rsp.activeBatchId.size()); put_u16(body, L); body.insert(body.end(), rsp.activeBatchId.begin(), rsp.activeBatchId.end()); put_u64(body, rsp.activeStartTs); put_u64(body, rsp.expectedEndTs); std::vector out; frame_wrap(rsp.dataId, body, out); return out; } bool decodeResponseBatchInfo(const std::vector& f, RspBatchInfo& out) { if (f.size() < 11) return false; const uint8_t* p = f.data(); if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; if (f.back() != kTail) return false; out.dataId = get_u32(p + 4); const uint16_t bodyLen = get_u16(p + 8); if (10u + bodyLen + 1u != f.size()) return false; const uint8_t* b = p + 10; const uint8_t* e = b + bodyLen; if (b + 2 + 4 + 1 + 2 > e) return false; if (get_u16(b) != CMD_RSP_BATCH_INFO) return false; b += 2; out.machineId = get_u32(b); b += 4; out.state = static_cast(*b++); const uint16_t L = get_u16(b); b += 2; if (b + L > e) return false; out.activeBatchId.assign(reinterpret_cast(b), L); b += L; if (b + 8 + 8 > e) return false; out.activeStartTs = get_u64(b); b += 8; out.expectedEndTs = get_u64(b); b += 8; return (b == e); } // --------------------------- // 0xE100 Error // --------------------------- std::vector encodeResponseError(const RspError& rsp) { std::vector body; put_u16(body, CMD_RSP_ERROR); put_u16(body, rsp.refCmd); put_u32(body, rsp.machineId); put_u16(body, static_cast(rsp.code)); const uint16_t L = static_cast(rsp.message.size()); put_u16(body, L); body.insert(body.end(), rsp.message.begin(), rsp.message.end()); std::vector out; frame_wrap(rsp.dataId, body, out); return out; } bool decodeResponseError(const std::vector& f, RspError& out) { if (f.size() < 11) return false; const uint8_t* p = f.data(); if (p[0] != kHead[0] || p[1] != kHead[1] || p[2] != kHead[2] || p[3] != kHead[3]) return false; if (f.back() != kTail) return false; out.dataId = get_u32(p + 4); const uint16_t bodyLen = get_u16(p + 8); if (10u + bodyLen + 1u != f.size()) return false; const uint8_t* b = p + 10; const uint8_t* e = b + bodyLen; if (b + 2 + 2 + 4 + 2 + 2 > e) return false; if (get_u16(b) != CMD_RSP_ERROR) return false; b += 2; out.refCmd = get_u16(b); b += 2; out.machineId = get_u32(b); b += 4; out.code = static_cast(get_u16(b)); b += 2; const uint16_t L = get_u16(b); b += 2; if (b + L > e) return false; out.message.assign(reinterpret_cast(b), L); b += L; return (b == e); } } // namespace Proto