#include "stdafx.h" #include //#include "Client2SrvType.h" #include "Global.h" #include "WatchServerSocket.h" #include "crc32.h" #pragma warning(push) #pragma warning(disable:4995) #pragma warning(pop) const int AF_IPV4 = 0; const int AF_IPV6 = 1; const int SOCK_TCP = SOCK_STREAM-1; const int SOCK_UDP = SOCK_DGRAM-1; //---------------------------------------------- CWatchServerSocket ---- CWatchServerSocket::CWatchServerSocket() { m_nMode = AF_IPV4; m_nSockType = SOCK_TCP; m_SocketClient.SetInterface(this); m_bSocket = FALSE; } CWatchServerSocket::~CWatchServerSocket() { } BOOL CWatchServerSocket::Connection(LPCTSTR strAddr, LPCTSTR strPort) { int nFamily = (m_nMode == AF_IPV4) ? AF_INET : AF_INET6; if ( !m_SocketClient.StartClient(NULL, strAddr, strPort, nFamily, (m_nSockType+1) ) ) { //MessageBox(NULL, _T("连接服务器失败!"), "提示", MB_ICONSTOP); return FALSE; } else { CSocketHandle* pSH = (CSocketHandle *)m_SocketClient; pSH->m_nPendingSize = 0; memset(pSH->m_PendingBuffer, 0, SOCKET_BUFFSIZE); SetupMCAST(); return TRUE; } } void CWatchServerSocket::DisConnection() { m_SocketClient.Terminate(); Sleep(1000); } bool CWatchServerSocket::SetupMCAST() { const TCHAR szIPv4MCAST[] = TEXT("239.121.1.2"); const TCHAR szIPv6MCAST[] = TEXT("FF02:0:0:0:0:0:0:1"); // All Nodes local address bool result = false; if ( m_nSockType == SOCK_UDP ) { if ( m_nMode == AF_IPV4 ) { result = m_SocketClient->AddMembership(szIPv4MCAST, NULL); } else { result = m_SocketClient->AddMembership(szIPv6MCAST, NULL); HRESULT hr = HRESULT_FROM_WIN32(GetLastError()); hr = hr; } } return result; } void CWatchServerSocket::GetAddress(const SockAddrIn& addrIn, CString& rString) const { TCHAR szIPAddr[MAX_PATH] = { 0 }; CSocketHandle::FormatIP(szIPAddr, MAX_PATH, addrIn); rString.Format(_T("%s : %d"), szIPAddr, static_cast(static_cast(ntohs(addrIn.GetPort()))) ); } /////////////////////////////////////////////////////////////////////////////// // 实现ISocketClientHandler的通信方法 void CWatchServerSocket::OnThreadBegin(CSocketHandle* pSH) { ASSERT( pSH == m_SocketClient ); (pSH); CString strAddr; SockAddrIn sockAddr; m_SocketClient->GetSockName(sockAddr); GetAddress( sockAddr, strAddr ); InitializeCriticalSection(&pSH->m_hClient2SrvSection); // AppendText( _T("Client Running on: %s\r\n"), strAddr); } void CWatchServerSocket::OnThreadExit(CSocketHandle* pSH) { ASSERT( pSH == m_SocketClient ); DeleteCriticalSection( &pSH->m_hClient2SrvSection ); (pSH); } void CWatchServerSocket::OnDataReceived(CSocketHandle* pSH, const BYTE* pbData, DWORD dwCount, const SockAddrIn& addr) { ASSERT( pSH == m_SocketClient ); (pSH); if( !m_SocketClient->IsOpen() ) return; ProcessData( pSH, pbData, dwCount ); //int nRet; //nRet = OnCmdProcess( (void *)pbData ); //if( nRet == -1 ) //{ // TRACE("crc32 error \r\n"); //} } void CWatchServerSocket::OnConnectionDropped(CSocketHandle* pSH) { ASSERT( pSH == m_SocketClient ); (pSH); m_bSocket = FALSE; LogEvent( _T("Connection lost with server. Need restart.\r\n") ); } void CWatchServerSocket::OnConnectionError(CSocketHandle* pSH, DWORD dwError) { ASSERT( pSH == m_SocketClient ); (pSH); //_com_error err(dwError); m_bSocket = FALSE; //LogEvent( _T("Communication Error:\r\n%s\r\n"), err.ErrorMessage() ); } unsigned int CWatchServerSocket::CalcCheckSum( void *pData, unsigned int nSize ) { unsigned int checksum = 0; if ( nSize <= sizeof( ProtocolHeader ) ) { return 0; } unsigned char *pBody = &( ( unsigned char* )pData )[ sizeof( ProtocolHeader ) ]; nSize -= sizeof( ProtocolHeader ); checksum = crc32( 0, pBody, nSize ); return checksum; } void CWatchServerSocket::ProcessHeart(void *pData, int nLen) { //m_SocketClient->Write((const LPBYTE)pData, nLen, NULL); ProtocolHeader *pHeader = (ProtocolHeader *)pData; if( pHeader != NULL ) { if( pHeader->nCmdType >= 0 && pHeader->nCmdType < MAX_DLL_TYPE ) { g_dwServiceOnlineTick[pHeader->nCmdType] = GetTickCount(); //CString str; //str.Format("动态库类型=%d, 收到心跳包", pHeader->nCmdType); //LogEvent(str); } } //LogEvent("收到心跳包"); } int CWatchServerSocket::OnCmdProcess(void *pData) { ProtocolHeader *pHeader = (ProtocolHeader *)pData; if( pHeader == NULL ) return -1; if( pHeader->nLen < 0 || pHeader->nLen > 65535 ) return -1; switch( pHeader->nCmd ) { case CMD_WATCH_HEART: ProcessHeart(pData, pHeader->nLen); break; } return 0; } void CWatchServerSocket::ProcessData(CSocketHandle *pSH, const BYTE* pData, DWORD nLen) { DWORD nBuffIndex = 0; EnterCriticalSection( &(pSH->m_hClient2SrvSection) ); while( nBuffIndex < nLen ) { ProtocolHeader *pHeader; //当前协议包头 DWORD nProcessedLen = 0; //当前循环处理了多少个字节 if( pSH->m_nPendingSize > 0 ) // 开始组包 { pHeader = (ProtocolHeader *)pSH->m_PendingBuffer; if( pSH->m_nPendingSize < sizeof(ProtocolHeader) ) //上一次接收到的长度小于包头 { DWORD nLinkHeaderLen = sizeof( ProtocolHeader ) - pSH->m_nPendingSize; if( nLinkHeaderLen <= nLen ) //这次可以收完包头 { memcpy( &pSH->m_PendingBuffer[ pSH->m_nPendingSize ], pData, nLinkHeaderLen ); //这里已经收完Header nProcessedLen = pHeader->nLen - pSH->m_nPendingSize; if( nProcessedLen <= nLen ) //如果所需处理的长度小于等于当前包长度 { memcpy( &pSH->m_PendingBuffer[ sizeof( ProtocolHeader ) ], & ( ( char *) pData )[ nLinkHeaderLen ], pHeader->nLen - sizeof( ProtocolHeader ) ); pSH->m_nPendingSize = 0; // 收完所需的包,置m_nPendingSize为0 } else { int nTemp = nLen - nLinkHeaderLen; //除去头剩余部分的长度 if ( nTemp > 0 ) //刚好是Header的长度,不用拷贝内存,所以这里加了>0的判断 { memcpy( &pSH->m_PendingBuffer[ sizeof( ProtocolHeader ) ], & ( ( char *) pData )[ nLinkHeaderLen ], nTemp ); } pSH->m_nPendingSize += nLen; } } else //这次还是没有收完包头, 继续Pending { memcpy( &pSH->m_PendingBuffer[ pSH->m_nPendingSize ], pData, nLen ); pSH->m_nPendingSize += nLen; nProcessedLen = nLen; } } else //Header部分已经在阻塞的缓冲区中 { nProcessedLen = pHeader->nLen - pSH->m_nPendingSize; if ( nProcessedLen <= nLen ) //如果需要处理的长度小于现有包的长度 { memcpy( &pSH->m_PendingBuffer[ pSH->m_nPendingSize ], pData, nProcessedLen ); pSH->m_nPendingSize = 0; } else //否则要继续阻塞 { memcpy( &pSH->m_PendingBuffer[ pSH->m_nPendingSize ], pData, nLen ); pSH->m_nPendingSize += nLen; } } } else //第一次接包 { pHeader = (ProtocolHeader *)&( (unsigned char *)pData )[nBuffIndex]; if( nLen - nBuffIndex < sizeof(ProtocolHeader) ) // 没有收够包头,先记录当前收到的Buffer { //如果第一次接包就没有收够包头,认为是非法包,扔掉,就是说已处理的长度nProcessedLen = 0 pSH->m_nPendingSize = nLen - nBuffIndex; memcpy(pSH->m_PendingBuffer, pHeader, pSH->m_nPendingSize); } else { nProcessedLen = pHeader->nLen; if( (int)pHeader->nLen > nLen - nBuffIndex ) { memcpy(pSH->m_PendingBuffer, pHeader, nLen - nBuffIndex); //如果第一次接包,pHeader->nLen大于当前包的总长,认为是非法包,扔掉 if( nBuffIndex == 0 ) { //组包错误,则扔掉当前包 TRACE("pHeader->nLen大于当前包的总长,认为是非法包,扔掉\r\n"); break; } pSH->m_nPendingSize = nLen - nBuffIndex; nProcessedLen = nLen - nBuffIndex; } else { pSH->m_nPendingSize = 0; } } } if ( nProcessedLen == 0 ) { // 没有收够包头,认为是非法包,扔掉 TRACE("没有收够包头,认为是非法包,扔掉\r\n"); break; } if ( pSH->m_nPendingSize == 0 ) { if ( pHeader->nLen > SOCKET_BUFFSIZE ) { // 包长度超过限制 TRACE("pHeader->nLen超过限制\r\n"); } if(-1 == OnCmdProcess( pHeader )) { //MessageBox( NULL, "Error OnCmdProcess", NULL, MB_OK ); TRACE("crc校验错误!\r\n"); break; } } nBuffIndex += nProcessedLen; } LeaveCriticalSection( &(pSH->m_hClient2SrvSection) ); }