#include "StdAfx.h" #include "IServerImpl.h" #include <comdef.h> #include <atlbase.h> #include <strsafe.h> #include "ThreadPool.hpp" #include "ClientProcess.h" // #ifdef _DEBUG // #define new DEBUG_NEW // #endif const int AF_IPV4 = 0; const int AF_IPV6 = 1; const int SOCK_TCP = SOCK_STREAM-1; const int SOCK_UDP = SOCK_DGRAM-1; namespace ServerSocketImpl { IServerImpl* g_pServerSocket = NULL; IServerImpl::IServerImpl():m_nMode(AF_IPV4) ,m_nSockType(SOCK_TCP) ,m_strPort(_T("64320")) ,m_nSocketIndex(0) { m_bStopbeat = FALSE; InitializeCriticalSection( &m_csProcessData ); m_SocketServer.SetInterface(this); } IServerImpl::~IServerImpl() { m_SocketServer.Terminate(); DeleteCriticalSection( &m_csProcessData ); } BOOL IServerImpl::Initialize() { TCHAR szIPAddr[MAX_PATH] = { 0 }; CSocketHandle::GetLocalAddress(szIPAddr, MAX_PATH, AF_INET); //AppendText(_T("Local Address (IPv4): %s\r\n"), szIPAddr); CSocketHandle::GetLocalAddress(szIPAddr, MAX_PATH, AF_INET6); //AppendText(_T("Local Address (IPv6): %s\r\n"), szIPAddr); return TRUE; } void IServerImpl::Start(IN LPCTSTR strPort,IN const int &nMode) { m_nMode = nMode; int nFamily = (m_nMode == AF_IPV4) ? AF_INET : AF_INET6; if (!m_SocketServer.StartServer(NULL, strPort, nFamily, (m_nSockType+1))) { //OutputDebugString(_T("\n���ӷ�����ʧ�ܣ�\n")); AfxMessageBox(_T("Failed to start server."), NULL, MB_ICONSTOP); return; } //CClientProcess::GetInstance()->StartMsgWork(); //SyncControls(); } void IServerImpl::Stop() { if(m_hRunObject) SetEvent(m_hRunObject); if( m_hClearInvalidateSocketThread ) { if (WaitForSingleObject(m_hClearInvalidateSocketThread,INFINITE) != WAIT_FAILED) { CloseHandle(m_hClearInvalidateSocketThread); m_hClearInvalidateSocketThread = NULL; } } if ( m_hRunObject ) CloseHandle( m_hRunObject ); m_hRunObject = NULL; m_SocketServer.Terminate(); //SyncControls(); } void IServerImpl::Send() { if ( m_SocketServer.IsOpen() ) { CString strMsg; //m_ctlMessage.GetWindowText( strMsg ); if ( strMsg.IsEmpty() ) { //AppendText( _T("Please enter the message to send.\r\n") ); return; } USES_CONVERSION; if (m_nSockType == SOCK_TCP) { const LPBYTE lpbData = (const LPBYTE)(T2CA(strMsg)); // unsafe access to Socket list! #ifdef SOCKHANDLE_USE_OVERLAPPED const SocketContextList& sl = m_SocketServer.GetSocketList(); for(SocketContextList::const_iterator citer = sl.begin(); citer != sl.end(); ++citer) #else const SocketList& sl = m_SocketServer.GetSocketList(); for(SocketList::const_iterator citer = sl.begin(); citer != sl.end(); ++citer) #endif { CSocketHandle sockHandle; sockHandle.Attach( (*citer) ); sockHandle.Write(lpbData, strMsg.GetLength(), NULL); sockHandle.Detach(); } } else { SockAddrIn servAddr, sockAddr; m_SocketServer->GetSockName(servAddr); GetDestination(sockAddr); if ( servAddr != sockAddr ) { m_SocketServer.Write((const LPBYTE)(T2CA(strMsg)), strMsg.GetLength(), sockAddr); } else { //AppendText( _T("Please change the port number to send message to a client.\r\n") ); } } } else { AfxMessageBox(_T("Socket is not connected")); } } void IServerImpl::SendAll(CSocketHandle &sockHandle, unsigned char *pMsg, int nLength) { if ( m_SocketServer.IsOpen() ) { USES_CONVERSION; if (m_nSockType == SOCK_TCP) { // unsafe access to Socket list! const LPBYTE lpbData = (const LPBYTE)(pMsg); sockHandle.Write(lpbData, nLength, NULL); } else { SockAddrIn servAddr, sockAddr; m_SocketServer->GetSockName(servAddr); GetDestination(sockAddr); if ( servAddr != sockAddr ) { m_SocketServer.Write((const LPBYTE)*pMsg, nLength, sockAddr); } else { } } } else { } } int IServerImpl::OnIntegrityPacket(IN PerSocketContext &sockHandle, IN void *pIntegrityPacket) { return 0; } void IServerImpl::GetAddress(const SockAddrIn& addrIn, CString& rString) const { TCHAR szIPAddr[MAX_PATH] = { 0 }; CSocketHandle::FormatIP(szIPAddr, MAX_PATH, addrIn); rString.Format(_T("%s : %d"), szIPAddr, static_cast<int>(static_cast<UINT>(ntohs(addrIn.GetPort()))) ); } void IServerImpl::AppendText(LPCTSTR lpszFormat, ...) { // if ( !::IsWindow(m_ctlMsgList.GetSafeHwnd()) ) return; // TCHAR szBuffer[512]; // HWND hWnd = m_ctlMsgList.GetSafeHwnd(); // DWORD dwResult = 0; // if (SendMessageTimeout(hWnd, WM_GETTEXTLENGTH, 0, 0, SMTO_NORMAL, 500L, &dwResult) != 0) // { // int nLen = (int) dwResult; // if (SendMessageTimeout(hWnd, EM_SETSEL, nLen, nLen, SMTO_NORMAL, 500L, &dwResult) != 0) // { // size_t cb = 0; // va_list args; // va_start(args, lpszFormat); // ::StringCchVPrintfEx(szBuffer, 512, NULL, &cb, 0, lpszFormat, args); // va_end(args); // SendMessageTimeout(hWnd, EM_REPLACESEL, FALSE, reinterpret_cast<LPARAM>(szBuffer), SMTO_NORMAL, 500L, &dwResult); // } // } } bool IServerImpl::GetDestination(SockAddrIn& addrIn) const { CString strPort; //GetDlgItemText(IDC_SVR_PORT, strPort); int nFamily = (m_nMode == AF_IPV4) ? AF_INET : AF_INET6; return addrIn.CreateFrom(NULL, strPort, nFamily); } bool IServerImpl::SetupMCAST() { const TCHAR szIPv4MCAST[] = TEXT("239.121.1.2"); const TCHAR szIPv6MCAST[] = TEXT("FF02:0:0:0:0:0:0:1"); // All Nodes local address bool result = false; if ( m_nSockType == SOCK_UDP ) { if ( m_nMode == AF_IPV4 ) { result = m_SocketServer->AddMembership(szIPv4MCAST, NULL); } else { result = m_SocketServer->AddMembership(szIPv6MCAST, NULL); HRESULT hr = HRESULT_FROM_WIN32(GetLastError()); hr = hr; } } return result; } /////////////////////////////////////////////////////////////////////////////// void IServerImpl::OnThreadBegin(CSocketHandle* pSH) { ASSERT( pSH == m_SocketServer ); (pSH); CString strAddr; SockAddrIn sockAddr; m_SocketServer->GetSockName(sockAddr); GetAddress( sockAddr, strAddr ); //AppendText( _T("Server Running on: %s\r\n"), strAddr); } void IServerImpl::OnThreadExit(CSocketHandle* pSH) { ASSERT( pSH == m_SocketServer ); (pSH); //AppendText( _T("Server Down!\r\n")); } void IServerImpl::OnConnectionFailure(CSocketHandle* pSH, SOCKET newSocket) { ASSERT( pSH == m_SocketServer ); (pSH); CString strAddr; CSocketHandle sockHandle; SockAddrIn sockAddr; if (newSocket != INVALID_SOCKET) { sockHandle.Attach( newSocket ); sockHandle.GetPeerName( sockAddr ); GetAddress( sockAddr, strAddr ); sockHandle.Close(); //AppendText( _T("Connection abandoned: %s\r\n"), strAddr ); //LOG4C_NO_FILENUM((LOG_NOTICE,"Connection abandoned��%s",strAddr)); //OutputDebugString(_T("\n�ͻ��������жϣ�")); //OutputDebugString(strAddr); //OutputDebugString(_T("\n\n")); } else { //OutputDebugString(_T("\n�ͻ��������жϣ�����һ����Ч�����֣�")); //OutputDebugString(strAddr); //OutputDebugString(_T("\n\n")); //AppendText( _T("Connection abandoned. Not a valid socket.\r\n"), strAddr ); //LOG4C_NO_FILENUM((LOG_NOTICE,"Connection abandoned. Not a valid socket��%s",strAddr)); } } void IServerImpl::OnAddConnection(CSocketHandle* pSH, SOCKET newSocket) { ASSERT( pSH == m_SocketServer ); (pSH); CString strAddr; CSocketHandle sockHandle; SockAddrIn sockAddr; sockHandle.Attach( newSocket ); sockHandle.GetPeerName( sockAddr ); GetAddress( sockAddr, strAddr ); sockHandle.Detach(); OutputDebugString(_T("\n�µ����ӽ���:")); OutputDebugString(strAddr); OutputDebugString(_T("\n")); //LOG4C_NO_FILENUM((LOG_NOTICE,"�µ����ӽ��룺%s",strAddr)); //AppendText( _T("Connection established: %s\r\n"), strAddr ); } //void IServerImpl::OnDataReceived(CSocketHandle* pSH, const SOCKET sClient, const BYTE* pbData, DWORD dwCount, const SockAddrIn& addr, BYTE **pendingbuf, unsigned int& npendingSize, unsigned int& ncursize) void IServerImpl::OnDataReceived(LPWSAOVERLAPPED pSH, const BYTE* pbData, DWORD dwCount, const SockAddrIn& addr) { //TCHAR szcount[10]= {0}; //itoa(dwCount, szcount, 10); //OutputDebugString(szcount); //OutputDebugString("\n"); // return; //ASSERT( pSH == NULL ); if ( pSH == NULL ) return ; //CString strAddr, strText; //USES_CONVERSION; //LPTSTR pszText = strText.GetBuffer(dwCount+1); //::StringCchCopyN(pszText, dwCount+1, A2CT(reinterpret_cast<LPCSTR>(pbData)), dwCount); //strText.ReleaseBuffer(); //GetAddress( addr, strAddr ); //AppendText( _T("%s>(%s)\r\n"), strAddr, strText); //return; static ULONGLONG ulCount = 0; ulCount += dwCount; static TCHAR szcount[40]= {0}; _ui64toa(ulCount, szcount, 10); //OutputDebugString(szcount); //OutputDebugString("\n"); LOG4C_NO_FILENUM((LOG_NOTICE,"�ۻ���%s",szcount)); if (m_nSockType == SOCK_TCP) { PerSocketContext *psio = (PerSocketContext*)pSH; EnterCriticalSection( &m_csProcessData ); ToprocessRecivebuf(*psio,pbData,dwCount); LeaveCriticalSection( &m_csProcessData ); } else { SockAddrIn servAddr, sockAddr; m_SocketServer->GetSockName(servAddr); GetDestination(sockAddr); if ( servAddr != sockAddr ) { //m_SocketServer.Write((const LPBYTE)(T2CA(strMsg)), strMsg.GetLength(), sockAddr); } else { //AppendText( _T("Please change the port number to send message to a client.\r\n") ); } } } void IServerImpl::OnConnectionDropped(CSocketHandle* pSH) { ASSERT( pSH == m_SocketServer ); (pSH); CString strAddr; CSocketHandle sockHandle; SockAddrIn sockAddr; //sockHandle.Attach( pSH->GetSocket() ); sockHandle.GetPeerName( sockAddr ); GetAddress( sockAddr, strAddr ); //sockHandle.Detach(); //AppendText( _T("Connection lost with client.\r\n") ); //LOG4C_NO_FILENUM((LOG_NOTICE,"Connection lost with client")); } void IServerImpl::OnConnectionError(CSocketHandle* pSH, DWORD dwError) { ASSERT( pSH == m_SocketServer ); (pSH); _com_error err(dwError); //AppendText( _T("Communication Error:\r\n%s\r\n"), err.ErrorMessage() ); CString strAddr; CSocketHandle sockHandle; SockAddrIn sockAddr; sockHandle.GetPeerName( sockAddr ); GetAddress( sockAddr, strAddr ); //LOG4C((LOG_NOTICE,"IP:%s Communication Error��%s", strAddr, err.ErrorMessage())); } #if defined(SOCKHANDLE_USE_OVERLAPPED) void IServerImpl::OnRemoveConnection(CSocketHandle* pSH, SOCKET dropSocket) { return; ASSERT( pSH == m_SocketServer ); (pSH); CString strAddr; CSocketHandle sockHandle; SockAddrIn sockAddr; sockHandle.Attach( dropSocket ); sockHandle.GetPeerName( sockAddr ); GetAddress( sockAddr, strAddr ); sockHandle.Detach(); //LOG4C_NO_FILENUM((LOG_NOTICE,"Connection abandoned. Not a valid socket��%s",strAddr)); } #endif DWORD IServerImpl::GetClientConnectCount() { //DWORD dwClientSize = 0; //m_SocketServer->GetConnectionCount(); return m_SocketServer.GetConnectionCount(); } void IServerImpl::ToprocessRecivebuf(IN PerSocketContext &sockHandle, IN const BYTE* pReceivebuf, IN DWORD dwReceiveSize) { // ��ǰ���յİ��������ij���; DWORD dwIndexOfProcessed = 0; // ѭ��һ�δ����˶��ٵ�ǰ��; DWORD dwOnceProcessedLen = 0; TheProhead *ptphead; while( dwIndexOfProcessed < dwReceiveSize ) { ptphead = NULL; dwOnceProcessedLen = 0; // ��һ�ν��ջ�������������ʣ���; if ( sockHandle.ncurSize == 0 ) { // ���յ����ݲ���һ����ͷ; if ( dwReceiveSize - dwIndexOfProcessed < sizeof( TheProhead ) ) { sockHandle.ncurSize = dwReceiveSize - dwIndexOfProcessed; sockHandle.SetPendingCurPack(sockHandle.ncurSize); memcpy(sockHandle.pendingbuf, pReceivebuf+dwIndexOfProcessed, sockHandle.ncurSize); dwOnceProcessedLen = sockHandle.ncurSize; //OutputDebugString("A�����յ����ݲ���һ����ͷ\n"); } else {// ���յ����ݵ��ڻ����һ����ͷ; // ������һ����ͷ; ptphead = (TheProhead*)(pReceivebuf+dwIndexOfProcessed); if ( dwReceiveSize - dwIndexOfProcessed > ptphead->nDataLen + sizeof(TheProhead) ) { sockHandle.ncurSize = sockHandle.npendingSize = ptphead->nDataLen + sizeof(TheProhead); sockHandle.SetPendingPack(sockHandle.ncurSize); memcpy(sockHandle.pendingbuf, pReceivebuf+dwIndexOfProcessed, sockHandle.ncurSize); dwOnceProcessedLen = sockHandle.ncurSize; //OutputDebugString("A�����յ������Բ������һ����ͷ\n"); } else { sockHandle.npendingSize = ptphead->nDataLen + sizeof(TheProhead); sockHandle.ncurSize = dwReceiveSize - dwIndexOfProcessed; sockHandle.SetPendingPack(sockHandle.npendingSize); memcpy(sockHandle.pendingbuf, pReceivebuf+dwIndexOfProcessed, sockHandle.ncurSize); dwOnceProcessedLen = sockHandle.ncurSize; //OutputDebugString("A�����յ����ݿ������һ����ͷ\n"); } } } else// ��������ʣ������; { if ( sockHandle.ncurSize >= sizeof(TheProhead) ) {// �ѽ��յ����ݴ���һ�������İ�ͷ; ptphead = (TheProhead*)sockHandle.pendingbuf; if ( sockHandle.npendingSize == ptphead->nDataLen + sizeof(TheProhead) ) { if ( sockHandle.npendingSize - sockHandle.ncurSize > dwReceiveSize - dwIndexOfProcessed ) {// ���յ����ݲ������һ�������İ�; memcpy(sockHandle.pendingbuf + sockHandle.ncurSize, pReceivebuf + dwIndexOfProcessed, dwReceiveSize - dwIndexOfProcessed); sockHandle.ncurSize += dwReceiveSize - dwIndexOfProcessed; dwOnceProcessedLen = dwReceiveSize - dwIndexOfProcessed; //OutputDebugString("B�����յ����ݲ������һ��Э���\n"); } else {// ���յ������㹻���һ�������İ�; memcpy(sockHandle.pendingbuf + sockHandle.ncurSize, pReceivebuf + dwIndexOfProcessed, sockHandle.npendingSize - sockHandle.ncurSize); dwOnceProcessedLen = sockHandle.npendingSize - sockHandle.ncurSize; sockHandle.ncurSize = sockHandle.npendingSize; //OutputDebugString("B�����յ����ݿ������һ��Э���\n"); } } else { // ��������,����; LOG4C_NO_FILENUM((LOG_NOTICE, "���ݰ�������")); //OutputDebugString("B�����ݰ�������\n"); } } else {// �ѽ��յ����ݲ���һ����ͷ; if ( sizeof(TheProhead) - sockHandle.ncurSize > dwReceiveSize - dwIndexOfProcessed ) {// ���ν��յ�����δ�����һ����ͷ; sockHandle.ReSetPengingPack(sockHandle.ncurSize + dwReceiveSize - dwIndexOfProcessed); memcpy(sockHandle.pendingbuf + sockHandle.ncurSize, pReceivebuf + dwIndexOfProcessed, dwReceiveSize - dwIndexOfProcessed); sockHandle.ncurSize += dwReceiveSize - dwIndexOfProcessed; dwOnceProcessedLen = dwReceiveSize - dwIndexOfProcessed; //OutputDebugString("C�����ν��յ�����δ�����һ����ͷ\n"); } else {// ���ν��յ����������һ����ͷ; // ������һ����ͷ; sockHandle.ReSetPengingPack(sizeof(TheProhead)); memcpy(sockHandle.pendingbuf + sockHandle.ncurSize, pReceivebuf + dwIndexOfProcessed, sizeof(TheProhead) - sockHandle.ncurSize); dwOnceProcessedLen = sizeof(TheProhead) - sockHandle.ncurSize; sockHandle.ncurSize = sizeof(TheProhead); ptphead = (TheProhead*)sockHandle.pendingbuf; sockHandle.npendingSize = ptphead->nDataLen + sizeof(TheProhead); sockHandle.ReSetPengingPack(sockHandle.npendingSize); if ( sockHandle.npendingSize >= dwReceiveSize - dwIndexOfProcessed - dwOnceProcessedLen ) {// ʣ�����������һ�������İ�; memcpy(sockHandle.pendingbuf + sockHandle.ncurSize, pReceivebuf + dwIndexOfProcessed + dwOnceProcessedLen, sockHandle.npendingSize - sockHandle.ncurSize); dwOnceProcessedLen += sockHandle.npendingSize - sockHandle.ncurSize; sockHandle.ncurSize = sockHandle.npendingSize; //OutputDebugString("C�����յ�����δ�����һ��Э���\n"); } else {// ʣ������δ�����һ�������İ�; memcpy(sockHandle.pendingbuf + sockHandle.ncurSize, pReceivebuf + dwIndexOfProcessed + dwOnceProcessedLen, dwReceiveSize - dwIndexOfProcessed - dwOnceProcessedLen); sockHandle.ncurSize += dwReceiveSize - dwIndexOfProcessed - dwOnceProcessedLen; dwOnceProcessedLen = dwReceiveSize - dwIndexOfProcessed; //OutputDebugString("C�����յ����ݿ������һ��Э���\n"); } } } } if ( sockHandle.ncurSize == sockHandle.npendingSize ) { //OutputDebugString("D�����յ����������һ�δ�������\n"); TheProbody *ptpb = (TheProbody*)sockHandle.pendingbuf; CFile cf; static int a = 0; CString strFile = _T(""); strFile.Format(_T("D:\\�����ļ�\\%d-%d.dat"), (SOCKET)sockHandle,a); if ( cf.Open(strFile, CFile::modeCreate|CFile::modeReadWrite) ) { cf.Write(ptpb->szBody, ptpb->tphead.nDataLen); cf.Close(); a++; } else { LOG4C_NO_FILENUM((LOG_NOTICE, "�����ļ�ʧ��")); } delete []sockHandle.pendingbuf; sockHandle.pendingbuf = NULL; sockHandle.ncurSize = sockHandle.npendingSize = 0; } dwIndexOfProcessed += dwOnceProcessedLen; } } };