SATTCPServer.cpp 32 KB


  1. #include "StdAfx.h"
  2. #include "SATTCPServer.h"
  3. //#include "MainDlg.h"
  4. #include "SATExecutor.h"
  5. #include "SATDevices.h"
  6. // 每一个处理器上产生多少个线程(为了最大限度的提升服务器性能,详见配套文档)
  7. #define WORKER_THREADS_PER_PROCESSOR 2
  8. // 同时投递的Accept请求的数量(这个要根据实际的情况灵活设置)
  9. #define MAX_POST_ACCEPT 10
  10. // 传递给Worker线程的退出信号
  11. #define EXIT_CODE NULL
  12. // 释放指针和句柄资源的宏
  13. // 释放指针宏
  14. #define RELEASE(x) {if(x != NULL ){delete x;x=NULL;}}
  15. // 释放句柄宏
  16. #define RELEASE_HANDLE(x) {if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}}
  17. // 释放Socket宏
  18. #define RELEASE_SOCKET(x) {if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}}
  19. CSATTCPServer::CSATTCPServer(void):
  20. m_nThreads(0),
  21. m_hShutdownEvent(NULL),
  22. m_hIOCompletionPort(NULL),
  23. m_phWorkerThreads(NULL),
  24. m_strIP(DEFAULT_IP),
  25. m_nPort(DEFAULT_PORT),
  26. m_pMain(NULL),
  27. m_lpfnAcceptEx( NULL ),
  28. m_pListenContext( NULL )
  29. {
  30. }
  31. CSATTCPServer::~CSATTCPServer(void)
  32. {
  33. // 确保资源彻底释放
  34. this->Stop();
  35. }
  36. ///////////////////////////////////////////////////////////////////
  37. // 工作者线程: 为IOCP请求服务的工作者线程
  38. // 也就是每当完成端口上出现了完成数据包,就将之取出来进行处理的线程
  39. ///////////////////////////////////////////////////////////////////
  40. DWORD WINAPI CSATTCPServer::_WorkerThread(LPVOID lpParam)
  41. {
  42. THREADPARAMS_WORKER* pParam = (THREADPARAMS_WORKER*)lpParam;
  43. CSATTCPServer* pIOCPModel = (CSATTCPServer*)pParam->pIOCPModel;
  44. int nThreadNo = (int)pParam->nThreadNo;
  45. pIOCPModel->_ShowMessage(_T("工作者线程启动,ID: %d."),nThreadNo);
  46. OVERLAPPED *pOverlapped = NULL;
  47. PER_SOCKET_CONTEXT *pSocketContext = NULL;
  48. DWORD dwBytesTransfered = 0;
  49. // 循环处理请求,知道接收到Shutdown信息为止
  50. while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0))
  51. {
  52. BOOL bReturn = GetQueuedCompletionStatus(
  53. pIOCPModel->m_hIOCompletionPort,
  54. &dwBytesTransfered,
  55. (PULONG_PTR)&pSocketContext,
  56. &pOverlapped,
  57. INFINITE);
  58. // 如果收到的是退出标志,则直接退出
  59. if ( EXIT_CODE==(DWORD)pSocketContext )
  60. {
  61. pIOCPModel->_ShowMessage(_T("收到的是退出标志,则直接退出,ID: %d."),nThreadNo);
  62. break;
  63. }
  64. // 判断是否出现了错误
  65. if( !bReturn )
  66. {
  67. DWORD dwErr = GetLastError();
  68. // 显示一下提示信息
  69. if( !pIOCPModel->HandleError( pSocketContext,dwErr ) )
  70. {
  71. break;
  72. }
  73. continue;
  74. }
  75. else
  76. {
  77. // 读取传入的参数
  78. PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped);
  79. // 判断是否有客户端断开了
  80. if((0 == dwBytesTransfered) && ( RECV_POSTED==pIoContext->m_OpType || SEND_POSTED==pIoContext->m_OpType))
  81. {
  82. pIOCPModel->_ShowMessage( _T("客户端 %s:%d 断开连接."),
  83. inet_ntoa(pSocketContext->m_ClientAddr.sin_addr),
  84. ntohs(pSocketContext->m_ClientAddr.sin_port) );
  85. // 释放掉对应的资源
  86. pIOCPModel->_RemoveContext( pSocketContext );
  87. continue;
  88. }
  89. else
  90. {
  91. switch( pIoContext->m_OpType )
  92. {
  93. // Accept
  94. case ACCEPT_POSTED:
  95. {
  96. // 为了增加代码可读性,这里用专门的_DoAccept函数进行处理连入请求
  97. pIOCPModel->_DoAccpet( pSocketContext, pIoContext );
  98. }
  99. break;
  100. // RECV
  101. case RECV_POSTED:
  102. {
  103. // 为了增加代码可读性,这里用专门的_DoRecv函数进行处理接收请求
  104. pIOCPModel->_DoRecv( pSocketContext,pIoContext );
  105. }
  106. break;
  107. // SEND
  108. // 这里略过不写了,要不代码太多了,不容易理解,Send操作相对来讲简单一些
  109. case SEND_POSTED:
  110. {
  111. }
  112. break;
  113. default:
  114. // 不应该执行到这里
  115. TRACE(_T("_WorkThread中的 pIoContext->m_OpType 参数异常.\n"));
  116. break;
  117. } //switch
  118. }//if
  119. }//if
  120. }//while
  121. TRACE(_T("工作者线程 %d 号退出.\n"),nThreadNo);
  122. // 释放线程参数
  123. RELEASE(lpParam);
  124. return 0;
  125. }
  126. //====================================================================================
  127. //
  128. // 系统初始化和终止
  129. //
  130. //====================================================================================
  131. ////////////////////////////////////////////////////////////////////
  132. // 初始化WinSock 2.2
  133. bool CSATTCPServer::LoadSocketLib()
  134. {
  135. WSADATA wsaData;
  136. int nResult;
  137. nResult = WSAStartup(MAKEWORD(2,2), &wsaData);
  138. // 错误(一般都不可能出现)
  139. if (NO_ERROR != nResult)
  140. {
  141. this->_ShowMessage(_T("初始化WinSock 2.2失败!\n"));
  142. return false;
  143. }
  144. return true;
  145. }
  146. //////////////////////////////////////////////////////////////////
  147. // 启动服务器
  148. bool CSATTCPServer::Start(unsigned int port)
  149. {
  150. // 初始化线程互斥量
  151. InitializeCriticalSection(&m_csContextList);
  152. // 建立系统退出的事件通知
  153. m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
  154. // 初始化IOCP
  155. if (false == _InitializeIOCP())
  156. {
  157. this->_ShowMessage(_T("初始化IOCP失败!\n"));
  158. return false;
  159. }
  160. else
  161. {
  162. this->_ShowMessage(_T("\nIOCP初始化完毕\n."));
  163. }
  164. // 初始化Socket
  165. if( false==_InitializeListenSocket(port) )
  166. {
  167. this->_ShowMessage(_T("Listen Socket初始化失败!\n"));
  168. this->_DeInitialize();
  169. return false;
  170. }
  171. else
  172. {
  173. this->_ShowMessage(_T("Listen Socket初始化完毕."));
  174. }
  175. this->_ShowMessage(_T("系统准备就绪,等候连接....\n"));
  176. return true;
  177. }
  178. ////////////////////////////////////////////////////////////////////
  179. // 开始发送系统退出消息,退出完成端口和线程资源
  180. void CSATTCPServer::Stop()
  181. {
  182. if( m_pListenContext != NULL && m_pListenContext->m_Socket != INVALID_SOCKET )
  183. {
  184. // 激活关闭消息通知
  185. SetEvent(m_hShutdownEvent);
  186. for (int i = 0; i < m_nThreads; i++)
  187. {
  188. // 通知所有的完成端口操作退出
  189. PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)EXIT_CODE, NULL);
  190. }
  191. // 等待所有的客户端资源退出
  192. WaitForMultipleObjects(m_nThreads, m_phWorkerThreads, TRUE, INFINITE);
  193. // 清除客户端列表信息
  194. this->_ClearContextList();
  195. // 释放其他资源
  196. this->_DeInitialize();
  197. this->_ShowMessage(_T("停止监听\n"));
  198. }
  199. }
  200. ////////////////////////////////
  201. // 初始化完成端口
  202. bool CSATTCPServer::_InitializeIOCP()
  203. {
  204. // 建立第一个完成端口
  205. m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
  206. if ( NULL == m_hIOCompletionPort)
  207. {
  208. this->_ShowMessage(_T("建立完成端口失败!错误代码: %d!\n"), WSAGetLastError());
  209. return false;
  210. }
  211. // 根据本机中的处理器数量,建立对应的线程数
  212. //m_nThreads = WORKER_THREADS_PER_PROCESSOR * _GetNoOfProcessors();
  213. m_nThreads = _GetNoOfProcessors();
  214. // 为工作者线程初始化句柄
  215. m_phWorkerThreads = new HANDLE[m_nThreads];
  216. // 根据计算出来的数量建立工作者线程
  217. DWORD nThreadID;
  218. for (int i = 0; i < m_nThreads; i++)
  219. {
  220. THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;
  221. pThreadParams->pIOCPModel = this;
  222. pThreadParams->nThreadNo = i+1;
  223. m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread, (void *)pThreadParams, 0, &nThreadID);
  224. }
  225. TRACE(" 建立 _WorkerThread %d 个.\n", m_nThreads );
  226. return true;
  227. }
  228. /////////////////////////////////////////////////////////////////
  229. // 初始化Socket
  230. bool CSATTCPServer::_InitializeListenSocket(unsigned int port)
  231. {
  232. // AcceptEx 和 GetAcceptExSockaddrs 的GUID,用于导出函数指针
  233. GUID GuidAcceptEx = WSAID_ACCEPTEX;
  234. GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
  235. // 服务器地址信息,用于绑定Socket
  236. struct sockaddr_in ServerAddress;
  237. // 生成用于监听的Socket的信息
  238. m_pListenContext = new PER_SOCKET_CONTEXT;
  239. // 需要使用重叠IO,必须得使用WSASocket来建立Socket,才可以支持重叠IO操作
  240. m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
  241. if (INVALID_SOCKET == m_pListenContext->m_Socket)
  242. {
  243. this->_ShowMessage(_T("初始化Socket失败,错误代码: %d.\n"), WSAGetLastError());
  244. return false;
  245. }
  246. else
  247. {
  248. TRACE("WSASocket() 完成.\n");
  249. }
  250. // 将Listen Socket绑定至完成端口中
  251. if( NULL== CreateIoCompletionPort( (HANDLE)m_pListenContext->m_Socket, m_hIOCompletionPort,(DWORD)m_pListenContext, 0))
  252. {
  253. this->_ShowMessage(_T("绑定 Listen Socket至完成端口失败!错误代码: %d/n"), WSAGetLastError());
  254. RELEASE_SOCKET( m_pListenContext->m_Socket );
  255. return false;
  256. }
  257. else
  258. {
  259. TRACE("Listen Socket绑定完成端口 完成.\n");
  260. }
  261. // 填充地址信息
  262. ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));
  263. ServerAddress.sin_family = AF_INET;
  264. // 这里可以绑定任何可用的IP地址,或者绑定一个指定的IP地址
  265. ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);
  266. //ServerAddress.sin_addr.s_addr = inet_addr(m_strIP.GetString());
  267. ServerAddress.sin_port = htons(port);
  268. #if 0
  269. // SO_REUSEADDR:端口复用,可以让n个套接字绑定在一个端口上;
  270. // 将套接字属性设置为 SO_REUSEADDR (允许套接口和一个已在使用中的地址捆绑),可以解决绑定失败问题
  271. BOOL bReuseaddr = TRUE;
  272. setsockopt(m_pListenContext->m_Socket,SOL_SOCKET ,SO_REUSEADDR,(const char*)&bReuseaddr,sizeof(BOOL));
  273. #endif
  274. // 绑定地址和端口
  275. if (SOCKET_ERROR == ::bind(m_pListenContext->m_Socket, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress)))
  276. {
  277. TRACE("bind() 失败.\n");
  278. this->_ShowMessage(_T("bind()函数执行错误! 错误代码: %d/n"), WSAGetLastError());
  279. return false;
  280. }
  281. else
  282. {
  283. TRACE("bind() 完成.\n");
  284. }
  285. // 开始进行监听
  286. if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,SOMAXCONN))
  287. {
  288. this->_ShowMessage(_T("Listen()函数执行出现错误; 错误代码: %d/n"), WSAGetLastError());
  289. return false;
  290. }
  291. else
  292. {
  293. TRACE("Listen() 完成.\n");
  294. }
  295. // 使用AcceptEx函数,因为这个是属于WinSock2规范之外的微软另外提供的扩展函数
  296. // 所以需要额外获取一下函数的指针,
  297. // 获取AcceptEx函数指针
  298. DWORD dwBytes = 0;
  299. if(SOCKET_ERROR == WSAIoctl(
  300. m_pListenContext->m_Socket,
  301. SIO_GET_EXTENSION_FUNCTION_POINTER,
  302. &GuidAcceptEx,
  303. sizeof(GuidAcceptEx),
  304. &m_lpfnAcceptEx,
  305. sizeof(m_lpfnAcceptEx),
  306. &dwBytes,
  307. NULL,
  308. NULL))
  309. {
  310. this->_ShowMessage(_T("WSAIoctl 未能获取AcceptEx函数指针。错误代码: %d\n"), WSAGetLastError());
  311. this->_DeInitialize();
  312. return false;
  313. }
  314. // 获取GetAcceptExSockAddrs函数指针,也是同理
  315. if(SOCKET_ERROR == WSAIoctl(
  316. m_pListenContext->m_Socket,
  317. SIO_GET_EXTENSION_FUNCTION_POINTER,
  318. &GuidGetAcceptExSockAddrs,
  319. sizeof(GuidGetAcceptExSockAddrs),
  320. &m_lpfnGetAcceptExSockAddrs,
  321. sizeof(m_lpfnGetAcceptExSockAddrs),
  322. &dwBytes,
  323. NULL,
  324. NULL))
  325. {
  326. this->_ShowMessage(_T("WSAIoctl 未能获取GuidGetAcceptExSockAddrs函数指针。错误代码: %d\n"), WSAGetLastError());
  327. this->_DeInitialize();
  328. return false;
  329. }
  330. // 为AcceptEx 准备参数,然后投递AcceptEx I/O请求
  331. for( int i=0;i<MAX_POST_ACCEPT;i++ )
  332. {
  333. // 新建一个IO_CONTEXT
  334. PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();
  335. if( false==this->_PostAccept( pAcceptIoContext ) )
  336. {
  337. m_pListenContext->RemoveContext(pAcceptIoContext);
  338. return false;
  339. }
  340. }
  341. this->_ShowMessage( _T("投递 %d 个AcceptEx请求完毕"),MAX_POST_ACCEPT );
  342. return true;
  343. }
  344. ////////////////////////////////////////////////////////////
  345. // 最后释放掉所有资源
  346. void CSATTCPServer::_DeInitialize()
  347. {
  348. // 删除客户端列表的互斥量
  349. DeleteCriticalSection(&m_csContextList);
  350. // 关闭系统退出事件句柄
  351. RELEASE_HANDLE(m_hShutdownEvent);
  352. // 释放工作者线程句柄指针
  353. for( int i=0;i<m_nThreads;i++ )
  354. {
  355. RELEASE_HANDLE(m_phWorkerThreads[i]);
  356. }
  357. RELEASE(m_phWorkerThreads);
  358. // 关闭IOCP句柄
  359. RELEASE_HANDLE(m_hIOCompletionPort);
  360. int nRet = shutdown(m_pListenContext->m_Socket, SD_BOTH);
  361. if ( nRet != 0 )
  362. OutputDebugString(_T("================>m_pListenContext->m_Socket套接字出错。<================\n"));
  363. else
  364. OutputDebugString(_T("================>m_pListenContext->m_Socket套接字成功。<================\n"));
  365. // 先关闭端口;
  366. RELEASE_SOCKET( m_pListenContext->m_Socket );
  367. // 关闭监听Socket
  368. RELEASE(m_pListenContext);
  369. this->_ShowMessage(_T("释放资源完毕.\n"));
  370. }
  371. //====================================================================================
  372. //
  373. // 投递完成端口请求
  374. //
  375. //====================================================================================
  376. //////////////////////////////////////////////////////////////////
  377. // 投递Accept请求
  378. bool CSATTCPServer::_PostAccept( PER_IO_CONTEXT* pAcceptIoContext )
  379. {
  380. ASSERT( INVALID_SOCKET!=m_pListenContext->m_Socket );
  381. // 准备参数
  382. DWORD dwBytes = 0;
  383. pAcceptIoContext->m_OpType = ACCEPT_POSTED;
  384. WSABUF *p_wbuf = &pAcceptIoContext->m_wsaBuf;
  385. OVERLAPPED *p_ol = &pAcceptIoContext->m_Overlapped;
  386. // 为以后新连入的客户端先准备好Socket( 这个是与传统accept最大的区别 )
  387. pAcceptIoContext->m_sockAccept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
  388. if( INVALID_SOCKET==pAcceptIoContext->m_sockAccept )
  389. {
  390. _ShowMessage(_T("创建用于Accept的Socket失败!错误代码: %d"), WSAGetLastError());
  391. return false;
  392. }
  393. // 投递AcceptEx
  394. if(FALSE == m_lpfnAcceptEx( m_pListenContext->m_Socket, pAcceptIoContext->m_sockAccept, p_wbuf->buf, p_wbuf->len - ((sizeof(SOCKADDR_IN)+16)*2),
  395. sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, &dwBytes, p_ol))
  396. {
  397. if(WSA_IO_PENDING != WSAGetLastError())
  398. {
  399. _ShowMessage(_T("投递 AcceptEx 请求失败,错误代码: %d"), WSAGetLastError());
  400. return false;
  401. }
  402. }
  403. return true;
  404. }
  405. ////////////////////////////////////////////////////////////
  406. // 在有客户端连入的时候,进行处理
  407. // 流程有点复杂,你要是看不懂的话,就看配套的文档吧....
  408. // 如果能理解这里的话,完成端口的机制你就消化了一大半了
  409. // 总之你要知道,传入的是ListenSocket的Context,我们需要复制一份出来给新连入的Socket用
  410. // 原来的Context还是要在上面继续投递下一个Accept请求
  411. //
  412. bool CSATTCPServer::_DoAccpet( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext )
  413. {
  414. SOCKADDR_IN* ClientAddr = NULL;
  415. SOCKADDR_IN* LocalAddr = NULL;
  416. int remoteLen = sizeof(SOCKADDR_IN), localLen = sizeof(SOCKADDR_IN);
  417. ///////////////////////////////////////////////////////////////////////////
  418. // 1. 首先取得连入客户端的地址信息
  419. // 这个 m_lpfnGetAcceptExSockAddrs 不得了啊~~~~~~
  420. // 不但可以取得客户端和本地端的地址信息,还能顺便取出客户端发来的第一组数据
  421. this->m_lpfnGetAcceptExSockAddrs(pIoContext->m_wsaBuf.buf, pIoContext->m_wsaBuf.len - ((sizeof(SOCKADDR_IN)+16)*2),
  422. sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, (LPSOCKADDR*)&LocalAddr, &localLen, (LPSOCKADDR*)&ClientAddr, &remoteLen);
  423. _RecvProcess(pSocketContext, pIoContext);
  424. this->_ShowMessage( _T("客户端 %s:%d 连入."), inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port) );
  425. //////////////////////////////////////////////////////////////////////////////////////////////////////
  426. // 2. 这里需要注意,这里传入的这个是ListenSocket上的Context,这个Context我们还需要用于监听下一个连接
  427. // 所以我还得要将ListenSocket上的Context复制出来一份为新连入的Socket新建一个SocketContext
  428. PER_SOCKET_CONTEXT* pNewSocketContext = new PER_SOCKET_CONTEXT;
  429. pNewSocketContext->m_Socket = pIoContext->m_sockAccept;
  430. memcpy(&(pNewSocketContext->m_ClientAddr), ClientAddr, sizeof(SOCKADDR_IN));
  431. // 参数设置完毕,将这个Socket和完成端口绑定(这也是一个关键步骤)
  432. if( false==this->_AssociateWithIOCP( pNewSocketContext ) )
  433. {
  434. RELEASE( pNewSocketContext );
  435. return false;
  436. }
  437. ///////////////////////////////////////////////////////////////////////////////////////////////////
  438. // 3. 继续,建立其下的IoContext,用于在这个Socket上投递第一个Recv数据请求
  439. PER_IO_CONTEXT* pNewIoContext = pNewSocketContext->GetNewIoContext();
  440. pNewIoContext->m_OpType = RECV_POSTED;
  441. pNewIoContext->m_sockAccept = pNewSocketContext->m_Socket;
  442. // 如果Buffer需要保留,就自己拷贝一份出来
  443. //memcpy( pNewIoContext->m_szBuffer,pIoContext->m_szBuffer,MAX_BUFFER_LEN );
  444. // 绑定完毕之后,就可以开始在这个Socket上投递完成请求了
  445. if( false==this->_PostRecv( pNewIoContext) )
  446. {
  447. pNewSocketContext->RemoveContext( pNewIoContext );
  448. return false;
  449. }
  450. /////////////////////////////////////////////////////////////////////////////////////////////////
  451. // 4. 如果投递成功,那么就把这个有效的客户端信息,加入到ContextList中去(需要统一管理,方便释放资源)
  452. this->_AddToContextList( pNewSocketContext );
  453. ////////////////////////////////////////////////////////////////////////////////////////////////
  454. // 5. 使用完毕之后,把Listen Socket的那个IoContext重置,然后准备投递新的AcceptEx
  455. pIoContext->ResetBuffer();
  456. return this->_PostAccept( pIoContext );
  457. }
  458. ////////////////////////////////////////////////////////////////////
  459. // 投递接收数据请求
  460. bool CSATTCPServer::_PostRecv( PER_IO_CONTEXT* pIoContext )
  461. {
  462. // 初始化变量
  463. DWORD dwFlags = 0;
  464. DWORD dwBytes = 0;
  465. WSABUF *p_wbuf = &pIoContext->m_wsaBuf;
  466. OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
  467. pIoContext->ResetBuffer();
  468. pIoContext->m_OpType = RECV_POSTED;
  469. // 初始化完成后,,投递WSARecv请求
  470. int nBytesRecv = WSARecv( pIoContext->m_sockAccept, p_wbuf, 1, &dwBytes, &dwFlags, p_ol, NULL );
  471. // 如果返回值错误,并且错误的代码并非是Pending的话,那就说明这个重叠请求失败了
  472. if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError()))
  473. {
  474. this->_ShowMessage(_T("投递第一个WSARecv失败!"));
  475. return false;
  476. }
  477. return true;
  478. }
  479. /////////////////////////////////////////////////////////////////
  480. // 在有接收的数据到达的时候,进行处理
  481. bool CSATTCPServer::_DoRecv( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext )
  482. {
  483. // 先把上一次的数据显示出现,然后就重置状态,发出下一个Recv请求
  484. SOCKADDR_IN* ClientAddr = &pSocketContext->m_ClientAddr;
  485. //this->_ShowMessage( _T("收到 %s:%d 信息:%s"),inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port), pIoContext->m_wsaBuf.buf );
  486. _RecvProcess(pSocketContext, pIoContext);
  487. // 然后开始投递下一个WSARecv请求
  488. return _PostRecv( pIoContext );
  489. }
  490. /////////////////////////////////////////////////////
  491. // 将句柄(Socket)绑定到完成端口中
  492. bool CSATTCPServer::_AssociateWithIOCP( PER_SOCKET_CONTEXT *pContext )
  493. {
  494. // 将用于和客户端通信的SOCKET绑定到完成端口中
  495. HANDLE hTemp = CreateIoCompletionPort((HANDLE)pContext->m_Socket, m_hIOCompletionPort, (DWORD)pContext, 0);
  496. if (NULL == hTemp)
  497. {
  498. this->_ShowMessage(_T("执行CreateIoCompletionPort()出现错误.错误代码:%d"),GetLastError());
  499. return false;
  500. }
  501. return true;
  502. }
  503. //====================================================================================
  504. //
  505. // ContextList 相关操作
  506. //
  507. //====================================================================================
  508. //////////////////////////////////////////////////////////////
  509. // 将客户端的相关信息存储到数组中
  510. void CSATTCPServer::_AddToContextList( PER_SOCKET_CONTEXT *pHandleData )
  511. {
  512. EnterCriticalSection(&m_csContextList);
  513. m_arrayClientContext.Add(pHandleData);
  514. LeaveCriticalSection(&m_csContextList);
  515. }
  516. ////////////////////////////////////////////////////////////////
  517. // 移除某个特定的Context
  518. void CSATTCPServer::_RemoveContext( PER_SOCKET_CONTEXT *pSocketContext )
  519. {
  520. EnterCriticalSection(&m_csContextList);
  521. for( int i=0;i<m_arrayClientContext.GetCount();i++ )
  522. {
  523. if( pSocketContext==m_arrayClientContext.GetAt(i) )
  524. {
  525. RELEASE( pSocketContext );
  526. m_arrayClientContext.RemoveAt(i);
  527. break;
  528. }
  529. }
  530. LeaveCriticalSection(&m_csContextList);
  531. }
  532. ////////////////////////////////////////////////////////////////
  533. // 清空客户端信息
  534. void CSATTCPServer::_ClearContextList()
  535. {
  536. EnterCriticalSection(&m_csContextList);
  537. for( int i=0;i<m_arrayClientContext.GetCount();i++ )
  538. {
  539. delete m_arrayClientContext.GetAt(i);
  540. }
  541. m_arrayClientContext.RemoveAll();
  542. LeaveCriticalSection(&m_csContextList);
  543. }
  544. //====================================================================================
  545. //
  546. // 其他辅助函数定义
  547. //
  548. //====================================================================================
  549. ////////////////////////////////////////////////////////////////////
  550. // 获得本机的IP地址
  551. CString CSATTCPServer::GetLocalIP()
  552. {
  553. // 获得本机主机名
  554. char hostname[MAX_PATH] = {0};
  555. gethostname(hostname,MAX_PATH);
  556. struct hostent FAR* lpHostEnt = gethostbyname(hostname);
  557. if(lpHostEnt == NULL)
  558. {
  559. return DEFAULT_IP;
  560. }
  561. // 取得IP地址列表中的第一个为返回的IP(因为一台主机可能会绑定多个IP)
  562. LPSTR lpAddr = lpHostEnt->h_addr_list[0];
  563. // 将IP地址转化成字符串形式
  564. struct in_addr inAddr;
  565. memmove(&inAddr,lpAddr,4);
  566. m_strIP = CString( inet_ntoa(inAddr) );
  567. return m_strIP;
  568. }
  569. ///////////////////////////////////////////////////////////////////
  570. // 获得本机中处理器的数量
  571. int CSATTCPServer::_GetNoOfProcessors()
  572. {
  573. SYSTEM_INFO si;
  574. GetSystemInfo(&si);
  575. return si.dwNumberOfProcessors;
  576. }
  577. /////////////////////////////////////////////////////////////////////
  578. // 在主界面中显示提示信息
  579. void CSATTCPServer::_ShowMessage(const CString szFormat,...) const
  580. {
  581. // 根据传入的参数格式化字符串
  582. CString strMessage;
  583. va_list arglist;
  584. // 处理变长参数
  585. va_start(arglist, szFormat);
  586. strMessage.FormatV(szFormat,arglist);
  587. va_end(arglist);
  588. GLOBAL::WriteTextLog(GLOBAL::SAT_TCP, strMessage);
  589. }
  590. #define PAK_LEN sizeof(SATPROTO::DataHeader)
  591. void CSATTCPServer::_RecvProcess(PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext)
  592. {
  593. // 小于包头;
  594. SATPROTO::DataHeader* phead = NULL;
  595. if (pSocketContext->lastData.size() == 0)
  596. {
  597. // 不足包头;
  598. if (PAK_LEN > pIoContext->m_Overlapped.InternalHigh)
  599. {
  600. //OutputDebugString("A:不足包头;\n");
  601. //GLOBAL::WriteTextLog("A:不足包头");
  602. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  603. }
  604. else
  605. {
  606. // 包头充足;
  607. phead = (SATPROTO::DataHeader*)pIoContext->m_wsaBuf.buf;
  608. if ( !CheckDataHeader(phead) )
  609. {
  610. //OutputDebugString("A:包头损坏;\n");
  611. //GLOBAL::WriteTextLog("A:包头损坏");
  612. return;
  613. }
  614. // 完整的包;
  615. if (phead->len == pIoContext->m_Overlapped.InternalHigh)
  616. {
  617. //OutputDebugString("A:完整的包;\n");
  618. //GLOBAL::WriteTextLog("A:完整的包");
  619. _TaskProcess(pIoContext, (SATPROTO::Package*)pIoContext->m_wsaBuf.buf);
  620. }
  621. // 小包;
  622. else if (phead->len > pIoContext->m_Overlapped.InternalHigh)
  623. {
  624. //OutputDebugString("A:小包;\n");
  625. //GLOBAL::WriteTextLog("A:小包");
  626. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  627. }
  628. // 超包;
  629. else if (phead->len < pIoContext->m_Overlapped.InternalHigh)
  630. {
  631. //OutputDebugString("A:超包;\n");
  632. //GLOBAL::WriteTextLog("A:超包");
  633. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + phead->len, pIoContext->m_Overlapped.InternalHigh - phead->len);
  634. _TaskProcess(pIoContext, (SATPROTO::Package*)pIoContext->m_wsaBuf.buf);
  635. }
  636. }
  637. }
  638. else
  639. {
  640. int lastlen = pIoContext->m_Overlapped.InternalHigh;
  641. if (pSocketContext->lastData.size() >= PAK_LEN)
  642. {
  643. phead = (SATPROTO::DataHeader*)pSocketContext->lastData.data();
  644. if ( !CheckDataHeader(phead) )
  645. {
  646. OutputDebugString("C:包头损坏;\n");
  647. //GLOBAL::WriteTextLog("C:包头损坏");
  648. pSocketContext->lastData.clear();
  649. return;
  650. }
  651. if (phead->len <= pSocketContext->lastData.size() + pIoContext->m_Overlapped.InternalHigh)
  652. {
  653. if ( phead->len <= pSocketContext->lastData.size() )
  654. {
  655. //OutputDebugString("C:超包;\n");
  656. //GLOBAL::WriteTextLog("C:超包");
  657. // 完整包;
  658. _TaskProcess(pIoContext, (SATPROTO::Package*)pSocketContext->lastData.substr(0, phead->len).data());
  659. pSocketContext->lastData = pSocketContext->lastData.substr(phead->len);
  660. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  661. }
  662. else
  663. {
  664. //OutputDebugString("D:超包;\n");
  665. //GLOBAL::WriteTextLog("D:超包");
  666. lastlen = pSocketContext->lastData.size() + pIoContext->m_Overlapped.InternalHigh - phead->len;
  667. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh - lastlen);
  668. // 完整包;
  669. _TaskProcess(pIoContext, (SATPROTO::Package*)pSocketContext->lastData.data());
  670. // 剩余包;
  671. pSocketContext->lastData.clear();
  672. if (lastlen)
  673. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + pIoContext->m_Overlapped.InternalHigh - lastlen, lastlen);
  674. }
  675. }
  676. else
  677. {
  678. //OutputDebugString("C:仍不足一个包;\n");
  679. //GLOBAL::WriteTextLog("C:仍不足一个包");
  680. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  681. }
  682. }
  683. else
  684. {
  685. // 包头剩余长度;
  686. int diflen = PAK_LEN - pSocketContext->lastData.size();
  687. // 仍不足一个包头;
  688. if ( diflen > pIoContext->m_Overlapped.InternalHigh )
  689. {
  690. //OutputDebugString("B:仍不足一个包头;\n");
  691. //GLOBAL::WriteTextLog("B:仍不足一个包头");
  692. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  693. }
  694. else
  695. {
  696. // 拼成完整包头;
  697. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, diflen);
  698. phead = (SATPROTO::DataHeader*)pSocketContext->lastData.data();
  699. if ( !CheckDataHeader(phead) )
  700. {
  701. //OutputDebugString("B:包头损坏;\n");
  702. //GLOBAL::WriteTextLog("B:包头损坏");
  703. pSocketContext->lastData.clear();
  704. return;
  705. }
  706. // 完整包;
  707. if ( phead->len == PAK_LEN + pIoContext->m_Overlapped.InternalHigh - diflen )
  708. {
  709. //OutputDebugString("B:完整包;\n");
  710. //GLOBAL::WriteTextLog("B:完整包");
  711. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + diflen, pIoContext->m_Overlapped.InternalHigh - diflen);
  712. _TaskProcess(pIoContext, (SATPROTO::Package*)pSocketContext->lastData.data());
  713. pSocketContext->lastData.clear();
  714. }
  715. // 小包;
  716. else if ( phead->len > PAK_LEN + pIoContext->m_Overlapped.InternalHigh - diflen)
  717. {
  718. //OutputDebugString("B:小包;\n");
  719. //GLOBAL::WriteTextLog("B:小包");
  720. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + diflen, pIoContext->m_Overlapped.InternalHigh - diflen);
  721. }
  722. // 超包;
  723. else if (phead->len < PAK_LEN + pIoContext->m_Overlapped.InternalHigh - diflen)
  724. {
  725. //OutputDebugString("B:超包;\n");
  726. //GLOBAL::WriteTextLog("B:超包");
  727. // 组完成包;
  728. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + diflen, phead->len - PAK_LEN);
  729. _TaskProcess(pIoContext, (SATPROTO::Package*)pSocketContext->lastData.data());
  730. pSocketContext->lastData.clear();
  731. int last = pIoContext->m_Overlapped.InternalHigh - diflen - phead->len + PAK_LEN;
  732. if (last)
  733. {
  734. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + pIoContext->m_Overlapped.InternalHigh - last, last);
  735. }
  736. }
  737. }
  738. }
  739. }
  740. }
  741. void CSATTCPServer::_TaskProcess(PER_IO_CONTEXT* pIoContext, SATPROTO::Package* pak)
  742. {
  743. SATPROTO::DataHeader *pHeader = &pak->header;
  744. if ( !pHeader )
  745. return;
  746. if ( pHeader->protocol == 0xAA ) {
  747. switch ( pHeader->cmd )
  748. {
  749. case SATPROTO::CMD_LOGIN:
  750. case SATPROTO::CMD_LOGOUT:
  751. {
  752. // 登录;
  753. SATPROTO::UserInfo *pLogin = (SATPROTO::UserInfo*)pak->buf;
  754. GLOBAL::WriteTextLog(GLOBAL::SAT_TCP, _T("User=%s, psw=%s, Actuator=%s"), pLogin->szUserName, pLogin->szPassword, pLogin->szActuatorName);
  755. bool bRet = CSATExecutor::GetInstance()->Login(pLogin->szUserName, pLogin->szPassword, pLogin->szActuatorName, !pHeader->cmd);
  756. // 计算数据包长度;
  757. long len = sizeof(SATPROTO::DataHeader)+sizeof(SATPROTO::LoginResp);
  758. byte *pbuff = new byte[len];
  759. memset(pbuff, 0, len);
  760. SATPROTO::Package *pSendPak = (SATPROTO::Package *)pbuff;
  761. pSendPak->header.protocol = 0xAA;
  762. pSendPak->header.cmd = pHeader->cmd;
  763. pSendPak->header.len = len;
  764. #if 0
  765. // 复制内容到指针中;
  766. memcpy(pSendPak->buf, &bRet, sizeof(bool));
  767. char szMessage[MAX_PATH] = {0};
  768. _stprintf_s(szMessage, _T("登录……"));
  769. memcpy(pSendPak->buf + sizeof(bool), szMessage, MAX_PATH);
  770. #else
  771. const SATHTTP::STLoginResp *pstLoginResp = CSATExecutor::GetInstance()->GetLoginResp();
  772. SATPROTO::LoginResp *pLoginResp = (SATPROTO::LoginResp*)(pbuff+sizeof(SATPROTO::DataHeader));
  773. pLoginResp->bStatus = bRet;
  774. memset(pLoginResp->szMessage, 0, MAX_PATH);
  775. //_stprintf_s(pLoginResp->szMessage, _T("%s"), pstLoginResp->strMessage.c_str());
  776. //_tcscpy_s(pLoginResp->szMessage, _T("登录描述……"));
  777. // 如果使用上方两种方式赋值,会导致'\0'后面的字节不为0;
  778. memcpy(pLoginResp->szMessage, pstLoginResp->strMessage.c_str(), pstLoginResp->strMessage.size());
  779. #endif
  780. // 发送,无须判断是否发送成功;
  781. send(pIoContext->m_sockAccept, (const char*)pbuff, len, 0);
  782. // 释放内存;
  783. delete []pbuff;
  784. pbuff = NULL;
  785. }
  786. break;
  787. case SATPROTO::CMD_ADD_DEVICE:
  788. case SATPROTO::CMD_DEL_DEVICE:
  789. {
  790. std::string ip = (char*)pak->buf;
  791. if (pHeader->cmd == SATPROTO::CMD_DEL_DEVICE )
  792. CSATDevices::DelDevices(ip);
  793. else
  794. CSATDevices::AddReticleDevices(ip);
  795. SATPROTO::DataHeader header;
  796. header.cmd = pHeader->cmd;
  797. header.len = sizeof(SATPROTO::DataHeader);
  798. header.protocol = 0xAA;
  799. send(pIoContext->m_sockAccept, (const char*)&header, sizeof(SATPROTO::DataHeader), 0);
  800. }
  801. break;
  802. case SATPROTO::CMD_QUERY_DEVICES:
  803. {
  804. // 计算数据包长度;
  805. long len = sizeof(SATPROTO::DataHeader)+sizeof(SATPROTO::DeviceResp);
  806. byte *pbuff = new byte[len];
  807. memset(pbuff, 0, len);
  808. SATPROTO::Package *pSendPak = (SATPROTO::Package *)pbuff;
  809. pSendPak->header.protocol = 0xAA;
  810. pSendPak->header.cmd = pHeader->cmd;
  811. pSendPak->header.len = len;
  812. // 转换pak->buf为结构体;
  813. SATPROTO::DeviceResp *pDevResp = (SATPROTO::DeviceResp*)(pbuff+sizeof(SATPROTO::DataHeader));
  814. memset(pDevResp->ssDevs, 0, SATPROTO::MAX_DEVS*sizeof(SATPROTO::Device));
  815. // 获取设备列表;
  816. pDevResp->nSize = CSATDevices::AttachDeviceName2Buffer(pDevResp->ssDevs);
  817. // 返回给客户端;
  818. send(pIoContext->m_sockAccept, (const char*)pbuff, len, 0);
  819. // 释放内存;
  820. delete []pbuff;
  821. pbuff = NULL;
  822. }
  823. break;
  824. case SATPROTO::CMD_QUERY_TASK:
  825. {
  826. long len = sizeof(SATPROTO::DataHeader) + sizeof(SATPROTO::TaskInfoResp);
  827. byte *pbuff = new byte[len];
  828. memset(pbuff, 0, len);
  829. SATPROTO::Package *pSendPak = (SATPROTO::Package *)pbuff;
  830. pSendPak->header.protocol = 0xAA;
  831. pSendPak->header.cmd = pHeader->cmd;
  832. pSendPak->header.len = len;
  833. // 转换pak->buf为结构体;
  834. SATPROTO::TaskInfoResp *pTaskResp = (SATPROTO::TaskInfoResp*)(pbuff+sizeof(SATPROTO::DataHeader));
  835. memset(pTaskResp->ssTasks, 0, SATPROTO::MAX_TASKS*sizeof(SATPROTO::TaskInfo));
  836. // 获取设备列表;
  837. pTaskResp->nSize = CSATExecutor::GetInstance()->AttachTaskInfo2Buffer(pTaskResp->ssTasks);
  838. // 返回给客户端;
  839. send(pIoContext->m_sockAccept, (const char*)pbuff, len, 0);
  840. // 释放内存;
  841. delete []pbuff;
  842. pbuff = NULL;
  843. }
  844. break;
  845. case SATPROTO::CMD_PY_NOTICE:
  846. {
  847. SATPROTO::PyNotice *notice = (SATPROTO::PyNotice*)pak->buf;
  848. GLOBAL::g_PyNotify.notify = true;
  849. GLOBAL::g_PyNotify.datetime = notice->nTimeStamp;
  850. if ( notice->nNoticeType == 0 ) { // 表示正常关机;
  851. GLOBAL::g_PyNotify.report_type = "shutdown";
  852. }
  853. else if ( notice->nNoticeType == 1 ) { // 表示正常重启;
  854. GLOBAL::g_PyNotify.report_type = "reboot";
  855. }
  856. GLOBAL::WriteTextLog(GLOBAL::SAT_TCP, "收到异常关机或重启:%s", GLOBAL::g_PyNotify.report_type.c_str());
  857. #if 0 // 不作任何返回;
  858. // 封装返回数据;
  859. long len = sizeof(SATPROTO::DataHeader);
  860. byte *pbuff = new byte[len];
  861. memset(pbuff, 0, len);
  862. SATPROTO::Package *pSendPak = (SATPROTO::Package *)pbuff;
  863. pSendPak->header.protocol = 0xAA;
  864. pSendPak->header.cmd = pHeader->cmd;
  865. pSendPak->header.len = len;
  866. // 返回给客户端;
  867. send(pIoContext->m_sockAccept, (const char*)pbuff, len, 0);
  868. // 释放内存;
  869. delete []pbuff;
  870. pbuff = NULL;
  871. #endif
  872. }
  873. break;
  874. default:
  875. break;
  876. }
  877. }
  878. }
  879. bool CSATTCPServer::CheckDataHeader(SATPROTO::DataHeader* header)
  880. {
  881. if ( header->protocol != 0xAA )
  882. return false;
  883. return true;
  884. }
  885. /////////////////////////////////////////////////////////////////////
  886. // 判断客户端Socket是否已经断开,否则在一个无效的Socket上投递WSARecv操作会出现异常
  887. // 使用的方法是尝试向这个socket发送数据,判断这个socket调用的返回值
  888. // 因为如果客户端网络异常断开(例如客户端崩溃或者拔掉网线等)的时候,服务器端是无法收到客户端断开的通知的
  889. bool CSATTCPServer::_IsSocketAlive(SOCKET s)
  890. {
  891. int nByteSent=send(s,"",0,0);
  892. if (-1 == nByteSent) return false;
  893. return true;
  894. }
  895. ///////////////////////////////////////////////////////////////////
  896. // 显示并处理完成端口上的错误
  897. bool CSATTCPServer::HandleError( PER_SOCKET_CONTEXT *pContext,const DWORD& dwErr )
  898. {
  899. // 如果是超时了,就再继续等吧
  900. if(WAIT_TIMEOUT == dwErr)
  901. {
  902. // 确认客户端是否还活着...
  903. if( !_IsSocketAlive( pContext->m_Socket) )
  904. {
  905. this->_ShowMessage( _T("检测到客户端异常退出!") );
  906. this->_RemoveContext( pContext );
  907. return true;
  908. }
  909. else
  910. {
  911. this->_ShowMessage( _T("网络操作超时!重试中...") );
  912. return true;
  913. }
  914. }
  915. // 可能是客户端异常退出了
  916. else if( ERROR_NETNAME_DELETED==dwErr )
  917. {
  918. this->_ShowMessage( _T("检测到客户端异常退出!") );
  919. this->_RemoveContext( pContext );
  920. return true;
  921. }
  922. else
  923. {
  924. this->_ShowMessage( _T("完成端口操作出现错误,线程退出。错误代码:%d"),dwErr );
  925. return false;
  926. }
  927. }