#include "StdAfx.h" #include "WebSocketClientPool.h" WebSocketClientPool::WebSocketClientPool(size_t thread_count) : m_thread_pool(thread_count) {} WebSocketClientPool::~WebSocketClientPool() {} void WebSocketClientPool::init_clients(const std::vector& uris) { m_clients.clear(); for (const auto& uri : uris) { try { if (!uri.empty() && uri.size() > 8) { // "ws://localhost:8080" auto client = std::make_shared(); client->connect(uri); m_clients.push_back(client); } } catch (const std::exception& e) { std::cerr << "Error connecting to " << uri << ": " << e.what() << std::endl; exit_clients(); } } } void WebSocketClientPool::exit_clients() { for (auto& client : m_clients) { if (client->is_connected()) { client->close(); } } // Çå¿Õ֮ǰµÄ¿Í»§¶Ë£¬È·±£ÊǸɾ»µÄ״̬ m_clients.clear(); } void WebSocketClientPool::post_to_all(const std::string& message) { for (auto& client : m_clients) { m_thread_pool.enqueue([client, message]() { if (client->is_connected()) { client->send(message); } }); } } void WebSocketClientPool::post_binary_to_all(const std::vector& binary_data) { for (auto& client : m_clients) { m_thread_pool.enqueue([client, binary_data]() { if (client->is_connected()) { client->send_binary(binary_data); } }); } } bool WebSocketClientPool::post_to_idle_client(const std::string& message) { for (auto& client : m_clients) { if (client->is_connected()) { // Ìá½»·¢ËÍÈÎÎñµ½Ïß³Ì³Ø m_thread_pool.enqueue([client, message]() { client->set_busy(true); // ±ê¼ÇΪæµ client->send(message); // ·¢ËÍÏûÏ¢ client->set_busy(false); // Íê³Éºó±ê¼ÇΪ¿ÕÏÐ }); return true; // ³É¹¦Ìá½»ÈÎÎñ } } return false; // ûÓпÉÓÿͻ§¶Ë } bool WebSocketClientPool::post_binary_to_idle_client(const std::vector& binary_data) { for (auto& client : m_clients) { if (client->is_connected()) { // Ìá½»·¢ËÍÈÎÎñµ½Ïß³Ì³Ø m_thread_pool.enqueue([client, binary_data]() { client->set_busy(true); // ±ê¼ÇΪæµ client->send_binary(binary_data); // ·¢ËͶþ½øÖÆÊý¾Ý client->set_busy(false); // Íê³Éºó±ê¼ÇΪ¿ÕÏÐ }); return true; // ³É¹¦Ìá½»ÈÎÎñ } } return false; // ûÓпÉÓÿͻ§¶Ë } bool WebSocketClientPool::send_to_idle_client(const std::string& message, int timeout_ms) { for (auto& client : m_clients) { if (!client->is_busy() && client->is_connected()) { // ±ê¼ÇΪæ client->set_busy(true); // ʹÓÃÏ̳߳ط¢ËÍÏûÏ¢ m_thread_pool.enqueue([client, message, timeout_ms]() { // ·¢ËÍÏûÏ¢²¢µÈ´ý·þÎñÆ÷»Ø¸´ client->send_and_wait(message, timeout_ms); // 5Ã볬ʱ client->set_busy(false); // ±ê¼ÇΪ¿ÕÏÐ }); return true; // ³É¹¦·¢ËÍÏûÏ¢£¬·µ»Ø true } } return false; // ûÓпÕÏпͻ§¶Ë } bool WebSocketClientPool::send_binary_to_idle_client(const std::vector& binary_data, int timeout_ms) { for (auto& client : m_clients) { if (!client->is_busy() && client->is_connected()) { // ±ê¼ÇΪæ client->set_busy(true); // ʹÓÃÏ̳߳ط¢ËÍÏûÏ¢ m_thread_pool.enqueue([client, binary_data, timeout_ms]() { // ·¢ËÍÏûÏ¢²¢µÈ´ý·þÎñÆ÷»Ø¸´ client->send_binary_and_wait(binary_data, timeout_ms); client->set_busy(false); // ±ê¼ÇΪ¿ÕÏÐ }); return true; // ³É¹¦·¢ËÍÏûÏ¢£¬·µ»Ø true } } return false; // ûÓпÕÏпͻ§¶Ë } bool WebSocketClientPool::send_to_idle_client_and_wait(const std::string& message, int timeout_ms) { for (auto& client : m_clients) { if (!client->is_busy() && client->is_connected()) { client->set_busy(true); // ±ê¼ÇΪæ try { // Ö±½ÓÔÚµ±Ç°Ï̵߳÷Óà send_and_wait£¬×èÈûÖ±µ½ÈÎÎñÍê³É bool result = client->send_and_wait(message, timeout_ms); client->set_busy(false); // ÉèÖÃΪ¿ÕÏÐ״̬ return result; // ·µ»Ø½á¹û } catch (const std::exception& e) { std::cerr << "Exception during sending: " << e.what() << std::endl; client->set_busy(false); // ¼´Ê¹·¢ÉúÒì³££¬Ò²ÒªÖØÖÃΪδæ״̬ return false; // ·¢ÉúÒì³££¬·µ»ØÊ§°Ü } } } return false; // ûÓпÕÏпͻ§¶Ë } bool WebSocketClientPool::send_binary_to_idle_client_and_wait(const std::vector& binary_data, int timeout_ms) { for (auto& client : m_clients) { if (!client->is_busy() && client->is_connected()) { client->set_busy(true); // ±ê¼ÇΪæ try { // Ö±½ÓÔÚµ±Ç°Ï̵߳÷Óà send_and_wait£¬×èÈûÖ±µ½ÈÎÎñÍê³É bool result = client->send_binary_and_wait(binary_data, timeout_ms); client->set_busy(false); // ÉèÖÃΪ¿ÕÏÐ״̬ return result; // ·µ»Ø½á¹û } catch (const std::exception& e) { std::cerr << "Exception during sending: " << e.what() << std::endl; client->set_busy(false); // ¼´Ê¹·¢ÉúÒì³££¬Ò²ÒªÖØÖÃΪδæ״̬ return false; // ·¢ÉúÒì³££¬·µ»ØÊ§°Ü } } } return false; // ûÓпÕÏпͻ§¶Ë } bool WebSocketClientPool::send_to_idle_client_with_retry(const std::string& message, int total_timeout_ms, int retry_interval_ms) { auto start_time = std::chrono::steady_clock::now(); while (true) { // ²éÕÒ¿ÕÏпͻ§¶Ë for (auto& client : m_clients) { if (!client->is_busy() && client->is_connected()) { // ±ê¼ÇΪæ client->set_busy(true); // ʹÓÃÏ̳߳ط¢ËÍÏûÏ¢ m_thread_pool.enqueue([client, message]() { try { // ·¢ËÍÏûÏ¢²¢µÈ´ý·þÎñÆ÷»Ø¸´ client->send_and_wait(message, 5000); // ÕâÀïµÄ³¬Ê±¿ÉÒÔ¸ù¾ÝÐèÒªµ÷Õû } catch (const std::exception& e) { std::cerr << "Error sending data: " << e.what() << std::endl; } client->set_busy(false); // ±ê¼ÇΪ¿ÕÏÐ }); return true; // ³É¹¦·¢ËÍ } } // ¼ÆËãÒÑʹÓõÄʱ¼ä auto elapsed_time = std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time).count(); if (elapsed_time >= total_timeout_ms) { std::cerr << "Timeout: failed to send data within the specified timeout of " << total_timeout_ms << " ms." << std::endl; return false; // ³¬Ê±£¬ÎÞ·¨·¢ËÍ } // ûÓÐÕÒµ½¿ÕÏпͻ§¶Ë£¬µÈ´ý retry_interval_ms ºÁÃëºóÖØÊÔ std::this_thread::sleep_for(std::chrono::milliseconds(retry_interval_ms)); } } bool WebSocketClientPool::send_binary_to_idle_client_with_retry(std::vector&& binary_data, int total_timeout_ms, int retry_interval_ms) { auto start_time = std::chrono::steady_clock::now(); while (true) { // ²éÕÒ¿ÕÏпͻ§¶Ë for (auto& client : m_clients) { if (!client->is_busy() && client->is_connected()) { // ±ê¼ÇΪæ client->set_busy(true); // ʹÓÃÏ̳߳ط¢ËÍÏûÏ¢ m_thread_pool.enqueue([client, binary_data = std::move(binary_data)]() mutable { try { // ·¢ËÍÏûÏ¢²¢µÈ´ý·þÎñÆ÷»Ø¸´ client->send_binary_and_wait(binary_data, 5000); // ÕâÀïµÄ³¬Ê±¿ÉÒÔ¸ù¾ÝÐèÒªµ÷Õû } catch (const std::exception& e) { std::cerr << "Error sending binary data: " << e.what() << std::endl; } client->set_busy(false); // ±ê¼ÇΪ¿ÕÏÐ }); return true; // ³É¹¦·¢ËÍ } } // ¼ÆËãÒÑʹÓõÄʱ¼ä auto elapsed_time = std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time).count(); if (elapsed_time >= total_timeout_ms) { std::cerr << "Timeout: failed to send data within the specified timeout of " << total_timeout_ms << " ms." << std::endl; return false; // ³¬Ê±£¬ÎÞ·¨·¢ËÍ } // ûÓÐÕÒµ½¿ÕÏпͻ§¶Ë£¬µÈ´ý retry_interval_ms ºÁÃëºóÖØÊÔ std::this_thread::sleep_for(std::chrono::milliseconds(retry_interval_ms)); } } void WebSocketClientPool::set_open_handler(WebSocketClient::OpenHandler handler) { for (auto& client : m_clients) { client->set_open_handler(handler); } } void WebSocketClientPool::set_message_handler(WebSocketClient::MessageHandler handler) { for (auto& client : m_clients) { client->set_message_handler(handler); } } void WebSocketClientPool::set_close_handler(WebSocketClient::CloseHandler handler) { for (auto& client : m_clients) { client->set_close_handler(handler); } } void WebSocketClientPool::set_fail_handler(WebSocketClient::FailHandler handler) { for (auto& client : m_clients) { client->set_fail_handler(handler); } } void WebSocketClientPool::set_pong_handler(WebSocketClient::PongHandler handler) { for (auto& client : m_clients) { client->set_pong_handler(handler); } }