| Common_Class/AutoFileCleanupTool/AutoFileCleanupToolDlg.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| Common_Class/AutoFileCleanupTool/AutoFileCleanupToolDlg.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| EdgeInspector_App/Process/InspectCamera.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| EdgeInspector_App/View/ViewMain_HWSetting.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| EdgeInspector_App/WebSocket/WebSocketClient.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| EdgeInspector_App/WebSocket/WebSocketClient.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| EdgeInspector_App/WebSocket/WebSocketClientPool.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| EdgeInspector_App/WebSocket/WebSocketClientPool.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
Common_Class/AutoFileCleanupTool/AutoFileCleanupToolDlg.cpp
@@ -9,6 +9,9 @@ #include "afxdialogex.h" #include "Config.h" #include <string> #include <functional> #ifdef _DEBUG #define new DEBUG_NEW #endif @@ -82,6 +85,88 @@ , m_bDeleteEmptyFolders(FALSE) // 默认不删除空文件夹 { m_hIcon = AfxGetApp()->LoadIcon(IDR_MAINFRAME); } void CAutoFileCleanupToolDlg::PerformCleanupTask() { // 遍历源目录 CFileFind finder; CString strSearchPath = m_strSourceFolder + _T("\\*.*"); if (!finder.FindFile(strSearchPath)) { AppendLogLineRichStyled(_T("Source folder not found or empty."), LOG_COLOR_WARNING); return; } std::function<void(const CString&)> ProcessFolder; ProcessFolder = [&](const CString& strFolderPath) { CFileFind subFinder; BOOL bWorking = subFinder.FindFile(strFolderPath + _T("\\*.*")); while (bWorking) { bWorking = subFinder.FindNextFile(); if (subFinder.IsDots()) { continue; } CString srcPath = subFinder.GetFilePath(); CString relPath = srcPath.Mid(m_strSourceFolder.GetLength() + 1); CString dstPath = m_strTargetFolder + _T("\\") + relPath; if (subFinder.IsDirectory()) { ProcessFolder(srcPath); // 递归处理子文件夹 continue; } // 比较文件 CFileStatus srcStatus, dstStatus; if (CFile::GetStatus(srcPath, srcStatus) && CFile::GetStatus(dstPath, dstStatus)) { if (srcStatus.m_size == dstStatus.m_size) { if (m_bSafeMode) { AppendLogLineRichStyled(_T("Simulated delete: ") + srcPath, LOG_COLOR_WARNING); } else { CFile::Remove(srcPath); if (PathFileExists(srcPath)) { AppendLogLineRichStyled(_T("Failed to delete: ") + srcPath, LOG_COLOR_ERROR); } else { AppendLogLineRichStyled(_T("Deleted: ") + srcPath, LOG_COLOR_SUCCESS); } } } } } subFinder.Close(); // 删除空文件夹(可选) if (m_bDeleteEmptyFolders) { BOOL bFound = finder.FindFile(strFolderPath + _T("\\*.*")); BOOL bEmpty = TRUE; while (bFound) { bFound = finder.FindNextFile(); if (finder.IsDots()) { continue; } // 有文件或文件夹,不是空的 bEmpty = FALSE; break; } finder.Close(); if (bEmpty) { if (RemoveDirectory(strFolderPath)) { AppendLogLineRichStyled(_T("Deleted empty folder: ") + strFolderPath, LOG_COLOR_SUCCESS); } else { AppendLogLineRichStyled(_T("Failed to delete empty folder: ") + strFolderPath, LOG_COLOR_ERROR); } } } }; ProcessFolder(m_strSourceFolder); } void CAutoFileCleanupToolDlg::SaveSettings() @@ -458,12 +543,7 @@ // 清理逻辑放这里 AppendLogLineRichStyled(_T("Performing auto cleanup..."), LOG_COLOR_NORMAL); if (m_bSafeMode) { AppendLogLineRichStyled(_T("Safe mode: No files will be deleted."), LOG_COLOR_NORMAL); } else { AppendLogLineRichStyled(_T("Scanning and cleaning files..."), LOG_COLOR_NORMAL); } PerformCleanupTask(); AppendLogLineRichStyled(_T("Cleanup cycle completed."), LOG_COLOR_SUCCESS); } Common_Class/AutoFileCleanupTool/AutoFileCleanupToolDlg.h
@@ -13,6 +13,7 @@ CAutoFileCleanupToolDlg(CWnd* pParent = nullptr); // 标准构造函数 private: void PerformCleanupTask(); void SaveSettings(); void LoadSettings(); void StartCleanup(); EdgeInspector_App/Process/InspectCamera.cpp
@@ -3934,7 +3934,7 @@ double th = (double) m_pRecipe->m_SideParam[(int) emDim].m_NotchPrm[nNotchIdx].m_nReferece_Line_Threshold; int nNotchCenterOffset = m_pRecipe->m_SideParam[(int) emDim].m_NotchPrm[nNotchIdx].m_nNotchCenter_Offset_pxl; cvThreshold(scr, img_Bin_MeasureLine, th, 50, CV_THRESH_BINARY); cvThreshold(scr, img_Bin_MeasureLine, th, 255, CV_THRESH_BINARY); // 1. Find Seed CvPoint ptSeed; @@ -6240,46 +6240,52 @@ BOOL CInspectCamera::ProcessFrame(int iThread,DimensionDir emDim,stFrameIndex stFrame) { // 1. Find End Line.. (Corner甫 力寇茄 Side扼牢阑 茫扁 困秦 End Line何磐 茫绰促) FindEndLine(iThread, emDim, stFrame); try { // 1. Find End Line.. (Corner甫 力寇茄 Side扼牢阑 茫扁 困秦 End Line何磐 茫绰促) FindEndLine(iThread, emDim, stFrame); // 2. Find Side Line.. (秦寸 橇饭烙俊辑 Corner/Notch 甫 力寇茄 Side 扼牢 茫扁) // 2. Find Side Line.. (秦寸 橇饭烙俊辑 Corner/Notch 甫 力寇茄 Side 扼牢 茫扁) #if USE_AI_DETECT FindSideLine(iThread, emDim, stFrame); FindSideLine(iThread, emDim, stFrame); #else FindSideLine_ExceptNotch(iThread, emDim, stFrame); FindSideLine_ExceptNotch(iThread, emDim, stFrame); #endif // USE_AI_DETECT // 3. Find Top Corner FindTopCorner(iThread,emDim,stFrame); // 3. Find Top Corner FindTopCorner(iThread, emDim, stFrame); // 4. Find Top Align Mark FindTopAlignMark(iThread,emDim,stFrame); // 4. Find Top Align Mark FindTopAlignMark(iThread, emDim, stFrame); // 5. Find Bot Corner FindBotCorner(iThread,emDim,stFrame); // 5. Find Bot Corner FindBotCorner(iThread, emDim, stFrame); // 6. Find Bot Align Mark FindBotAlignMark(iThread,emDim,stFrame); // 6. Find Bot Align Mark FindBotAlignMark(iThread, emDim, stFrame); #if USE_AI_DETECT SendFrameScanDataOverAI(iThread, emDim, stFrame); SendFrameScanDataOverAI(iThread, emDim, stFrame); #else // 7. Inspect Defect InspectDefect_Side(iThread, emDim, stFrame); // 7. Inspect Defect InspectDefect_Side(iThread, emDim, stFrame); // 8. Notch Process Notch_Process(iThread, emDim, stFrame); // 8. Notch Process Notch_Process(iThread, emDim, stFrame); #endif // USE_WEBSOCKET // 9. Measure Measure(iThread,emDim,stFrame); // 10. User Defect UserDefect_Process(iThread,emDim,stFrame); // 9. Measure Measure(iThread, emDim, stFrame); // 11. Exception Area Process ExceptionArea_Process(iThread,emDim,stFrame); // 10. User Defect UserDefect_Process(iThread, emDim, stFrame); // 11. Exception Area Process ExceptionArea_Process(iThread, emDim, stFrame); } catch (const std::exception&) { g_pLog->DisplayMessage(_T("Exception in ProcessFrame for Camera %d, Dimension %d"), m_iCamera, (int)emDim); return FALSE; } return TRUE; } EdgeInspector_App/View/ViewMain_HWSetting.cpp
@@ -806,7 +806,7 @@ void CViewMain_HWSetting::OnClickBrowseCopyToolConfig() { CString strFilePath; CFileDialog dlg(TRUE, NULL, NULL, OFN_HIDEREADONLY | OFN_OVERWRITEPROMPT, _T("Config Files (*.ffs_real;*.json;*.xml)|*.ffs_real;*.json;*.xml||"), this); CFileDialog dlg(TRUE, NULL, NULL, OFN_HIDEREADONLY | OFN_OVERWRITEPROMPT, _T("Config Files (*.ffs_real;*.json;*.xml;*.ini;*.cfg)|*.ffs_real;*.json;*.xml;*.ini;*.cfg||"), this); if (dlg.DoModal() == IDOK) { strFilePath = dlg.GetPathName(); m_pDlgHDSettingParm->m_strCopyToolConfigPath = strFilePath; @@ -828,7 +828,7 @@ void CViewMain_HWSetting::OnClickBrowseDeleteToolConfig() { CString strFilePath; CFileDialog dlg(TRUE, NULL, NULL, OFN_HIDEREADONLY | OFN_OVERWRITEPROMPT, _T("Config Files (*.ffs_real;*.json;*.xml;*.ini)|*.ffs_real;*.json;*.xml;*.ini||"), this); CFileDialog dlg(TRUE, NULL, NULL, OFN_HIDEREADONLY | OFN_OVERWRITEPROMPT, _T("Config Files (*.ffs_real;*.json;*.xml;*.ini;*.cfg)|*.ffs_real;*.json;*.xml;*.ini;*.cfg||"), this); if (dlg.DoModal() == IDOK) { strFilePath = dlg.GetPathName(); m_pDlgHDSettingParm->m_strDeleteToolConfigPath = strFilePath; EdgeInspector_App/WebSocket/WebSocketClient.cpp
@@ -1,326 +1,326 @@ #include "stdafx.h" #include "WebSocketClient.h" WebSocketClient::WebSocketClient() : m_connected(false), m_reconnect(true), m_close_done(false), m_message_received(false), m_heartbeat_interval(5) { // 清除之前的客户端配置并重新初始化 m_client.clear_access_channels(websocketpp::log::alevel::all); m_client.clear_error_channels(websocketpp::log::elevel::all); m_client.init_asio(); // 设置事件回调 m_client.set_open_handler([this](websocketpp::connection_hdl hdl) { on_open(hdl); }); m_client.set_message_handler([this](websocketpp::connection_hdl hdl, message_ptr msg) { on_message(hdl, msg); }); m_client.set_close_handler([this](websocketpp::connection_hdl hdl) { on_close(hdl); }); m_client.set_fail_handler([this](websocketpp::connection_hdl hdl) { on_fail(hdl); }); m_client.set_pong_handler([this](websocketpp::connection_hdl hdl, const std::string& payload) { on_pong(hdl, payload); }); } WebSocketClient::~WebSocketClient() { close(); } void WebSocketClient::connect(const std::string& uri) { websocketpp::lib::error_code ec; { m_connected.store(false); m_reconnect.store(false); m_client.clear_access_channels(websocketpp::log::alevel::all); m_client.clear_error_channels(websocketpp::log::elevel::all); std::lock_guard<std::mutex> lock(m_mutex); // 保护共享资源 auto con = m_client.get_connection(uri, ec); if (ec) { std::cerr << "Get connection failed: " << ec.message() << std::endl; return; } m_hdl = con->get_handle(); m_client.connect(con); } // 启动工作线程 m_thread = std::thread(&WebSocketClient::run, this); } bool WebSocketClient::is_busy() const { return m_busy.load(); } void WebSocketClient::set_busy(bool busy) { m_busy.store(busy); } void WebSocketClient::send(const std::string& message) { std::lock_guard<std::mutex> lock(m_mutex); if (m_connected.load()) { websocketpp::lib::error_code ec; m_client.send(m_hdl, message, websocketpp::frame::opcode::text, ec); if (ec) { std::cerr << "Send failed: " << ec.message() << std::endl; } } else { std::cerr << "Not connected, cannot send message." << std::endl; } } void WebSocketClient::send_binary(const std::vector<char>& binary_data) { std::lock_guard<std::mutex> lock(m_mutex); if (m_connected.load()) { websocketpp::lib::error_code ec; m_client.send(m_hdl, binary_data.data(), binary_data.size(), websocketpp::frame::opcode::binary, ec); if (ec) { std::cerr << "Send binary data failed: " << ec.message() << std::endl; } } else { std::cerr << "Not connected, cannot send binary data." << std::endl; } } void WebSocketClient::enqueue_message(const std::string& message) { m_message_queue.push(message); process_next_message(); } void WebSocketClient::enqueue_binary(const std::vector<char>& binary_data) { m_binary_queue.push(binary_data); process_next_binary(); } void WebSocketClient::process_next_message() { if (!m_message_queue.empty() && !is_busy()) { set_busy(true); std::string message = m_message_queue.front(); m_message_queue.pop(); send(message); // 发送消息 set_busy(false); // 设置为空闲状态,并处理下一个 process_next_message(); } } void WebSocketClient::process_next_binary() { if (!m_binary_queue.empty() && !is_busy()) { set_busy(true); std::vector<char> binary_data = m_binary_queue.front(); m_binary_queue.pop(); send_binary(binary_data); // 发送二进制数据 set_busy(false); // 设置为空闲状态,并处理下一个 process_next_binary(); } } bool WebSocketClient::send_and_wait(const std::string& message, int timeout_ms) { std::unique_lock<std::mutex> lock(m_mutex); m_message_received = false; if (m_connected.load()) { // 发送消息 websocketpp::lib::error_code ec; m_client.send(m_hdl, message, websocketpp::frame::opcode::text, ec); if (ec) { std::cerr << "Send failed: " << ec.message() << std::endl; return false; } // 等待消息或超时 if (m_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() { return m_message_received; })) { std::cout << "Server response received." << std::endl; return true; } else { std::cerr << "Timeout: No response from server." << std::endl; return false; } } else { std::cerr << "Not connected, cannot send message." << std::endl; } } bool WebSocketClient::send_binary_and_wait(const std::vector<char>& binary_data, int timeout_ms) { std::unique_lock<std::mutex> lock(m_mutex); m_message_received = false; if (m_connected.load()) { // 发送消息 websocketpp::lib::error_code ec; m_client.send(m_hdl, binary_data.data(), binary_data.size(), websocketpp::frame::opcode::binary, ec); if (ec) { std::cerr << "Send failed: " << ec.message() << std::endl; return false; } // 等待消息或超时 if (m_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() { return m_message_received; })) { std::cout << "Server response received." << std::endl; return true; } else { std::cerr << "Timeout: No response from server." << std::endl; return false; } } else { std::cerr << "Not connected, cannot send binary data." << std::endl; } } void WebSocketClient::close() { if (!m_connected.load()) { return; } try { m_reconnect.store(false); m_client.close(m_hdl, websocketpp::close::status::normal, "Closing connection"); } catch (const std::exception& e) { std::cerr << "Error during close: " << e.what() << std::endl; } std::unique_lock<std::mutex> lock(m_mutex); m_cv.wait(lock, [this] { return m_close_done; }); m_close_done = false; m_client.stop(); if (m_thread.joinable()) { m_thread.join(); } m_client.reset(); } void WebSocketClient::set_open_handler(OpenHandler handler) { std::lock_guard<std::mutex> lock(m_mutex); m_open_handler = handler; } void WebSocketClient::set_message_handler(MessageHandler handler) { std::lock_guard<std::mutex> lock(m_mutex); m_message_handler = handler; } void WebSocketClient::set_close_handler(CloseHandler handler) { std::lock_guard<std::mutex> lock(m_mutex); m_close_handler = handler; } void WebSocketClient::set_fail_handler(FailHandler handler) { std::lock_guard<std::mutex> lock(m_mutex); m_fail_handler = handler; } void WebSocketClient::set_pong_handler(PongHandler handler) { std::lock_guard<std::mutex> lock(m_mutex); m_pong_handler = handler; } void WebSocketClient::set_heartbeat_interval(int interval) { std::lock_guard<std::mutex> lock(m_mutex); m_heartbeat_interval = interval; } std::string WebSocketClient::get_current_url() { std::lock_guard<std::mutex> lock(m_mutex); auto con = m_client.get_con_from_hdl(m_hdl); return con->get_uri()->str(); } bool WebSocketClient::is_connected() const { return m_connected.load(); } void WebSocketClient::run() { try { m_client.run(); } catch (const std::exception& e) { std::cerr << "Error in run: " << e.what() << std::endl; } m_connected.store(false); } void WebSocketClient::heartbeat() { while (m_reconnect.load()) { std::this_thread::sleep_for(std::chrono::seconds(m_heartbeat_interval)); if (!m_connected.load()) { std::cerr << "Attempting to reconnect..." << std::endl; connect(get_current_url()); } } } void WebSocketClient::on_open(websocketpp::connection_hdl hdl) { std::cout << "Connection established" << std::endl; std::lock_guard<std::mutex> lock(m_mutex); m_connected.store(true); m_reconnect.store(true); if (m_open_handler) { m_open_handler(); } } void WebSocketClient::on_message(connection_hdl hdl, message_ptr msg) { std::cout << "Received message: " << msg->get_payload() << std::endl; std::lock_guard<std::mutex> lock(m_mutex); if (m_message_handler) { m_message_handler(msg->get_payload()); } m_message_received = true; m_cv.notify_one(); // 唤醒等待线程 } void WebSocketClient::on_close(connection_hdl hdl) { std::cout << "Connection closed." << std::endl; std::lock_guard<std::mutex> lock(m_mutex); m_hdl.reset(); // 清空连接句柄 m_connected.store(false); if (m_close_handler) { m_close_handler(); } // 如果需要重连 if (m_reconnect.load()) { // 停止当前运行的事件循环 //m_client.stop(); //if (m_thread.joinable()) { // m_thread.join(); //} std::cerr << "Connection lost. Attempting to reconnect..." << std::endl; //reconnect(); // 启动重连机制 } else { m_cv.notify_one(); m_close_done = true; } } void WebSocketClient::on_fail(websocketpp::connection_hdl hdl) { std::cerr << "Connection failed" << std::endl; std::lock_guard<std::mutex> lock(m_mutex); m_connected.store(false); if (m_fail_handler) { m_fail_handler(); } } void WebSocketClient::on_pong(websocketpp::connection_hdl hdl, const std::string& payload) { std::cout << "Received pong response with payload: " << payload << std::endl; std::lock_guard<std::mutex> lock(m_mutex); m_connected.store(true); if (m_pong_handler) { m_pong_handler(payload); } } //WebSocketClient::WebSocketClient() : m_connected(false), m_reconnect(true), m_close_done(false), m_message_received(false), m_heartbeat_interval(5) { // // 清除之前的客户端配置并重新初始化 // m_client.clear_access_channels(websocketpp::log::alevel::all); // m_client.clear_error_channels(websocketpp::log::elevel::all); // m_client.init_asio(); // // // 设置事件回调 // m_client.set_open_handler([this](websocketpp::connection_hdl hdl) { on_open(hdl); }); // m_client.set_message_handler([this](websocketpp::connection_hdl hdl, message_ptr msg) { on_message(hdl, msg); }); // m_client.set_close_handler([this](websocketpp::connection_hdl hdl) { on_close(hdl); }); // m_client.set_fail_handler([this](websocketpp::connection_hdl hdl) { on_fail(hdl); }); // m_client.set_pong_handler([this](websocketpp::connection_hdl hdl, const std::string& payload) { on_pong(hdl, payload); }); //} // //WebSocketClient::~WebSocketClient() { // close(); //} // //void WebSocketClient::connect(const std::string& uri) { // websocketpp::lib::error_code ec; // { // m_connected.store(false); // m_reconnect.store(false); // m_client.clear_access_channels(websocketpp::log::alevel::all); // m_client.clear_error_channels(websocketpp::log::elevel::all); // // std::lock_guard<std::mutex> lock(m_mutex); // 保护共享资源 // auto con = m_client.get_connection(uri, ec); // // if (ec) { // std::cerr << "Get connection failed: " << ec.message() << std::endl; // return; // } // // m_hdl = con->get_handle(); // m_client.connect(con); // } // // // 启动工作线程 // m_thread = std::thread(&WebSocketClient::run, this); //} // //bool WebSocketClient::is_busy() const { // return m_busy.load(); //} // //void WebSocketClient::set_busy(bool busy) { // m_busy.store(busy); //} // //void WebSocketClient::send(const std::string& message) { // std::lock_guard<std::mutex> lock(m_mutex); // if (m_connected.load()) { // websocketpp::lib::error_code ec; // m_client.send(m_hdl, message, websocketpp::frame::opcode::text, ec); // if (ec) { // std::cerr << "Send failed: " << ec.message() << std::endl; // } // } // else { // std::cerr << "Not connected, cannot send message." << std::endl; // } //} // //void WebSocketClient::send_binary(const std::vector<char>& binary_data) { // std::lock_guard<std::mutex> lock(m_mutex); // if (m_connected.load()) { // websocketpp::lib::error_code ec; // m_client.send(m_hdl, binary_data.data(), binary_data.size(), websocketpp::frame::opcode::binary, ec); // if (ec) { // std::cerr << "Send binary data failed: " << ec.message() << std::endl; // } // } // else { // std::cerr << "Not connected, cannot send binary data." << std::endl; // } //} // //void WebSocketClient::enqueue_message(const std::string& message) { // m_message_queue.push(message); // process_next_message(); //} // //void WebSocketClient::enqueue_binary(const std::vector<char>& binary_data) { // m_binary_queue.push(binary_data); // process_next_binary(); //} // //void WebSocketClient::process_next_message() { // if (!m_message_queue.empty() && !is_busy()) { // set_busy(true); // std::string message = m_message_queue.front(); // m_message_queue.pop(); // // send(message); // 发送消息 // // set_busy(false); // 设置为空闲状态,并处理下一个 // process_next_message(); // } //} // //void WebSocketClient::process_next_binary() { // if (!m_binary_queue.empty() && !is_busy()) { // set_busy(true); // std::vector<char> binary_data = m_binary_queue.front(); // m_binary_queue.pop(); // // send_binary(binary_data); // 发送二进制数据 // // set_busy(false); // 设置为空闲状态,并处理下一个 // process_next_binary(); // } //} // //bool WebSocketClient::send_and_wait(const std::string& message, int timeout_ms) { // std::unique_lock<std::mutex> lock(m_mutex); // m_message_received = false; // // if (m_connected.load()) { // // 发送消息 // websocketpp::lib::error_code ec; // m_client.send(m_hdl, message, websocketpp::frame::opcode::text, ec); // if (ec) { // std::cerr << "Send failed: " << ec.message() << std::endl; // return false; // } // // // 等待消息或超时 // if (m_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() { return m_message_received; })) { // std::cout << "Server response received." << std::endl; // return true; // } // else { // std::cerr << "Timeout: No response from server." << std::endl; // return false; // } // } // else { // std::cerr << "Not connected, cannot send message." << std::endl; // } //} // //bool WebSocketClient::send_binary_and_wait(const std::vector<char>& binary_data, int timeout_ms) { // std::unique_lock<std::mutex> lock(m_mutex); // m_message_received = false; // // if (m_connected.load()) { // // 发送消息 // websocketpp::lib::error_code ec; // m_client.send(m_hdl, binary_data.data(), binary_data.size(), websocketpp::frame::opcode::binary, ec); // if (ec) { // std::cerr << "Send failed: " << ec.message() << std::endl; // return false; // } // // // 等待消息或超时 // if (m_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() { return m_message_received; })) { // std::cout << "Server response received." << std::endl; // return true; // } // else { // std::cerr << "Timeout: No response from server." << std::endl; // return false; // } // } // else { // std::cerr << "Not connected, cannot send binary data." << std::endl; // } //} // //void WebSocketClient::close() { // if (!m_connected.load()) { // return; // } // // try { // m_reconnect.store(false); // m_client.close(m_hdl, websocketpp::close::status::normal, "Closing connection"); // } // catch (const std::exception& e) { // std::cerr << "Error during close: " << e.what() << std::endl; // } // // std::unique_lock<std::mutex> lock(m_mutex); // m_cv.wait(lock, [this] { return m_close_done; }); // m_close_done = false; // // m_client.stop(); // if (m_thread.joinable()) { // m_thread.join(); // } // // m_client.reset(); //} // //void WebSocketClient::set_open_handler(OpenHandler handler) { // std::lock_guard<std::mutex> lock(m_mutex); // m_open_handler = handler; //} // //void WebSocketClient::set_message_handler(MessageHandler handler) { // std::lock_guard<std::mutex> lock(m_mutex); // m_message_handler = handler; //} // //void WebSocketClient::set_close_handler(CloseHandler handler) { // std::lock_guard<std::mutex> lock(m_mutex); // m_close_handler = handler; //} // //void WebSocketClient::set_fail_handler(FailHandler handler) { // std::lock_guard<std::mutex> lock(m_mutex); // m_fail_handler = handler; //} // //void WebSocketClient::set_pong_handler(PongHandler handler) { // std::lock_guard<std::mutex> lock(m_mutex); // m_pong_handler = handler; //} // //void WebSocketClient::set_heartbeat_interval(int interval) { // std::lock_guard<std::mutex> lock(m_mutex); // m_heartbeat_interval = interval; //} // //std::string WebSocketClient::get_current_url() { // std::lock_guard<std::mutex> lock(m_mutex); // auto con = m_client.get_con_from_hdl(m_hdl); // return con->get_uri()->str(); //} // //bool WebSocketClient::is_connected() const { // return m_connected.load(); //} // //void WebSocketClient::run() { // try { // m_client.run(); // } // catch (const std::exception& e) { // std::cerr << "Error in run: " << e.what() << std::endl; // } // // m_connected.store(false); //} // //void WebSocketClient::heartbeat() { // while (m_reconnect.load()) { // std::this_thread::sleep_for(std::chrono::seconds(m_heartbeat_interval)); // if (!m_connected.load()) { // std::cerr << "Attempting to reconnect..." << std::endl; // connect(get_current_url()); // } // } //} // //void WebSocketClient::on_open(websocketpp::connection_hdl hdl) { // std::cout << "Connection established" << std::endl; // std::lock_guard<std::mutex> lock(m_mutex); // m_connected.store(true); // m_reconnect.store(true); // if (m_open_handler) { // m_open_handler(); // } //} // //void WebSocketClient::on_message(connection_hdl hdl, message_ptr msg) { // std::cout << "Received message: " << msg->get_payload() << std::endl; // std::lock_guard<std::mutex> lock(m_mutex); // if (m_message_handler) { // m_message_handler(msg->get_payload()); // } // // m_message_received = true; // m_cv.notify_one(); // 唤醒等待线程 //} // //void WebSocketClient::on_close(connection_hdl hdl) { // std::cout << "Connection closed." << std::endl; // std::lock_guard<std::mutex> lock(m_mutex); // // m_hdl.reset(); // 清空连接句柄 // m_connected.store(false); // if (m_close_handler) { // m_close_handler(); // } // // // 如果需要重连 // if (m_reconnect.load()) { // // // 停止当前运行的事件循环 // //m_client.stop(); // //if (m_thread.joinable()) { // // m_thread.join(); // //} // std::cerr << "Connection lost. Attempting to reconnect..." << std::endl; // //reconnect(); // 启动重连机制 // } // else { // m_cv.notify_one(); // m_close_done = true; // } //} // //void WebSocketClient::on_fail(websocketpp::connection_hdl hdl) { // std::cerr << "Connection failed" << std::endl; // std::lock_guard<std::mutex> lock(m_mutex); // m_connected.store(false); // // if (m_fail_handler) { // m_fail_handler(); // } //} // //void WebSocketClient::on_pong(websocketpp::connection_hdl hdl, const std::string& payload) { // std::cout << "Received pong response with payload: " << payload << std::endl; // std::lock_guard<std::mutex> lock(m_mutex); // m_connected.store(true); // // if (m_pong_handler) { // m_pong_handler(payload); // } //} EdgeInspector_App/WebSocket/WebSocketClient.h
@@ -1,129 +1,129 @@ #ifndef WEBSOCKETCLIENT_H #define WEBSOCKETCLIENT_H #include <websocketpp/config/asio_no_tls_client.hpp> #include <websocketpp/client.hpp> #include <string> #include <vector> #include <iostream> #include <thread> #include <atomic> #include <mutex> #include <functional> #include <chrono> typedef websocketpp::client<websocketpp::config::asio_client> client; class WebSocketClient { public: using message_ptr = websocketpp::config::asio_client::message_type::ptr; using connection_hdl = websocketpp::connection_hdl; using OpenHandler = std::function<void()>; using MessageHandler = std::function<void(const std::string&)>; using CloseHandler = std::function<void()>; using FailHandler = std::function<void()>; using PongHandler = std::function<void(const std::string&)>; WebSocketClient(); ~WebSocketClient(); // 连接到指定的URI void connect(const std::string& uri); // 增加客户端的状态标识 bool is_busy() const; void set_busy(bool busy); // 发送文本消息 void send(const std::string& message); // 发送二进制数据 void send_binary(const std::vector<char>& binary_data); // 通过队列发送文本消息 void enqueue_message(const std::string& message); // 通过队列发送二进制数据 void enqueue_binary(const std::vector<char>& binary_data); // 处理下一个消息 void process_next_message(); void process_next_binary(); // 发送文本消息并等待回复 bool send_and_wait(const std::string& message, int timeout_ms); // 发送二进制数据并等待回复 bool send_binary_and_wait(const std::vector<char>& binary_data, int timeout_ms); // 关闭连接 void close(); // 设置各种回调函数 void set_open_handler(OpenHandler handler); void set_message_handler(MessageHandler handler); void set_close_handler(CloseHandler handler); void set_fail_handler(FailHandler handler); void set_pong_handler(PongHandler handler); // 设置心跳间隔(单位:秒) void set_heartbeat_interval(int interval); // 获取当前连接的URL std::string get_current_url(); // 检查当前连接是否仍然有效 bool is_connected() const; private: // WebSocket 运行函数 void run(); // 心跳检查函数 void heartbeat(); // 回调函数触发器 void on_open(websocketpp::connection_hdl hdl); void on_message(connection_hdl hdl, message_ptr msg); void on_close(websocketpp::connection_hdl hdl); void on_fail(websocketpp::connection_hdl hdl); void on_pong(websocketpp::connection_hdl hdl, const std::string& payload); private: // WebSocket客户端实例 client m_client; // WebSocket 连接句柄 connection_hdl m_hdl; // 线程相关 std::thread m_thread; std::thread m_heartbeat_thread; std::atomic<bool> m_connected; std::atomic<bool> m_reconnect; int m_heartbeat_interval; bool m_message_received; // 发送队列 std::queue<std::string> m_message_queue; std::queue<std::vector<char>> m_binary_queue; // 标识客户端是否忙碌 std::atomic<bool> m_busy; // 线程同步相关 bool m_close_done; std::condition_variable m_cv; // 互斥锁,用于保护共享资源 std::mutex m_mutex; // 回调函数 OpenHandler m_open_handler; MessageHandler m_message_handler; CloseHandler m_close_handler; FailHandler m_fail_handler; PongHandler m_pong_handler; }; //#include <websocketpp/config/asio_no_tls_client.hpp> //#include <websocketpp/client.hpp> //#include <string> //#include <vector> //#include <iostream> //#include <thread> //#include <atomic> //#include <mutex> //#include <functional> //#include <chrono> // //typedef websocketpp::client<websocketpp::config::asio_client> client; // //class WebSocketClient { //public: // using message_ptr = websocketpp::config::asio_client::message_type::ptr; // using connection_hdl = websocketpp::connection_hdl; // using OpenHandler = std::function<void()>; // using MessageHandler = std::function<void(const std::string&)>; // using CloseHandler = std::function<void()>; // using FailHandler = std::function<void()>; // using PongHandler = std::function<void(const std::string&)>; // // WebSocketClient(); // ~WebSocketClient(); // // // 连接到指定的URI // void connect(const std::string& uri); // // // 增加客户端的状态标识 // bool is_busy() const; // void set_busy(bool busy); // // // 发送文本消息 // void send(const std::string& message); // // // 发送二进制数据 // void send_binary(const std::vector<char>& binary_data); // // // 通过队列发送文本消息 // void enqueue_message(const std::string& message); // // // 通过队列发送二进制数据 // void enqueue_binary(const std::vector<char>& binary_data); // // // 处理下一个消息 // void process_next_message(); // void process_next_binary(); // // // 发送文本消息并等待回复 // bool send_and_wait(const std::string& message, int timeout_ms); // // // 发送二进制数据并等待回复 // bool send_binary_and_wait(const std::vector<char>& binary_data, int timeout_ms); // // // 关闭连接 // void close(); // // // 设置各种回调函数 // void set_open_handler(OpenHandler handler); // void set_message_handler(MessageHandler handler); // void set_close_handler(CloseHandler handler); // void set_fail_handler(FailHandler handler); // void set_pong_handler(PongHandler handler); // // // 设置心跳间隔(单位:秒) // void set_heartbeat_interval(int interval); // // // 获取当前连接的URL // std::string get_current_url(); // // // 检查当前连接是否仍然有效 // bool is_connected() const; // //private: // // WebSocket 运行函数 // void run(); // // // 心跳检查函数 // void heartbeat(); // // // 回调函数触发器 // void on_open(websocketpp::connection_hdl hdl); // void on_message(connection_hdl hdl, message_ptr msg); // void on_close(websocketpp::connection_hdl hdl); // void on_fail(websocketpp::connection_hdl hdl); // void on_pong(websocketpp::connection_hdl hdl, const std::string& payload); // //private: // // WebSocket客户端实例 // client m_client; // // // WebSocket 连接句柄 // connection_hdl m_hdl; // // // 线程相关 // std::thread m_thread; // std::thread m_heartbeat_thread; // std::atomic<bool> m_connected; // std::atomic<bool> m_reconnect; // int m_heartbeat_interval; // bool m_message_received; // // // 发送队列 // std::queue<std::string> m_message_queue; // std::queue<std::vector<char>> m_binary_queue; // // // 标识客户端是否忙碌 // std::atomic<bool> m_busy; // // // 线程同步相关 // bool m_close_done; // std::condition_variable m_cv; // // // 互斥锁,用于保护共享资源 // std::mutex m_mutex; // // // 回调函数 // OpenHandler m_open_handler; // MessageHandler m_message_handler; // CloseHandler m_close_handler; // FailHandler m_fail_handler; // PongHandler m_pong_handler; //}; #endif // WEBSOCKETCLIENT_H EdgeInspector_App/WebSocket/WebSocketClientPool.cpp
@@ -1,278 +1,278 @@ #include "StdAfx.h" #include "WebSocketClientPool.h" WebSocketClientPool::WebSocketClientPool(size_t thread_count) : m_thread_pool(thread_count) {} WebSocketClientPool::~WebSocketClientPool() {} void WebSocketClientPool::init_clients(const std::vector<std::string>& uris) { m_clients.clear(); for (const auto& uri : uris) { try { if (!uri.empty() && uri.size() > 8) { // "ws://localhost:8080" auto client = std::make_shared<WebSocketClient>(); client->connect(uri); m_clients.push_back(client); } } catch (const std::exception& e) { std::cerr << "Error connecting to " << uri << ": " << e.what() << std::endl; exit_clients(); } } } void WebSocketClientPool::exit_clients() { for (auto& client : m_clients) { if (client->is_connected()) { client->close(); } } // 清空之前的客户端,确保是干净的状态 m_clients.clear(); } void WebSocketClientPool::post_to_all(const std::string& message) { for (auto& client : m_clients) { m_thread_pool.enqueue([client, message]() { if (client->is_connected()) { client->send(message); } }); } } void WebSocketClientPool::post_binary_to_all(const std::vector<char>& binary_data) { for (auto& client : m_clients) { m_thread_pool.enqueue([client, binary_data]() { if (client->is_connected()) { client->send_binary(binary_data); } }); } } bool WebSocketClientPool::post_to_idle_client(const std::string& message) { for (auto& client : m_clients) { if (client->is_connected()) { // 提交发送任务到线程池 m_thread_pool.enqueue([client, message]() { client->set_busy(true); // 标记为忙碌 client->send(message); // 发送消息 client->set_busy(false); // 完成后标记为空闲 }); return true; // 成功提交任务 } } return false; // 没有可用客户端 } bool WebSocketClientPool::post_binary_to_idle_client(const std::vector<char>& binary_data) { for (auto& client : m_clients) { if (client->is_connected()) { // 提交发送任务到线程池 m_thread_pool.enqueue([client, binary_data]() { client->set_busy(true); // 标记为忙碌 client->send_binary(binary_data); // 发送二进制数据 client->set_busy(false); // 完成后标记为空闲 }); return true; // 成功提交任务 } } return false; // 没有可用客户端 } bool WebSocketClientPool::send_to_idle_client(const std::string& message, int timeout_ms) { for (auto& client : m_clients) { if (!client->is_busy() && client->is_connected()) { // 标记为忙 client->set_busy(true); // 使用线程池发送消息 m_thread_pool.enqueue([client, message, timeout_ms]() { // 发送消息并等待服务器回复 client->send_and_wait(message, timeout_ms); // 5秒超时 client->set_busy(false); // 标记为空闲 }); return true; // 成功发送消息,返回 true } } return false; // 没有空闲客户端 } bool WebSocketClientPool::send_binary_to_idle_client(const std::vector<char>& binary_data, int timeout_ms) { for (auto& client : m_clients) { if (!client->is_busy() && client->is_connected()) { // 标记为忙 client->set_busy(true); // 使用线程池发送消息 m_thread_pool.enqueue([client, binary_data, timeout_ms]() { // 发送消息并等待服务器回复 client->send_binary_and_wait(binary_data, timeout_ms); client->set_busy(false); // 标记为空闲 }); return true; // 成功发送消息,返回 true } } return false; // 没有空闲客户端 } bool WebSocketClientPool::send_to_idle_client_and_wait(const std::string& message, int timeout_ms) { for (auto& client : m_clients) { if (!client->is_busy() && client->is_connected()) { client->set_busy(true); // 标记为忙 try { // 直接在当前线程调用 send_and_wait,阻塞直到任务完成 bool result = client->send_and_wait(message, timeout_ms); client->set_busy(false); // 设置为空闲状态 return result; // 返回结果 } catch (const std::exception& e) { std::cerr << "Exception during sending: " << e.what() << std::endl; client->set_busy(false); // 即使发生异常,也要重置为未忙状态 return false; // 发生异常,返回失败 } } } return false; // 没有空闲客户端 } bool WebSocketClientPool::send_binary_to_idle_client_and_wait(const std::vector<char>& binary_data, int timeout_ms) { for (auto& client : m_clients) { if (!client->is_busy() && client->is_connected()) { client->set_busy(true); // 标记为忙 try { // 直接在当前线程调用 send_and_wait,阻塞直到任务完成 bool result = client->send_binary_and_wait(binary_data, timeout_ms); client->set_busy(false); // 设置为空闲状态 return result; // 返回结果 } catch (const std::exception& e) { std::cerr << "Exception during sending: " << e.what() << std::endl; client->set_busy(false); // 即使发生异常,也要重置为未忙状态 return false; // 发生异常,返回失败 } } } return false; // 没有空闲客户端 } bool WebSocketClientPool::send_to_idle_client_with_retry(const std::string& message, int total_timeout_ms, int retry_interval_ms) { auto start_time = std::chrono::steady_clock::now(); while (true) { // 查找空闲客户端 for (auto& client : m_clients) { if (!client->is_busy() && client->is_connected()) { // 标记为忙 client->set_busy(true); // 使用线程池发送消息 m_thread_pool.enqueue([client, message]() { try { // 发送消息并等待服务器回复 client->send_and_wait(message, 5000); // 这里的超时可以根据需要调整 } catch (const std::exception& e) { std::cerr << "Error sending data: " << e.what() << std::endl; } client->set_busy(false); // 标记为空闲 }); return true; // 成功发送 } } // 计算已使用的时间 auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count(); if (elapsed_time >= total_timeout_ms) { std::cerr << "Timeout: failed to send data within the specified timeout of " << total_timeout_ms << " ms." << std::endl; return false; // 超时,无法发送 } // 没有找到空闲客户端,等待 retry_interval_ms 毫秒后重试 std::this_thread::sleep_for(std::chrono::milliseconds(retry_interval_ms)); } } bool WebSocketClientPool::send_binary_to_idle_client_with_retry(std::vector<char>&& binary_data, int total_timeout_ms, int retry_interval_ms) { auto start_time = std::chrono::steady_clock::now(); while (true) { // 查找空闲客户端 for (auto& client : m_clients) { if (!client->is_busy() && client->is_connected()) { // 标记为忙 client->set_busy(true); // 使用线程池发送消息 m_thread_pool.enqueue([client, binary_data = std::move(binary_data)]() mutable { try { // 发送消息并等待服务器回复 client->send_binary_and_wait(binary_data, 5000); // 这里的超时可以根据需要调整 } catch (const std::exception& e) { std::cerr << "Error sending binary data: " << e.what() << std::endl; } client->set_busy(false); // 标记为空闲 }); return true; // 成功发送 } } // 计算已使用的时间 auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count(); if (elapsed_time >= total_timeout_ms) { std::cerr << "Timeout: failed to send data within the specified timeout of " << total_timeout_ms << " ms." << std::endl; return false; // 超时,无法发送 } // 没有找到空闲客户端,等待 retry_interval_ms 毫秒后重试 std::this_thread::sleep_for(std::chrono::milliseconds(retry_interval_ms)); } } void WebSocketClientPool::set_open_handler(WebSocketClient::OpenHandler handler) { for (auto& client : m_clients) { client->set_open_handler(handler); } } void WebSocketClientPool::set_message_handler(WebSocketClient::MessageHandler handler) { for (auto& client : m_clients) { client->set_message_handler(handler); } } void WebSocketClientPool::set_close_handler(WebSocketClient::CloseHandler handler) { for (auto& client : m_clients) { client->set_close_handler(handler); } } void WebSocketClientPool::set_fail_handler(WebSocketClient::FailHandler handler) { for (auto& client : m_clients) { client->set_fail_handler(handler); } } void WebSocketClientPool::set_pong_handler(WebSocketClient::PongHandler handler) { for (auto& client : m_clients) { client->set_pong_handler(handler); } } //WebSocketClientPool::WebSocketClientPool(size_t thread_count) : m_thread_pool(thread_count) {} // //WebSocketClientPool::~WebSocketClientPool() {} // //void WebSocketClientPool::init_clients(const std::vector<std::string>& uris) { // m_clients.clear(); // // for (const auto& uri : uris) { // try { // if (!uri.empty() && uri.size() > 8) // { // // "ws://localhost:8080" // auto client = std::make_shared<WebSocketClient>(); // client->connect(uri); // m_clients.push_back(client); // } // } // catch (const std::exception& e) { // std::cerr << "Error connecting to " << uri << ": " << e.what() << std::endl; // exit_clients(); // } // } //} // //void WebSocketClientPool::exit_clients() { // for (auto& client : m_clients) { // if (client->is_connected()) // { // client->close(); // } // } // // // 清空之前的客户端,确保是干净的状态 // m_clients.clear(); //} // //void WebSocketClientPool::post_to_all(const std::string& message) { // for (auto& client : m_clients) { // m_thread_pool.enqueue([client, message]() { // if (client->is_connected()) // { // client->send(message); // } // }); // } //} // //void WebSocketClientPool::post_binary_to_all(const std::vector<char>& binary_data) { // for (auto& client : m_clients) { // m_thread_pool.enqueue([client, binary_data]() { // if (client->is_connected()) // { // client->send_binary(binary_data); // } // }); // } //} // //bool WebSocketClientPool::post_to_idle_client(const std::string& message) { // for (auto& client : m_clients) { // if (client->is_connected()) { // // 提交发送任务到线程池 // m_thread_pool.enqueue([client, message]() { // client->set_busy(true); // 标记为忙碌 // client->send(message); // 发送消息 // client->set_busy(false); // 完成后标记为空闲 // }); // return true; // 成功提交任务 // } // } // return false; // 没有可用客户端 //} // //bool WebSocketClientPool::post_binary_to_idle_client(const std::vector<char>& binary_data) { // for (auto& client : m_clients) { // if (client->is_connected()) { // // 提交发送任务到线程池 // m_thread_pool.enqueue([client, binary_data]() { // client->set_busy(true); // 标记为忙碌 // client->send_binary(binary_data); // 发送二进制数据 // client->set_busy(false); // 完成后标记为空闲 // }); // return true; // 成功提交任务 // } // } // return false; // 没有可用客户端 //} // //bool WebSocketClientPool::send_to_idle_client(const std::string& message, int timeout_ms) { // for (auto& client : m_clients) { // if (!client->is_busy() && client->is_connected()) { // // 标记为忙 // client->set_busy(true); // // // 使用线程池发送消息 // m_thread_pool.enqueue([client, message, timeout_ms]() { // // 发送消息并等待服务器回复 // client->send_and_wait(message, timeout_ms); // 5秒超时 // client->set_busy(false); // 标记为空闲 // }); // // return true; // 成功发送消息,返回 true // } // } // return false; // 没有空闲客户端 //} // //bool WebSocketClientPool::send_binary_to_idle_client(const std::vector<char>& binary_data, int timeout_ms) { // for (auto& client : m_clients) { // if (!client->is_busy() && client->is_connected()) { // // 标记为忙 // client->set_busy(true); // // // 使用线程池发送消息 // m_thread_pool.enqueue([client, binary_data, timeout_ms]() { // // 发送消息并等待服务器回复 // client->send_binary_and_wait(binary_data, timeout_ms); // client->set_busy(false); // 标记为空闲 // }); // // return true; // 成功发送消息,返回 true // } // } // return false; // 没有空闲客户端 //} // //bool WebSocketClientPool::send_to_idle_client_and_wait(const std::string& message, int timeout_ms) { // for (auto& client : m_clients) { // if (!client->is_busy() && client->is_connected()) { // client->set_busy(true); // 标记为忙 // // try { // // 直接在当前线程调用 send_and_wait,阻塞直到任务完成 // bool result = client->send_and_wait(message, timeout_ms); // client->set_busy(false); // 设置为空闲状态 // return result; // 返回结果 // } // catch (const std::exception& e) { // std::cerr << "Exception during sending: " << e.what() << std::endl; // client->set_busy(false); // 即使发生异常,也要重置为未忙状态 // return false; // 发生异常,返回失败 // } // } // } // return false; // 没有空闲客户端 //} // //bool WebSocketClientPool::send_binary_to_idle_client_and_wait(const std::vector<char>& binary_data, int timeout_ms) { // for (auto& client : m_clients) { // if (!client->is_busy() && client->is_connected()) { // client->set_busy(true); // 标记为忙 // // try { // // 直接在当前线程调用 send_and_wait,阻塞直到任务完成 // bool result = client->send_binary_and_wait(binary_data, timeout_ms); // client->set_busy(false); // 设置为空闲状态 // return result; // 返回结果 // } // catch (const std::exception& e) { // std::cerr << "Exception during sending: " << e.what() << std::endl; // client->set_busy(false); // 即使发生异常,也要重置为未忙状态 // return false; // 发生异常,返回失败 // } // } // } // return false; // 没有空闲客户端 //} // //bool WebSocketClientPool::send_to_idle_client_with_retry(const std::string& message, int total_timeout_ms, int retry_interval_ms) { // auto start_time = std::chrono::steady_clock::now(); // // while (true) { // // 查找空闲客户端 // for (auto& client : m_clients) { // if (!client->is_busy() && client->is_connected()) { // // 标记为忙 // client->set_busy(true); // // // 使用线程池发送消息 // m_thread_pool.enqueue([client, message]() { // try { // // 发送消息并等待服务器回复 // client->send_and_wait(message, 5000); // 这里的超时可以根据需要调整 // } // catch (const std::exception& e) { // std::cerr << "Error sending data: " << e.what() << std::endl; // } // client->set_busy(false); // 标记为空闲 // }); // // return true; // 成功发送 // } // } // // // 计算已使用的时间 // auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count(); // // if (elapsed_time >= total_timeout_ms) { // std::cerr << "Timeout: failed to send data within the specified timeout of " << total_timeout_ms << " ms." << std::endl; // return false; // 超时,无法发送 // } // // // 没有找到空闲客户端,等待 retry_interval_ms 毫秒后重试 // std::this_thread::sleep_for(std::chrono::milliseconds(retry_interval_ms)); // } //} // //bool WebSocketClientPool::send_binary_to_idle_client_with_retry(std::vector<char>&& binary_data, int total_timeout_ms, int retry_interval_ms) { // auto start_time = std::chrono::steady_clock::now(); // // while (true) { // // 查找空闲客户端 // for (auto& client : m_clients) { // if (!client->is_busy() && client->is_connected()) { // // 标记为忙 // client->set_busy(true); // // // 使用线程池发送消息 // m_thread_pool.enqueue([client, binary_data = std::move(binary_data)]() mutable { // try { // // 发送消息并等待服务器回复 // client->send_binary_and_wait(binary_data, 5000); // 这里的超时可以根据需要调整 // } // catch (const std::exception& e) { // std::cerr << "Error sending binary data: " << e.what() << std::endl; // } // client->set_busy(false); // 标记为空闲 // }); // // return true; // 成功发送 // } // } // // // 计算已使用的时间 // auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count(); // // if (elapsed_time >= total_timeout_ms) { // std::cerr << "Timeout: failed to send data within the specified timeout of " << total_timeout_ms << " ms." << std::endl; // return false; // 超时,无法发送 // } // // // 没有找到空闲客户端,等待 retry_interval_ms 毫秒后重试 // std::this_thread::sleep_for(std::chrono::milliseconds(retry_interval_ms)); // } //} // //void WebSocketClientPool::set_open_handler(WebSocketClient::OpenHandler handler) { // for (auto& client : m_clients) { // client->set_open_handler(handler); // } //} // //void WebSocketClientPool::set_message_handler(WebSocketClient::MessageHandler handler) { // for (auto& client : m_clients) { // client->set_message_handler(handler); // } //} // //void WebSocketClientPool::set_close_handler(WebSocketClient::CloseHandler handler) { // for (auto& client : m_clients) { // client->set_close_handler(handler); // } //} // //void WebSocketClientPool::set_fail_handler(WebSocketClient::FailHandler handler) { // for (auto& client : m_clients) { // client->set_fail_handler(handler); // } //} // //void WebSocketClientPool::set_pong_handler(WebSocketClient::PongHandler handler) { // for (auto& client : m_clients) { // client->set_pong_handler(handler); // } //} EdgeInspector_App/WebSocket/WebSocketClientPool.h
@@ -1,50 +1,50 @@ #ifndef WEBSOCKETCLIENTWITHPOOL_H #define WEBSOCKETCLIENTWITHPOOL_H #include "WebSocketClient.h" #include "ThreadPool.h" class WebSocketClientPool { public: explicit WebSocketClientPool(size_t thread_count); ~WebSocketClientPool(); // 初始化客户端 void init_clients(const std::vector<std::string>& uris); // 退出所有客户端 void exit_clients(); // 使用线程池发送消息 void post_to_all(const std::string& message); void post_binary_to_all(const std::vector<char>& binary_data); // 从池中选择一个空闲的客户端进行发送,不需要服务器响应 bool post_to_idle_client(const std::string& message); bool post_binary_to_idle_client(const std::vector<char>& binary_data); // 从池中选择一个空闲的客户端进行发送,并需要服务器响应 bool send_to_idle_client(const std::string& message, int timeout_ms); bool send_binary_to_idle_client(const std::vector<char>& binary_data, int timeout_ms); // 从池中选择一个空闲的客户端进行发送,并等待服务器响应 bool send_to_idle_client_and_wait(const std::string& message, int timeout_ms); bool send_binary_to_idle_client_and_wait(const std::vector<char>& binary_data, int timeout_ms); // 从池中选择一个空闲的客户端进行发送,并等待服务器响应,如果超时则重试 bool send_to_idle_client_with_retry(const std::string& message, int total_timeout_ms, int retry_interval_ms); bool send_binary_to_idle_client_with_retry(std::vector<char>&& binary_data, int total_timeout_ms, int retry_interval_ms); // 设置 WebSocket 客户端事件处理程序 void set_open_handler(WebSocketClient::OpenHandler handler); void set_message_handler(WebSocketClient::MessageHandler handler); void set_close_handler(WebSocketClient::CloseHandler handler); void set_fail_handler(WebSocketClient::FailHandler handler); void set_pong_handler(WebSocketClient::PongHandler handler); private: std::vector<std::shared_ptr<WebSocketClient>> m_clients; ThreadPool m_thread_pool; }; //#include "WebSocketClient.h" //#include "ThreadPool.h" // //class WebSocketClientPool { //public: // explicit WebSocketClientPool(size_t thread_count); // ~WebSocketClientPool(); // // // 初始化客户端 // void init_clients(const std::vector<std::string>& uris); // // // 退出所有客户端 // void exit_clients(); // // // 使用线程池发送消息 // void post_to_all(const std::string& message); // void post_binary_to_all(const std::vector<char>& binary_data); // // // 从池中选择一个空闲的客户端进行发送,不需要服务器响应 // bool post_to_idle_client(const std::string& message); // bool post_binary_to_idle_client(const std::vector<char>& binary_data); // // // 从池中选择一个空闲的客户端进行发送,并需要服务器响应 // bool send_to_idle_client(const std::string& message, int timeout_ms); // bool send_binary_to_idle_client(const std::vector<char>& binary_data, int timeout_ms); // // // 从池中选择一个空闲的客户端进行发送,并等待服务器响应 // bool send_to_idle_client_and_wait(const std::string& message, int timeout_ms); // bool send_binary_to_idle_client_and_wait(const std::vector<char>& binary_data, int timeout_ms); // // // 从池中选择一个空闲的客户端进行发送,并等待服务器响应,如果超时则重试 // bool send_to_idle_client_with_retry(const std::string& message, int total_timeout_ms, int retry_interval_ms); // bool send_binary_to_idle_client_with_retry(std::vector<char>&& binary_data, int total_timeout_ms, int retry_interval_ms); // // // 设置 WebSocket 客户端事件处理程序 // void set_open_handler(WebSocketClient::OpenHandler handler); // void set_message_handler(WebSocketClient::MessageHandler handler); // void set_close_handler(WebSocketClient::CloseHandler handler); // void set_fail_handler(WebSocketClient::FailHandler handler); // void set_pong_handler(WebSocketClient::PongHandler handler); // //private: // std::vector<std::shared_ptr<WebSocketClient>> m_clients; // ThreadPool m_thread_pool; //}; #endif // WEBSOCKETCLIENTWITHPOOL_H