#include "stdafx.h" #include "WebSocketClient.h" WebSocketClient::WebSocketClient() : m_connected(false), m_reconnect(true), m_close_done(false), m_message_received(false), m_heartbeat_interval(5) { // Çå³ý֮ǰµÄ¿Í»§¶ËÅäÖò¢ÖØÐ³õʼ»¯ m_client.clear_access_channels(websocketpp::log::alevel::all); m_client.clear_error_channels(websocketpp::log::elevel::all); m_client.init_asio(); // ÉèÖÃʼþ»Øµ÷ m_client.set_open_handler([this](websocketpp::connection_hdl hdl) { on_open(hdl); }); m_client.set_message_handler([this](websocketpp::connection_hdl hdl, message_ptr msg) { on_message(hdl, msg); }); m_client.set_close_handler([this](websocketpp::connection_hdl hdl) { on_close(hdl); }); m_client.set_fail_handler([this](websocketpp::connection_hdl hdl) { on_fail(hdl); }); m_client.set_pong_handler([this](websocketpp::connection_hdl hdl, const std::string& payload) { on_pong(hdl, payload); }); } WebSocketClient::~WebSocketClient() { close(); } void WebSocketClient::connect(const std::string& uri) { websocketpp::lib::error_code ec; { m_connected.store(false); m_reconnect.store(false); m_client.clear_access_channels(websocketpp::log::alevel::all); m_client.clear_error_channels(websocketpp::log::elevel::all); std::lock_guard lock(m_mutex); // ±£»¤¹²Ïí×ÊÔ´ auto con = m_client.get_connection(uri, ec); if (ec) { std::cerr << "Get connection failed: " << ec.message() << std::endl; return; } m_hdl = con->get_handle(); m_client.connect(con); } // Æô¶¯¹¤×÷Ïß³Ì m_thread = std::thread(&WebSocketClient::run, this); } bool WebSocketClient::is_busy() const { return m_busy.load(); } void WebSocketClient::set_busy(bool busy) { m_busy.store(busy); } void WebSocketClient::send(const std::string& message) { std::lock_guard lock(m_mutex); if (m_connected.load()) { websocketpp::lib::error_code ec; m_client.send(m_hdl, message, websocketpp::frame::opcode::text, ec); if (ec) { std::cerr << "Send failed: " << ec.message() << std::endl; } } else { std::cerr << "Not connected, cannot send message." << std::endl; } } void WebSocketClient::send_binary(const std::vector& binary_data) { std::lock_guard lock(m_mutex); if (m_connected.load()) { websocketpp::lib::error_code ec; m_client.send(m_hdl, binary_data.data(), binary_data.size(), websocketpp::frame::opcode::binary, ec); if (ec) { std::cerr << "Send binary data failed: " << ec.message() << std::endl; } } else { std::cerr << "Not connected, cannot send binary data." << std::endl; } } void WebSocketClient::enqueue_message(const std::string& message) { m_message_queue.push(message); process_next_message(); } void WebSocketClient::enqueue_binary(const std::vector& binary_data) { m_binary_queue.push(binary_data); process_next_binary(); } void WebSocketClient::process_next_message() { if (!m_message_queue.empty() && !is_busy()) { set_busy(true); std::string message = m_message_queue.front(); m_message_queue.pop(); send(message); // ·¢ËÍÏûÏ¢ set_busy(false); // ÉèÖÃΪ¿ÕÏÐ״̬£¬²¢´¦ÀíÏÂÒ»¸ö process_next_message(); } } void WebSocketClient::process_next_binary() { if (!m_binary_queue.empty() && !is_busy()) { set_busy(true); std::vector binary_data = m_binary_queue.front(); m_binary_queue.pop(); send_binary(binary_data); // ·¢ËͶþ½øÖÆÊý¾Ý set_busy(false); // ÉèÖÃΪ¿ÕÏÐ״̬£¬²¢´¦ÀíÏÂÒ»¸ö process_next_binary(); } } bool WebSocketClient::send_and_wait(const std::string& message, int timeout_ms) { std::unique_lock lock(m_mutex); m_message_received = false; if (m_connected.load()) { // ·¢ËÍÏûÏ¢ websocketpp::lib::error_code ec; m_client.send(m_hdl, message, websocketpp::frame::opcode::text, ec); if (ec) { std::cerr << "Send failed: " << ec.message() << std::endl; return false; } // µÈ´ýÏûÏ¢»ò³¬Ê± if (m_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() { return m_message_received; })) { std::cout << "Server response received." << std::endl; return true; } else { std::cerr << "Timeout: No response from server." << std::endl; return false; } } else { std::cerr << "Not connected, cannot send message." << std::endl; } } bool WebSocketClient::send_binary_and_wait(const std::vector& binary_data, int timeout_ms) { std::unique_lock lock(m_mutex); m_message_received = false; if (m_connected.load()) { // ·¢ËÍÏûÏ¢ websocketpp::lib::error_code ec; m_client.send(m_hdl, binary_data.data(), binary_data.size(), websocketpp::frame::opcode::binary, ec); if (ec) { std::cerr << "Send failed: " << ec.message() << std::endl; return false; } // µÈ´ýÏûÏ¢»ò³¬Ê± if (m_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() { return m_message_received; })) { std::cout << "Server response received." << std::endl; return true; } else { std::cerr << "Timeout: No response from server." << std::endl; return false; } } else { std::cerr << "Not connected, cannot send binary data." << std::endl; } } void WebSocketClient::close() { if (!m_connected.load()) { return; } try { m_reconnect.store(false); m_client.close(m_hdl, websocketpp::close::status::normal, "Closing connection"); } catch (const std::exception& e) { std::cerr << "Error during close: " << e.what() << std::endl; } std::unique_lock lock(m_mutex); m_cv.wait(lock, [this] { return m_close_done; }); m_close_done = false; m_client.stop(); if (m_thread.joinable()) { m_thread.join(); } m_client.reset(); } void WebSocketClient::set_open_handler(OpenHandler handler) { std::lock_guard lock(m_mutex); m_open_handler = handler; } void WebSocketClient::set_message_handler(MessageHandler handler) { std::lock_guard lock(m_mutex); m_message_handler = handler; } void WebSocketClient::set_close_handler(CloseHandler handler) { std::lock_guard lock(m_mutex); m_close_handler = handler; } void WebSocketClient::set_fail_handler(FailHandler handler) { std::lock_guard lock(m_mutex); m_fail_handler = handler; } void WebSocketClient::set_pong_handler(PongHandler handler) { std::lock_guard lock(m_mutex); m_pong_handler = handler; } void WebSocketClient::set_heartbeat_interval(int interval) { std::lock_guard lock(m_mutex); m_heartbeat_interval = interval; } std::string WebSocketClient::get_current_url() { std::lock_guard lock(m_mutex); auto con = m_client.get_con_from_hdl(m_hdl); return con->get_uri()->str(); } bool WebSocketClient::is_connected() const { return m_connected.load(); } void WebSocketClient::run() { try { m_client.run(); } catch (const std::exception& e) { std::cerr << "Error in run: " << e.what() << std::endl; } m_connected.store(false); } void WebSocketClient::heartbeat() { while (m_reconnect.load()) { std::this_thread::sleep_for(std::chrono::seconds(m_heartbeat_interval)); if (!m_connected.load()) { std::cerr << "Attempting to reconnect..." << std::endl; connect(get_current_url()); } } } void WebSocketClient::on_open(websocketpp::connection_hdl hdl) { std::cout << "Connection established" << std::endl; std::lock_guard lock(m_mutex); m_connected.store(true); m_reconnect.store(true); if (m_open_handler) { m_open_handler(); } } void WebSocketClient::on_message(connection_hdl hdl, message_ptr msg) { std::cout << "Received message: " << msg->get_payload() << std::endl; std::lock_guard lock(m_mutex); if (m_message_handler) { m_message_handler(msg->get_payload()); } m_message_received = true; m_cv.notify_one(); // »½ÐѵȴýÏß³Ì } void WebSocketClient::on_close(connection_hdl hdl) { std::cout << "Connection closed." << std::endl; std::lock_guard lock(m_mutex); m_hdl.reset(); // Çå¿ÕÁ¬½Ó¾ä±ú m_connected.store(false); if (m_close_handler) { m_close_handler(); } // Èç¹ûÐèÒªÖØÁ¬ if (m_reconnect.load()) { // Í£Ö¹µ±Ç°ÔËÐеÄʼþÑ­»· //m_client.stop(); //if (m_thread.joinable()) { // m_thread.join(); //} std::cerr << "Connection lost. Attempting to reconnect..." << std::endl; //reconnect(); // Æô¶¯ÖØÁ¬»úÖÆ } else { m_cv.notify_one(); m_close_done = true; } } void WebSocketClient::on_fail(websocketpp::connection_hdl hdl) { std::cerr << "Connection failed" << std::endl; std::lock_guard lock(m_mutex); m_connected.store(false); if (m_fail_handler) { m_fail_handler(); } } void WebSocketClient::on_pong(websocketpp::connection_hdl hdl, const std::string& payload) { std::cout << "Received pong response with payload: " << payload << std::endl; std::lock_guard lock(m_mutex); m_connected.store(true); if (m_pong_handler) { m_pong_handler(payload); } }