From 3bd719bc31c407f638aeb11a15ea61d178b4701c Mon Sep 17 00:00:00 2001 From: deffpuzzl Date: Wed, 25 Jul 2018 15:15:59 +0800 Subject: [PATCH] fix socket multithread compete --- src/tcp.c | 76 ++++++++++++++++++++++++++++++++++++----------- tbl_user_info.def | 2 +- 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/src/tcp.c b/src/tcp.c index e9a22fd..9186a76 100644 --- a/src/tcp.c +++ b/src/tcp.c @@ -45,20 +45,23 @@ void* pProtocaJava(SATvm *pstSavm, void *pstVoid, TFace *pstFace, void *pvBuff if(MAX(f->m_lRows, f->m_lDLen) > DATA_MAX_LEN) \ { \ s->m_lErrno = RECD_TOO_LONG; \ - goto LISTEN_ERROR; \ + lSendBuffer(c->m_skSock, f, sizeof(TFace)); \ + goto LISTEN_REDO; \ } \ c->m_lBuffer = MAX(f->m_lRows, f->m_lDLen); \ if(NULL == (c->pstFace = (void *)realloc(c->pstFace, c->m_lBuffer + sizeof(TFace)))) \ { \ - s->m_lErrno = MALLC_MEM_ERR; \ - goto LISTEN_ERROR; \ + s->m_lErrno = MALLC_MEM_ERR; \ + lSendBuffer(c->m_skSock, f, sizeof(TFace)); \ + goto LISTEN_REDO; \ } \ f = (TFace *)c->pstFace; \ c->pvData = c->pstFace + sizeof(TFace); \ if(NULL == (c->pstVoid = (void *)realloc(c->pstVoid, c->m_lBuffer + sizeof(TFace)))) \ { \ s->m_lErrno = MALLC_MEM_ERR; \ - goto LISTEN_ERROR; \ + lSendBuffer(c->m_skSock, f, sizeof(TFace)); \ + goto LISTEN_REDO; \ } \ } @@ -149,9 +152,16 @@ void vTraceLog(const char *pszFile, int nLine, const char *fmt, ...) ftime(&tb); ptm = localtime(&tb.time); + +#ifdef DEBUG fprintf(fp, "F=%-8s L=%-5d P=%-7d T=%-7ld T=%04d%02d%02d %02d%02d%02d:%03d %s\n", pszFile, nLine, getpid(), syscall(SYS_gettid), ptm->tm_year + 1900, ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, tb.millitm, szMsg); +#else + fprintf(fp, "P=%-7d T=%-7ld T=%04d%02d%02d %02d%02d%02d:%03d %s\n", + getpid(), syscall(SYS_gettid), ptm->tm_year + 1900, ptm->tm_mon + 1, + ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, tb.millitm, szMsg); +#endif fclose(fp); return ; } @@ -562,6 +572,7 @@ int lRecvUnblock(SATvm *pstSavm, int skSock, char *so, int read) *************************************************************************************************/ int lRecvBuffer(int skSock, char *pszRecv, int lRead) { + extern int errno; int lByte = 0, lRecv = 0; errno = 0; @@ -600,6 +611,7 @@ int lRecvBuffer(int skSock, char *pszRecv, int lRead) *************************************************************************************************/ int lSendBuffer(BSock skSock, void *pszSend, int lSend) { + extern int errno; long lByte = 0, lLeft = lSend, lWrite = 0; errno = 0; @@ -1366,7 +1378,10 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat pstSavm->m_bWork = pstCon->m_bWork; pstSavm->m_pstWork = pstCon->m_pstWork; if(RC_SUCC != lReplace(pstSavm, pvData)) + { pstFace->m_lErrno = pstSavm->m_lErrno; + Tlog("TvmReplace error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno)); + } else { pstFace->m_lRows = pstSavm->m_lEffect; @@ -1379,7 +1394,10 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat pstSavm->m_bWork = pstCon->m_bWork; pstSavm->m_pstWork = pstCon->m_pstWork; if(RC_SUCC != lUpdate(pstSavm, pvData)) + { pstFace->m_lErrno = pstSavm->m_lErrno; + Tlog("TvmUpdate error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno)); + } else { pstFace->m_lRows = pstSavm->m_lEffect; @@ -1392,7 +1410,10 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat pstSavm->m_bWork = pstCon->m_bWork; pstSavm->m_pstWork = pstCon->m_pstWork; if(RC_SUCC != lDelete(pstSavm)) + { pstFace->m_lErrno = pstSavm->m_lErrno; + Tlog("TvmDelete error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno)); + } else { pstFace->m_lDLen = pstSavm->m_lEType; @@ -1406,18 +1427,24 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat pstSavm->m_pstWork = pstCon->m_pstWork; if(RC_SUCC == lReplace(pstSavm, pvData)) pstCon->m_pstWork = pstSavm->m_pstWork; + else + Tlog("AsyReplace error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno)); return RC_SUCC; case OPERAYS_UPDATE: pstSavm->m_bWork = pstCon->m_bWork; pstSavm->m_pstWork = pstCon->m_pstWork; if(RC_SUCC == lUpdate(pstSavm, pvData)) pstCon->m_pstWork = pstSavm->m_pstWork; + else + Tlog("AsyUpdate error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno)); return RC_SUCC; case OPERAYS_DELETE: pstSavm->m_bWork = pstCon->m_bWork; pstSavm->m_pstWork = pstCon->m_pstWork; if(RC_SUCC == lDelete(pstSavm)) pstCon->m_pstWork = pstSavm->m_pstWork; + else + Tlog("AsyDelete error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno)); return RC_SUCC; case OPERAYS_QUEPUSH: if(RC_SUCC != lPush(pstSavm)) @@ -1431,6 +1458,8 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat pstSavm->m_pstWork = pstCon->m_pstWork; if(RC_SUCC == lInsert(pstSavm)) pstCon->m_pstWork = pstSavm->m_pstWork; + else + Tlog("AsyInsert error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno)); return RC_SUCC; case OPERATE_QUEPOPS: lPopup(pstSavm, pstFace->m_lFind, pstFace->m_lErrno, (size_t *)&pstFace->m_lRows, @@ -1462,7 +1491,10 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat return RC_SUCC; case OPERATE_QUEPUSH: if(RC_SUCC != lPush(pstSavm)) + { pstFace->m_lErrno = pstSavm->m_lErrno; + Tlog("TvmPush error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno)); + } else pstFace->m_lRows = pstSavm->m_lEffect; lSendBuffer(pstCon->m_skSock, (void *)pstFace, sizeof(TFace)); @@ -1471,7 +1503,10 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat pstSavm->m_bWork = pstCon->m_bWork; pstSavm->m_pstWork = pstCon->m_pstWork; if(RC_SUCC != lInsert(pstSavm)) + { pstFace->m_lErrno = pstSavm->m_lErrno; + Tlog("TvmInsert error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno)); + } else pstFace->m_lRows = pstSavm->m_lEffect; pstCon->m_pstWork = pstSavm->m_pstWork; @@ -1479,7 +1514,10 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat return RC_SUCC; case OPERATE_TRCATE: if(RC_SUCC != lTruncate(pstSavm, pstFace->m_table)) + { pstFace->m_lErrno = pstSavm->m_lErrno; + Tlog("TvmTruncate error, %d, %s", pstSavm->m_lErrno, sGetTError(pstSavm->m_lErrno)); + } else pstFace->m_lRows = pstSavm->m_lEffect; lSendBuffer(pstCon->m_skSock, (void *)pstFace, sizeof(TFace)); @@ -1754,7 +1792,7 @@ long lEpollAccept(SATvm *pstSavm, BSock epfd, SKCon *pc) memset(&event, 0, sizeof(event)); event.data.ptr = pstCon; // event.events = EPOLLIN | EPOLLET; - event.events = EPOLLIN; + event.events = EPOLLIN | EPOLLONESHOT; if(0 != epoll_ctl(epfd, EPOLL_CTL_ADD, skAccept, &event)) { close(skAccept); @@ -1911,6 +1949,7 @@ void* vEpollListen(void *pvParam) TThread *pstTrd = (TThread *)pvParam; SATvm *pstSavm = (SATvm *)pCloneSATvm(); + signal(SIGPIPE, SIG_IGN); pthread_detach(pthread_self()); if(RC_SUCC != lTvmBuffer(pstSavm)) return NULL; @@ -1939,23 +1978,24 @@ void* vEpollListen(void *pvParam) } pstCon->m_pstWork = NULL; epoll_ctl(pstTrd->m_epfd, EPOLL_CTL_DEL, pstCon->m_skSock, &events[i]); + close(pstCon->m_skSock); TFree(pstCon->pstVoid); TFree(pstCon->pstFace); - close(pstCon->m_skSock); TFree(pstCon); continue; } pstCon->m_lRead += lRet; if(sizeof(TFace) != pstCon->m_lRead) // more data wait to read - continue; - + goto LISTEN_REDO; + pstFace = (TFace *)pstCon->pstFace; if(TVM_MAX_TABLE <= pstFace->m_table) { pstCon->m_lRead = 0; pstFace->m_lErrno = RESOU_DISABLE; - goto LISTEN_ERROR; + lSendBuffer(pstCon->m_skSock, pstCon->pstFace, sizeof(TFace)); + goto LISTEN_REDO; } checkrequest(pstSavm, pstCon, pstFace); @@ -1975,18 +2015,17 @@ void* vEpollListen(void *pvParam) } pstCon->m_pstWork = NULL; epoll_ctl(pstTrd->m_epfd, EPOLL_CTL_DEL, pstCon->m_skSock, &events[i]); + close(pstCon->m_skSock); TFree(pstCon->pstVoid); TFree(pstCon->pstFace); - close(pstCon->m_skSock); TFree(pstCon); } } - } - continue; -LISTEN_ERROR: - lSendBuffer(pstCon->m_skSock, pstCon->pstFace, sizeof(TFace)); - continue; +LISTEN_REDO: + events[i].events = EPOLLIN|EPOLLONESHOT; + epoll_ctl(pstTrd->m_epfd, EPOLL_CTL_MOD, pstCon->m_skSock, &events[i]); + } } close(pstTrd->m_epfd); @@ -2491,9 +2530,9 @@ long lBootLocal(SATvm *pstSavm, TBoot *pstBoot, Benum eMode) vRemoteResouce(pstSavm, eMode, pstBoot->m_lBootPort); vTvmDisconnect(pstSavm); - for(i = 0; i < pstBoot->m_lBootExec; i ++) + for(i = 0; i < pstBoot->m_lBootExec && i < 500; i ++) { - usleep(500); + usleep(5000); if(!pstTrd[i].m_bRun) continue; @@ -5015,6 +5054,9 @@ long lTvmInsert(SATvm *pstSavm) checkbuffer(pstSavm, pstRun, 1); memcpy(pstRun->pstVoid + sizeof(TFace), pstSavm->pstVoid, pstSavm->lSize); + +fprintf(stderr, "Benum:%d, Uenum:%d, TABLE:%d, uint:%d, uint:%d, size_t:%d\n", pstFace->m_enum, + pstFace->m_lFind, pstFace->m_table, pstFace->m_lDLen, pstFace->m_lErrno, pstFace->m_lRows); if(lWrite != lSendBuffer(pstSavm->m_skSock, (void *)pstRun->pstVoid, lWrite)) { pstSavm->m_lErrno = SOCK_COM_EXCP; diff --git a/tbl_user_info.def b/tbl_user_info.def index ebf7908..1c3e145 100644 --- a/tbl_user_info.def +++ b/tbl_user_info.def @@ -1,5 +1,5 @@ set TABLE=20 -set TABLESPACE=10000 +set TABLESPACE=50000 create table TBL_USER_INFO (