fix socket multithread compete

pull/1/head
deffpuzzl 2018-07-25 15:15:59 +08:00
parent e8248418ca
commit 3bd719bc31
2 changed files with 60 additions and 18 deletions

View File

@ -45,20 +45,23 @@ void* pProtocaJava(SATvm *pstSavm, void *pstVoid, TFace *pstFace, void *pvBuff
if(MAX(f->m_lRows, f->m_lDLen) > DATA_MAX_LEN) \ if(MAX(f->m_lRows, f->m_lDLen) > DATA_MAX_LEN) \
{ \ { \
s->m_lErrno = RECD_TOO_LONG; \ s->m_lErrno = RECD_TOO_LONG; \
goto LISTEN_ERROR; \ lSendBuffer(c->m_skSock, f, sizeof(TFace)); \
goto LISTEN_REDO; \
} \ } \
c->m_lBuffer = MAX(f->m_lRows, f->m_lDLen); \ c->m_lBuffer = MAX(f->m_lRows, f->m_lDLen); \
if(NULL == (c->pstFace = (void *)realloc(c->pstFace, c->m_lBuffer + sizeof(TFace)))) \ if(NULL == (c->pstFace = (void *)realloc(c->pstFace, c->m_lBuffer + sizeof(TFace)))) \
{ \ { \
s->m_lErrno = MALLC_MEM_ERR; \ s->m_lErrno = MALLC_MEM_ERR; \
goto LISTEN_ERROR; \ lSendBuffer(c->m_skSock, f, sizeof(TFace)); \
goto LISTEN_REDO; \
} \ } \
f = (TFace *)c->pstFace; \ f = (TFace *)c->pstFace; \
c->pvData = c->pstFace + sizeof(TFace); \ c->pvData = c->pstFace + sizeof(TFace); \
if(NULL == (c->pstVoid = (void *)realloc(c->pstVoid, c->m_lBuffer + sizeof(TFace)))) \ if(NULL == (c->pstVoid = (void *)realloc(c->pstVoid, c->m_lBuffer + sizeof(TFace)))) \
{ \ { \
s->m_lErrno = MALLC_MEM_ERR; \ s->m_lErrno = MALLC_MEM_ERR; \
goto LISTEN_ERROR; \ lSendBuffer(c->m_skSock, f, sizeof(TFace)); \
goto LISTEN_REDO; \
} \ } \
} }
@ -149,9 +152,16 @@ void vTraceLog(const char *pszFile, int nLine, const char *fmt, ...)
ftime(&tb); ftime(&tb);
ptm = localtime(&tb.time); ptm = localtime(&tb.time);
#ifdef DEBUG
fprintf(fp, "F=%-8s L=%-5d P=%-7d T=%-7ld T=%04d%02d%02d %02d%02d%02d:%03d %s\n", fprintf(fp, "F=%-8s L=%-5d P=%-7d T=%-7ld T=%04d%02d%02d %02d%02d%02d:%03d %s\n",
pszFile, nLine, getpid(), syscall(SYS_gettid), ptm->tm_year + 1900, ptm->tm_mon + 1, pszFile, nLine, getpid(), syscall(SYS_gettid), ptm->tm_year + 1900, ptm->tm_mon + 1,
ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, tb.millitm, szMsg); ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, tb.millitm, szMsg);
#else
fprintf(fp, "P=%-7d T=%-7ld T=%04d%02d%02d %02d%02d%02d:%03d %s\n",
getpid(), syscall(SYS_gettid), ptm->tm_year + 1900, ptm->tm_mon + 1,
ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, tb.millitm, szMsg);
#endif
fclose(fp); fclose(fp);
return ; return ;
} }
@ -562,6 +572,7 @@ int lRecvUnblock(SATvm *pstSavm, int skSock, char *so, int read)
*************************************************************************************************/ *************************************************************************************************/
int lRecvBuffer(int skSock, char *pszRecv, int lRead) int lRecvBuffer(int skSock, char *pszRecv, int lRead)
{ {
extern int errno;
int lByte = 0, lRecv = 0; int lByte = 0, lRecv = 0;
errno = 0; errno = 0;
@ -600,6 +611,7 @@ int lRecvBuffer(int skSock, char *pszRecv, int lRead)
*************************************************************************************************/ *************************************************************************************************/
int lSendBuffer(BSock skSock, void *pszSend, int lSend) int lSendBuffer(BSock skSock, void *pszSend, int lSend)
{ {
extern int errno;
long lByte = 0, lLeft = lSend, lWrite = 0; long lByte = 0, lLeft = lSend, lWrite = 0;
errno = 0; errno = 0;
@ -1366,7 +1378,10 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat
pstSavm->m_bWork = pstCon->m_bWork; pstSavm->m_bWork = pstCon->m_bWork;
pstSavm->m_pstWork = pstCon->m_pstWork; pstSavm->m_pstWork = pstCon->m_pstWork;
if(RC_SUCC != lReplace(pstSavm, pvData)) if(RC_SUCC != lReplace(pstSavm, pvData))
{
pstFace->m_lErrno = pstSavm->m_lErrno; pstFace->m_lErrno = pstSavm->m_lErrno;
Tlog("TvmReplace error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno));
}
else else
{ {
pstFace->m_lRows = pstSavm->m_lEffect; pstFace->m_lRows = pstSavm->m_lEffect;
@ -1379,7 +1394,10 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat
pstSavm->m_bWork = pstCon->m_bWork; pstSavm->m_bWork = pstCon->m_bWork;
pstSavm->m_pstWork = pstCon->m_pstWork; pstSavm->m_pstWork = pstCon->m_pstWork;
if(RC_SUCC != lUpdate(pstSavm, pvData)) if(RC_SUCC != lUpdate(pstSavm, pvData))
{
pstFace->m_lErrno = pstSavm->m_lErrno; pstFace->m_lErrno = pstSavm->m_lErrno;
Tlog("TvmUpdate error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno));
}
else else
{ {
pstFace->m_lRows = pstSavm->m_lEffect; pstFace->m_lRows = pstSavm->m_lEffect;
@ -1392,7 +1410,10 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat
pstSavm->m_bWork = pstCon->m_bWork; pstSavm->m_bWork = pstCon->m_bWork;
pstSavm->m_pstWork = pstCon->m_pstWork; pstSavm->m_pstWork = pstCon->m_pstWork;
if(RC_SUCC != lDelete(pstSavm)) if(RC_SUCC != lDelete(pstSavm))
{
pstFace->m_lErrno = pstSavm->m_lErrno; pstFace->m_lErrno = pstSavm->m_lErrno;
Tlog("TvmDelete error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno));
}
else else
{ {
pstFace->m_lDLen = pstSavm->m_lEType; pstFace->m_lDLen = pstSavm->m_lEType;
@ -1406,18 +1427,24 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat
pstSavm->m_pstWork = pstCon->m_pstWork; pstSavm->m_pstWork = pstCon->m_pstWork;
if(RC_SUCC == lReplace(pstSavm, pvData)) if(RC_SUCC == lReplace(pstSavm, pvData))
pstCon->m_pstWork = pstSavm->m_pstWork; pstCon->m_pstWork = pstSavm->m_pstWork;
else
Tlog("AsyReplace error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno));
return RC_SUCC; return RC_SUCC;
case OPERAYS_UPDATE: case OPERAYS_UPDATE:
pstSavm->m_bWork = pstCon->m_bWork; pstSavm->m_bWork = pstCon->m_bWork;
pstSavm->m_pstWork = pstCon->m_pstWork; pstSavm->m_pstWork = pstCon->m_pstWork;
if(RC_SUCC == lUpdate(pstSavm, pvData)) if(RC_SUCC == lUpdate(pstSavm, pvData))
pstCon->m_pstWork = pstSavm->m_pstWork; pstCon->m_pstWork = pstSavm->m_pstWork;
else
Tlog("AsyUpdate error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno));
return RC_SUCC; return RC_SUCC;
case OPERAYS_DELETE: case OPERAYS_DELETE:
pstSavm->m_bWork = pstCon->m_bWork; pstSavm->m_bWork = pstCon->m_bWork;
pstSavm->m_pstWork = pstCon->m_pstWork; pstSavm->m_pstWork = pstCon->m_pstWork;
if(RC_SUCC == lDelete(pstSavm)) if(RC_SUCC == lDelete(pstSavm))
pstCon->m_pstWork = pstSavm->m_pstWork; pstCon->m_pstWork = pstSavm->m_pstWork;
else
Tlog("AsyDelete error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno));
return RC_SUCC; return RC_SUCC;
case OPERAYS_QUEPUSH: case OPERAYS_QUEPUSH:
if(RC_SUCC != lPush(pstSavm)) if(RC_SUCC != lPush(pstSavm))
@ -1431,6 +1458,8 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat
pstSavm->m_pstWork = pstCon->m_pstWork; pstSavm->m_pstWork = pstCon->m_pstWork;
if(RC_SUCC == lInsert(pstSavm)) if(RC_SUCC == lInsert(pstSavm))
pstCon->m_pstWork = pstSavm->m_pstWork; pstCon->m_pstWork = pstSavm->m_pstWork;
else
Tlog("AsyInsert error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno));
return RC_SUCC; return RC_SUCC;
case OPERATE_QUEPOPS: case OPERATE_QUEPOPS:
lPopup(pstSavm, pstFace->m_lFind, pstFace->m_lErrno, (size_t *)&pstFace->m_lRows, lPopup(pstSavm, pstFace->m_lFind, pstFace->m_lErrno, (size_t *)&pstFace->m_lRows,
@ -1462,7 +1491,10 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat
return RC_SUCC; return RC_SUCC;
case OPERATE_QUEPUSH: case OPERATE_QUEPUSH:
if(RC_SUCC != lPush(pstSavm)) if(RC_SUCC != lPush(pstSavm))
{
pstFace->m_lErrno = pstSavm->m_lErrno; pstFace->m_lErrno = pstSavm->m_lErrno;
Tlog("TvmPush error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno));
}
else else
pstFace->m_lRows = pstSavm->m_lEffect; pstFace->m_lRows = pstSavm->m_lEffect;
lSendBuffer(pstCon->m_skSock, (void *)pstFace, sizeof(TFace)); lSendBuffer(pstCon->m_skSock, (void *)pstFace, sizeof(TFace));
@ -1471,7 +1503,10 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat
pstSavm->m_bWork = pstCon->m_bWork; pstSavm->m_bWork = pstCon->m_bWork;
pstSavm->m_pstWork = pstCon->m_pstWork; pstSavm->m_pstWork = pstCon->m_pstWork;
if(RC_SUCC != lInsert(pstSavm)) if(RC_SUCC != lInsert(pstSavm))
{
pstFace->m_lErrno = pstSavm->m_lErrno; pstFace->m_lErrno = pstSavm->m_lErrno;
Tlog("TvmInsert error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno));
}
else else
pstFace->m_lRows = pstSavm->m_lEffect; pstFace->m_lRows = pstSavm->m_lEffect;
pstCon->m_pstWork = pstSavm->m_pstWork; pstCon->m_pstWork = pstSavm->m_pstWork;
@ -1479,7 +1514,10 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat
return RC_SUCC; return RC_SUCC;
case OPERATE_TRCATE: case OPERATE_TRCATE:
if(RC_SUCC != lTruncate(pstSavm, pstFace->m_table)) if(RC_SUCC != lTruncate(pstSavm, pstFace->m_table))
{
pstFace->m_lErrno = pstSavm->m_lErrno; pstFace->m_lErrno = pstSavm->m_lErrno;
Tlog("TvmTruncate error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno));
}
else else
pstFace->m_lRows = pstSavm->m_lEffect; pstFace->m_lRows = pstSavm->m_lEffect;
lSendBuffer(pstCon->m_skSock, (void *)pstFace, sizeof(TFace)); lSendBuffer(pstCon->m_skSock, (void *)pstFace, sizeof(TFace));
@ -1754,7 +1792,7 @@ long lEpollAccept(SATvm *pstSavm, BSock epfd, SKCon *pc)
memset(&event, 0, sizeof(event)); memset(&event, 0, sizeof(event));
event.data.ptr = pstCon; event.data.ptr = pstCon;
// event.events = EPOLLIN | EPOLLET; // event.events = EPOLLIN | EPOLLET;
event.events = EPOLLIN; event.events = EPOLLIN | EPOLLONESHOT;
if(0 != epoll_ctl(epfd, EPOLL_CTL_ADD, skAccept, &event)) if(0 != epoll_ctl(epfd, EPOLL_CTL_ADD, skAccept, &event))
{ {
close(skAccept); close(skAccept);
@ -1911,6 +1949,7 @@ void* vEpollListen(void *pvParam)
TThread *pstTrd = (TThread *)pvParam; TThread *pstTrd = (TThread *)pvParam;
SATvm *pstSavm = (SATvm *)pCloneSATvm(); SATvm *pstSavm = (SATvm *)pCloneSATvm();
signal(SIGPIPE, SIG_IGN);
pthread_detach(pthread_self()); pthread_detach(pthread_self());
if(RC_SUCC != lTvmBuffer(pstSavm)) if(RC_SUCC != lTvmBuffer(pstSavm))
return NULL; return NULL;
@ -1939,23 +1978,24 @@ void* vEpollListen(void *pvParam)
} }
pstCon->m_pstWork = NULL; pstCon->m_pstWork = NULL;
epoll_ctl(pstTrd->m_epfd, EPOLL_CTL_DEL, pstCon->m_skSock, &events[i]); epoll_ctl(pstTrd->m_epfd, EPOLL_CTL_DEL, pstCon->m_skSock, &events[i]);
close(pstCon->m_skSock);
TFree(pstCon->pstVoid); TFree(pstCon->pstVoid);
TFree(pstCon->pstFace); TFree(pstCon->pstFace);
close(pstCon->m_skSock);
TFree(pstCon); TFree(pstCon);
continue; continue;
} }
pstCon->m_lRead += lRet; pstCon->m_lRead += lRet;
if(sizeof(TFace) != pstCon->m_lRead) // more data wait to read if(sizeof(TFace) != pstCon->m_lRead) // more data wait to read
continue; goto LISTEN_REDO;
pstFace = (TFace *)pstCon->pstFace; pstFace = (TFace *)pstCon->pstFace;
if(TVM_MAX_TABLE <= pstFace->m_table) if(TVM_MAX_TABLE <= pstFace->m_table)
{ {
pstCon->m_lRead = 0; pstCon->m_lRead = 0;
pstFace->m_lErrno = RESOU_DISABLE; pstFace->m_lErrno = RESOU_DISABLE;
goto LISTEN_ERROR; lSendBuffer(pstCon->m_skSock, pstCon->pstFace, sizeof(TFace));
goto LISTEN_REDO;
} }
checkrequest(pstSavm, pstCon, pstFace); checkrequest(pstSavm, pstCon, pstFace);
@ -1975,18 +2015,17 @@ void* vEpollListen(void *pvParam)
} }
pstCon->m_pstWork = NULL; pstCon->m_pstWork = NULL;
epoll_ctl(pstTrd->m_epfd, EPOLL_CTL_DEL, pstCon->m_skSock, &events[i]); epoll_ctl(pstTrd->m_epfd, EPOLL_CTL_DEL, pstCon->m_skSock, &events[i]);
close(pstCon->m_skSock);
TFree(pstCon->pstVoid); TFree(pstCon->pstVoid);
TFree(pstCon->pstFace); TFree(pstCon->pstFace);
close(pstCon->m_skSock);
TFree(pstCon); TFree(pstCon);
} }
} }
}
continue;
LISTEN_ERROR: LISTEN_REDO:
lSendBuffer(pstCon->m_skSock, pstCon->pstFace, sizeof(TFace)); events[i].events = EPOLLIN|EPOLLONESHOT;
continue; epoll_ctl(pstTrd->m_epfd, EPOLL_CTL_MOD, pstCon->m_skSock, &events[i]);
}
} }
close(pstTrd->m_epfd); close(pstTrd->m_epfd);
@ -2491,9 +2530,9 @@ long lBootLocal(SATvm *pstSavm, TBoot *pstBoot, Benum eMode)
vRemoteResouce(pstSavm, eMode, pstBoot->m_lBootPort); vRemoteResouce(pstSavm, eMode, pstBoot->m_lBootPort);
vTvmDisconnect(pstSavm); vTvmDisconnect(pstSavm);
for(i = 0; i < pstBoot->m_lBootExec; i ++) for(i = 0; i < pstBoot->m_lBootExec && i < 500; i ++)
{ {
usleep(500); usleep(5000);
if(!pstTrd[i].m_bRun) if(!pstTrd[i].m_bRun)
continue; continue;
@ -5015,6 +5054,9 @@ long lTvmInsert(SATvm *pstSavm)
checkbuffer(pstSavm, pstRun, 1); checkbuffer(pstSavm, pstRun, 1);
memcpy(pstRun->pstVoid + sizeof(TFace), pstSavm->pstVoid, pstSavm->lSize); memcpy(pstRun->pstVoid + sizeof(TFace), pstSavm->pstVoid, pstSavm->lSize);
fprintf(stderr, "Benum:%d, Uenum:%d, TABLE:%d, uint:%d, uint:%d, size_t:%d\n", pstFace->m_enum,
pstFace->m_lFind, pstFace->m_table, pstFace->m_lDLen, pstFace->m_lErrno, pstFace->m_lRows);
if(lWrite != lSendBuffer(pstSavm->m_skSock, (void *)pstRun->pstVoid, lWrite)) if(lWrite != lSendBuffer(pstSavm->m_skSock, (void *)pstRun->pstVoid, lWrite))
{ {
pstSavm->m_lErrno = SOCK_COM_EXCP; pstSavm->m_lErrno = SOCK_COM_EXCP;

View File

@ -1,5 +1,5 @@
set TABLE=20 set TABLE=20
set TABLESPACE=10000 set TABLESPACE=50000
create table TBL_USER_INFO create table TBL_USER_INFO
( (