mrDarker
2025-08-22 04d89bc9216553adcd0df87985a7665882924093
EdgeInspector_App/WebSocket/WebSocketClientPool.cpp
@@ -1,278 +1,278 @@
#include "StdAfx.h"
#include "StdAfx.h"
#include "WebSocketClientPool.h"
WebSocketClientPool::WebSocketClientPool(size_t thread_count) : m_thread_pool(thread_count) {}
WebSocketClientPool::~WebSocketClientPool() {}
void WebSocketClientPool::init_clients(const std::vector<std::string>& uris) {
    m_clients.clear();
    for (const auto& uri : uris) {
        try {
            if (!uri.empty() && uri.size() > 8)
            {
                // "ws://localhost:8080"
                auto client = std::make_shared<WebSocketClient>();
                client->connect(uri);
                m_clients.push_back(client);
            }
        }
        catch (const std::exception& e) {
            std::cerr << "Error connecting to " << uri << ": " << e.what() << std::endl;
            exit_clients();
        }
    }
}
void WebSocketClientPool::exit_clients() {
   for (auto& client : m_clients) {
        if (client->is_connected())
        {
            client->close();
        }
   }
    // 清空之前的客户端,确保是干净的状态
    m_clients.clear();
}
void WebSocketClientPool::post_to_all(const std::string& message) {
    for (auto& client : m_clients) {
        m_thread_pool.enqueue([client, message]() {
            if (client->is_connected())
            {
                client->send(message);
            }
        });
    }
}
void WebSocketClientPool::post_binary_to_all(const std::vector<char>& binary_data) {
    for (auto& client : m_clients) {
        m_thread_pool.enqueue([client, binary_data]() {
            if (client->is_connected())
            {
                client->send_binary(binary_data);
            }
        });
    }
}
bool WebSocketClientPool::post_to_idle_client(const std::string& message) {
    for (auto& client : m_clients) {
        if (client->is_connected()) {
            // 提交发送任务到线程池
            m_thread_pool.enqueue([client, message]() {
                client->set_busy(true);   // 标记为忙碌
                client->send(message);    // 发送消息
                client->set_busy(false);  // 完成后标记为空闲
            });
            return true; // 成功提交任务
        }
    }
    return false; // 没有可用客户端
}
bool WebSocketClientPool::post_binary_to_idle_client(const std::vector<char>& binary_data) {
    for (auto& client : m_clients) {
        if (client->is_connected()) {
            // 提交发送任务到线程池
            m_thread_pool.enqueue([client, binary_data]() {
                client->set_busy(true);   // 标记为忙碌
                client->send_binary(binary_data); // 发送二进制数据
                client->set_busy(false);  // 完成后标记为空闲
            });
            return true; // 成功提交任务
        }
    }
    return false; // 没有可用客户端
}
bool WebSocketClientPool::send_to_idle_client(const std::string& message, int timeout_ms) {
    for (auto& client : m_clients) {
        if (!client->is_busy() && client->is_connected()) {
            // 标记为忙
            client->set_busy(true);
            // 使用线程池发送消息
            m_thread_pool.enqueue([client, message, timeout_ms]() {
                // 发送消息并等待服务器回复
                client->send_and_wait(message, timeout_ms); // 5秒超时
                client->set_busy(false); // 标记为空闲
            });
            return true; // 成功发送消息,返回 true
        }
    }
    return false; // 没有空闲客户端
}
bool WebSocketClientPool::send_binary_to_idle_client(const std::vector<char>& binary_data, int timeout_ms) {
    for (auto& client : m_clients) {
        if (!client->is_busy() && client->is_connected()) {
            // 标记为忙
            client->set_busy(true);
            // 使用线程池发送消息
            m_thread_pool.enqueue([client, binary_data, timeout_ms]() {
                // 发送消息并等待服务器回复
                client->send_binary_and_wait(binary_data, timeout_ms);
                client->set_busy(false); // 标记为空闲
            });
            return true; // 成功发送消息,返回 true
        }
    }
    return false; // 没有空闲客户端
}
bool WebSocketClientPool::send_to_idle_client_and_wait(const std::string& message, int timeout_ms) {
    for (auto& client : m_clients) {
        if (!client->is_busy() && client->is_connected()) {
            client->set_busy(true); // 标记为忙
            try {
                // 直接在当前线程调用 send_and_wait,阻塞直到任务完成
                bool result = client->send_and_wait(message, timeout_ms);
                client->set_busy(false); // 设置为空闲状态
                return result; // 返回结果
            }
            catch (const std::exception& e) {
                std::cerr << "Exception during sending: " << e.what() << std::endl;
                client->set_busy(false); // 即使发生异常,也要重置为未忙状态
                return false; // 发生异常,返回失败
            }
        }
    }
    return false; // 没有空闲客户端
}
bool WebSocketClientPool::send_binary_to_idle_client_and_wait(const std::vector<char>& binary_data, int timeout_ms) {
    for (auto& client : m_clients) {
        if (!client->is_busy() && client->is_connected()) {
            client->set_busy(true); // 标记为忙
            try {
                // 直接在当前线程调用 send_and_wait,阻塞直到任务完成
                bool result = client->send_binary_and_wait(binary_data, timeout_ms);
                client->set_busy(false); // 设置为空闲状态
                return result; // 返回结果
            }
            catch (const std::exception& e) {
                std::cerr << "Exception during sending: " << e.what() << std::endl;
                client->set_busy(false); // 即使发生异常,也要重置为未忙状态
                return false; // 发生异常,返回失败
            }
        }
    }
    return false; // 没有空闲客户端
}
bool WebSocketClientPool::send_to_idle_client_with_retry(const std::string& message, int total_timeout_ms, int retry_interval_ms) {
   auto start_time = std::chrono::steady_clock::now();
   while (true) {
      // 查找空闲客户端
      for (auto& client : m_clients) {
         if (!client->is_busy() && client->is_connected()) {
            // 标记为忙
            client->set_busy(true);
            // 使用线程池发送消息
            m_thread_pool.enqueue([client, message]() {
               try {
                  // 发送消息并等待服务器回复
                  client->send_and_wait(message, 5000); // 这里的超时可以根据需要调整
               }
               catch (const std::exception& e) {
                  std::cerr << "Error sending data: " << e.what() << std::endl;
               }
               client->set_busy(false); // 标记为空闲
             });
            return true; // 成功发送
         }
      }
      // 计算已使用的时间
      auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count();
      if (elapsed_time >= total_timeout_ms) {
         std::cerr << "Timeout: failed to send data within the specified timeout of " << total_timeout_ms << " ms." << std::endl;
         return false; // 超时,无法发送
      }
      // 没有找到空闲客户端,等待 retry_interval_ms 毫秒后重试
      std::this_thread::sleep_for(std::chrono::milliseconds(retry_interval_ms));
   }
}
bool WebSocketClientPool::send_binary_to_idle_client_with_retry(std::vector<char>&& binary_data, int total_timeout_ms, int retry_interval_ms) {
    auto start_time = std::chrono::steady_clock::now();
    while (true) {
        // 查找空闲客户端
        for (auto& client : m_clients) {
            if (!client->is_busy() && client->is_connected()) {
                // 标记为忙
                client->set_busy(true);
                // 使用线程池发送消息
                m_thread_pool.enqueue([client, binary_data = std::move(binary_data)]() mutable {
                    try {
                        // 发送消息并等待服务器回复
                        client->send_binary_and_wait(binary_data, 5000); // 这里的超时可以根据需要调整
                    }
                    catch (const std::exception& e) {
                        std::cerr << "Error sending binary data: " << e.what() << std::endl;
                    }
                    client->set_busy(false); // 标记为空闲
                });
                return true; // 成功发送
            }
        }
        // 计算已使用的时间
        auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count();
        if (elapsed_time >= total_timeout_ms) {
            std::cerr << "Timeout: failed to send data within the specified timeout of " << total_timeout_ms << " ms." << std::endl;
            return false; // 超时,无法发送
        }
        // 没有找到空闲客户端,等待 retry_interval_ms 毫秒后重试
        std::this_thread::sleep_for(std::chrono::milliseconds(retry_interval_ms));
    }
}
void WebSocketClientPool::set_open_handler(WebSocketClient::OpenHandler handler) {
    for (auto& client : m_clients) {
        client->set_open_handler(handler);
    }
}
void WebSocketClientPool::set_message_handler(WebSocketClient::MessageHandler handler) {
    for (auto& client : m_clients) {
        client->set_message_handler(handler);
    }
}
void WebSocketClientPool::set_close_handler(WebSocketClient::CloseHandler handler) {
    for (auto& client : m_clients) {
        client->set_close_handler(handler);
    }
}
void WebSocketClientPool::set_fail_handler(WebSocketClient::FailHandler handler) {
    for (auto& client : m_clients) {
        client->set_fail_handler(handler);
    }
}
void WebSocketClientPool::set_pong_handler(WebSocketClient::PongHandler handler) {
   for (auto& client : m_clients) {
      client->set_pong_handler(handler);
   }
}
//WebSocketClientPool::WebSocketClientPool(size_t thread_count) : m_thread_pool(thread_count) {}
//
//WebSocketClientPool::~WebSocketClientPool() {}
//
//void WebSocketClientPool::init_clients(const std::vector<std::string>& uris) {
//    m_clients.clear();
//
//    for (const auto& uri : uris) {
//        try {
//            if (!uri.empty() && uri.size() > 8)
//            {
//                // "ws://localhost:8080"
//                auto client = std::make_shared<WebSocketClient>();
//                client->connect(uri);
//                m_clients.push_back(client);
//            }
//        }
//        catch (const std::exception& e) {
//            std::cerr << "Error connecting to " << uri << ": " << e.what() << std::endl;
//            exit_clients();
//        }
//    }
//}
//
//void WebSocketClientPool::exit_clients() {
//   for (auto& client : m_clients) {
//        if (client->is_connected())
//        {
//            client->close();
//        }
//   }
//
//    // 清空之前的客户端,确保是干净的状态
//    m_clients.clear();
//}
//
//void WebSocketClientPool::post_to_all(const std::string& message) {
//    for (auto& client : m_clients) {
//        m_thread_pool.enqueue([client, message]() {
//            if (client->is_connected())
//            {
//                client->send(message);
//            }
//        });
//    }
//}
//
//void WebSocketClientPool::post_binary_to_all(const std::vector<char>& binary_data) {
//    for (auto& client : m_clients) {
//        m_thread_pool.enqueue([client, binary_data]() {
//            if (client->is_connected())
//            {
//                client->send_binary(binary_data);
//            }
//        });
//    }
//}
//
//bool WebSocketClientPool::post_to_idle_client(const std::string& message) {
//    for (auto& client : m_clients) {
//        if (client->is_connected()) {
//            // 提交发送任务到线程池
//            m_thread_pool.enqueue([client, message]() {
//                client->set_busy(true);   // 标记为忙碌
//                client->send(message);    // 发送消息
//                client->set_busy(false);  // 完成后标记为空闲
//            });
//            return true; // 成功提交任务
//        }
//    }
//    return false; // 没有可用客户端
//}
//
//bool WebSocketClientPool::post_binary_to_idle_client(const std::vector<char>& binary_data) {
//    for (auto& client : m_clients) {
//        if (client->is_connected()) {
//            // 提交发送任务到线程池
//            m_thread_pool.enqueue([client, binary_data]() {
//                client->set_busy(true);   // 标记为忙碌
//                client->send_binary(binary_data); // 发送二进制数据
//                client->set_busy(false);  // 完成后标记为空闲
//            });
//            return true; // 成功提交任务
//        }
//    }
//    return false; // 没有可用客户端
//}
//
//bool WebSocketClientPool::send_to_idle_client(const std::string& message, int timeout_ms) {
//    for (auto& client : m_clients) {
//        if (!client->is_busy() && client->is_connected()) {
//            // 标记为忙
//            client->set_busy(true);
//
//            // 使用线程池发送消息
//            m_thread_pool.enqueue([client, message, timeout_ms]() {
//                // 发送消息并等待服务器回复
//                client->send_and_wait(message, timeout_ms); // 5秒超时
//                client->set_busy(false); // 标记为空闲
//            });
//
//            return true; // 成功发送消息,返回 true
//        }
//    }
//    return false; // 没有空闲客户端
//}
//
//bool WebSocketClientPool::send_binary_to_idle_client(const std::vector<char>& binary_data, int timeout_ms) {
//    for (auto& client : m_clients) {
//        if (!client->is_busy() && client->is_connected()) {
//            // 标记为忙
//            client->set_busy(true);
//
//            // 使用线程池发送消息
//            m_thread_pool.enqueue([client, binary_data, timeout_ms]() {
//                // 发送消息并等待服务器回复
//                client->send_binary_and_wait(binary_data, timeout_ms);
//                client->set_busy(false); // 标记为空闲
//            });
//
//            return true; // 成功发送消息,返回 true
//        }
//    }
//    return false; // 没有空闲客户端
//}
//
//bool WebSocketClientPool::send_to_idle_client_and_wait(const std::string& message, int timeout_ms) {
//    for (auto& client : m_clients) {
//        if (!client->is_busy() && client->is_connected()) {
//            client->set_busy(true); // 标记为忙
//
//            try {
//                // 直接在当前线程调用 send_and_wait,阻塞直到任务完成
//                bool result = client->send_and_wait(message, timeout_ms);
//                client->set_busy(false); // 设置为空闲状态
//                return result; // 返回结果
//            }
//            catch (const std::exception& e) {
//                std::cerr << "Exception during sending: " << e.what() << std::endl;
//                client->set_busy(false); // 即使发生异常,也要重置为未忙状态
//                return false; // 发生异常,返回失败
//            }
//        }
//    }
//    return false; // 没有空闲客户端
//}
//
//bool WebSocketClientPool::send_binary_to_idle_client_and_wait(const std::vector<char>& binary_data, int timeout_ms) {
//    for (auto& client : m_clients) {
//        if (!client->is_busy() && client->is_connected()) {
//            client->set_busy(true); // 标记为忙
//
//            try {
//                // 直接在当前线程调用 send_and_wait,阻塞直到任务完成
//                bool result = client->send_binary_and_wait(binary_data, timeout_ms);
//                client->set_busy(false); // 设置为空闲状态
//                return result; // 返回结果
//            }
//            catch (const std::exception& e) {
//                std::cerr << "Exception during sending: " << e.what() << std::endl;
//                client->set_busy(false); // 即使发生异常,也要重置为未忙状态
//                return false; // 发生异常,返回失败
//            }
//        }
//    }
//    return false; // 没有空闲客户端
//}
//
//bool WebSocketClientPool::send_to_idle_client_with_retry(const std::string& message, int total_timeout_ms, int retry_interval_ms) {
//   auto start_time = std::chrono::steady_clock::now();
//
//   while (true) {
//      // 查找空闲客户端
//      for (auto& client : m_clients) {
//         if (!client->is_busy() && client->is_connected()) {
//            // 标记为忙
//            client->set_busy(true);
//
//            // 使用线程池发送消息
//            m_thread_pool.enqueue([client, message]() {
//               try {
//                  // 发送消息并等待服务器回复
//                  client->send_and_wait(message, 5000); // 这里的超时可以根据需要调整
//               }
//               catch (const std::exception& e) {
//                  std::cerr << "Error sending data: " << e.what() << std::endl;
//               }
//               client->set_busy(false); // 标记为空闲
//             });
//
//            return true; // 成功发送
//         }
//      }
//
//      // 计算已使用的时间
//      auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count();
//
//      if (elapsed_time >= total_timeout_ms) {
//         std::cerr << "Timeout: failed to send data within the specified timeout of " << total_timeout_ms << " ms." << std::endl;
//         return false; // 超时,无法发送
//      }
//
//      // 没有找到空闲客户端,等待 retry_interval_ms 毫秒后重试
//      std::this_thread::sleep_for(std::chrono::milliseconds(retry_interval_ms));
//   }
//}
//
//bool WebSocketClientPool::send_binary_to_idle_client_with_retry(std::vector<char>&& binary_data, int total_timeout_ms, int retry_interval_ms) {
//    auto start_time = std::chrono::steady_clock::now();
//
//    while (true) {
//        // 查找空闲客户端
//        for (auto& client : m_clients) {
//            if (!client->is_busy() && client->is_connected()) {
//                // 标记为忙
//                client->set_busy(true);
//
//                // 使用线程池发送消息
//                m_thread_pool.enqueue([client, binary_data = std::move(binary_data)]() mutable {
//                    try {
//                        // 发送消息并等待服务器回复
//                        client->send_binary_and_wait(binary_data, 5000); // 这里的超时可以根据需要调整
//                    }
//                    catch (const std::exception& e) {
//                        std::cerr << "Error sending binary data: " << e.what() << std::endl;
//                    }
//                    client->set_busy(false); // 标记为空闲
//                });
//
//                return true; // 成功发送
//            }
//        }
//
//        // 计算已使用的时间
//        auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count();
//
//        if (elapsed_time >= total_timeout_ms) {
//            std::cerr << "Timeout: failed to send data within the specified timeout of " << total_timeout_ms << " ms." << std::endl;
//            return false; // 超时,无法发送
//        }
//
//        // 没有找到空闲客户端,等待 retry_interval_ms 毫秒后重试
//        std::this_thread::sleep_for(std::chrono::milliseconds(retry_interval_ms));
//    }
//}
//
//void WebSocketClientPool::set_open_handler(WebSocketClient::OpenHandler handler) {
//    for (auto& client : m_clients) {
//        client->set_open_handler(handler);
//    }
//}
//
//void WebSocketClientPool::set_message_handler(WebSocketClient::MessageHandler handler) {
//    for (auto& client : m_clients) {
//        client->set_message_handler(handler);
//    }
//}
//
//void WebSocketClientPool::set_close_handler(WebSocketClient::CloseHandler handler) {
//    for (auto& client : m_clients) {
//        client->set_close_handler(handler);
//    }
//}
//
//void WebSocketClientPool::set_fail_handler(WebSocketClient::FailHandler handler) {
//    for (auto& client : m_clients) {
//        client->set_fail_handler(handler);
//    }
//}
//
//void WebSocketClientPool::set_pong_handler(WebSocketClient::PongHandler handler) {
//   for (auto& client : m_clients) {
//      client->set_pong_handler(handler);
//   }
//}