TcpAgent.cpp 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387
  1. /*
  2. * Copyright: JessMA Open Source (ldcsaa@gmail.com)
  3. *
  4. * Version : 3.6.1
  5. * Author : Bruce Liang
  6. * Website : http://www.jessma.org
  7. * Project : https://github.com/ldcsaa
  8. * Blog : http://www.cnblogs.com/ldcsaa
  9. * Wiki : http://www.oschina.net/p/hp-socket
  10. * QQ Group : 75375912
  11. *
  12. * Licensed under the Apache License, Version 2.0 (the "License");
  13. * you may not use this file except in compliance with the License.
  14. * You may obtain a copy of the License at
  15. *
  16. * http://www.apache.org/licenses/LICENSE-2.0
  17. *
  18. * Unless required by applicable law or agreed to in writing, software
  19. * distributed under the License is distributed on an "AS IS" BASIS,
  20. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  21. * See the License for the specific language governing permissions and
  22. * limitations under the License.
  23. */
  24. #include "stdafx.h"
  25. #include <atlfile.h>
  26. #include "TcpAgent.h"
  27. #include "../../Common/Src/WaitFor.h"
  28. #include <malloc.h>
  29. #include <process.h>
  30. EnHandleResult CTcpAgent::TriggerFireConnect(TSocketObj* pSocketObj)
  31. {
  32. CReentrantSpinLock locallock(pSocketObj->csRecv);
  33. return FireConnect(pSocketObj);
  34. }
  35. EnHandleResult CTcpAgent::TriggerFireReceive(TSocketObj* pSocketObj, TBufferObj* pBufferObj)
  36. {
  37. EnHandleResult rs = (EnHandleResult)HR_CLOSED;
  38. if(TSocketObj::IsValid(pSocketObj))
  39. {
  40. CReentrantSpinLock locallock(pSocketObj->csRecv);
  41. if(TSocketObj::IsValid(pSocketObj))
  42. {
  43. rs = FireReceive(pSocketObj, (BYTE*)pBufferObj->buff.buf, pBufferObj->buff.len);
  44. }
  45. }
  46. return rs;
  47. }
  48. EnHandleResult CTcpAgent::TriggerFireSend(TSocketObj* pSocketObj, TBufferObj* pBufferObj)
  49. {
  50. EnHandleResult rs = FireSend(pSocketObj, (BYTE*)pBufferObj->buff.buf, pBufferObj->buff.len);
  51. if(rs == HR_ERROR)
  52. {
  53. TRACE("<S-CNNID: %Iu> OnSend() event should not return 'HR_ERROR' !!\n", pSocketObj->connID);
  54. ASSERT(FALSE);
  55. }
  56. AddFreeBufferObj(pBufferObj);
  57. return rs;
  58. }
  59. EnHandleResult CTcpAgent::TriggerFireClose(TSocketObj* pSocketObj, EnSocketOperation enOperation, int iErrorCode)
  60. {
  61. CReentrantSpinLock locallock(pSocketObj->csRecv);
  62. return FireClose(pSocketObj, enOperation, iErrorCode);
  63. }
  64. void CTcpAgent::SetLastError(EnSocketError code, LPCSTR func, int ec)
  65. {
  66. m_enLastError = code;
  67. TRACE("%s --> Error: %d, EC: %d\n", func, code, ec);
  68. }
  69. BOOL CTcpAgent::Start(LPCTSTR lpszBindAddress, BOOL bAsyncConnect)
  70. {
  71. if(!CheckParams() || !CheckStarting())
  72. return FALSE;
  73. PrepareStart();
  74. if(ParseBindAddress(lpszBindAddress))
  75. if(CreateCompletePort())
  76. if(CreateWorkerThreads())
  77. {
  78. m_bAsyncConnect = bAsyncConnect;
  79. m_enState = SS_STARTED;
  80. return TRUE;
  81. }
  82. Stop();
  83. return FALSE;
  84. }
  85. BOOL CTcpAgent::CheckParams()
  86. {
  87. if ((m_enSendPolicy >= SP_PACK && m_enSendPolicy <= SP_DIRECT) &&
  88. ((int)m_dwMaxConnectionCount > 0) &&
  89. ((int)m_dwWorkerThreadCount > 0 && m_dwWorkerThreadCount <= MAX_WORKER_THREAD_COUNT) &&
  90. ((int)m_dwSocketBufferSize >= MIN_SOCKET_BUFFER_SIZE) &&
  91. ((int)m_dwFreeSocketObjLockTime >= 0) &&
  92. ((int)m_dwFreeSocketObjPool >= 0) &&
  93. ((int)m_dwFreeBufferObjPool >= 0) &&
  94. ((int)m_dwFreeSocketObjHold >= m_dwFreeSocketObjPool) &&
  95. ((int)m_dwFreeBufferObjHold >= m_dwFreeBufferObjPool) &&
  96. ((int)m_dwKeepAliveTime >= 1000 || m_dwKeepAliveTime == 0) &&
  97. ((int)m_dwKeepAliveInterval >= 1000 || m_dwKeepAliveInterval == 0) )
  98. return TRUE;
  99. SetLastError(SE_INVALID_PARAM, __FUNCTION__, ERROR_INVALID_PARAMETER);
  100. return FALSE;
  101. }
  102. void CTcpAgent::PrepareStart()
  103. {
  104. m_bfActiveSockets.Reset(m_dwMaxConnectionCount);
  105. m_lsFreeSocket.Reset(m_dwFreeSocketObjHold);
  106. m_bfObjPool.SetItemCapacity((int)m_dwSocketBufferSize);
  107. m_bfObjPool.SetPoolSize((int)m_dwFreeBufferObjPool);
  108. m_bfObjPool.SetPoolHold((int)m_dwFreeBufferObjHold);
  109. m_bfObjPool.Prepare();
  110. }
  111. BOOL CTcpAgent::CheckStarting()
  112. {
  113. CSpinLock locallock(m_csState);
  114. if(m_enState == SS_STOPPED)
  115. m_enState = SS_STARTING;
  116. else
  117. {
  118. SetLastError(SE_ILLEGAL_STATE, __FUNCTION__, ERROR_INVALID_OPERATION);
  119. return FALSE;
  120. }
  121. return TRUE;
  122. }
  123. BOOL CTcpAgent::CheckStoping()
  124. {
  125. if(m_enState == SS_STOPPED)
  126. return FALSE;
  127. CSpinLock locallock(m_csState);
  128. if(HasStarted())
  129. {
  130. m_enState = SS_STOPPING;
  131. return TRUE;
  132. }
  133. else if(m_enState == SS_STOPPING)
  134. {
  135. while(m_enState != SS_STOPPED)
  136. ::Sleep(30);
  137. SetLastError(SE_ILLEGAL_STATE, __FUNCTION__, ERROR_INVALID_OPERATION);
  138. }
  139. return FALSE;
  140. }
  141. BOOL CTcpAgent::ParseBindAddress(LPCTSTR lpszBindAddress)
  142. {
  143. BOOL isOK = FALSE;
  144. SOCKET sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  145. if(sock != INVALID_SOCKET)
  146. {
  147. if(!lpszBindAddress)
  148. lpszBindAddress = DEFAULT_BIND_ADDRESS;
  149. ::sockaddr_A_2_IN(AF_INET, lpszBindAddress, 0, m_soAddrIN);
  150. if(::bind(sock, (SOCKADDR*)&m_soAddrIN, sizeof(SOCKADDR_IN)) != SOCKET_ERROR)
  151. {
  152. m_pfnConnectEx = ::Get_ConnectEx_FuncPtr(sock);
  153. m_pfnDisconnectEx = ::Get_DisconnectEx_FuncPtr(sock);
  154. ASSERT(m_pfnConnectEx);
  155. ASSERT(m_pfnDisconnectEx);
  156. isOK = TRUE;
  157. }
  158. else
  159. SetLastError(SE_SOCKET_BIND, __FUNCTION__, ::WSAGetLastError());
  160. ::ManualCloseSocket(sock);
  161. }
  162. else
  163. SetLastError(SE_SOCKET_CREATE, __FUNCTION__, ::WSAGetLastError());
  164. return isOK;
  165. }
  166. BOOL CTcpAgent::CreateCompletePort()
  167. {
  168. m_hCompletePort = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);
  169. if(m_hCompletePort == nullptr)
  170. SetLastError(SE_CP_CREATE, __FUNCTION__, ::GetLastError());
  171. return (m_hCompletePort != nullptr);
  172. }
  173. BOOL CTcpAgent::CreateWorkerThreads()
  174. {
  175. BOOL isOK = TRUE;
  176. for(DWORD i = 0; i < m_dwWorkerThreadCount; i++)
  177. {
  178. HANDLE hThread = (HANDLE)_beginthreadex(nullptr, 0, WorkerThreadProc, (LPVOID)this, 0, nullptr);
  179. if(hThread)
  180. m_vtWorkerThreads.push_back(hThread);
  181. else
  182. {
  183. SetLastError(SE_WORKER_THREAD_CREATE, __FUNCTION__, ::GetLastError());
  184. isOK = FALSE;
  185. break;
  186. }
  187. }
  188. return isOK;
  189. }
  190. BOOL CTcpAgent::Stop()
  191. {
  192. if(!CheckStoping())
  193. return FALSE;
  194. ::WaitWithMessageLoop(150);
  195. DisconnectClientSocket();
  196. WaitForClientSocketClose();
  197. WaitForWorkerThreadEnd();
  198. ReleaseClientSocket();
  199. FireShutdown();
  200. ReleaseFreeSocket();
  201. ReleaseFreeBuffer();
  202. CloseCompletePort();
  203. Reset();
  204. return TRUE;
  205. }
  206. void CTcpAgent::Reset()
  207. {
  208. m_phSocket.Reset();
  209. ::ZeroMemory((void*)&m_soAddrIN, sizeof(SOCKADDR_IN));
  210. m_pfnConnectEx = nullptr;
  211. m_pfnDisconnectEx = nullptr;
  212. m_enState = SS_STOPPED;
  213. }
  214. void CTcpAgent::DisconnectClientSocket()
  215. {
  216. DWORD size = 0;
  217. unique_ptr<CONNID[]> ids = m_bfActiveSockets.GetAllElementIndexes(size);
  218. for(DWORD i = 0; i < size; i++)
  219. Disconnect(ids[i]);
  220. }
  221. void CTcpAgent::ReleaseClientSocket()
  222. {
  223. VERIFY(m_bfActiveSockets.IsEmpty());
  224. m_bfActiveSockets.Reset();
  225. }
  226. TSocketObj* CTcpAgent::GetFreeSocketObj(CONNID dwConnID, SOCKET soClient)
  227. {
  228. DWORD dwIndex;
  229. TSocketObj* pSocketObj = nullptr;
  230. if(m_lsFreeSocket.TryLock(&pSocketObj, dwIndex))
  231. {
  232. if(::GetTimeGap32(pSocketObj->freeTime) >= m_dwFreeSocketObjLockTime)
  233. m_lsFreeSocket.ReleaseLock(nullptr, dwIndex);
  234. else
  235. {
  236. m_lsFreeSocket.ReleaseLock(pSocketObj, dwIndex);
  237. pSocketObj = nullptr;
  238. }
  239. }
  240. if(!pSocketObj) pSocketObj = CreateSocketObj();
  241. pSocketObj->Reset(dwConnID, soClient);
  242. return pSocketObj;
  243. }
  244. void CTcpAgent::AddFreeSocketObj(TSocketObj* pSocketObj, EnSocketCloseFlag enFlag, EnSocketOperation enOperation, int iErrorCode)
  245. {
  246. if(!InvalidSocketObj(pSocketObj))
  247. return;
  248. CloseClientSocketObj(pSocketObj, enFlag, enOperation, iErrorCode);
  249. TSocketObj::Release(pSocketObj);
  250. m_bfActiveSockets.Remove(pSocketObj->connID);
  251. if(!m_lsFreeSocket.TryPut(pSocketObj))
  252. {
  253. m_lsGCSocket.PushBack(pSocketObj);
  254. if(m_lsGCSocket.Size() > m_dwFreeSocketObjPool)
  255. ReleaseGCSocketObj();
  256. }
  257. }
  258. void CTcpAgent::ReleaseGCSocketObj(BOOL bForce)
  259. {
  260. TSocketObj* pSocketObj = nullptr;
  261. DWORD now = ::TimeGetTime();
  262. while(m_lsGCSocket.PopFront(&pSocketObj))
  263. {
  264. if(bForce || (int)(now - pSocketObj->freeTime) >= (int)m_dwFreeSocketObjLockTime)
  265. DeleteSocketObj(pSocketObj);
  266. else
  267. {
  268. m_lsGCSocket.PushBack(pSocketObj);
  269. break;
  270. }
  271. }
  272. }
  273. BOOL CTcpAgent::InvalidSocketObj(TSocketObj* pSocketObj)
  274. {
  275. BOOL bDone = FALSE;
  276. if(TSocketObj::IsValid(pSocketObj))
  277. {
  278. CReentrantSpinLock locallock(pSocketObj->csRecv);
  279. CCriSecLock locallock2(pSocketObj->csSend);
  280. if(TSocketObj::IsValid(pSocketObj))
  281. {
  282. TSocketObj::Invalid(pSocketObj);
  283. bDone = TRUE;
  284. }
  285. }
  286. return bDone;
  287. }
  288. void CTcpAgent::AddClientSocketObj(CONNID dwConnID, TSocketObj* pSocketObj)
  289. {
  290. ASSERT(FindSocketObj(dwConnID) == nullptr);
  291. pSocketObj->connTime = ::TimeGetTime();
  292. pSocketObj->activeTime = pSocketObj->connTime;
  293. VERIFY(m_bfActiveSockets.ReleaseLock(dwConnID, pSocketObj));
  294. }
  295. void CTcpAgent::ReleaseFreeSocket()
  296. {
  297. TSocketObj* pSocketObj = nullptr;
  298. while(m_lsFreeSocket.TryGet(&pSocketObj))
  299. DeleteSocketObj(pSocketObj);
  300. VERIFY(m_lsFreeSocket.IsEmpty());
  301. m_lsFreeSocket.Reset();
  302. ReleaseGCSocketObj(TRUE);
  303. VERIFY(m_lsGCSocket.IsEmpty());
  304. }
  305. TSocketObj* CTcpAgent::CreateSocketObj()
  306. {
  307. TSocketObj* pSocketObj = (TSocketObj*)m_phSocket.Alloc(sizeof(TSocketObj));
  308. ASSERT(pSocketObj);
  309. pSocketObj->TSocketObj::TSocketObj(m_bfObjPool);
  310. return pSocketObj;
  311. }
  312. void CTcpAgent::DeleteSocketObj(TSocketObj* pSocketObj)
  313. {
  314. ASSERT(pSocketObj);
  315. pSocketObj->TSocketObj::~TSocketObj();
  316. m_phSocket.Free(pSocketObj);
  317. }
  318. TBufferObj* CTcpAgent::GetFreeBufferObj(int iLen)
  319. {
  320. ASSERT(iLen >= -1 && iLen <= (int)m_dwSocketBufferSize);
  321. TBufferObj* pBufferObj = m_bfObjPool.PickFreeItem();
  322. if(iLen < 0) iLen = m_dwSocketBufferSize;
  323. pBufferObj->buff.len = iLen;
  324. return pBufferObj;
  325. }
  326. void CTcpAgent::AddFreeBufferObj(TBufferObj* pBufferObj)
  327. {
  328. m_bfObjPool.PutFreeItem(pBufferObj);
  329. }
  330. void CTcpAgent::ReleaseFreeBuffer()
  331. {
  332. m_bfObjPool.Clear();
  333. }
  334. TSocketObj* CTcpAgent::FindSocketObj(CONNID dwConnID)
  335. {
  336. TSocketObj* pSocketObj = nullptr;
  337. if(!m_bfActiveSockets.Get(dwConnID, &pSocketObj))
  338. pSocketObj = nullptr;
  339. return pSocketObj;
  340. }
  341. void CTcpAgent::CloseClientSocketObj(TSocketObj* pSocketObj, EnSocketCloseFlag enFlag, EnSocketOperation enOperation, int iErrorCode, int iShutdownFlag)
  342. {
  343. ASSERT(TSocketObj::IsExist(pSocketObj));
  344. if(enFlag == SCF_CLOSE)
  345. TriggerFireClose(pSocketObj, SO_CLOSE, SE_OK);
  346. else if(enFlag == SCF_ERROR)
  347. TriggerFireClose(pSocketObj, enOperation, iErrorCode);
  348. SOCKET socket = pSocketObj->socket;
  349. pSocketObj->socket = INVALID_SOCKET;
  350. ::ManualCloseSocket(socket, iShutdownFlag);
  351. }
  352. BOOL CTcpAgent::GetLocalAddress(CONNID dwConnID, TCHAR lpszAddress[], int& iAddressLen, USHORT& usPort)
  353. {
  354. ASSERT(lpszAddress != nullptr && iAddressLen > 0);
  355. TSocketObj* pSocketObj = FindSocketObj(dwConnID);
  356. if(TSocketObj::IsValid(pSocketObj))
  357. return ::GetSocketLocalAddress(pSocketObj->socket, lpszAddress, iAddressLen, usPort);
  358. return FALSE;
  359. }
  360. BOOL CTcpAgent::GetRemoteAddress(CONNID dwConnID, TCHAR lpszAddress[], int& iAddressLen, USHORT& usPort)
  361. {
  362. ASSERT(lpszAddress != nullptr && iAddressLen > 0);
  363. TSocketObj* pSocketObj = FindSocketObj(dwConnID);
  364. if(TSocketObj::IsExist(pSocketObj))
  365. {
  366. ADDRESS_FAMILY usFamily;
  367. return ::sockaddr_IN_2_A(pSocketObj->remoteAddr, usFamily, lpszAddress, iAddressLen, usPort);
  368. }
  369. return FALSE;
  370. }
  371. BOOL CTcpAgent::SetConnectionExtra(CONNID dwConnID, PVOID pExtra)
  372. {
  373. TSocketObj* pSocketObj = FindSocketObj(dwConnID);
  374. return SetConnectionExtra(pSocketObj, pExtra);
  375. }
  376. BOOL CTcpAgent::SetConnectionExtra(TSocketObj* pSocketObj, PVOID pExtra)
  377. {
  378. if(TSocketObj::IsExist(pSocketObj))
  379. {
  380. pSocketObj->extra = pExtra;
  381. return TRUE;
  382. }
  383. return FALSE;
  384. }
  385. BOOL CTcpAgent::GetConnectionExtra(CONNID dwConnID, PVOID* ppExtra)
  386. {
  387. TSocketObj* pSocketObj = FindSocketObj(dwConnID);
  388. return GetConnectionExtra(pSocketObj, ppExtra);
  389. }
  390. BOOL CTcpAgent::GetConnectionExtra(TSocketObj* pSocketObj, PVOID* ppExtra)
  391. {
  392. ASSERT(ppExtra != nullptr);
  393. if(TSocketObj::IsExist(pSocketObj))
  394. {
  395. *ppExtra = pSocketObj->extra;
  396. return TRUE;
  397. }
  398. return FALSE;
  399. }
  400. BOOL CTcpAgent::SetConnectionReserved(CONNID dwConnID, PVOID pReserved)
  401. {
  402. TSocketObj* pSocketObj = FindSocketObj(dwConnID);
  403. return SetConnectionReserved(pSocketObj, pReserved);
  404. }
  405. BOOL CTcpAgent::SetConnectionReserved(TSocketObj* pSocketObj, PVOID pReserved)
  406. {
  407. if(TSocketObj::IsExist(pSocketObj))
  408. {
  409. pSocketObj->reserved = pReserved;
  410. return TRUE;
  411. }
  412. return FALSE;
  413. }
  414. BOOL CTcpAgent::GetConnectionReserved(CONNID dwConnID, PVOID* ppReserved)
  415. {
  416. TSocketObj* pSocketObj = FindSocketObj(dwConnID);
  417. return GetConnectionReserved(pSocketObj, ppReserved);
  418. }
  419. BOOL CTcpAgent::GetConnectionReserved(TSocketObj* pSocketObj, PVOID* ppReserved)
  420. {
  421. ASSERT(ppReserved != nullptr);
  422. if(TSocketObj::IsExist(pSocketObj))
  423. {
  424. *ppReserved = pSocketObj->reserved;
  425. return TRUE;
  426. }
  427. return FALSE;
  428. }
  429. BOOL CTcpAgent::SetConnectionReserved2(CONNID dwConnID, PVOID pReserved2)
  430. {
  431. TSocketObj* pSocketObj = FindSocketObj(dwConnID);
  432. return SetConnectionReserved2(pSocketObj, pReserved2);
  433. }
  434. BOOL CTcpAgent::SetConnectionReserved2(TSocketObj* pSocketObj, PVOID pReserved2)
  435. {
  436. if(TSocketObj::IsExist(pSocketObj))
  437. {
  438. pSocketObj->reserved2 = pReserved2;
  439. return TRUE;
  440. }
  441. return FALSE;
  442. }
  443. BOOL CTcpAgent::GetConnectionReserved2(CONNID dwConnID, PVOID* ppReserved2)
  444. {
  445. TSocketObj* pSocketObj = FindSocketObj(dwConnID);
  446. return GetConnectionReserved2(pSocketObj, ppReserved2);
  447. }
  448. BOOL CTcpAgent::GetConnectionReserved2(TSocketObj* pSocketObj, PVOID* ppReserved2)
  449. {
  450. ASSERT(ppReserved2 != nullptr);
  451. if(TSocketObj::IsExist(pSocketObj))
  452. {
  453. *ppReserved2 = pSocketObj->reserved2;
  454. return TRUE;
  455. }
  456. return FALSE;
  457. }
  458. BOOL CTcpAgent::GetPendingDataLength(CONNID dwConnID, int& iPending)
  459. {
  460. TSocketObj* pSocketObj = FindSocketObj(dwConnID);
  461. if(TSocketObj::IsValid(pSocketObj))
  462. {
  463. iPending = pSocketObj->Pending();
  464. return TRUE;
  465. }
  466. return FALSE;
  467. }
  468. DWORD CTcpAgent::GetConnectionCount()
  469. {
  470. return m_bfActiveSockets.Elements();
  471. }
  472. BOOL CTcpAgent::GetAllConnectionIDs(CONNID pIDs[], DWORD& dwCount)
  473. {
  474. return m_bfActiveSockets.GetAllElementIndexes(pIDs, dwCount);
  475. }
  476. BOOL CTcpAgent::GetConnectPeriod(CONNID dwConnID, DWORD& dwPeriod)
  477. {
  478. BOOL isOK = TRUE;
  479. TSocketObj* pSocketObj = FindSocketObj(dwConnID);
  480. if(TSocketObj::IsValid(pSocketObj))
  481. dwPeriod = ::GetTimeGap32(pSocketObj->connTime);
  482. else
  483. isOK = FALSE;
  484. return isOK;
  485. }
  486. BOOL CTcpAgent::GetSilencePeriod(CONNID dwConnID, DWORD& dwPeriod)
  487. {
  488. if(!m_bMarkSilence)
  489. return FALSE;
  490. BOOL isOK = TRUE;
  491. TSocketObj* pSocketObj = FindSocketObj(dwConnID);
  492. if(TSocketObj::IsValid(pSocketObj))
  493. dwPeriod = ::GetTimeGap32(pSocketObj->activeTime);
  494. else
  495. isOK = FALSE;
  496. return isOK;
  497. }
  498. BOOL CTcpAgent::Disconnect(CONNID dwConnID, BOOL bForce)
  499. {
  500. BOOL isOK = FALSE;
  501. TSocketObj* pSocketObj = FindSocketObj(dwConnID);
  502. if(TSocketObj::IsValid(pSocketObj))
  503. {
  504. if(bForce)
  505. isOK = ::PostIocpDisconnect(m_hCompletePort, dwConnID);
  506. else
  507. isOK = m_pfnDisconnectEx(pSocketObj->socket, nullptr, 0, 0);
  508. }
  509. return isOK;
  510. }
  511. BOOL CTcpAgent::DisconnectLongConnections(DWORD dwPeriod, BOOL bForce)
  512. {
  513. if(dwPeriod > MAX_CONNECTION_PERIOD)
  514. return FALSE;
  515. DWORD size = 0;
  516. unique_ptr<CONNID[]> ids = m_bfActiveSockets.GetAllElementIndexes(size);
  517. DWORD now = ::TimeGetTime();
  518. for(DWORD i = 0; i < size; i++)
  519. {
  520. CONNID connID = ids[i];
  521. TSocketObj* pSocketObj = FindSocketObj(connID);
  522. if(TSocketObj::IsValid(pSocketObj) && (int)(now - pSocketObj->connTime) >= (int)dwPeriod)
  523. Disconnect(connID, bForce);
  524. }
  525. return TRUE;
  526. }
  527. BOOL CTcpAgent::DisconnectSilenceConnections(DWORD dwPeriod, BOOL bForce)
  528. {
  529. if(!m_bMarkSilence)
  530. return FALSE;
  531. if(dwPeriod > MAX_CONNECTION_PERIOD)
  532. return FALSE;
  533. DWORD size = 0;
  534. unique_ptr<CONNID[]> ids = m_bfActiveSockets.GetAllElementIndexes(size);
  535. DWORD now = ::TimeGetTime();
  536. for(DWORD i = 0; i < size; i++)
  537. {
  538. CONNID connID = ids[i];
  539. TSocketObj* pSocketObj = FindSocketObj(connID);
  540. if(TSocketObj::IsValid(pSocketObj) && (int)(now - pSocketObj->activeTime) >= (int)dwPeriod)
  541. Disconnect(connID, bForce);
  542. }
  543. return TRUE;
  544. }
  545. void CTcpAgent::WaitForClientSocketClose()
  546. {
  547. while(m_bfActiveSockets.Elements() > 0)
  548. ::WaitWithMessageLoop(100);
  549. }
  550. void CTcpAgent::WaitForWorkerThreadEnd()
  551. {
  552. int count = (int)m_vtWorkerThreads.size();
  553. for(int i = 0; i < count; i++)
  554. ::PostIocpExit(m_hCompletePort);
  555. int remain = count;
  556. int index = 0;
  557. while(remain > 0)
  558. {
  559. int wait = min(remain, MAXIMUM_WAIT_OBJECTS);
  560. HANDLE* pHandles = (HANDLE*)_alloca(sizeof(HANDLE) * wait);
  561. for(int i = 0; i < wait; i++)
  562. pHandles[i] = m_vtWorkerThreads[i + index];
  563. VERIFY(::WaitForMultipleObjects((DWORD)wait, pHandles, TRUE, INFINITE) == WAIT_OBJECT_0);
  564. for(int i = 0; i < wait; i++)
  565. ::CloseHandle(pHandles[i]);
  566. remain -= wait;
  567. index += wait;
  568. }
  569. m_vtWorkerThreads.clear();
  570. }
  571. void CTcpAgent::CloseCompletePort()
  572. {
  573. if(m_hCompletePort != nullptr)
  574. {
  575. ::CloseHandle(m_hCompletePort);
  576. m_hCompletePort = nullptr;
  577. }
  578. }
  579. UINT WINAPI CTcpAgent::WorkerThreadProc(LPVOID pv)
  580. {
  581. CTcpAgent* pAgent = (CTcpAgent*)pv;
  582. while(TRUE)
  583. {
  584. DWORD dwErrorCode = NO_ERROR;
  585. DWORD dwBytes;
  586. OVERLAPPED* pOverlapped;
  587. TSocketObj* pSocketObj;
  588. BOOL result = ::GetQueuedCompletionStatus
  589. (
  590. pAgent->m_hCompletePort,
  591. &dwBytes,
  592. (PULONG_PTR)&pSocketObj,
  593. &pOverlapped,
  594. INFINITE
  595. );
  596. if(pOverlapped == nullptr)
  597. {
  598. EnIocpAction action = pAgent->CheckIocpCommand(pOverlapped, dwBytes, (ULONG_PTR)pSocketObj);
  599. if(action == IOCP_ACT_CONTINUE)
  600. continue;
  601. else if(action == IOCP_ACT_BREAK)
  602. break;
  603. }
  604. TBufferObj* pBufferObj = CONTAINING_RECORD(pOverlapped, TBufferObj, ov);
  605. CONNID dwConnID = pSocketObj->connID;
  606. if (!result)
  607. {
  608. DWORD dwFlag = 0;
  609. DWORD dwSysCode = ::GetLastError();
  610. if(pAgent->HasStarted())
  611. {
  612. SOCKET sock = pBufferObj->client;
  613. result = ::WSAGetOverlappedResult(sock, &pBufferObj->ov, &dwBytes, FALSE, &dwFlag);
  614. if (!result)
  615. {
  616. dwErrorCode = ::WSAGetLastError();
  617. TRACE("GetQueuedCompletionStatus error (<A-CNNID: %Iu> SYS: %d, SOCK: %d, FLAG: %d)\n", dwConnID, dwSysCode, dwErrorCode, dwFlag);
  618. }
  619. }
  620. else
  621. dwErrorCode = dwSysCode;
  622. ASSERT(dwSysCode != 0 && dwErrorCode != 0);
  623. }
  624. pAgent->HandleIo(dwConnID, pSocketObj, pBufferObj, dwBytes, dwErrorCode);
  625. }
  626. pAgent->OnWorkerThreadEnd(::GetCurrentThreadId());
  627. return 0;
  628. }
  629. EnIocpAction CTcpAgent::CheckIocpCommand(OVERLAPPED* pOverlapped, DWORD dwBytes, ULONG_PTR ulCompKey)
  630. {
  631. ASSERT(pOverlapped == nullptr);
  632. EnIocpAction action = IOCP_ACT_CONTINUE;
  633. if(dwBytes == IOCP_CMD_SEND)
  634. DoSend((CONNID)ulCompKey);
  635. else if(dwBytes == IOCP_CMD_DISCONNECT)
  636. ForceDisconnect((CONNID)ulCompKey);
  637. else if(dwBytes == IOCP_CMD_EXIT && ulCompKey == 0)
  638. action = IOCP_ACT_BREAK;
  639. else
  640. VERIFY(FALSE);
  641. return action;
  642. }
  643. void CTcpAgent::ForceDisconnect(CONNID dwConnID)
  644. {
  645. AddFreeSocketObj(FindSocketObj(dwConnID), SCF_CLOSE);
  646. }
  647. void CTcpAgent::HandleIo(CONNID dwConnID, TSocketObj* pSocketObj, TBufferObj* pBufferObj, DWORD dwBytes, DWORD dwErrorCode)
  648. {
  649. ASSERT(pBufferObj != nullptr);
  650. ASSERT(pSocketObj != nullptr);
  651. if(dwErrorCode != NO_ERROR)
  652. {
  653. HandleError(dwConnID, pSocketObj, pBufferObj, dwErrorCode);
  654. return;
  655. }
  656. if(dwBytes == 0 && pBufferObj->operation != SO_CONNECT)
  657. {
  658. AddFreeSocketObj(pSocketObj, SCF_CLOSE);
  659. AddFreeBufferObj(pBufferObj);
  660. return;
  661. }
  662. pBufferObj->buff.len = dwBytes;
  663. switch(pBufferObj->operation)
  664. {
  665. case SO_CONNECT:
  666. HandleConnect(dwConnID, pSocketObj, pBufferObj);
  667. break;
  668. case SO_SEND:
  669. HandleSend(dwConnID, pSocketObj, pBufferObj);
  670. break;
  671. case SO_RECEIVE:
  672. HandleReceive(dwConnID, pSocketObj, pBufferObj);
  673. break;
  674. default:
  675. ASSERT(FALSE);
  676. }
  677. }
  678. void CTcpAgent::HandleError(CONNID dwConnID, TSocketObj* pSocketObj, TBufferObj* pBufferObj, DWORD dwErrorCode)
  679. {
  680. CheckError(pSocketObj, pBufferObj->operation, dwErrorCode);
  681. AddFreeBufferObj(pBufferObj);
  682. }
  683. void CTcpAgent::HandleConnect(CONNID dwConnID, TSocketObj* pSocketObj, TBufferObj* pBufferObj)
  684. {
  685. ::SSO_UpdateConnectContext(pBufferObj->client, 0);
  686. if(TriggerFireConnect(pSocketObj) != HR_ERROR)
  687. DoReceive(dwConnID, pSocketObj, pBufferObj);
  688. else
  689. {
  690. AddFreeSocketObj(pSocketObj, SCF_NONE);
  691. AddFreeBufferObj(pBufferObj);
  692. }
  693. }
  694. void CTcpAgent::HandleSend(CONNID dwConnID, TSocketObj* pSocketObj, TBufferObj* pBufferObj)
  695. {
  696. switch(m_enSendPolicy)
  697. {
  698. case SP_PACK:
  699. {
  700. long sndCount = ::InterlockedDecrement(&pSocketObj->sndCount);
  701. TriggerFireSend(pSocketObj, pBufferObj);
  702. if(sndCount == 0) DoSendPack(pSocketObj);
  703. }
  704. break;
  705. case SP_SAFE:
  706. {
  707. long sndCount = ::InterlockedDecrement(&pSocketObj->sndCount);
  708. if(sndCount == 0 && !pSocketObj->smooth)
  709. {
  710. CCriSecLock locallock(pSocketObj->csSend);
  711. if((sndCount = pSocketObj->sndCount) == 0)
  712. pSocketObj->smooth = TRUE;
  713. }
  714. TriggerFireSend(pSocketObj, pBufferObj);
  715. if(sndCount == 0) DoSendSafe(pSocketObj);
  716. }
  717. break;
  718. case SP_DIRECT:
  719. {
  720. TriggerFireSend(pSocketObj, pBufferObj);
  721. }
  722. break;
  723. default:
  724. ASSERT(FALSE);
  725. }
  726. }
  727. void CTcpAgent::HandleReceive(CONNID dwConnID, TSocketObj* pSocketObj, TBufferObj* pBufferObj)
  728. {
  729. if(m_bMarkSilence) pSocketObj->activeTime = ::TimeGetTime();
  730. EnHandleResult hr = TriggerFireReceive(pSocketObj, pBufferObj);
  731. if(hr == HR_OK || hr == HR_IGNORE)
  732. DoReceive(dwConnID, pSocketObj, pBufferObj);
  733. else if(hr == HR_CLOSED)
  734. {
  735. AddFreeBufferObj(pBufferObj);
  736. }
  737. else
  738. {
  739. TRACE("<A-CNNID: %Iu> OnReceive() event return 'HR_ERROR', connection will be closed !\n", dwConnID);
  740. AddFreeSocketObj(pSocketObj, SCF_ERROR, SO_RECEIVE, ERROR_CANCELLED);
  741. AddFreeBufferObj(pBufferObj);
  742. }
  743. }
  744. int CTcpAgent::DoReceive(CONNID dwConnID, TSocketObj* pSocketObj, TBufferObj* pBufferObj)
  745. {
  746. pBufferObj->buff.len = m_dwSocketBufferSize;
  747. int result =::PostReceive(pSocketObj, pBufferObj);
  748. if(result != NO_ERROR)
  749. {
  750. CheckError(pSocketObj, SO_RECEIVE, result);
  751. AddFreeBufferObj(pBufferObj);
  752. }
  753. return result;
  754. }
  755. BOOL CTcpAgent::Connect(LPCTSTR lpszRemoteAddress, USHORT usPort, CONNID* pdwConnID)
  756. {
  757. ASSERT(lpszRemoteAddress && usPort != 0);
  758. DWORD result = NO_ERROR;
  759. CONNID dwConnID = 0;
  760. SOCKET soClient = INVALID_SOCKET;
  761. SOCKADDR_IN addr;
  762. if(!HasStarted())
  763. result = ERROR_INVALID_STATE;
  764. else
  765. {
  766. result = CreateClientSocket(lpszRemoteAddress, usPort, soClient, addr);
  767. if(result == NO_ERROR)
  768. {
  769. result = PrepareConnect(dwConnID, soClient);
  770. if(result == NO_ERROR)
  771. result = ConnectToServer(dwConnID, soClient, addr);
  772. }
  773. }
  774. if(result != NO_ERROR)
  775. {
  776. if(soClient != INVALID_SOCKET)
  777. ::ManualCloseSocket(soClient);
  778. ::SetLastError(result);
  779. }
  780. if(pdwConnID) *pdwConnID = dwConnID;
  781. return (result == NO_ERROR);
  782. }
  783. DWORD CTcpAgent::CreateClientSocket(LPCTSTR lpszRemoteAddress, USHORT usPort, SOCKET& soClient, SOCKADDR_IN& addr)
  784. {
  785. TCHAR szAddress[40];
  786. int iAddressLen = sizeof(szAddress) / sizeof(TCHAR);
  787. if(!::GetIPAddress(lpszRemoteAddress, szAddress, iAddressLen))
  788. return WSAEADDRNOTAVAIL;
  789. if(!::sockaddr_A_2_IN(AF_INET, szAddress, usPort, addr))
  790. return WSAEADDRNOTAVAIL;
  791. DWORD result = NO_ERROR;
  792. soClient = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  793. if(soClient != INVALID_SOCKET)
  794. {
  795. VERIFY(::SSO_ReuseAddress(soClient, m_bReuseAddress) != SOCKET_ERROR);
  796. BOOL bOnOff = (m_dwKeepAliveTime > 0 && m_dwKeepAliveInterval > 0);
  797. ::SSO_KeepAliveVals(soClient, bOnOff, m_dwKeepAliveTime, m_dwKeepAliveInterval);
  798. if(::bind(soClient, (SOCKADDR*)&m_soAddrIN, sizeof(SOCKADDR_IN)) == SOCKET_ERROR)
  799. result = ::WSAGetLastError();
  800. }
  801. else
  802. result = ::WSAGetLastError();
  803. return result;
  804. }
  805. DWORD CTcpAgent::PrepareConnect(CONNID& dwConnID, SOCKET soClient)
  806. {
  807. if(!m_bfActiveSockets.AcquireLock(dwConnID))
  808. return ERROR_CONNECTION_COUNT_LIMIT;
  809. if(FirePrepareConnect(dwConnID, soClient) == HR_ERROR)
  810. {
  811. VERIFY(m_bfActiveSockets.ReleaseLock(dwConnID, nullptr));
  812. return ERROR_CANCELLED;
  813. }
  814. return NO_ERROR;
  815. }
  816. DWORD CTcpAgent::ConnectToServer(CONNID dwConnID, SOCKET soClient, const SOCKADDR_IN& addr)
  817. {
  818. TBufferObj* pBufferObj = GetFreeBufferObj();
  819. TSocketObj* pSocketObj = GetFreeSocketObj(dwConnID, soClient);
  820. memcpy(&pSocketObj->remoteAddr, &addr, sizeof(SOCKADDR_IN));
  821. AddClientSocketObj(dwConnID, pSocketObj);
  822. DWORD result = NO_ERROR;
  823. BOOL bNeedFree = TRUE;
  824. if(m_bAsyncConnect)
  825. {
  826. if(::CreateIoCompletionPort((HANDLE)soClient, m_hCompletePort, (ULONG_PTR)pSocketObj, 0))
  827. {
  828. result = DoConnect(dwConnID, pSocketObj, pBufferObj);
  829. bNeedFree = FALSE;
  830. }
  831. else
  832. result = ::GetLastError();
  833. }
  834. else
  835. {
  836. if(::connect(soClient, (SOCKADDR*)&addr, sizeof(SOCKADDR_IN)) != SOCKET_ERROR)
  837. {
  838. if(::CreateIoCompletionPort((HANDLE)soClient, m_hCompletePort, (ULONG_PTR)pSocketObj, 0))
  839. {
  840. if(TriggerFireConnect(pSocketObj) != HR_ERROR)
  841. {
  842. result = DoReceive(dwConnID, pSocketObj, pBufferObj);
  843. bNeedFree = FALSE;
  844. }
  845. else
  846. result = ERROR_CANCELLED;
  847. }
  848. else
  849. result = ::GetLastError();
  850. }
  851. else
  852. result = ::WSAGetLastError();
  853. }
  854. if(result != NO_ERROR)
  855. {
  856. if(bNeedFree)
  857. {
  858. AddFreeSocketObj(pSocketObj, SCF_NONE);
  859. AddFreeBufferObj(pBufferObj);
  860. }
  861. }
  862. return result;
  863. }
  864. int CTcpAgent::DoConnect(CONNID dwConnID, TSocketObj* pSocketObj, TBufferObj* pBufferObj)
  865. {
  866. int result = ::PostConnect(m_pfnConnectEx, pSocketObj->socket, pSocketObj->remoteAddr, pBufferObj);
  867. if(result != NO_ERROR)
  868. {
  869. CheckError(pSocketObj, SO_CONNECT, result);
  870. AddFreeBufferObj(pBufferObj);
  871. }
  872. return result;
  873. }
  874. BOOL CTcpAgent::Send(CONNID dwConnID, const BYTE* pBuffer, int iLength, int iOffset)
  875. {
  876. ASSERT(pBuffer && iLength > 0);
  877. if(iOffset != 0) pBuffer += iOffset;
  878. WSABUF buffer;
  879. buffer.len = iLength;
  880. buffer.buf = (char*)pBuffer;
  881. return SendPackets(dwConnID, &buffer, 1);
  882. }
  883. BOOL CTcpAgent::DoSendPackets(CONNID dwConnID, const WSABUF pBuffers[], int iCount)
  884. {
  885. ASSERT(pBuffers && iCount > 0);
  886. TSocketObj* pSocketObj = FindSocketObj(dwConnID);
  887. if(!TSocketObj::IsValid(pSocketObj))
  888. {
  889. ::SetLastError(ERROR_OBJECT_NOT_FOUND);
  890. return FALSE;
  891. }
  892. return DoSendPackets(pSocketObj, pBuffers, iCount);
  893. }
  894. BOOL CTcpAgent::DoSendPackets(TSocketObj* pSocketObj, const WSABUF pBuffers[], int iCount)
  895. {
  896. ASSERT(pSocketObj && pBuffers && iCount > 0);
  897. int result = NO_ERROR;
  898. if(pBuffers && iCount > 0)
  899. {
  900. CCriSecLock locallock(pSocketObj->csSend);
  901. if(TSocketObj::IsValid(pSocketObj))
  902. result = SendInternal(pSocketObj, pBuffers, iCount);
  903. else
  904. result = ERROR_OBJECT_NOT_FOUND;
  905. }
  906. else
  907. result = ERROR_INVALID_PARAMETER;
  908. if(result != NO_ERROR)
  909. {
  910. if(m_enSendPolicy == SP_DIRECT && TSocketObj::IsValid(pSocketObj))
  911. CheckError(pSocketObj, SO_SEND, result);
  912. ::SetLastError(result);
  913. }
  914. return (result == NO_ERROR);
  915. }
  916. int CTcpAgent::SendInternal(TSocketObj* pSocketObj, const WSABUF pBuffers[], int iCount)
  917. {
  918. int result = NO_ERROR;
  919. for(int i = 0; i < iCount; i++)
  920. {
  921. int iBufLen = pBuffers[i].len;
  922. if(iBufLen > 0)
  923. {
  924. BYTE* pBuffer = (BYTE*)pBuffers[i].buf;
  925. ASSERT(pBuffer);
  926. switch(m_enSendPolicy)
  927. {
  928. case SP_PACK: result = SendPack(pSocketObj, pBuffer, iBufLen); break;
  929. case SP_SAFE: result = SendSafe(pSocketObj, pBuffer, iBufLen); break;
  930. case SP_DIRECT: result = SendDirect(pSocketObj, pBuffer, iBufLen); break;
  931. default: ASSERT(FALSE); result = ERROR_INVALID_INDEX; break;
  932. }
  933. if(result != NO_ERROR)
  934. break;
  935. }
  936. }
  937. return result;
  938. }
  939. int CTcpAgent::SendPack(TSocketObj* pSocketObj, const BYTE* pBuffer, int iLength)
  940. {
  941. BOOL isPostSend = !TSocketObj::IsPending(pSocketObj);
  942. return CatAndPost(pSocketObj, pBuffer, iLength, isPostSend);
  943. }
  944. int CTcpAgent::SendSafe(TSocketObj* pSocketObj, const BYTE* pBuffer, int iLength)
  945. {
  946. BOOL isPostSend = !TSocketObj::IsPending(pSocketObj) && TSocketObj::IsSmooth(pSocketObj);
  947. return CatAndPost(pSocketObj, pBuffer, iLength, isPostSend);
  948. }
  949. int CTcpAgent::SendDirect(TSocketObj* pSocketObj, const BYTE* pBuffer, int iLength)
  950. {
  951. int result = NO_ERROR;
  952. int iRemain = iLength;
  953. while(iRemain > 0)
  954. {
  955. int iBufferSize = min(iRemain, (int)m_dwSocketBufferSize);
  956. TBufferObj* pBufferObj = GetFreeBufferObj(iBufferSize);
  957. memcpy(pBufferObj->buff.buf, pBuffer, iBufferSize);
  958. result = ::PostSend(pSocketObj, pBufferObj);
  959. if(result != NO_ERROR)
  960. {
  961. AddFreeBufferObj(pBufferObj);
  962. break;
  963. }
  964. iRemain -= iBufferSize;
  965. pBuffer += iBufferSize;
  966. }
  967. return result;
  968. }
  969. int CTcpAgent::CatAndPost(TSocketObj* pSocketObj, const BYTE* pBuffer, int iLength, BOOL isPostSend)
  970. {
  971. int result = NO_ERROR;
  972. pSocketObj->sndBuff.Cat(pBuffer, iLength);
  973. pSocketObj->pending += iLength;
  974. if(isPostSend && !::PostIocpSend(m_hCompletePort, pSocketObj->connID))
  975. result = ::GetLastError();
  976. return result;
  977. }
  978. int CTcpAgent::DoSend(CONNID dwConnID)
  979. {
  980. TSocketObj* pSocketObj = FindSocketObj(dwConnID);
  981. if(TSocketObj::IsValid(pSocketObj))
  982. return DoSend(pSocketObj);
  983. return ERROR_OBJECT_NOT_FOUND;
  984. }
  985. int CTcpAgent::DoSend(TSocketObj* pSocketObj)
  986. {
  987. switch(m_enSendPolicy)
  988. {
  989. case SP_PACK: return DoSendPack(pSocketObj);
  990. case SP_SAFE: return DoSendSafe(pSocketObj);
  991. default: ASSERT(FALSE); return ERROR_INVALID_INDEX;
  992. }
  993. }
  994. int CTcpAgent::DoSendPack(TSocketObj* pSocketObj)
  995. {
  996. int result = NO_ERROR;
  997. if(TSocketObj::IsPending(pSocketObj))
  998. {
  999. CCriSecLock locallock(pSocketObj->csSend);
  1000. if(TSocketObj::IsValid(pSocketObj))
  1001. result = SendItem(pSocketObj);
  1002. }
  1003. if(!IOCP_SUCCESS(result))
  1004. CheckError(pSocketObj, SO_SEND, result);
  1005. return result;
  1006. }
  1007. int CTcpAgent::DoSendSafe(TSocketObj* pSocketObj)
  1008. {
  1009. int result = NO_ERROR;
  1010. if(TSocketObj::IsPending(pSocketObj) && TSocketObj::IsSmooth(pSocketObj))
  1011. {
  1012. CCriSecLock locallock(pSocketObj->csSend);
  1013. if(TSocketObj::IsPending(pSocketObj) && TSocketObj::IsSmooth(pSocketObj))
  1014. {
  1015. pSocketObj->smooth = FALSE;
  1016. result = SendItem(pSocketObj);
  1017. if(result == NO_ERROR)
  1018. pSocketObj->smooth = TRUE;
  1019. }
  1020. }
  1021. if(!IOCP_SUCCESS(result))
  1022. CheckError(pSocketObj, SO_SEND, result);
  1023. return result;
  1024. }
  1025. int CTcpAgent::SendItem(TSocketObj* pSocketObj)
  1026. {
  1027. int result = NO_ERROR;
  1028. while(pSocketObj->sndBuff.Size() > 0)
  1029. {
  1030. ::InterlockedIncrement(&pSocketObj->sndCount);
  1031. TBufferObj* pBufferObj = pSocketObj->sndBuff.PopFront();
  1032. int iBufferSize = pBufferObj->buff.len;
  1033. ASSERT(iBufferSize > 0 && iBufferSize <= (int)m_dwSocketBufferSize);
  1034. pSocketObj->pending -= iBufferSize;
  1035. result = ::PostSendNotCheck(pSocketObj, pBufferObj);
  1036. if(result != NO_ERROR)
  1037. {
  1038. if(result != WSA_IO_PENDING)
  1039. AddFreeBufferObj(pBufferObj);
  1040. break;
  1041. }
  1042. }
  1043. return result;
  1044. }
  1045. BOOL CTcpAgent::SendSmallFile(CONNID dwConnID, LPCTSTR lpszFileName, const LPWSABUF pHead, const LPWSABUF pTail)
  1046. {
  1047. ASSERT(lpszFileName != nullptr);
  1048. CAtlFile file;
  1049. HRESULT hr = file.Create(lpszFileName, GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
  1050. if(SUCCEEDED(hr))
  1051. {
  1052. ULONGLONG ullLen;
  1053. hr = file.GetSize(ullLen);
  1054. if(SUCCEEDED(hr))
  1055. {
  1056. ULONGLONG ullTotal = ullLen + (pHead ? pHead->len : 0) + (pTail ? pTail->len : 0);
  1057. if(ullLen > 0 && ullTotal <= MAX_SMALL_FILE_SIZE)
  1058. {
  1059. CAtlFileMapping<> fmap;
  1060. hr = fmap.MapFile(file);
  1061. if(SUCCEEDED(hr))
  1062. {
  1063. WSABUF bufs[3] = {0};
  1064. bufs[1].len = (ULONG)ullLen;
  1065. bufs[1].buf = fmap;
  1066. if(pHead) memcpy(&bufs[0], pHead, sizeof(WSABUF));
  1067. if(pTail) memcpy(&bufs[2], pTail, sizeof(WSABUF));
  1068. return SendPackets(dwConnID, bufs, 3);
  1069. }
  1070. }
  1071. else if(ullLen == 0)
  1072. hr = HRESULT_FROM_WIN32(ERROR_FILE_INVALID);
  1073. else
  1074. hr = HRESULT_FROM_WIN32(ERROR_FILE_TOO_LARGE);
  1075. }
  1076. }
  1077. ::SetLastError(hr & 0x0000FFFF);
  1078. return FALSE;
  1079. }
  1080. void CTcpAgent::CheckError(TSocketObj* pSocketObj, EnSocketOperation enOperation, int iErrorCode)
  1081. {
  1082. if(iErrorCode != WSAENOTSOCK && iErrorCode != ERROR_OPERATION_ABORTED)
  1083. AddFreeSocketObj(pSocketObj, SCF_ERROR, enOperation, iErrorCode);
  1084. }