AsyncSocketServerImpl.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. #ifndef __ASYNCSOCKETSERVERIMPL_20160228__
  2. #define __ASYNCSOCKETSERVERIMPL_20160228__
  3. #pragma once
  4. #pragma warning(push)
  5. #pragma warning(disable:4995)
  6. #include <algorithm>
  7. #pragma warning(pop)
  8. #include "CritSection.h"
  9. #include "SocketHandle.h"
  10. #include "PerSocketContext.h"
  11. static DWORD WM_ADD_CONNECTION = WM_USER+0x101;
  12. /************************************************************************/
  13. /* Copyright (C), 2016-2020, [IT], 保留所有权利;
  14. /* 模 块 名:IASocketServerHandler;
  15. /* 描 述:Event handler that ASocketServerImpl<T> must implement;
  16. /* This class is not required, you can do the same thing as long your class exposes these functions.
  17. /* (These functions are not pure to save you some typing)
  18. /*
  19. /* 版 本:[V];
  20. /* 作 者:[IT];
  21. /* 日 期:[2/23/2016];
  22. /*
  23. /*
  24. /* 注 意:;
  25. /*
  26. /* 修改记录:[IT];
  27. /* 修改日期:;
  28. /* 修改版本:;
  29. /* 修改内容:;
  30. /************************************************************************/
  31. class IASocketServerHandler
  32. {
  33. public:
  34. virtual void OnThreadBegin(CSocketHandle* ) {}
  35. virtual void OnThreadExit(CSocketHandle* ) {}
  36. virtual void OnThreadLoopEnter(CSocketHandle* ) {}
  37. virtual void OnThreadLoopLeave(CSocketHandle* ) {}
  38. virtual void OnAddConnection(CSocketHandle* , SOCKET ) {}
  39. virtual void OnRemoveConnection(CSocketHandle* , SOCKET ) {}
  40. virtual void OnDataReceived(CSocketHandle* , const SOCKET sClient, const BYTE* pData, DWORD dwCount, const SockAddrIn&, BYTE **pendingbuf, unsigned int& npendingSize, unsigned int& ncursize) {}
  41. virtual void OnConnectionFailure(CSocketHandle*, SOCKET) {}
  42. virtual void OnConnectionDropped(CSocketHandle* ) {}
  43. virtual void OnConnectionError(CSocketHandle* , DWORD ) {}
  44. };
  45. /************************************************************************/
  46. /* Copyright (C), 2016-2020, [IT], 保留所有权利;
  47. /* 模 块 名:ASocketServerImpl<T, tBufferSize>;
  48. /* 描 述:Because <typename T> may refer to any class of your choosing,Server Communication wrapper;
  49. /*
  50. /* 版 本:[V];
  51. /* 作 者:[IT];
  52. /* 日 期:[2/23/2016];
  53. /*
  54. /*
  55. /* 注 意:;
  56. /*
  57. /* 修改记录:[IT];
  58. /* 修改日期:;
  59. /* 修改版本:;
  60. /* 修改内容:;
  61. /************************************************************************/
  62. template <typename T, size_t tBufferSize = 1024*64>
  63. class ASocketServerImpl
  64. {
  65. typedef ASocketServerImpl<T, tBufferSize> thisClass;
  66. public:
  67. ASocketServerImpl(): _pInterface(0), _thread(0), _threadId(0)
  68. {
  69. }
  70. void SetInterface(T* pInterface)
  71. {
  72. ::InterlockedExchangePointer(reinterpret_cast<void**>(&_pInterface), pInterface);
  73. }
  74. operator CSocketHandle*() throw()
  75. {
  76. return( &_socket );
  77. }
  78. CSocketHandle* operator->() throw()
  79. {
  80. return( &_socket );
  81. }
  82. bool IsOpen() const
  83. {
  84. return _socket.IsOpen();
  85. }
  86. bool CreateSocket(LPCTSTR pszHost, LPCTSTR pszServiceName, int nFamily, int nType, UINT uOptions = 0)
  87. {
  88. return _socket.CreateSocket(pszHost, pszServiceName, nFamily, nType, uOptions);
  89. }
  90. void Close()
  91. {
  92. _socket.Close();
  93. }
  94. DWORD Read(LPBYTE lpBuffer, DWORD dwSize, LPSOCKADDR lpAddrIn = NULL, DWORD dwTimeout = INFINITE)
  95. {
  96. return _socket.Read(lpBuffer, dwSize, lpAddrIn, dwTimeout);
  97. }
  98. DWORD Write(const LPBYTE lpBuffer, DWORD dwCount, const LPSOCKADDR lpAddrIn = NULL, DWORD dwTimeout = INFINITE)
  99. {
  100. return _socket.Write(lpBuffer, dwCount, lpAddrIn, dwTimeout);
  101. }
  102. const SocketContextList& GetSocketList() const
  103. {
  104. // direct access! - use Lock/Unlock to protect
  105. return _sockets;
  106. }
  107. SocketContextList& GetClientSocketList()
  108. {
  109. // direct access! - use Lock/Unlock to protect
  110. return _sockets;
  111. }
  112. bool Lock()
  113. {
  114. return _critSection.Lock();
  115. }
  116. bool Unlock()
  117. {
  118. return _critSection.Unlock();
  119. }
  120. void ResetConnectionList()
  121. {
  122. AutoThreadSection aSection(&_critSection);
  123. _sockets.clear();
  124. }
  125. size_t GetConnectionCount() //const
  126. {
  127. AutoThreadSection aSection(&_critSection);
  128. return _sockets.size();
  129. }
  130. void AddConnection(SOCKET sock)
  131. {
  132. AutoThreadSection aSection(&_critSection);
  133. _sockets.push_back( sock );
  134. }
  135. void RemoveConnection(SOCKET sock)
  136. {
  137. AutoThreadSection aSection(&_critSection);
  138. _sockets.remove( sock );
  139. }
  140. PerSocketContext* GetConnectionBuffer(SOCKET sock)
  141. {
  142. AutoThreadSection aSection(&_critSection);
  143. SocketContextList::iterator iter = std::find(_sockets.begin(), _sockets.end(), sock);
  144. return ((iter != _sockets.end()) ? &(*iter) : NULL);
  145. }
  146. bool CloseConnection(SOCKET sock)
  147. {
  148. return CSocketHandle::ShutdownConnection( sock );
  149. }
  150. void CloseAllConnections();
  151. bool StartServer(LPCTSTR pszHost, LPCTSTR pszServiceName, int nFamily, int nType, UINT uOptions = 0);
  152. void Terminate(DWORD dwTimeout = INFINITE);
  153. static bool IsConnectionDropped(DWORD dwError);
  154. ThreadSection *ReturnSection() { return &_critSection; }
  155. protected:
  156. void Run();
  157. void IoRun();
  158. bool AsyncRead(PerSocketContext* pBuffer);
  159. void OnIOComplete(DWORD dwError, PerSocketContext* pBuffer, DWORD cbTransferred, DWORD dwFlags);
  160. static DWORD WINAPI SocketServerProc(thisClass* _this);
  161. static DWORD WINAPI SocketServerIOProc(thisClass* _this);
  162. static void WINAPI CompletionRoutine(DWORD dwError, DWORD cbTransferred,
  163. LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags);
  164. T* _pInterface;
  165. HANDLE _thread;
  166. DWORD _threadId;
  167. ThreadSection _critSection;
  168. CSocketHandle _socket; // 服务端SOCKET;
  169. SocketContextList _sockets; // 客户端SOCKET连接列表;
  170. };
  171. template <typename T, size_t tBufferSize>
  172. void ASocketServerImpl<T, tBufferSize>::CloseAllConnections()
  173. {
  174. AutoThreadSection aSection(&_critSection);
  175. if ( !_sockets.empty() )
  176. {
  177. // NOTE(elaurentin): this function closes all connections but handles are kept inside of list
  178. // (socket handles are removed by the pooling thread)
  179. SocketContextList::iterator iter;
  180. for(iter = _sockets.begin(); iter != _sockets.end(); ++iter)
  181. {
  182. CloseConnection( (*iter) );
  183. }
  184. }
  185. }
  186. template <typename T, size_t tBufferSize>
  187. bool ASocketServerImpl<T, tBufferSize>::StartServer(LPCTSTR pszHost, LPCTSTR pszServiceName, int nFamily, int nType, UINT uOptions)
  188. {
  189. // must be closed first...
  190. if ( IsOpen() ) return false;
  191. bool result = false;
  192. result = _socket.CreateSocket(pszHost, pszServiceName, nFamily, nType, uOptions);
  193. if ( result )
  194. {
  195. _thread = AtlCreateThread(SocketServerProc, this);
  196. if ( _thread == NULL )
  197. {
  198. DWORD dwError = GetLastError();
  199. _socket.Close();
  200. SetLastError(dwError);
  201. result = false;
  202. }
  203. }
  204. return result;
  205. }
  206. template <typename T, size_t tBufferSize>
  207. void ASocketServerImpl<T, tBufferSize>::Run()
  208. {
  209. _ASSERTE( _pInterface != NULL && "Need an interface to pass events");
  210. SOCKET sock = _socket.GetSocket();
  211. int type = _socket.GetSocketType();
  212. // Notification: OnThreadBegin
  213. if ( _pInterface != NULL ) {
  214. _pInterface->OnThreadBegin(*this);
  215. }
  216. if (type == SOCK_STREAM)
  217. {
  218. HANDLE ioThread = NULL;
  219. DWORD ioThreadId = 0L;
  220. ioThread = CreateThreadT(0, 0, SocketServerIOProc, this, 0, &ioThreadId);
  221. if ( ioThread == NULL )
  222. {
  223. DWORD dwError = GetLastError();
  224. if ( _pInterface != NULL ) {
  225. _pInterface->OnConnectionError(*this, dwError);
  226. }
  227. }
  228. // In TCP mode, use an I/O thread to process all requests
  229. while( _socket.IsOpen() )
  230. {
  231. SOCKET newSocket = CSocketHandle::WaitForConnection(sock);
  232. if (!_socket.IsOpen())
  233. break;
  234. if (!PostThreadMessage(ioThreadId, WM_ADD_CONNECTION, 0, static_cast<ULONG_PTR>(newSocket)))
  235. {
  236. // Notification: OnConnectionFailure
  237. if ( _pInterface != NULL ) {
  238. _pInterface->OnConnectionFailure(*this, newSocket);
  239. }
  240. }
  241. }
  242. // close all connections
  243. CloseAllConnections();
  244. // wait for io thread
  245. if ( ioThread != NULL )
  246. {
  247. PostThreadMessage(ioThreadId, WM_QUIT, 0, 0);
  248. WaitForSingleObject(ioThread, INFINITE);
  249. CloseHandle(ioThread);
  250. }
  251. ResetConnectionList();
  252. }
  253. else
  254. {
  255. // UDP mode - let's reuse our thread here
  256. try
  257. {
  258. PerSocketContext ioBuffer(_socket.GetSocket(), tBufferSize);
  259. AsyncRead( &ioBuffer );
  260. // Save our thread id so we can exit gracefully
  261. _threadId = GetCurrentThreadId();
  262. // Process UDP packets
  263. IoRun();
  264. }
  265. catch(std::bad_alloc&)
  266. { /* memory exception */
  267. if ( _pInterface != NULL )
  268. {
  269. _pInterface->OnConnectionError(*this, ERROR_NOT_ENOUGH_MEMORY);
  270. }
  271. }
  272. }
  273. // Notification: OnThreadExit
  274. if ( _pInterface != NULL ) {
  275. _pInterface->OnThreadExit(*this);
  276. }
  277. }
  278. template <typename T, size_t tBufferSize>
  279. void ASocketServerImpl<T, tBufferSize>::IoRun()
  280. {
  281. _ASSERTE( _pInterface != NULL && "Need an interface to pass events");
  282. MSG msg;
  283. ::PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
  284. // Notification: OnThreadLoopEnter
  285. if ( _pInterface != NULL ) {
  286. _pInterface->OnThreadLoopEnter(*this);
  287. }
  288. DWORD dwResult;
  289. while( (dwResult = MsgWaitForMultipleObjectsEx(0, NULL, INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE)) != WAIT_FAILED)
  290. {
  291. msg.message = 0;
  292. PeekMessage(&msg, NULL, 0, 0, PM_REMOVE);
  293. // exit on WM_QUIT or main socket closed
  294. if ( msg.message == WM_QUIT || !_socket.IsOpen() )
  295. break;
  296. else if ( msg.message == WM_ADD_CONNECTION )
  297. {
  298. SOCKET sock = static_cast<SOCKET>(msg.lParam);
  299. AddConnection(sock);
  300. PerSocketContext* pBuffer = GetConnectionBuffer(sock);
  301. //pBuffer->dwTime = GetTickCount(); // Jeff add 2015.10.22
  302. _ASSERTE( pBuffer != NULL );
  303. if ( pBuffer != NULL )
  304. {
  305. pBuffer->ReAlloc( tBufferSize );
  306. if (!AsyncRead(pBuffer))
  307. {
  308. // remove and close connection
  309. // 清除SOCK不在这里做,因为清除线程里会有清除,这里也清除;
  310. RemoveConnection( sock );//Jeff.wasn't remove at here;
  311. CloseConnection( sock );// Jeff.doesn't close socket here;
  312. }
  313. else
  314. {
  315. // Notification: OnAddConnection
  316. if ( _pInterface != NULL ) {
  317. _pInterface->OnAddConnection(*this, sock);
  318. }
  319. }
  320. }
  321. }
  322. }
  323. // Notification: OnThreadLoopLeave
  324. if ( _pInterface != NULL ) {
  325. _pInterface->OnThreadLoopLeave(*this);
  326. }
  327. }
  328. template <typename T, size_t tBufferSize>
  329. bool ASocketServerImpl<T, tBufferSize>::AsyncRead(PerSocketContext* pBuffer)
  330. {
  331. CSocketHandle sockHandle;
  332. DWORD dwRead;
  333. PerSocketContext& sbuf = (*pBuffer);
  334. SOCKET sock = static_cast<SOCKET>(sbuf);
  335. sockHandle.Attach( sock );
  336. int type = sockHandle.GetSocketType();
  337. LPWSAOVERLAPPED lpOverlapped = static_cast<LPWSAOVERLAPPED>(sbuf);
  338. lpOverlapped->hEvent = reinterpret_cast<HANDLE>(this);
  339. lpOverlapped->Pointer = reinterpret_cast<PVOID>(pBuffer);
  340. if (type == SOCK_STREAM) {
  341. // TCP - save current peer address
  342. sockHandle.GetPeerName( sbuf );
  343. dwRead = sockHandle.ReadEx(static_cast<LPBYTE>(sbuf), // buffer
  344. static_cast<DWORD>(sbuf.BufferSize()), // buffer size
  345. NULL, // sockaddr
  346. lpOverlapped, // overlapped
  347. thisClass::CompletionRoutine );
  348. } else {
  349. dwRead = sockHandle.ReadEx(static_cast<LPBYTE>(sbuf), // buffer
  350. static_cast<DWORD>(sbuf.BufferSize()), // buffer size
  351. static_cast<LPSOCKADDR>(sbuf), // sockaddr
  352. lpOverlapped, // overlapped
  353. thisClass::CompletionRoutine );
  354. }
  355. sockHandle.Detach();
  356. return ( dwRead != -1L );
  357. }
  358. template <typename T, size_t tBufferSize>
  359. void ASocketServerImpl<T, tBufferSize>::OnIOComplete(DWORD dwError, PerSocketContext* pBuffer, DWORD cbTransferred, DWORD /*dwFlags*/)
  360. {
  361. _ASSERTE( _pInterface != NULL && "Need an interface to pass events");
  362. _ASSERTE( pBuffer != NULL && "Invalid Buffer");
  363. if ( pBuffer != NULL )
  364. {
  365. //pBuffer->dwTime = GetTickCount(); // Jeff add 2015.10.22
  366. CSocketHandle sockHandle;
  367. SOCKET sock = static_cast<SOCKET>(*pBuffer);
  368. sockHandle.Attach( sock );
  369. int type = sockHandle.GetSocketType();
  370. if ( dwError == NOERROR )
  371. {
  372. if (type == SOCK_STREAM && cbTransferred == 0L )
  373. {
  374. // connection broken
  375. if ( _pInterface != NULL ) {
  376. _pInterface->OnConnectionDropped(*this);
  377. }
  378. // remove connection
  379. // 清除SOCK不在这里做,因为清除线程里会有清除,这里也清除;
  380. RemoveConnection( sock );// Jeff.doesn't remove socket here;
  381. CloseConnection(sock);
  382. if ( _pInterface != NULL ) {
  383. // Notification: OnRemoveConnection
  384. _pInterface->OnRemoveConnection(*this, sock);
  385. }
  386. }
  387. else
  388. {
  389. // Notification: OnDataReceived
  390. if ( _pInterface != NULL ) {
  391. _pInterface->OnDataReceived(*this,
  392. static_cast<SOCKET>(*pBuffer),
  393. static_cast<LPBYTE>(*pBuffer), // Data
  394. cbTransferred, // Number of bytes
  395. static_cast<SockAddrIn&>(*pBuffer),
  396. (BYTE**)(&pBuffer->_pendingbuf),
  397. pBuffer->_npendingSize,
  398. pBuffer->_ncurSize);
  399. }
  400. // schedule another read for this socket
  401. AsyncRead( pBuffer );
  402. }
  403. }
  404. else
  405. {
  406. for ( ; _pInterface != NULL; )
  407. {
  408. if (IsConnectionDropped( dwError) ) {
  409. // Notification: OnConnectionDropped
  410. if (type == SOCK_STREAM || (dwError == WSAENOTSOCK || dwError == WSAENETDOWN))
  411. //if (type == SOCK_STREAM && (dwError == WSAENOTSOCK || dwError == WSAENETDOWN))// Jeff.set.
  412. {
  413. _pInterface->OnConnectionDropped(*this);
  414. // 清除SOCK不在这里做,因为清除线程里会有清除,这里也清除;
  415. CloseConnection(sock);
  416. RemoveConnection( sock );
  417. //CloseConnection( sock );
  418. //_pInterface->OnConnectionError(*this, dwError); // Jeff.add
  419. type = -1; // don't schedule other request
  420. break;
  421. }
  422. }
  423. // Notification: OnConnectionError
  424. _pInterface->OnConnectionError(*this, dwError);
  425. break;
  426. }
  427. // Schedule read request
  428. if ((type == SOCK_STREAM || type == SOCK_DGRAM) && _socket.IsOpen() ) {
  429. AsyncRead( pBuffer );
  430. }
  431. }
  432. // Detach or Close this socket (TCP-mode only)
  433. if (type != SOCK_STREAM ||
  434. (type == SOCK_STREAM && cbTransferred != 0L) )
  435. {
  436. sockHandle.Detach();
  437. }
  438. }
  439. }
  440. template <typename T, size_t tBufferSize>
  441. void ASocketServerImpl<T, tBufferSize>::Terminate(DWORD dwTimeout /*= INFINITE*/)
  442. {
  443. _socket.Close();
  444. if ( _thread != NULL )
  445. {
  446. if ( _threadId != 0 ) {
  447. PostThreadMessage(_threadId, WM_QUIT, 0, 0);
  448. }
  449. if ( WaitForSingleObject(_thread, dwTimeout) == WAIT_TIMEOUT ) {
  450. TerminateThread(_thread, 1);
  451. }
  452. CloseHandle(_thread);
  453. _thread = NULL;
  454. _threadId = 0L;
  455. }
  456. }
  457. template <typename T, size_t tBufferSize>
  458. DWORD WINAPI ASocketServerImpl<T, tBufferSize>::SocketServerProc(thisClass* _this)
  459. {
  460. if ( _this != NULL )
  461. {
  462. _this->Run();
  463. }
  464. return 0;
  465. }
  466. template <typename T, size_t tBufferSize>
  467. DWORD WINAPI ASocketServerImpl<T, tBufferSize>::SocketServerIOProc(thisClass* _this)
  468. {
  469. if ( _this != NULL )
  470. {
  471. _this->IoRun();
  472. }
  473. return 0;
  474. }
  475. template <typename T, size_t tBufferSize>
  476. void WINAPI ASocketServerImpl<T, tBufferSize>::CompletionRoutine(DWORD dwError, DWORD cbTransferred,
  477. LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags)
  478. {
  479. thisClass* _this = reinterpret_cast<thisClass*>( lpOverlapped->hEvent );
  480. if ( _this != NULL )
  481. {
  482. PerSocketContext* pBuffer = reinterpret_cast<PerSocketContext*>(lpOverlapped->Pointer);
  483. _this->OnIOComplete(dwError, pBuffer, cbTransferred, dwFlags);
  484. }
  485. }
  486. template <typename T, size_t tBufferSize>
  487. bool ASocketServerImpl<T, tBufferSize>::IsConnectionDropped(DWORD dwError)
  488. {
  489. // see: winerror.h for definition
  490. switch( dwError )
  491. {
  492. case WSAENOTSOCK:
  493. case WSAENETDOWN:
  494. case WSAENETUNREACH:
  495. case WSAENETRESET:
  496. case WSAECONNABORTED:
  497. case WSAECONNRESET:
  498. case WSAESHUTDOWN:
  499. case WSAEHOSTDOWN:
  500. case WSAEHOSTUNREACH:
  501. return true;
  502. default:
  503. TRACE("--------------%d\r\n",dwError);
  504. break;
  505. }
  506. return false;
  507. }
  508. #endif // __ASYNCSOCKETSERVERIMPL_20160228__