LWQ
2025-07-14 52d230fd0eb38adc5c6f4c6d6ed3786a3c19354d
EdgeInspector_App/WebSocket/WebSocketClient.cpp
@@ -1,326 +1,326 @@
#include "stdafx.h"
#include "WebSocketClient.h"
WebSocketClient::WebSocketClient() : m_connected(false), m_reconnect(true), m_close_done(false), m_message_received(false), m_heartbeat_interval(5) {
    // 清除之前的客户端配置并重新初始化
    m_client.clear_access_channels(websocketpp::log::alevel::all);
    m_client.clear_error_channels(websocketpp::log::elevel::all);
    m_client.init_asio();
    // 设置事件回调
    m_client.set_open_handler([this](websocketpp::connection_hdl hdl) { on_open(hdl); });
    m_client.set_message_handler([this](websocketpp::connection_hdl hdl, message_ptr msg) { on_message(hdl, msg); });
    m_client.set_close_handler([this](websocketpp::connection_hdl hdl) { on_close(hdl); });
    m_client.set_fail_handler([this](websocketpp::connection_hdl hdl) { on_fail(hdl); });
   m_client.set_pong_handler([this](websocketpp::connection_hdl hdl, const std::string& payload) { on_pong(hdl, payload); });
}
WebSocketClient::~WebSocketClient() {
    close();
}
void WebSocketClient::connect(const std::string& uri) {
    websocketpp::lib::error_code ec;
    {
        m_connected.store(false);
        m_reconnect.store(false);
        m_client.clear_access_channels(websocketpp::log::alevel::all);
        m_client.clear_error_channels(websocketpp::log::elevel::all);
        std::lock_guard<std::mutex> lock(m_mutex);  // 保护共享资源
        auto con = m_client.get_connection(uri, ec);
        if (ec) {
            std::cerr << "Get connection failed: " << ec.message() << std::endl;
            return;
        }
        m_hdl = con->get_handle();
        m_client.connect(con);
    }
    // 启动工作线程
    m_thread = std::thread(&WebSocketClient::run, this);
}
bool WebSocketClient::is_busy() const {
    return m_busy.load();
}
void WebSocketClient::set_busy(bool busy) {
    m_busy.store(busy);
}
void WebSocketClient::send(const std::string& message) {
    std::lock_guard<std::mutex> lock(m_mutex);
    if (m_connected.load()) {
        websocketpp::lib::error_code ec;
        m_client.send(m_hdl, message, websocketpp::frame::opcode::text, ec);
        if (ec) {
            std::cerr << "Send failed: " << ec.message() << std::endl;
        }
    }
    else {
        std::cerr << "Not connected, cannot send message." << std::endl;
    }
}
void WebSocketClient::send_binary(const std::vector<char>& binary_data) {
    std::lock_guard<std::mutex> lock(m_mutex);
    if (m_connected.load()) {
        websocketpp::lib::error_code ec;
        m_client.send(m_hdl, binary_data.data(), binary_data.size(), websocketpp::frame::opcode::binary, ec);
        if (ec) {
            std::cerr << "Send binary data failed: " << ec.message() << std::endl;
        }
    }
    else {
        std::cerr << "Not connected, cannot send binary data." << std::endl;
    }
}
void WebSocketClient::enqueue_message(const std::string& message) {
    m_message_queue.push(message);
    process_next_message();
}
void WebSocketClient::enqueue_binary(const std::vector<char>& binary_data) {
    m_binary_queue.push(binary_data);
    process_next_binary();
}
void WebSocketClient::process_next_message() {
    if (!m_message_queue.empty() && !is_busy()) {
        set_busy(true);
        std::string message = m_message_queue.front();
        m_message_queue.pop();
        send(message); // 发送消息
        set_busy(false); // 设置为空闲状态,并处理下一个
        process_next_message();
    }
}
void WebSocketClient::process_next_binary() {
    if (!m_binary_queue.empty() && !is_busy()) {
        set_busy(true);
        std::vector<char> binary_data = m_binary_queue.front();
        m_binary_queue.pop();
        send_binary(binary_data); // 发送二进制数据
        set_busy(false); // 设置为空闲状态,并处理下一个
        process_next_binary();
    }
}
bool WebSocketClient::send_and_wait(const std::string& message, int timeout_ms) {
    std::unique_lock<std::mutex> lock(m_mutex);
    m_message_received = false;
    if (m_connected.load()) {
        // 发送消息
        websocketpp::lib::error_code ec;
        m_client.send(m_hdl, message, websocketpp::frame::opcode::text, ec);
        if (ec) {
            std::cerr << "Send failed: " << ec.message() << std::endl;
            return false;
        }
        // 等待消息或超时
        if (m_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() { return m_message_received; })) {
            std::cout << "Server response received." << std::endl;
            return true;
        }
        else {
            std::cerr << "Timeout: No response from server." << std::endl;
            return false;
        }
   }
   else {
      std::cerr << "Not connected, cannot send message." << std::endl;
   }
}
bool WebSocketClient::send_binary_and_wait(const std::vector<char>& binary_data, int timeout_ms) {
    std::unique_lock<std::mutex> lock(m_mutex);
    m_message_received = false;
    if (m_connected.load()) {
        // 发送消息
        websocketpp::lib::error_code ec;
        m_client.send(m_hdl, binary_data.data(), binary_data.size(), websocketpp::frame::opcode::binary, ec);
        if (ec) {
            std::cerr << "Send failed: " << ec.message() << std::endl;
            return false;
        }
        // 等待消息或超时
        if (m_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() { return m_message_received; })) {
            std::cout << "Server response received." << std::endl;
            return true;
        }
        else {
            std::cerr << "Timeout: No response from server." << std::endl;
            return false;
        }
    }
    else {
        std::cerr << "Not connected, cannot send binary data." << std::endl;
    }
}
void WebSocketClient::close() {
    if (!m_connected.load()) {
        return;
    }
    try {
        m_reconnect.store(false);
        m_client.close(m_hdl, websocketpp::close::status::normal, "Closing connection");
    }
    catch (const std::exception& e) {
        std::cerr << "Error during close: " << e.what() << std::endl;
    }
    std::unique_lock<std::mutex> lock(m_mutex);
    m_cv.wait(lock, [this] { return m_close_done; });
    m_close_done = false;
    m_client.stop();
    if (m_thread.joinable()) {
        m_thread.join();
    }
    m_client.reset();
}
void WebSocketClient::set_open_handler(OpenHandler handler) {
    std::lock_guard<std::mutex> lock(m_mutex);
    m_open_handler = handler;
}
void WebSocketClient::set_message_handler(MessageHandler handler) {
    std::lock_guard<std::mutex> lock(m_mutex);
    m_message_handler = handler;
}
void WebSocketClient::set_close_handler(CloseHandler handler) {
    std::lock_guard<std::mutex> lock(m_mutex);
    m_close_handler = handler;
}
void WebSocketClient::set_fail_handler(FailHandler handler) {
    std::lock_guard<std::mutex> lock(m_mutex);
    m_fail_handler = handler;
}
void WebSocketClient::set_pong_handler(PongHandler handler) {
   std::lock_guard<std::mutex> lock(m_mutex);
   m_pong_handler = handler;
}
void WebSocketClient::set_heartbeat_interval(int interval) {
    std::lock_guard<std::mutex> lock(m_mutex);
    m_heartbeat_interval = interval;
}
std::string WebSocketClient::get_current_url() {
    std::lock_guard<std::mutex> lock(m_mutex);
    auto con = m_client.get_con_from_hdl(m_hdl);
    return con->get_uri()->str();
}
bool WebSocketClient::is_connected() const {
    return m_connected.load();
}
void WebSocketClient::run() {
    try {
        m_client.run();
    }
    catch (const std::exception& e) {
        std::cerr << "Error in run: " << e.what() << std::endl;
    }
    m_connected.store(false);
}
void WebSocketClient::heartbeat() {
    while (m_reconnect.load()) {
        std::this_thread::sleep_for(std::chrono::seconds(m_heartbeat_interval));
        if (!m_connected.load()) {
            std::cerr << "Attempting to reconnect..." << std::endl;
            connect(get_current_url());
        }
    }
}
void WebSocketClient::on_open(websocketpp::connection_hdl hdl) {
    std::cout << "Connection established" << std::endl;
    std::lock_guard<std::mutex> lock(m_mutex);
    m_connected.store(true);
    m_reconnect.store(true);
    if (m_open_handler) {
        m_open_handler();
    }
}
void WebSocketClient::on_message(connection_hdl hdl, message_ptr msg) {
    std::cout << "Received message: " << msg->get_payload() << std::endl;
    std::lock_guard<std::mutex> lock(m_mutex);
    if (m_message_handler) {
        m_message_handler(msg->get_payload());
    }
    m_message_received = true;
    m_cv.notify_one();  // 唤醒等待线程
}
void WebSocketClient::on_close(connection_hdl hdl) {
    std::cout << "Connection closed." << std::endl;
    std::lock_guard<std::mutex> lock(m_mutex);
    m_hdl.reset();  // 清空连接句柄
    m_connected.store(false);
    if (m_close_handler) {
        m_close_handler();
    }
    // 如果需要重连
    if (m_reconnect.load()) {
        // 停止当前运行的事件循环
        //m_client.stop();
        //if (m_thread.joinable()) {
        //    m_thread.join();
        //}
        std::cerr << "Connection lost. Attempting to reconnect..." << std::endl;
        //reconnect();  // 启动重连机制
    }
    else {
        m_cv.notify_one();
        m_close_done = true;
    }
}
void WebSocketClient::on_fail(websocketpp::connection_hdl hdl) {
    std::cerr << "Connection failed" << std::endl;
    std::lock_guard<std::mutex> lock(m_mutex);
    m_connected.store(false);
    if (m_fail_handler) {
        m_fail_handler();
    }
}
void WebSocketClient::on_pong(websocketpp::connection_hdl hdl, const std::string& payload) {
    std::cout << "Received pong response with payload: " << payload << std::endl;
    std::lock_guard<std::mutex> lock(m_mutex);
    m_connected.store(true);
    if (m_pong_handler) {
        m_pong_handler(payload);
    }
}
//WebSocketClient::WebSocketClient() : m_connected(false), m_reconnect(true), m_close_done(false), m_message_received(false), m_heartbeat_interval(5) {
//    // 清除之前的客户端配置并重新初始化
//    m_client.clear_access_channels(websocketpp::log::alevel::all);
//    m_client.clear_error_channels(websocketpp::log::elevel::all);
//    m_client.init_asio();
//
//    // 设置事件回调
//    m_client.set_open_handler([this](websocketpp::connection_hdl hdl) { on_open(hdl); });
//    m_client.set_message_handler([this](websocketpp::connection_hdl hdl, message_ptr msg) { on_message(hdl, msg); });
//    m_client.set_close_handler([this](websocketpp::connection_hdl hdl) { on_close(hdl); });
//    m_client.set_fail_handler([this](websocketpp::connection_hdl hdl) { on_fail(hdl); });
//   m_client.set_pong_handler([this](websocketpp::connection_hdl hdl, const std::string& payload) { on_pong(hdl, payload); });
//}
//
//WebSocketClient::~WebSocketClient() {
//    close();
//}
//
//void WebSocketClient::connect(const std::string& uri) {
//    websocketpp::lib::error_code ec;
//    {
//        m_connected.store(false);
//        m_reconnect.store(false);
//        m_client.clear_access_channels(websocketpp::log::alevel::all);
//        m_client.clear_error_channels(websocketpp::log::elevel::all);
//
//        std::lock_guard<std::mutex> lock(m_mutex);  // 保护共享资源
//        auto con = m_client.get_connection(uri, ec);
//
//        if (ec) {
//            std::cerr << "Get connection failed: " << ec.message() << std::endl;
//            return;
//        }
//
//        m_hdl = con->get_handle();
//        m_client.connect(con);
//    }
//
//    // 启动工作线程
//    m_thread = std::thread(&WebSocketClient::run, this);
//}
//
//bool WebSocketClient::is_busy() const {
//    return m_busy.load();
//}
//
//void WebSocketClient::set_busy(bool busy) {
//    m_busy.store(busy);
//}
//
//void WebSocketClient::send(const std::string& message) {
//    std::lock_guard<std::mutex> lock(m_mutex);
//    if (m_connected.load()) {
//        websocketpp::lib::error_code ec;
//        m_client.send(m_hdl, message, websocketpp::frame::opcode::text, ec);
//        if (ec) {
//            std::cerr << "Send failed: " << ec.message() << std::endl;
//        }
//    }
//    else {
//        std::cerr << "Not connected, cannot send message." << std::endl;
//    }
//}
//
//void WebSocketClient::send_binary(const std::vector<char>& binary_data) {
//    std::lock_guard<std::mutex> lock(m_mutex);
//    if (m_connected.load()) {
//        websocketpp::lib::error_code ec;
//        m_client.send(m_hdl, binary_data.data(), binary_data.size(), websocketpp::frame::opcode::binary, ec);
//        if (ec) {
//            std::cerr << "Send binary data failed: " << ec.message() << std::endl;
//        }
//    }
//    else {
//        std::cerr << "Not connected, cannot send binary data." << std::endl;
//    }
//}
//
//void WebSocketClient::enqueue_message(const std::string& message) {
//    m_message_queue.push(message);
//    process_next_message();
//}
//
//void WebSocketClient::enqueue_binary(const std::vector<char>& binary_data) {
//    m_binary_queue.push(binary_data);
//    process_next_binary();
//}
//
//void WebSocketClient::process_next_message() {
//    if (!m_message_queue.empty() && !is_busy()) {
//        set_busy(true);
//        std::string message = m_message_queue.front();
//        m_message_queue.pop();
//
//        send(message); // 发送消息
//
//        set_busy(false); // 设置为空闲状态,并处理下一个
//        process_next_message();
//    }
//}
//
//void WebSocketClient::process_next_binary() {
//    if (!m_binary_queue.empty() && !is_busy()) {
//        set_busy(true);
//        std::vector<char> binary_data = m_binary_queue.front();
//        m_binary_queue.pop();
//
//        send_binary(binary_data); // 发送二进制数据
//
//        set_busy(false); // 设置为空闲状态,并处理下一个
//        process_next_binary();
//    }
//}
//
//bool WebSocketClient::send_and_wait(const std::string& message, int timeout_ms) {
//    std::unique_lock<std::mutex> lock(m_mutex);
//    m_message_received = false;
//
//    if (m_connected.load()) {
//        // 发送消息
//        websocketpp::lib::error_code ec;
//        m_client.send(m_hdl, message, websocketpp::frame::opcode::text, ec);
//        if (ec) {
//            std::cerr << "Send failed: " << ec.message() << std::endl;
//            return false;
//        }
//
//        // 等待消息或超时
//        if (m_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() { return m_message_received; })) {
//            std::cout << "Server response received." << std::endl;
//            return true;
//        }
//        else {
//            std::cerr << "Timeout: No response from server." << std::endl;
//            return false;
//        }
//   }
//   else {
//      std::cerr << "Not connected, cannot send message." << std::endl;
//   }
//}
//
//bool WebSocketClient::send_binary_and_wait(const std::vector<char>& binary_data, int timeout_ms) {
//    std::unique_lock<std::mutex> lock(m_mutex);
//    m_message_received = false;
//
//    if (m_connected.load()) {
//        // 发送消息
//        websocketpp::lib::error_code ec;
//        m_client.send(m_hdl, binary_data.data(), binary_data.size(), websocketpp::frame::opcode::binary, ec);
//        if (ec) {
//            std::cerr << "Send failed: " << ec.message() << std::endl;
//            return false;
//        }
//
//        // 等待消息或超时
//        if (m_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() { return m_message_received; })) {
//            std::cout << "Server response received." << std::endl;
//            return true;
//        }
//        else {
//            std::cerr << "Timeout: No response from server." << std::endl;
//            return false;
//        }
//    }
//    else {
//        std::cerr << "Not connected, cannot send binary data." << std::endl;
//    }
//}
//
//void WebSocketClient::close() {
//    if (!m_connected.load()) {
//        return;
//    }
//
//    try {
//        m_reconnect.store(false);
//        m_client.close(m_hdl, websocketpp::close::status::normal, "Closing connection");
//    }
//    catch (const std::exception& e) {
//        std::cerr << "Error during close: " << e.what() << std::endl;
//    }
//
//    std::unique_lock<std::mutex> lock(m_mutex);
//    m_cv.wait(lock, [this] { return m_close_done; });
//    m_close_done = false;
//
//    m_client.stop();
//    if (m_thread.joinable()) {
//        m_thread.join();
//    }
//
//    m_client.reset();
//}
//
//void WebSocketClient::set_open_handler(OpenHandler handler) {
//    std::lock_guard<std::mutex> lock(m_mutex);
//    m_open_handler = handler;
//}
//
//void WebSocketClient::set_message_handler(MessageHandler handler) {
//    std::lock_guard<std::mutex> lock(m_mutex);
//    m_message_handler = handler;
//}
//
//void WebSocketClient::set_close_handler(CloseHandler handler) {
//    std::lock_guard<std::mutex> lock(m_mutex);
//    m_close_handler = handler;
//}
//
//void WebSocketClient::set_fail_handler(FailHandler handler) {
//    std::lock_guard<std::mutex> lock(m_mutex);
//    m_fail_handler = handler;
//}
//
//void WebSocketClient::set_pong_handler(PongHandler handler) {
//   std::lock_guard<std::mutex> lock(m_mutex);
//   m_pong_handler = handler;
//}
//
//void WebSocketClient::set_heartbeat_interval(int interval) {
//    std::lock_guard<std::mutex> lock(m_mutex);
//    m_heartbeat_interval = interval;
//}
//
//std::string WebSocketClient::get_current_url() {
//    std::lock_guard<std::mutex> lock(m_mutex);
//    auto con = m_client.get_con_from_hdl(m_hdl);
//    return con->get_uri()->str();
//}
//
//bool WebSocketClient::is_connected() const {
//    return m_connected.load();
//}
//
//void WebSocketClient::run() {
//    try {
//        m_client.run();
//    }
//    catch (const std::exception& e) {
//        std::cerr << "Error in run: " << e.what() << std::endl;
//    }
//
//    m_connected.store(false);
//}
//
//void WebSocketClient::heartbeat() {
//    while (m_reconnect.load()) {
//        std::this_thread::sleep_for(std::chrono::seconds(m_heartbeat_interval));
//        if (!m_connected.load()) {
//            std::cerr << "Attempting to reconnect..." << std::endl;
//            connect(get_current_url());
//        }
//    }
//}
//
//void WebSocketClient::on_open(websocketpp::connection_hdl hdl) {
//    std::cout << "Connection established" << std::endl;
//    std::lock_guard<std::mutex> lock(m_mutex);
//    m_connected.store(true);
//    m_reconnect.store(true);
//    if (m_open_handler) {
//        m_open_handler();
//    }
//}
//
//void WebSocketClient::on_message(connection_hdl hdl, message_ptr msg) {
//    std::cout << "Received message: " << msg->get_payload() << std::endl;
//    std::lock_guard<std::mutex> lock(m_mutex);
//    if (m_message_handler) {
//        m_message_handler(msg->get_payload());
//    }
//
//    m_message_received = true;
//    m_cv.notify_one();  // 唤醒等待线程
//}
//
//void WebSocketClient::on_close(connection_hdl hdl) {
//    std::cout << "Connection closed." << std::endl;
//    std::lock_guard<std::mutex> lock(m_mutex);
//
//    m_hdl.reset();  // 清空连接句柄
//    m_connected.store(false);
//    if (m_close_handler) {
//        m_close_handler();
//    }
//
//    // 如果需要重连
//    if (m_reconnect.load()) {
//
//        // 停止当前运行的事件循环
//        //m_client.stop();
//        //if (m_thread.joinable()) {
//        //    m_thread.join();
//        //}
//        std::cerr << "Connection lost. Attempting to reconnect..." << std::endl;
//        //reconnect();  // 启动重连机制
//    }
//    else {
//        m_cv.notify_one();
//        m_close_done = true;
//    }
//}
//
//void WebSocketClient::on_fail(websocketpp::connection_hdl hdl) {
//    std::cerr << "Connection failed" << std::endl;
//    std::lock_guard<std::mutex> lock(m_mutex);
//    m_connected.store(false);
//
//    if (m_fail_handler) {
//        m_fail_handler();
//    }
//}
//
//void WebSocketClient::on_pong(websocketpp::connection_hdl hdl, const std::string& payload) {
//    std::cout << "Received pong response with payload: " << payload << std::endl;
//    std::lock_guard<std::mutex> lock(m_mutex);
//    m_connected.store(true);
//
//    if (m_pong_handler) {
//        m_pong_handler(payload);
//    }
//}