123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556 |
- #include "StdAfx.h"
- #include "IServerImpl.h"
- #include <comdef.h>
- #include <atlbase.h>
- #include <strsafe.h>
- #include "ThreadPool.hpp"
- #include "ClientProcess.h"
- // #ifdef _DEBUG
- // #define new DEBUG_NEW
- // #endif
- const int AF_IPV4 = 0;
- const int AF_IPV6 = 1;
- const int SOCK_TCP = SOCK_STREAM-1;
- const int SOCK_UDP = SOCK_DGRAM-1;
- namespace ServerSocketImpl
- {
- IServerImpl* g_pServerSocket = NULL;
- IServerImpl::IServerImpl():m_nMode(AF_IPV4)
- ,m_nSockType(SOCK_TCP)
- ,m_strPort(_T("64320"))
- ,m_nSocketIndex(0)
- {
- m_bStopbeat = FALSE;
- InitializeCriticalSection( &m_csProcessData );
- m_SocketServer.SetInterface(this);
- }
- IServerImpl::~IServerImpl()
- {
- m_SocketServer.Terminate();
- DeleteCriticalSection( &m_csProcessData );
- }
- BOOL IServerImpl::Initialize()
- {
- TCHAR szIPAddr[MAX_PATH] = { 0 };
- CSocketHandle::GetLocalAddress(szIPAddr, MAX_PATH, AF_INET);
- //AppendText(_T("Local Address (IPv4): %s\r\n"), szIPAddr);
- CSocketHandle::GetLocalAddress(szIPAddr, MAX_PATH, AF_INET6);
- //AppendText(_T("Local Address (IPv6): %s\r\n"), szIPAddr);
- return TRUE;
- }
- void IServerImpl::Start(IN LPCTSTR strPort,IN const int &nMode)
- {
- m_nMode = nMode;
- int nFamily = (m_nMode == AF_IPV4) ? AF_INET : AF_INET6;
- if (!m_SocketServer.StartServer(NULL, strPort, nFamily, (m_nSockType+1)))
- {
- //OutputDebugString(_T("\n连接服务器失败!\n"));
- AfxMessageBox(_T("Failed to start server."), NULL, MB_ICONSTOP);
- return;
- }
-
- //CClientProcess::GetInstance()->StartMsgWork();
- //SyncControls();
- }
- void IServerImpl::Stop()
- {
- if(m_hRunObject)
- SetEvent(m_hRunObject);
- if( m_hClearInvalidateSocketThread )
- {
- if (WaitForSingleObject(m_hClearInvalidateSocketThread,INFINITE) != WAIT_FAILED)
- {
- CloseHandle(m_hClearInvalidateSocketThread);
- m_hClearInvalidateSocketThread = NULL;
- }
- }
- if ( m_hRunObject )
- CloseHandle( m_hRunObject );
- m_hRunObject = NULL;
- m_SocketServer.Terminate();
- //SyncControls();
- }
- void IServerImpl::Send()
- {
- if ( m_SocketServer.IsOpen() )
- {
- CString strMsg;
- //m_ctlMessage.GetWindowText( strMsg );
- if ( strMsg.IsEmpty() )
- {
- //AppendText( _T("Please enter the message to send.\r\n") );
- return;
- }
- USES_CONVERSION;
- if (m_nSockType == SOCK_TCP)
- {
- const LPBYTE lpbData = (const LPBYTE)(T2CA(strMsg));
- // unsafe access to Socket list!
- #ifdef SOCKHANDLE_USE_OVERLAPPED
- const SocketContextList& sl = m_SocketServer.GetSocketList();
- for(SocketContextList::const_iterator citer = sl.begin(); citer != sl.end(); ++citer)
- #else
- const SocketList& sl = m_SocketServer.GetSocketList();
- for(SocketList::const_iterator citer = sl.begin(); citer != sl.end(); ++citer)
- #endif
- {
- CSocketHandle sockHandle;
- sockHandle.Attach( (*citer) );
- sockHandle.Write(lpbData, strMsg.GetLength(), NULL);
- sockHandle.Detach();
- }
- }
- else
- {
- SockAddrIn servAddr, sockAddr;
- m_SocketServer->GetSockName(servAddr);
- GetDestination(sockAddr);
- if ( servAddr != sockAddr )
- {
- m_SocketServer.Write((const LPBYTE)(T2CA(strMsg)), strMsg.GetLength(), sockAddr);
- }
- else
- {
- //AppendText( _T("Please change the port number to send message to a client.\r\n") );
- }
- }
- }
- else
- {
- AfxMessageBox(_T("Socket is not connected"));
- }
- }
- void IServerImpl::SendAll(CSocketHandle &sockHandle, unsigned char *pMsg, int nLength)
- {
- if ( m_SocketServer.IsOpen() )
- {
- USES_CONVERSION;
- if (m_nSockType == SOCK_TCP)
- {
- // unsafe access to Socket list!
- const LPBYTE lpbData = (const LPBYTE)(pMsg);
- sockHandle.Write(lpbData, nLength, NULL);
- }
- else
- {
- SockAddrIn servAddr, sockAddr;
- m_SocketServer->GetSockName(servAddr);
- GetDestination(sockAddr);
- if ( servAddr != sockAddr )
- {
- m_SocketServer.Write((const LPBYTE)*pMsg, nLength, sockAddr);
- }
- else
- {
- }
- }
- }
- else
- {
- }
- }
- int IServerImpl::OnIntegrityPacket(IN PerSocketContext &sockHandle, IN void *pIntegrityPacket)
- {
-
- return 0;
- }
- void IServerImpl::GetAddress(const SockAddrIn& addrIn, CString& rString) const
- {
- TCHAR szIPAddr[MAX_PATH] = { 0 };
- CSocketHandle::FormatIP(szIPAddr, MAX_PATH, addrIn);
- rString.Format(_T("%s : %d"), szIPAddr, static_cast<int>(static_cast<UINT>(ntohs(addrIn.GetPort()))) );
- }
- void IServerImpl::AppendText(LPCTSTR lpszFormat, ...)
- {
- // if ( !::IsWindow(m_ctlMsgList.GetSafeHwnd()) ) return;
- // TCHAR szBuffer[512];
- // HWND hWnd = m_ctlMsgList.GetSafeHwnd();
- // DWORD dwResult = 0;
- // if (SendMessageTimeout(hWnd, WM_GETTEXTLENGTH, 0, 0, SMTO_NORMAL, 500L, &dwResult) != 0)
- // {
- // int nLen = (int) dwResult;
- // if (SendMessageTimeout(hWnd, EM_SETSEL, nLen, nLen, SMTO_NORMAL, 500L, &dwResult) != 0)
- // {
- // size_t cb = 0;
- // va_list args;
- // va_start(args, lpszFormat);
- // ::StringCchVPrintfEx(szBuffer, 512, NULL, &cb, 0, lpszFormat, args);
- // va_end(args);
- // SendMessageTimeout(hWnd, EM_REPLACESEL, FALSE, reinterpret_cast<LPARAM>(szBuffer), SMTO_NORMAL, 500L, &dwResult);
- // }
- // }
- }
- bool IServerImpl::GetDestination(SockAddrIn& addrIn) const
- {
- CString strPort;
- //GetDlgItemText(IDC_SVR_PORT, strPort);
- int nFamily = (m_nMode == AF_IPV4) ? AF_INET : AF_INET6;
- return addrIn.CreateFrom(NULL, strPort, nFamily);
- }
- bool IServerImpl::SetupMCAST()
- {
- const TCHAR szIPv4MCAST[] = TEXT("239.121.1.2");
- const TCHAR szIPv6MCAST[] = TEXT("FF02:0:0:0:0:0:0:1"); // All Nodes local address
- bool result = false;
- if ( m_nSockType == SOCK_UDP )
- {
- if ( m_nMode == AF_IPV4 ) {
- result = m_SocketServer->AddMembership(szIPv4MCAST, NULL);
- } else {
- result = m_SocketServer->AddMembership(szIPv6MCAST, NULL);
- HRESULT hr = HRESULT_FROM_WIN32(GetLastError());
- hr = hr;
- }
- }
- return result;
- }
- ///////////////////////////////////////////////////////////////////////////////
- void IServerImpl::OnThreadBegin(CSocketHandle* pSH)
- {
- ASSERT( pSH == m_SocketServer );
- (pSH);
- CString strAddr;
- SockAddrIn sockAddr;
- m_SocketServer->GetSockName(sockAddr);
- GetAddress( sockAddr, strAddr );
- //AppendText( _T("Server Running on: %s\r\n"), strAddr);
- }
- void IServerImpl::OnThreadExit(CSocketHandle* pSH)
- {
- ASSERT( pSH == m_SocketServer );
- (pSH);
- //AppendText( _T("Server Down!\r\n"));
- }
- void IServerImpl::OnConnectionFailure(CSocketHandle* pSH, SOCKET newSocket)
- {
- ASSERT( pSH == m_SocketServer );
- (pSH);
- CString strAddr;
- CSocketHandle sockHandle;
- SockAddrIn sockAddr;
- if (newSocket != INVALID_SOCKET)
- {
- sockHandle.Attach( newSocket );
- sockHandle.GetPeerName( sockAddr );
- GetAddress( sockAddr, strAddr );
- sockHandle.Close();
- //AppendText( _T("Connection abandoned: %s\r\n"), strAddr );
- //LOG4C_NO_FILENUM((LOG_NOTICE,"Connection abandoned:%s",strAddr));
- //OutputDebugString(_T("\n客户端连接中断:"));
- //OutputDebugString(strAddr);
- //OutputDebugString(_T("\n\n"));
- }
- else
- {
- //OutputDebugString(_T("\n客户端连接中断,不是一个有效的套接字:"));
- //OutputDebugString(strAddr);
- //OutputDebugString(_T("\n\n"));
- //AppendText( _T("Connection abandoned. Not a valid socket.\r\n"), strAddr );
- //LOG4C_NO_FILENUM((LOG_NOTICE,"Connection abandoned. Not a valid socket:%s",strAddr));
- }
- }
- void IServerImpl::OnAddConnection(CSocketHandle* pSH, SOCKET newSocket)
- {
- ASSERT( pSH == m_SocketServer );
- (pSH);
- CString strAddr;
- CSocketHandle sockHandle;
- SockAddrIn sockAddr;
- sockHandle.Attach( newSocket );
- sockHandle.GetPeerName( sockAddr );
- GetAddress( sockAddr, strAddr );
- sockHandle.Detach();
- OutputDebugString(_T("\n新的连接接入:"));
- OutputDebugString(strAddr);
- OutputDebugString(_T("\n"));
- //LOG4C_NO_FILENUM((LOG_NOTICE,"新的连接接入:%s",strAddr));
- //AppendText( _T("Connection established: %s\r\n"), strAddr );
- }
- //void IServerImpl::OnDataReceived(CSocketHandle* pSH, const SOCKET sClient, const BYTE* pbData, DWORD dwCount, const SockAddrIn& addr, BYTE **pendingbuf, unsigned int& npendingSize, unsigned int& ncursize)
- void IServerImpl::OnDataReceived(LPWSAOVERLAPPED pSH, const BYTE* pbData, DWORD dwCount, const SockAddrIn& addr)
- {
- //TCHAR szcount[10]= {0};
- //itoa(dwCount, szcount, 10);
- //OutputDebugString(szcount);
- //OutputDebugString("\n");
- // return;
- //ASSERT( pSH == NULL );
- if ( pSH == NULL )
- return ;
- //CString strAddr, strText;
- //USES_CONVERSION;
- //LPTSTR pszText = strText.GetBuffer(dwCount+1);
- //::StringCchCopyN(pszText, dwCount+1, A2CT(reinterpret_cast<LPCSTR>(pbData)), dwCount);
- //strText.ReleaseBuffer();
- //GetAddress( addr, strAddr );
- //AppendText( _T("%s>(%s)\r\n"), strAddr, strText);
- //return;
- static ULONGLONG ulCount = 0;
- ulCount += dwCount;
- static TCHAR szcount[40]= {0};
- _ui64toa(ulCount, szcount, 10);
- //OutputDebugString(szcount);
- //OutputDebugString("\n");
- LOG4C_NO_FILENUM((LOG_NOTICE,"累积:%s",szcount));
- if (m_nSockType == SOCK_TCP)
- {
- PerSocketContext *psio = (PerSocketContext*)pSH;
- EnterCriticalSection( &m_csProcessData );
- ToprocessRecivebuf(*psio,pbData,dwCount);
- LeaveCriticalSection( &m_csProcessData );
- }
- else
- {
- SockAddrIn servAddr, sockAddr;
- m_SocketServer->GetSockName(servAddr);
- GetDestination(sockAddr);
- if ( servAddr != sockAddr )
- {
- //m_SocketServer.Write((const LPBYTE)(T2CA(strMsg)), strMsg.GetLength(), sockAddr);
- }
- else
- {
- //AppendText( _T("Please change the port number to send message to a client.\r\n") );
- }
- }
- }
- void IServerImpl::OnConnectionDropped(CSocketHandle* pSH)
- {
- ASSERT( pSH == m_SocketServer );
- (pSH);
- CString strAddr;
- CSocketHandle sockHandle;
- SockAddrIn sockAddr;
- //sockHandle.Attach( pSH->GetSocket() );
- sockHandle.GetPeerName( sockAddr );
- GetAddress( sockAddr, strAddr );
- //sockHandle.Detach();
- //AppendText( _T("Connection lost with client.\r\n") );
- //LOG4C_NO_FILENUM((LOG_NOTICE,"Connection lost with client"));
- }
- void IServerImpl::OnConnectionError(CSocketHandle* pSH, DWORD dwError)
- {
- ASSERT( pSH == m_SocketServer );
- (pSH);
- _com_error err(dwError);
- //AppendText( _T("Communication Error:\r\n%s\r\n"), err.ErrorMessage() );
- CString strAddr;
- CSocketHandle sockHandle;
- SockAddrIn sockAddr;
- sockHandle.GetPeerName( sockAddr );
- GetAddress( sockAddr, strAddr );
- //LOG4C((LOG_NOTICE,"IP:%s Communication Error:%s", strAddr, err.ErrorMessage()));
- }
- #if defined(SOCKHANDLE_USE_OVERLAPPED)
- void IServerImpl::OnRemoveConnection(CSocketHandle* pSH, SOCKET dropSocket)
- {
- return;
- ASSERT( pSH == m_SocketServer );
- (pSH);
- CString strAddr;
- CSocketHandle sockHandle;
- SockAddrIn sockAddr;
- sockHandle.Attach( dropSocket );
- sockHandle.GetPeerName( sockAddr );
- GetAddress( sockAddr, strAddr );
- sockHandle.Detach();
- //LOG4C_NO_FILENUM((LOG_NOTICE,"Connection abandoned. Not a valid socket:%s",strAddr));
- }
- #endif
- DWORD IServerImpl::GetClientConnectCount()
- {
- //DWORD dwClientSize = 0;
- //m_SocketServer->GetConnectionCount();
- return m_SocketServer.GetConnectionCount();
- }
- void IServerImpl::ToprocessRecivebuf(IN PerSocketContext &sockHandle, IN const BYTE* pReceivebuf, IN DWORD dwReceiveSize)
- {
- // 当前接收的包被处理的长度;
- DWORD dwIndexOfProcessed = 0;
- // 循环一次处理了多少当前包;
- DWORD dwOnceProcessedLen = 0;
- TheProhead *ptphead;
- while( dwIndexOfProcessed < dwReceiveSize )
- {
- ptphead = NULL;
- dwOnceProcessedLen = 0;
- // 第一次接收或完整组包后接收剩余包;
- if ( sockHandle.ncurSize == 0 )
- {
- // 接收的数据不足一个包头;
- if ( dwReceiveSize - dwIndexOfProcessed < sizeof( TheProhead ) )
- {
- sockHandle.ncurSize = dwReceiveSize - dwIndexOfProcessed;
- sockHandle.SetPendingCurPack(sockHandle.ncurSize);
- memcpy(sockHandle.pendingbuf, pReceivebuf+dwIndexOfProcessed, sockHandle.ncurSize);
- dwOnceProcessedLen = sockHandle.ncurSize;
- //OutputDebugString("A:接收的数据不足一个包头\n");
- }
- else
- {// 接收的数据等于或大于一个包头;
- // 先组完一个包头;
- ptphead = (TheProhead*)(pReceivebuf+dwIndexOfProcessed);
- if ( dwReceiveSize - dwIndexOfProcessed > ptphead->nDataLen + sizeof(TheProhead) )
- {
- sockHandle.ncurSize = sockHandle.npendingSize = ptphead->nDataLen + sizeof(TheProhead);
- sockHandle.SetPendingPack(sockHandle.ncurSize);
- memcpy(sockHandle.pendingbuf, pReceivebuf+dwIndexOfProcessed, sockHandle.ncurSize);
- dwOnceProcessedLen = sockHandle.ncurSize;
- //OutputDebugString("A:接收的数据仍不足组成一个包头\n");
- }
- else
- {
- sockHandle.npendingSize = ptphead->nDataLen + sizeof(TheProhead);
- sockHandle.ncurSize = dwReceiveSize - dwIndexOfProcessed;
- sockHandle.SetPendingPack(sockHandle.npendingSize);
- memcpy(sockHandle.pendingbuf, pReceivebuf+dwIndexOfProcessed, sockHandle.ncurSize);
- dwOnceProcessedLen = sockHandle.ncurSize;
- //OutputDebugString("A:接收的数据可以组成一个包头\n");
- }
- }
- }
- else// 接收整包剩余数据;
- {
- if ( sockHandle.ncurSize >= sizeof(TheProhead) )
- {// 已接收的数据大于一个完整的包头;
- ptphead = (TheProhead*)sockHandle.pendingbuf;
- if ( sockHandle.npendingSize == ptphead->nDataLen + sizeof(TheProhead) )
- {
- if ( sockHandle.npendingSize - sockHandle.ncurSize > dwReceiveSize - dwIndexOfProcessed )
- {// 接收的数据不足组成一个完整的包;
- memcpy(sockHandle.pendingbuf + sockHandle.ncurSize, pReceivebuf + dwIndexOfProcessed, dwReceiveSize - dwIndexOfProcessed);
- sockHandle.ncurSize += dwReceiveSize - dwIndexOfProcessed;
- dwOnceProcessedLen = dwReceiveSize - dwIndexOfProcessed;
- //OutputDebugString("B:接收的数据不足组成一个协议包\n");
- }
- else
- {// 接收的数据足够组成一个完整的包;
- memcpy(sockHandle.pendingbuf + sockHandle.ncurSize, pReceivebuf + dwIndexOfProcessed, sockHandle.npendingSize - sockHandle.ncurSize);
- dwOnceProcessedLen = sockHandle.npendingSize - sockHandle.ncurSize;
- sockHandle.ncurSize = sockHandle.npendingSize;
- //OutputDebugString("B:接收的数据可以组成一个协议包\n");
- }
- }
- else
- {
- // 错误数据,丢包;
- LOG4C_NO_FILENUM((LOG_NOTICE, "数据包错误,丢包"));
- //OutputDebugString("B:数据包错误,丢包\n");
- }
- }
- else
- {// 已接收的数据不足一个包头;
- if ( sizeof(TheProhead) - sockHandle.ncurSize > dwReceiveSize - dwIndexOfProcessed )
- {// 本次接收的数据未能组成一个包头;
- sockHandle.ReSetPengingPack(sockHandle.ncurSize + dwReceiveSize - dwIndexOfProcessed);
- memcpy(sockHandle.pendingbuf + sockHandle.ncurSize, pReceivebuf + dwIndexOfProcessed, dwReceiveSize - dwIndexOfProcessed);
- sockHandle.ncurSize += dwReceiveSize - dwIndexOfProcessed;
- dwOnceProcessedLen = dwReceiveSize - dwIndexOfProcessed;
- //OutputDebugString("C:本次接收的数据未能组成一个包头\n");
- }
- else
- {// 本次接收的数据能组成一个包头;
- // 先组完一个包头;
- sockHandle.ReSetPengingPack(sizeof(TheProhead));
- memcpy(sockHandle.pendingbuf + sockHandle.ncurSize, pReceivebuf + dwIndexOfProcessed, sizeof(TheProhead) - sockHandle.ncurSize);
- dwOnceProcessedLen = sizeof(TheProhead) - sockHandle.ncurSize;
- sockHandle.ncurSize = sizeof(TheProhead);
- ptphead = (TheProhead*)sockHandle.pendingbuf;
- sockHandle.npendingSize = ptphead->nDataLen + sizeof(TheProhead);
- sockHandle.ReSetPengingPack(sockHandle.npendingSize);
- if ( sockHandle.npendingSize >= dwReceiveSize - dwIndexOfProcessed - dwOnceProcessedLen )
- {// 剩余数据能组成一个完整的包;
- memcpy(sockHandle.pendingbuf + sockHandle.ncurSize, pReceivebuf + dwIndexOfProcessed + dwOnceProcessedLen, sockHandle.npendingSize - sockHandle.ncurSize);
- dwOnceProcessedLen += sockHandle.npendingSize - sockHandle.ncurSize;
- sockHandle.ncurSize = sockHandle.npendingSize;
- //OutputDebugString("C:接收的数据未能组成一个协议包\n");
- }
- else
- {// 剩余数据未能组成一个完整的包;
- memcpy(sockHandle.pendingbuf + sockHandle.ncurSize, pReceivebuf + dwIndexOfProcessed + dwOnceProcessedLen, dwReceiveSize - dwIndexOfProcessed - dwOnceProcessedLen);
- sockHandle.ncurSize += dwReceiveSize - dwIndexOfProcessed - dwOnceProcessedLen;
- dwOnceProcessedLen = dwReceiveSize - dwIndexOfProcessed;
- //OutputDebugString("C:接收的数据可以组成一个协议包\n");
- }
- }
- }
- }
- if ( sockHandle.ncurSize == sockHandle.npendingSize )
- {
- //OutputDebugString("D:接收的数据已完成一次处理过程\n");
- TheProbody *ptpb = (TheProbody*)sockHandle.pendingbuf;
- CFile cf;
- static int a = 0;
- CString strFile = _T("");
- strFile.Format(_T("D:\\接收文件\\%d-%d.dat"), (SOCKET)sockHandle,a);
- if ( cf.Open(strFile, CFile::modeCreate|CFile::modeReadWrite) )
- {
- cf.Write(ptpb->szBody, ptpb->tphead.nDataLen);
- cf.Close();
- a++;
- }
- else
- {
- LOG4C_NO_FILENUM((LOG_NOTICE, "创建文件失败"));
- }
- delete []sockHandle.pendingbuf;
- sockHandle.pendingbuf = NULL;
- sockHandle.ncurSize = sockHandle.npendingSize = 0;
- }
- dwIndexOfProcessed += dwOnceProcessedLen;
- }
- }
- };
|