PipeService.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. #include "StdAfx.h"
  2. #include "PipeService.h"
  3. //#include "MainDlg.h"
  4. #define PIPENAME _T("\\\\.\\pipe\\Assist")
  5. // 每一个处理器上产生多少个线程(为了最大限度的提升服务器性能,详见配套文档)
  6. #define WORKER_THREADS_PER_PROCESSOR 2
  7. // 同时投递的Accept请求的数量(这个要根据实际的情况灵活设置)
  8. #define MAX_POST_ACCEPT 10
  9. // 传递给Worker线程的退出信号
  10. #define EXIT_CODE NULL
  11. // 释放指针和句柄资源的宏
  12. // 释放指针宏
  13. #define RELEASE(x) {if(x != NULL ){delete x;x=NULL;}}
  14. // 释放句柄宏
  15. #define RELEASE_HANDLE(x) {if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}}
  16. // 释放Socket宏
  17. #define RELEASE_SOCKET(x) {if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}}
  18. CIOCPPipe::CIOCPPipe(void)
  19. {
  20. m_nThreads = 0;
  21. m_hShutdownEvent = NULL;
  22. m_hIOCompletionPort = NULL;
  23. m_phWorkerThreads= NULL;
  24. m_pMain = NULL;
  25. }
  26. CIOCPPipe::~CIOCPPipe(void)
  27. {
  28. // 确保资源彻底释放
  29. this->Stop();
  30. }
  31. DWORD WINAPI CIOCPPipe::_WorkerThread(LPVOID lpParam)
  32. {
  33. THREADPARAMS_WORKER* pParam = (THREADPARAMS_WORKER*)lpParam;
  34. CIOCPPipe* pIOCPModel = (CIOCPPipe*)pParam->pIOCPModel;
  35. int nThreadNo = (int)pParam->nThreadNo;
  36. pIOCPModel->_ShowMessage(_T("工作者线程启动,ID: %d."),nThreadNo);
  37. OVERLAPPED *pOverlapped = NULL;
  38. ULONG_PTR pCompletionKey = NULL;
  39. DWORD dwBytesTransfered = 0;
  40. // 循环处理请求,知道接收到Shutdown信息为止
  41. while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0))
  42. {
  43. BOOL bReturn = GetQueuedCompletionStatus(
  44. pIOCPModel->m_hIOCompletionPort,
  45. &dwBytesTransfered,
  46. &pCompletionKey,
  47. &pOverlapped,
  48. INFINITE);
  49. dprintf(_T("IOCP有消息"));
  50. // 如果收到的是退出标志,则直接退出
  51. if ( EXIT_CODE==(DWORD)pCompletionKey )
  52. {
  53. //break;
  54. continue;
  55. }
  56. // 判断是否出现了错误
  57. if( !bReturn )
  58. {
  59. DWORD dwErr = GetLastError();
  60. // 显示一下提示信息
  61. // 读取传入的参数
  62. PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped);
  63. if( !pIOCPModel->HandleError( pIoContext,dwErr ) )
  64. {
  65. //break;
  66. continue;
  67. }
  68. continue;
  69. }
  70. else
  71. {
  72. // 读取传入的参数
  73. PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped);
  74. // 判断是否有客户端断开了
  75. if((0 == dwBytesTransfered) && ( OP_RECV==pIoContext->m_OpType || OP_SEND==pIoContext->m_OpType))
  76. {
  77. pIOCPModel->_ShowMessage( _T("客户端 %s:%d 断开连接."),pIoContext->chClientName, pIoContext->dwProcessId );
  78. // 释放掉对应的资源
  79. pIOCPModel->_RemoveContext( pIoContext );
  80. continue;
  81. }
  82. else
  83. {
  84. switch( pIoContext->m_OpType )
  85. {
  86. case OP_ACCEPT:
  87. {
  88. pIOCPModel->_DoAccpet( pIoContext );
  89. dprintf(_T("客户端连接成功"));
  90. }
  91. break;
  92. case OP_RECV:
  93. {
  94. //pIOCPModel->_DoRecv( pSocketContext,pIoContext );
  95. }
  96. break;
  97. case OP_SEND:
  98. break;
  99. default:
  100. // 不应该执行到这里
  101. TRACE(_T("_WorkThread中的 pIoContext->m_OpType 参数异常.\n"));
  102. break;
  103. }
  104. }
  105. }
  106. }
  107. TRACE(_T("工作者线程 %d 号退出.\n"),nThreadNo);
  108. // 释放线程参数
  109. RELEASE(lpParam);
  110. return 0;
  111. }
  112. bool CIOCPPipe::Start()
  113. {
  114. // 初始化线程互斥量
  115. InitializeCriticalSection(&m_csContextList);
  116. // 建立系统退出的事件通知
  117. m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
  118. // 初始化IOCP
  119. if (false == _InitializeIOCP())
  120. {
  121. this->_ShowMessage(_T("初始化IOCP失败!\n"));
  122. return false;
  123. }
  124. else
  125. {
  126. this->_ShowMessage(_T("\nIOCP初始化完毕\n."));
  127. }
  128. this->_ShowMessage(_T("系统准备就绪,等候连接....\n"));
  129. return true;
  130. }
  131. void CIOCPPipe::Stop()
  132. {
  133. // 激活关闭消息通知
  134. SetEvent(m_hShutdownEvent);
  135. for (int i = 0; i < m_nThreads; i++)
  136. {
  137. // 通知所有的完成端口操作退出
  138. PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)EXIT_CODE, NULL);
  139. }
  140. // 等待所有的客户端资源退出
  141. WaitForMultipleObjects(m_nThreads, m_phWorkerThreads, TRUE, INFINITE);
  142. // 清除客户端列表信息
  143. this->_ClearContextList();
  144. // 释放其他资源
  145. this->_DeInitialize();
  146. this->_ShowMessage(_T("停止监听\n"));
  147. }
  148. bool CIOCPPipe::_InitializeIOCP()
  149. {
  150. // 建立第一个完成端口
  151. m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
  152. if ( NULL == m_hIOCompletionPort)
  153. {
  154. this->_ShowMessage(_T("建立完成端口失败!错误代码: %d!\n"), WSAGetLastError());
  155. return false;
  156. }
  157. // 根据本机中的处理器数量,建立对应的线程数
  158. #ifdef _DEBUG
  159. m_nThreads = 1;
  160. #else
  161. m_nThreads = WORKER_THREADS_PER_PROCESSOR * _GetNoOfProcessors();
  162. #endif
  163. // 为工作者线程初始化句柄
  164. m_phWorkerThreads = new HANDLE[m_nThreads];
  165. // 根据计算出来的数量建立工作者线程
  166. DWORD nThreadID;
  167. for (int i = 0; i < m_nThreads; i++)
  168. {
  169. THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;
  170. pThreadParams->pIOCPModel = this;
  171. pThreadParams->nThreadNo = i+1;
  172. m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread, (void *)pThreadParams, 0, &nThreadID);
  173. }
  174. _PostAccept();
  175. TRACE(" 建立 _WorkerThread %d 个.\n", m_nThreads );
  176. return true;
  177. }
  178. void CIOCPPipe::_DeInitialize()
  179. {
  180. // 删除客户端列表的互斥量
  181. DeleteCriticalSection(&m_csContextList);
  182. // 关闭系统退出事件句柄
  183. RELEASE_HANDLE(m_hShutdownEvent);
  184. // 释放工作者线程句柄指针
  185. for( int i=0;i<m_nThreads;i++ )
  186. {
  187. RELEASE_HANDLE(m_phWorkerThreads[i]);
  188. }
  189. RELEASE(m_phWorkerThreads);
  190. // 关闭IOCP句柄
  191. RELEASE_HANDLE(m_hIOCompletionPort);
  192. _ShowMessage(_T("释放资源完毕.\n"));
  193. }
  194. bool CIOCPPipe::_PostAccept()
  195. {
  196. HANDLE hPipeAccept = CreateNamedPipe(
  197. PIPENAME, // pipe name
  198. PIPE_ACCESS_DUPLEX | // read/write access
  199. FILE_FLAG_OVERLAPPED, // overlapped mode
  200. PIPE_TYPE_MESSAGE | // message-type pipe
  201. PIPE_READMODE_MESSAGE | // message-read mode
  202. PIPE_WAIT, // blocking mode
  203. PIPE_UNLIMITED_INSTANCES, // number of instances
  204. 0, // output buffer size
  205. 0, // input buffer size
  206. NMPWAIT_WAIT_FOREVER, // client time-out
  207. NULL); // default security attributes
  208. if ( hPipeAccept == INVALID_HANDLE_VALUE )
  209. return false;
  210. HANDLE hIOCP = CreateIoCompletionPort(hPipeAccept, m_hIOCompletionPort, 10001, 0);
  211. if ( hIOCP == NULL )
  212. {
  213. CloseHandle(hPipeAccept);
  214. return false;
  215. }
  216. PER_IO_CONTEXT *pMyOverlapped = new PER_IO_CONTEXT;
  217. memset(pMyOverlapped, 0, sizeof(PER_IO_CONTEXT));
  218. pMyOverlapped->m_OpType = OP_ACCEPT;
  219. pMyOverlapped->m_PipeAccept = hPipeAccept;
  220. BOOL bRet = ConnectNamedPipe(hPipeAccept, (LPOVERLAPPED)pMyOverlapped);
  221. if( bRet )
  222. {// 重叠IO或完成端口,返回的是Fase;
  223. CloseHandle(hPipeAccept);
  224. CloseHandle(hIOCP);
  225. delete pMyOverlapped;
  226. pMyOverlapped = NULL;
  227. _ShowMessage(_T("创建用于Accept的Pipe失败!错误代码: %d"), GetLastError());
  228. return false;
  229. }
  230. return true;
  231. }
  232. bool CIOCPPipe::_DoAccpet( PER_IO_CONTEXT* pIoContext )
  233. {
  234. // 客户端连接后,发出指令:获取客户端注入进程信息;
  235. pIoContext->m_OpType = OP_SEND;
  236. _stprintf_s(pIoContext->chReply, _T("%s"), _T("1001\n"));
  237. pIoContext->cbToWrite = sizeof(pIoContext->chReply);
  238. BOOL fWrite = WriteFile(
  239. pIoContext->m_PipeAccept,
  240. pIoContext->chReply,
  241. pIoContext->cbToWrite,
  242. &pIoContext->cbToWrite,
  243. (LPOVERLAPPED)pIoContext);
  244. if (!fWrite) {
  245. dprintf("WriteFie: dwWritten=%d,GLE=%d\n", pIoContext->cbToWrite, GetLastError());
  246. }
  247. dprintf(_T("向客户端发消息"));
  248. ////////////////////////////////////////////////////////////////////////////////////////////////
  249. // 5. 使用完毕之后,把Listen Socket的那个IoContext重置,然后准备投递新的AcceptEx
  250. pIoContext->ResetBuffer();
  251. return this->_PostAccept( );
  252. }
  253. bool CIOCPPipe::_PostRecv( PER_IO_CONTEXT* pIoContext )
  254. {
  255. // 初始化变量
  256. DWORD dwFlags = 0;
  257. DWORD dwBytes = 0;
  258. OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
  259. pIoContext->ResetBuffer();
  260. pIoContext->m_OpType = OP_RECV;
  261. return true;
  262. }
  263. bool CIOCPPipe::_DoRecv( PER_IO_CONTEXT* pIoContext )
  264. {
  265. // 先把上一次的数据显示出现,然后就重置状态,发出下一个Recv请求
  266. // 然后开始投递下一个WSARecv请求
  267. return _PostRecv( pIoContext );
  268. }
  269. bool CIOCPPipe::_AssociateWithIOCP( PER_IO_CONTEXT* pIoContext )
  270. {
  271. // 将用于和客户端通信的SOCKET绑定到完成端口中
  272. return true;
  273. }
  274. void CIOCPPipe::_AddToContextList( PER_IO_CONTEXT* pIoContext )
  275. {
  276. EnterCriticalSection(&m_csContextList);
  277. //m_arrayClientContext.Add(pHandleData);
  278. LeaveCriticalSection(&m_csContextList);
  279. }
  280. void CIOCPPipe::_RemoveContext( PER_IO_CONTEXT* pIoContext )
  281. {
  282. EnterCriticalSection(&m_csContextList);
  283. LeaveCriticalSection(&m_csContextList);
  284. }
  285. void CIOCPPipe::_ClearContextList()
  286. {
  287. EnterCriticalSection(&m_csContextList);
  288. for( int i=0;i<m_arrayClientContext.GetCount();i++ )
  289. {
  290. delete m_arrayClientContext.GetAt(i);
  291. }
  292. m_arrayClientContext.RemoveAll();
  293. LeaveCriticalSection(&m_csContextList);
  294. }
  295. int CIOCPPipe::_GetNoOfProcessors()
  296. {
  297. SYSTEM_INFO si;
  298. GetSystemInfo(&si);
  299. return si.dwNumberOfProcessors;
  300. }
  301. void CIOCPPipe::_ShowMessage(const CString szFormat,...) const
  302. {
  303. // 根据传入的参数格式化字符串
  304. CString strMessage;
  305. va_list arglist;
  306. // 处理变长参数
  307. va_start(arglist, szFormat);
  308. strMessage.FormatV(szFormat,arglist);
  309. va_end(arglist);
  310. }
  311. /////////////////////////////////////////////////////////////////////
  312. // 判断客户端Socket是否已经断开,否则在一个无效的Socket上投递WSARecv操作会出现异常
  313. // 使用的方法是尝试向这个socket发送数据,判断这个socket调用的返回值
  314. // 因为如果客户端网络异常断开(例如客户端崩溃或者拔掉网线等)的时候,服务器端是无法收到客户端断开的通知的
  315. bool CIOCPPipe::_IsSocketAlive(SOCKET s)
  316. {
  317. int nByteSent=send(s,"",0,0);
  318. if (-1 == nByteSent) return false;
  319. return true;
  320. }
  321. bool CIOCPPipe::HandleError( PER_IO_CONTEXT *pIoContext,const DWORD& dwErr )
  322. {
  323. // 如果是超时了,就再继续等吧
  324. if(WAIT_TIMEOUT == dwErr)
  325. {
  326. }
  327. // 可能是客户端异常退出了
  328. else if( ERROR_NETNAME_DELETED==dwErr )
  329. {
  330. this->_ShowMessage( _T("检测到客户端异常退出!") );
  331. return true;
  332. }
  333. else
  334. {
  335. this->_ShowMessage( _T("完成端口操作出现错误,线程退出。错误代码:%d"),dwErr );
  336. return false;
  337. }
  338. }