GrabBag/VrNets/tcpServer/Src/CYServerTask.cpp
2025-09-14 14:51:38 +08:00

264 lines
5.4 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "CYServerTask.h"
CYServerTask::CYServerTask()
: m_maxSocket(INVALID_SOCKET)
, m_tTask(nullptr)
, m_bWork(false)
, m_eWorkStatus(WORK_INIT)
, m_fRecv(nullptr)
, m_fException(nullptr)
, m_bUseProtocol(false)
{
m_vClient.clear();
FD_ZERO(&m_fdRead);
FD_ZERO(&m_fdExp);
m_pRecvBuf = new char[RECV_DATA_LEN];
m_pProtocalHead = new ProtocolHead;
}
CYServerTask::~CYServerTask()
{
delete m_tTask;
m_tTask = nullptr;
m_vClient.clear();
FD_ZERO(&m_fdRead);
FD_ZERO(&m_fdExp);
delete[] m_pRecvBuf;
delete m_pProtocalHead;
}
bool CYServerTask::StartTask(FunTCPServerRecv fRecv, bool bRecvSelfProtocol)
{
//1初始化线程
m_bWork = true;
//2赋值回调函数
m_fRecv = fRecv;
m_bUseProtocol = bRecvSelfProtocol;
if (!m_tTask)
{
m_tTask = new std::thread(std::mem_fn(&CYServerTask::_OnProcessEvent), this);
m_tTask->detach();
}
else
{
//发送信号进行初始化
while (WORK_RUNING != m_eWorkStatus)
{
std::unique_lock<std::mutex> lock(m_mutexWork);
m_cvWork.notify_one();
}
}
return true;
}
void CYServerTask::SetExceptionCallback(std::function<void(const TCPClient*)> fException)
{
m_fException = fException;
}
bool CYServerTask::StopTask()
{
m_bWork = false;
///<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>˿<EFBFBD>ʼ<EFBFBD>ŵȴ<C5B5><C8B4><EFBFBD><EFBFBD><EFBFBD>
if (m_tTask)
{
while (WORK_WAITSINGAL != m_eWorkStatus)
{
std::chrono::milliseconds milTime(10);
std::this_thread::sleep_for(milTime);
}
m_fRecv = nullptr;
delete m_tTask;
m_tTask = nullptr;
}
return true;
}
///<2F><><EFBFBD>ӿͻ<D3BF><CDBB><EFBFBD>
bool CYServerTask::AddClient(TCPClient * pClient)
{
if(nullptr != pClient && m_vClient.size() < FD_SETSIZE)
{
std::lock_guard<std::mutex> mLck(m_mClient);
//<2F><>¼Task<73>еĿͻ<C4BF><CDBB><EFBFBD>
m_vClient.push_back(pClient);
//<2F><><EFBFBD>ӵ<EFBFBD>select<63><74>fd<66><64><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
FD_SET(pClient->m_nFD, &m_fdRead);
FD_SET(pClient->m_nFD, &m_fdExp);
//<2F>ҵ<EFBFBD><D2B5><EFBFBD><EFBFBD>FD
m_maxSocket = m_maxSocket > pClient->m_nFD ? m_maxSocket : pClient->m_nFD;
return true;
}
else
{
return false;
}
}
///<2F>Ƴ<EFBFBD><C6B3>ͻ<EFBFBD><CDBB><EFBFBD>
bool CYServerTask::DelClient(const TCPClient * pClient)
{
bool bRet = false;
std::lock_guard<std::mutex> mLck(m_mClient);
std::vector<TCPClient*>::iterator iter = m_vClient.begin();
m_maxSocket = INVALID_SOCKET;
while (iter != m_vClient.end())
{
if (*iter == pClient)
{
m_vClient.erase(iter);
FD_CLR(pClient->m_nFD, &m_fdRead);
FD_CLR(pClient->m_nFD, &m_fdExp);
bRet = true;
break;
}
}
iter = m_vClient.begin();
while (iter != m_vClient.end())
{
//<2F><><EFBFBD><EFBFBD>ͳ<EFBFBD><CDB3><EFBFBD><EFBFBD><EFBFBD>ֵ
m_maxSocket = m_maxSocket > (*iter)->m_nFD ? m_maxSocket : (*iter)->m_nFD;
iter++;
}
return bRet;
}
///<2F><>ȡTask<73>пͻ<D0BF><CDBB>˵<EFBFBD><CBB5><EFBFBD>Ŀ
int CYServerTask::GetClientNum()
{
return (int)m_vClient.size();
}
void CYServerTask::_OnProcessEvent()
{
while (true)
{
if (!m_bWork)
{
m_eWorkStatus = WORK_WAITSINGAL;
std::unique_lock<std::mutex> lock(m_mutexWork);
m_cvWork.wait(lock);
if (WORK_CLOSE == m_eWorkStatus)
{
break;
}
else
{
m_eWorkStatus = WORK_RUNING;
}
}
else
{
if (m_vClient.empty())
{
std::chrono::milliseconds milTime(1);
std::this_thread::sleep_for(milTime);
continue;
}
///<2F><>ʱ<EFBFBD>ͻ<EFBFBD><CDBB><EFBFBD>vector
std::vector<TCPClient*> vTCPClient;
fd_set fdRead;
fd_set fdExp;
{
std::unique_lock<std::mutex> lock(m_mClient);
vTCPClient = m_vClient;
fdRead = m_fdRead;
fdExp = m_fdExp;
}
struct timeval sWaitTime = { 0, 1000 };
int nCount = select((int)m_maxSocket + 1, &fdRead, nullptr, &fdExp, &sWaitTime);
if (nCount <= 0)
{
continue;
}
for (int i = (int)vTCPClient.size() - 1; i >= 0; i--)
{
TCPClient* tmpClient = vTCPClient[i];
if (FD_ISSET(tmpClient->m_nFD, &fdRead))
{
//<2F><><EFBFBD>ܲ<EFBFBD><DCB2>ص<EFBFBD>
if (!_OnProcessData(tmpClient))
{
if (m_fException)
{
m_fException(tmpClient);
}
}
}
}
}
}
m_eWorkStatus = WORK_EXIT;
}
bool CYServerTask::_OnProcessData(TCPClient* pClient)
{
const int nRecvLen = RECV_DATA_LEN;
//<2F><>Э<EFBFBD><D0AD><EFBFBD><EFBFBD><EFBFBD>
if(!m_bUseProtocol)
{
int nCount = recv(pClient->m_nFD, m_pRecvBuf, nRecvLen, 0);
if (nCount > 0 && m_fRecv)
{
m_fRecv(pClient, m_pRecvBuf, nCount);
}
return nCount > 0;
}
//Э<><D0AD><EFBFBD><EFBFBD><EFBFBD>
int nAllDataLen = 0;
int recv_len = 0;
int nRet = 0;
int nDataAddr = 6 * sizeof(int);
//recv head
do
{
if ((recv_len = recv(pClient->m_nFD, (char *)(m_pProtocalHead) + nAllDataLen, nDataAddr - nAllDataLen, 0)) <= 0)
{
printf("read head failed \n");
return false;
}
nAllDataLen += recv_len;
} while (nAllDataLen < nDataAddr);
nAllDataLen = 0;
//recv data
while (nAllDataLen < m_pProtocalHead->nLen)
{
recv_len = recv(pClient->m_nFD, (char *)(m_pProtocalHead) + nDataAddr + nAllDataLen,
m_pProtocalHead->nLen - nAllDataLen, 0);
if (recv_len <= 0)
{
printf("read data len : %d failed [%d]\n", m_pProtocalHead->nLen - nAllDataLen, recv_len);
return false;
}
nAllDataLen += recv_len;
}
nAllDataLen = 0;
if (m_fRecv)
{
m_fRecv(pClient, (char *)m_pProtocalHead, m_pProtocalHead->nLen + nDataAddr);
}
//printf("cmd = %x len = %d \n", protocol.nCmd, protocol.nLen);
return 0 == nRet;
}