#ifndef __ASYNCSOCKETSERVERIMPL_20160228__ #define __ASYNCSOCKETSERVERIMPL_20160228__ #pragma once #pragma warning(push) #pragma warning(disable:4995) #include #pragma warning(pop) #include "CritSection.h" #include "SocketHandle.h" #include "PerSocketContext.h" static DWORD WM_ADD_CONNECTION = WM_USER+0x101; /************************************************************************/ /* Copyright (C), 2016-2020, [IT], 保留所有权利; /* 模 块 名:IASocketServerHandler; /* 描 述:Event handler that ASocketServerImpl must implement; /* This class is not required, you can do the same thing as long your class exposes these functions. /* (These functions are not pure to save you some typing) /* /* 版 本:[V]; /* 作 者:[IT]; /* 日 期:[2/23/2016]; /* /* /* 注 意:; /* /* 修改记录:[IT]; /* 修改日期:; /* 修改版本:; /* 修改内容:; /************************************************************************/ class IASocketServerHandler { public: virtual void OnThreadBegin(CSocketHandle* ) {} virtual void OnThreadExit(CSocketHandle* ) {} virtual void OnThreadLoopEnter(CSocketHandle* ) {} virtual void OnThreadLoopLeave(CSocketHandle* ) {} virtual void OnAddConnection(CSocketHandle* , SOCKET ) {} virtual void OnRemoveConnection(CSocketHandle* , SOCKET ) {} virtual void OnDataReceived(CSocketHandle* , const SOCKET sClient, const BYTE* pData, DWORD dwCount, const SockAddrIn&, BYTE **pendingbuf, unsigned int& npendingSize, unsigned int& ncursize) {} virtual void OnConnectionFailure(CSocketHandle*, SOCKET) {} virtual void OnConnectionDropped(CSocketHandle* ) {} virtual void OnConnectionError(CSocketHandle* , DWORD ) {} }; /************************************************************************/ /* Copyright (C), 2016-2020, [IT], 保留所有权利; /* 模 块 名:ASocketServerImpl; /* 描 述:Because may refer to any class of your choosing,Server Communication wrapper; /* /* 版 本:[V]; /* 作 者:[IT]; /* 日 期:[2/23/2016]; /* /* /* 注 意:; /* /* 修改记录:[IT]; /* 修改日期:; /* 修改版本:; /* 修改内容:; /************************************************************************/ template class ASocketServerImpl { typedef ASocketServerImpl thisClass; public: ASocketServerImpl(): _pInterface(0), _thread(0), _threadId(0) { } void SetInterface(T* pInterface) { ::InterlockedExchangePointer(reinterpret_cast(&_pInterface), pInterface); } operator CSocketHandle*() throw() { return( &_socket ); } CSocketHandle* operator->() throw() { return( &_socket ); } bool IsOpen() const { return _socket.IsOpen(); } bool CreateSocket(LPCTSTR pszHost, LPCTSTR pszServiceName, int nFamily, int nType, UINT uOptions = 0) { return _socket.CreateSocket(pszHost, pszServiceName, nFamily, nType, uOptions); } void Close() { _socket.Close(); } DWORD Read(LPBYTE lpBuffer, DWORD dwSize, LPSOCKADDR lpAddrIn = NULL, DWORD dwTimeout = INFINITE) { return _socket.Read(lpBuffer, dwSize, lpAddrIn, dwTimeout); } DWORD Write(const LPBYTE lpBuffer, DWORD dwCount, const LPSOCKADDR lpAddrIn = NULL, DWORD dwTimeout = INFINITE) { return _socket.Write(lpBuffer, dwCount, lpAddrIn, dwTimeout); } const SocketContextList& GetSocketList() const { // direct access! - use Lock/Unlock to protect return _sockets; } SocketContextList& GetClientSocketList() { // direct access! - use Lock/Unlock to protect return _sockets; } bool Lock() { return _critSection.Lock(); } bool Unlock() { return _critSection.Unlock(); } void ResetConnectionList() { AutoThreadSection aSection(&_critSection); _sockets.clear(); } size_t GetConnectionCount() //const { AutoThreadSection aSection(&_critSection); return _sockets.size(); } void AddConnection(SOCKET sock) { AutoThreadSection aSection(&_critSection); _sockets.push_back( sock ); } void RemoveConnection(SOCKET sock) { AutoThreadSection aSection(&_critSection); _sockets.remove( sock ); } PerSocketContext* GetConnectionBuffer(SOCKET sock) { AutoThreadSection aSection(&_critSection); SocketContextList::iterator iter = std::find(_sockets.begin(), _sockets.end(), sock); return ((iter != _sockets.end()) ? &(*iter) : NULL); } bool CloseConnection(SOCKET sock) { return CSocketHandle::ShutdownConnection( sock ); } void CloseAllConnections(); bool StartServer(LPCTSTR pszHost, LPCTSTR pszServiceName, int nFamily, int nType, UINT uOptions = 0); void Terminate(DWORD dwTimeout = INFINITE); static bool IsConnectionDropped(DWORD dwError); ThreadSection *ReturnSection() { return &_critSection; } protected: void Run(); void IoRun(); bool AsyncRead(PerSocketContext* pBuffer); void OnIOComplete(DWORD dwError, PerSocketContext* pBuffer, DWORD cbTransferred, DWORD dwFlags); static DWORD WINAPI SocketServerProc(thisClass* _this); static DWORD WINAPI SocketServerIOProc(thisClass* _this); static void WINAPI CompletionRoutine(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags); T* _pInterface; HANDLE _thread; DWORD _threadId; ThreadSection _critSection; CSocketHandle _socket; // 服务端SOCKET; SocketContextList _sockets; // 客户端SOCKET连接列表; }; template void ASocketServerImpl::CloseAllConnections() { AutoThreadSection aSection(&_critSection); if ( !_sockets.empty() ) { // NOTE(elaurentin): this function closes all connections but handles are kept inside of list // (socket handles are removed by the pooling thread) SocketContextList::iterator iter; for(iter = _sockets.begin(); iter != _sockets.end(); ++iter) { CloseConnection( (*iter) ); } } } template bool ASocketServerImpl::StartServer(LPCTSTR pszHost, LPCTSTR pszServiceName, int nFamily, int nType, UINT uOptions) { // must be closed first... if ( IsOpen() ) return false; bool result = false; result = _socket.CreateSocket(pszHost, pszServiceName, nFamily, nType, uOptions); if ( result ) { _thread = AtlCreateThread(SocketServerProc, this); if ( _thread == NULL ) { DWORD dwError = GetLastError(); _socket.Close(); SetLastError(dwError); result = false; } } return result; } template void ASocketServerImpl::Run() { _ASSERTE( _pInterface != NULL && "Need an interface to pass events"); SOCKET sock = _socket.GetSocket(); int type = _socket.GetSocketType(); // Notification: OnThreadBegin if ( _pInterface != NULL ) { _pInterface->OnThreadBegin(*this); } if (type == SOCK_STREAM) { HANDLE ioThread = NULL; DWORD ioThreadId = 0L; ioThread = CreateThreadT(0, 0, SocketServerIOProc, this, 0, &ioThreadId); if ( ioThread == NULL ) { DWORD dwError = GetLastError(); if ( _pInterface != NULL ) { _pInterface->OnConnectionError(*this, dwError); } } // In TCP mode, use an I/O thread to process all requests while( _socket.IsOpen() ) { SOCKET newSocket = CSocketHandle::WaitForConnection(sock); if (!_socket.IsOpen()) break; if (!PostThreadMessage(ioThreadId, WM_ADD_CONNECTION, 0, static_cast(newSocket))) { // Notification: OnConnectionFailure if ( _pInterface != NULL ) { _pInterface->OnConnectionFailure(*this, newSocket); } } } // close all connections CloseAllConnections(); // wait for io thread if ( ioThread != NULL ) { PostThreadMessage(ioThreadId, WM_QUIT, 0, 0); WaitForSingleObject(ioThread, INFINITE); CloseHandle(ioThread); } ResetConnectionList(); } else { // UDP mode - let's reuse our thread here try { PerSocketContext ioBuffer(_socket.GetSocket(), tBufferSize); AsyncRead( &ioBuffer ); // Save our thread id so we can exit gracefully _threadId = GetCurrentThreadId(); // Process UDP packets IoRun(); } catch(std::bad_alloc&) { /* memory exception */ if ( _pInterface != NULL ) { _pInterface->OnConnectionError(*this, ERROR_NOT_ENOUGH_MEMORY); } } } // Notification: OnThreadExit if ( _pInterface != NULL ) { _pInterface->OnThreadExit(*this); } } template void ASocketServerImpl::IoRun() { _ASSERTE( _pInterface != NULL && "Need an interface to pass events"); MSG msg; ::PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE); // Notification: OnThreadLoopEnter if ( _pInterface != NULL ) { _pInterface->OnThreadLoopEnter(*this); } DWORD dwResult; while( (dwResult = MsgWaitForMultipleObjectsEx(0, NULL, INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE)) != WAIT_FAILED) { msg.message = 0; PeekMessage(&msg, NULL, 0, 0, PM_REMOVE); // exit on WM_QUIT or main socket closed if ( msg.message == WM_QUIT || !_socket.IsOpen() ) break; else if ( msg.message == WM_ADD_CONNECTION ) { SOCKET sock = static_cast(msg.lParam); AddConnection(sock); PerSocketContext* pBuffer = GetConnectionBuffer(sock); //pBuffer->dwTime = GetTickCount(); // Jeff add 2015.10.22 _ASSERTE( pBuffer != NULL ); if ( pBuffer != NULL ) { pBuffer->ReAlloc( tBufferSize ); if (!AsyncRead(pBuffer)) { // remove and close connection // 清除SOCK不在这里做,因为清除线程里会有清除,这里也清除; RemoveConnection( sock );//Jeff.wasn't remove at here; CloseConnection( sock );// Jeff.doesn't close socket here; } else { // Notification: OnAddConnection if ( _pInterface != NULL ) { _pInterface->OnAddConnection(*this, sock); } } } } } // Notification: OnThreadLoopLeave if ( _pInterface != NULL ) { _pInterface->OnThreadLoopLeave(*this); } } template bool ASocketServerImpl::AsyncRead(PerSocketContext* pBuffer) { CSocketHandle sockHandle; DWORD dwRead; PerSocketContext& sbuf = (*pBuffer); SOCKET sock = static_cast(sbuf); sockHandle.Attach( sock ); int type = sockHandle.GetSocketType(); LPWSAOVERLAPPED lpOverlapped = static_cast(sbuf); lpOverlapped->hEvent = reinterpret_cast(this); lpOverlapped->Pointer = reinterpret_cast(pBuffer); if (type == SOCK_STREAM) { // TCP - save current peer address sockHandle.GetPeerName( sbuf ); dwRead = sockHandle.ReadEx(static_cast(sbuf), // buffer static_cast(sbuf.BufferSize()), // buffer size NULL, // sockaddr lpOverlapped, // overlapped thisClass::CompletionRoutine ); } else { dwRead = sockHandle.ReadEx(static_cast(sbuf), // buffer static_cast(sbuf.BufferSize()), // buffer size static_cast(sbuf), // sockaddr lpOverlapped, // overlapped thisClass::CompletionRoutine ); } sockHandle.Detach(); return ( dwRead != -1L ); } template void ASocketServerImpl::OnIOComplete(DWORD dwError, PerSocketContext* pBuffer, DWORD cbTransferred, DWORD /*dwFlags*/) { _ASSERTE( _pInterface != NULL && "Need an interface to pass events"); _ASSERTE( pBuffer != NULL && "Invalid Buffer"); if ( pBuffer != NULL ) { //pBuffer->dwTime = GetTickCount(); // Jeff add 2015.10.22 CSocketHandle sockHandle; SOCKET sock = static_cast(*pBuffer); sockHandle.Attach( sock ); int type = sockHandle.GetSocketType(); if ( dwError == NOERROR ) { if (type == SOCK_STREAM && cbTransferred == 0L ) { // connection broken if ( _pInterface != NULL ) { _pInterface->OnConnectionDropped(*this); } // remove connection // 清除SOCK不在这里做,因为清除线程里会有清除,这里也清除; RemoveConnection( sock );// Jeff.doesn't remove socket here; CloseConnection(sock); if ( _pInterface != NULL ) { // Notification: OnRemoveConnection _pInterface->OnRemoveConnection(*this, sock); } } else { // Notification: OnDataReceived if ( _pInterface != NULL ) { _pInterface->OnDataReceived(*this, static_cast(*pBuffer), static_cast(*pBuffer), // Data cbTransferred, // Number of bytes static_cast(*pBuffer), (BYTE**)(&pBuffer->_pendingbuf), pBuffer->_npendingSize, pBuffer->_ncurSize); } // schedule another read for this socket AsyncRead( pBuffer ); } } else { for ( ; _pInterface != NULL; ) { if (IsConnectionDropped( dwError) ) { // Notification: OnConnectionDropped if (type == SOCK_STREAM || (dwError == WSAENOTSOCK || dwError == WSAENETDOWN)) //if (type == SOCK_STREAM && (dwError == WSAENOTSOCK || dwError == WSAENETDOWN))// Jeff.set. { _pInterface->OnConnectionDropped(*this); // 清除SOCK不在这里做,因为清除线程里会有清除,这里也清除; CloseConnection(sock); RemoveConnection( sock ); //CloseConnection( sock ); //_pInterface->OnConnectionError(*this, dwError); // Jeff.add type = -1; // don't schedule other request break; } } // Notification: OnConnectionError _pInterface->OnConnectionError(*this, dwError); break; } // Schedule read request if ((type == SOCK_STREAM || type == SOCK_DGRAM) && _socket.IsOpen() ) { AsyncRead( pBuffer ); } } // Detach or Close this socket (TCP-mode only) if (type != SOCK_STREAM || (type == SOCK_STREAM && cbTransferred != 0L) ) { sockHandle.Detach(); } } } template void ASocketServerImpl::Terminate(DWORD dwTimeout /*= INFINITE*/) { _socket.Close(); if ( _thread != NULL ) { if ( _threadId != 0 ) { PostThreadMessage(_threadId, WM_QUIT, 0, 0); } if ( WaitForSingleObject(_thread, dwTimeout) == WAIT_TIMEOUT ) { TerminateThread(_thread, 1); } CloseHandle(_thread); _thread = NULL; _threadId = 0L; } } template DWORD WINAPI ASocketServerImpl::SocketServerProc(thisClass* _this) { if ( _this != NULL ) { _this->Run(); } return 0; } template DWORD WINAPI ASocketServerImpl::SocketServerIOProc(thisClass* _this) { if ( _this != NULL ) { _this->IoRun(); } return 0; } template void WINAPI ASocketServerImpl::CompletionRoutine(DWORD dwError, DWORD cbTransferred, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags) { thisClass* _this = reinterpret_cast( lpOverlapped->hEvent ); if ( _this != NULL ) { PerSocketContext* pBuffer = reinterpret_cast(lpOverlapped->Pointer); _this->OnIOComplete(dwError, pBuffer, cbTransferred, dwFlags); } } template bool ASocketServerImpl::IsConnectionDropped(DWORD dwError) { // see: winerror.h for definition switch( dwError ) { case WSAENOTSOCK: case WSAENETDOWN: case WSAENETUNREACH: case WSAENETRESET: case WSAECONNABORTED: case WSAECONNRESET: case WSAESHUTDOWN: case WSAEHOSTDOWN: case WSAEHOSTUNREACH: return true; default: TRACE("--------------%d\r\n",dwError); break; } return false; } #endif // __ASYNCSOCKETSERVERIMPL_20160228__