#include "stdafx.h"
|
#include "WebSocketClient.h"
|
|
WebSocketClient::WebSocketClient() : m_connected(false), m_reconnect(true), m_close_done(false), m_message_received(false), m_heartbeat_interval(5) {
|
// Çå³ý֮ǰµÄ¿Í»§¶ËÅäÖò¢ÖØÐ³õʼ»¯
|
m_client.clear_access_channels(websocketpp::log::alevel::all);
|
m_client.clear_error_channels(websocketpp::log::elevel::all);
|
m_client.init_asio();
|
|
// ÉèÖÃʼþ»Øµ÷
|
m_client.set_open_handler([this](websocketpp::connection_hdl hdl) { on_open(hdl); });
|
m_client.set_message_handler([this](websocketpp::connection_hdl hdl, message_ptr msg) { on_message(hdl, msg); });
|
m_client.set_close_handler([this](websocketpp::connection_hdl hdl) { on_close(hdl); });
|
m_client.set_fail_handler([this](websocketpp::connection_hdl hdl) { on_fail(hdl); });
|
m_client.set_pong_handler([this](websocketpp::connection_hdl hdl, const std::string& payload) { on_pong(hdl, payload); });
|
}
|
|
WebSocketClient::~WebSocketClient() {
|
close();
|
}
|
|
void WebSocketClient::connect(const std::string& uri) {
|
websocketpp::lib::error_code ec;
|
{
|
m_connected.store(false);
|
m_reconnect.store(false);
|
m_client.clear_access_channels(websocketpp::log::alevel::all);
|
m_client.clear_error_channels(websocketpp::log::elevel::all);
|
|
std::lock_guard<std::mutex> lock(m_mutex); // ±£»¤¹²Ïí×ÊÔ´
|
auto con = m_client.get_connection(uri, ec);
|
|
if (ec) {
|
std::cerr << "Get connection failed: " << ec.message() << std::endl;
|
return;
|
}
|
|
m_hdl = con->get_handle();
|
m_client.connect(con);
|
}
|
|
// Æô¶¯¹¤×÷Ïß³Ì
|
m_thread = std::thread(&WebSocketClient::run, this);
|
}
|
|
bool WebSocketClient::is_busy() const {
|
return m_busy.load();
|
}
|
|
void WebSocketClient::set_busy(bool busy) {
|
m_busy.store(busy);
|
}
|
|
void WebSocketClient::send(const std::string& message) {
|
std::lock_guard<std::mutex> lock(m_mutex);
|
if (m_connected.load()) {
|
websocketpp::lib::error_code ec;
|
m_client.send(m_hdl, message, websocketpp::frame::opcode::text, ec);
|
if (ec) {
|
std::cerr << "Send failed: " << ec.message() << std::endl;
|
}
|
}
|
else {
|
std::cerr << "Not connected, cannot send message." << std::endl;
|
}
|
}
|
|
void WebSocketClient::send_binary(const std::vector<char>& binary_data) {
|
std::lock_guard<std::mutex> lock(m_mutex);
|
if (m_connected.load()) {
|
websocketpp::lib::error_code ec;
|
m_client.send(m_hdl, binary_data.data(), binary_data.size(), websocketpp::frame::opcode::binary, ec);
|
if (ec) {
|
std::cerr << "Send binary data failed: " << ec.message() << std::endl;
|
}
|
}
|
else {
|
std::cerr << "Not connected, cannot send binary data." << std::endl;
|
}
|
}
|
|
void WebSocketClient::enqueue_message(const std::string& message) {
|
m_message_queue.push(message);
|
process_next_message();
|
}
|
|
void WebSocketClient::enqueue_binary(const std::vector<char>& binary_data) {
|
m_binary_queue.push(binary_data);
|
process_next_binary();
|
}
|
|
void WebSocketClient::process_next_message() {
|
if (!m_message_queue.empty() && !is_busy()) {
|
set_busy(true);
|
std::string message = m_message_queue.front();
|
m_message_queue.pop();
|
|
send(message); // ·¢ËÍÏûÏ¢
|
|
set_busy(false); // ÉèÖÃΪ¿ÕÏÐ״̬£¬²¢´¦ÀíÏÂÒ»¸ö
|
process_next_message();
|
}
|
}
|
|
void WebSocketClient::process_next_binary() {
|
if (!m_binary_queue.empty() && !is_busy()) {
|
set_busy(true);
|
std::vector<char> binary_data = m_binary_queue.front();
|
m_binary_queue.pop();
|
|
send_binary(binary_data); // ·¢ËͶþ½øÖÆÊý¾Ý
|
|
set_busy(false); // ÉèÖÃΪ¿ÕÏÐ״̬£¬²¢´¦ÀíÏÂÒ»¸ö
|
process_next_binary();
|
}
|
}
|
|
bool WebSocketClient::send_and_wait(const std::string& message, int timeout_ms) {
|
std::unique_lock<std::mutex> lock(m_mutex);
|
m_message_received = false;
|
|
if (m_connected.load()) {
|
// ·¢ËÍÏûÏ¢
|
websocketpp::lib::error_code ec;
|
m_client.send(m_hdl, message, websocketpp::frame::opcode::text, ec);
|
if (ec) {
|
std::cerr << "Send failed: " << ec.message() << std::endl;
|
return false;
|
}
|
|
// µÈ´ýÏûÏ¢»ò³¬Ê±
|
if (m_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() { return m_message_received; })) {
|
std::cout << "Server response received." << std::endl;
|
return true;
|
}
|
else {
|
std::cerr << "Timeout: No response from server." << std::endl;
|
return false;
|
}
|
}
|
else {
|
std::cerr << "Not connected, cannot send message." << std::endl;
|
}
|
}
|
|
bool WebSocketClient::send_binary_and_wait(const std::vector<char>& binary_data, int timeout_ms) {
|
std::unique_lock<std::mutex> lock(m_mutex);
|
m_message_received = false;
|
|
if (m_connected.load()) {
|
// ·¢ËÍÏûÏ¢
|
websocketpp::lib::error_code ec;
|
m_client.send(m_hdl, binary_data.data(), binary_data.size(), websocketpp::frame::opcode::binary, ec);
|
if (ec) {
|
std::cerr << "Send failed: " << ec.message() << std::endl;
|
return false;
|
}
|
|
// µÈ´ýÏûÏ¢»ò³¬Ê±
|
if (m_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() { return m_message_received; })) {
|
std::cout << "Server response received." << std::endl;
|
return true;
|
}
|
else {
|
std::cerr << "Timeout: No response from server." << std::endl;
|
return false;
|
}
|
}
|
else {
|
std::cerr << "Not connected, cannot send binary data." << std::endl;
|
}
|
}
|
|
void WebSocketClient::close() {
|
if (!m_connected.load()) {
|
return;
|
}
|
|
try {
|
m_reconnect.store(false);
|
m_client.close(m_hdl, websocketpp::close::status::normal, "Closing connection");
|
}
|
catch (const std::exception& e) {
|
std::cerr << "Error during close: " << e.what() << std::endl;
|
}
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
m_cv.wait(lock, [this] { return m_close_done; });
|
m_close_done = false;
|
|
m_client.stop();
|
if (m_thread.joinable()) {
|
m_thread.join();
|
}
|
|
m_client.reset();
|
}
|
|
void WebSocketClient::set_open_handler(OpenHandler handler) {
|
std::lock_guard<std::mutex> lock(m_mutex);
|
m_open_handler = handler;
|
}
|
|
void WebSocketClient::set_message_handler(MessageHandler handler) {
|
std::lock_guard<std::mutex> lock(m_mutex);
|
m_message_handler = handler;
|
}
|
|
void WebSocketClient::set_close_handler(CloseHandler handler) {
|
std::lock_guard<std::mutex> lock(m_mutex);
|
m_close_handler = handler;
|
}
|
|
void WebSocketClient::set_fail_handler(FailHandler handler) {
|
std::lock_guard<std::mutex> lock(m_mutex);
|
m_fail_handler = handler;
|
}
|
|
void WebSocketClient::set_pong_handler(PongHandler handler) {
|
std::lock_guard<std::mutex> lock(m_mutex);
|
m_pong_handler = handler;
|
}
|
|
void WebSocketClient::set_heartbeat_interval(int interval) {
|
std::lock_guard<std::mutex> lock(m_mutex);
|
m_heartbeat_interval = interval;
|
}
|
|
std::string WebSocketClient::get_current_url() {
|
std::lock_guard<std::mutex> lock(m_mutex);
|
auto con = m_client.get_con_from_hdl(m_hdl);
|
return con->get_uri()->str();
|
}
|
|
bool WebSocketClient::is_connected() const {
|
return m_connected.load();
|
}
|
|
void WebSocketClient::run() {
|
try {
|
m_client.run();
|
}
|
catch (const std::exception& e) {
|
std::cerr << "Error in run: " << e.what() << std::endl;
|
}
|
|
m_connected.store(false);
|
}
|
|
void WebSocketClient::heartbeat() {
|
while (m_reconnect.load()) {
|
std::this_thread::sleep_for(std::chrono::seconds(m_heartbeat_interval));
|
if (!m_connected.load()) {
|
std::cerr << "Attempting to reconnect..." << std::endl;
|
connect(get_current_url());
|
}
|
}
|
}
|
|
void WebSocketClient::on_open(websocketpp::connection_hdl hdl) {
|
std::cout << "Connection established" << std::endl;
|
std::lock_guard<std::mutex> lock(m_mutex);
|
m_connected.store(true);
|
m_reconnect.store(true);
|
if (m_open_handler) {
|
m_open_handler();
|
}
|
}
|
|
void WebSocketClient::on_message(connection_hdl hdl, message_ptr msg) {
|
std::cout << "Received message: " << msg->get_payload() << std::endl;
|
std::lock_guard<std::mutex> lock(m_mutex);
|
if (m_message_handler) {
|
m_message_handler(msg->get_payload());
|
}
|
|
m_message_received = true;
|
m_cv.notify_one(); // »½ÐѵȴýÏß³Ì
|
}
|
|
void WebSocketClient::on_close(connection_hdl hdl) {
|
std::cout << "Connection closed." << std::endl;
|
std::lock_guard<std::mutex> lock(m_mutex);
|
|
m_hdl.reset(); // Çå¿ÕÁ¬½Ó¾ä±ú
|
m_connected.store(false);
|
if (m_close_handler) {
|
m_close_handler();
|
}
|
|
// Èç¹ûÐèÒªÖØÁ¬
|
if (m_reconnect.load()) {
|
|
// Í£Ö¹µ±Ç°ÔËÐеÄʼþÑ»·
|
//m_client.stop();
|
//if (m_thread.joinable()) {
|
// m_thread.join();
|
//}
|
std::cerr << "Connection lost. Attempting to reconnect..." << std::endl;
|
//reconnect(); // Æô¶¯ÖØÁ¬»úÖÆ
|
}
|
else {
|
m_cv.notify_one();
|
m_close_done = true;
|
}
|
}
|
|
void WebSocketClient::on_fail(websocketpp::connection_hdl hdl) {
|
std::cerr << "Connection failed" << std::endl;
|
std::lock_guard<std::mutex> lock(m_mutex);
|
m_connected.store(false);
|
|
if (m_fail_handler) {
|
m_fail_handler();
|
}
|
}
|
|
void WebSocketClient::on_pong(websocketpp::connection_hdl hdl, const std::string& payload) {
|
std::cout << "Received pong response with payload: " << payload << std::endl;
|
std::lock_guard<std::mutex> lock(m_mutex);
|
m_connected.store(true);
|
|
if (m_pong_handler) {
|
m_pong_handler(payload);
|
}
|
}
|