fix thread pop from queue

pull/1/head
deffpuzzl 2018-06-12 18:26:59 +08:00
parent ac3ec27fd5
commit faa99c11a2
5 changed files with 13 additions and 17 deletions

View File

@ -32,7 +32,7 @@ int main(int argc, char *argv[])
SATvm *pstSavm = (SATvm *)pGetSATvm(); SATvm *pstSavm = (SATvm *)pGetSATvm();
// if(RC_SUCC != lCreateQueue(pstSavm, QUEUE_USER_INFO, 1000000, sizeof(szMsg), "")) // if(RC_SUCC != lCreateQueue(pstSavm, QUEUE_USER_INFO, 1000000, sizeof(szMsg), ""))
if(RC_SUCC != lTableQueue(pstSavm, QUEUE_USER_INFO, 1000000, lQueueUserInfo)) if(RC_SUCC != lTableQueue(pstSavm, QUEUE_USER_INFO, 50000, lQueueUserInfo))
{ {
fprintf(stderr, "create queue %d failed, err: %s\n", QUEUE_USER_INFO, sGetTError(pstSavm->m_lErrno)); fprintf(stderr, "create queue %d failed, err: %s\n", QUEUE_USER_INFO, sGetTError(pstSavm->m_lErrno));
return RC_FAIL; return RC_FAIL;

View File

@ -717,6 +717,7 @@ extern void* pGetBoot();
extern long lDefaultBoot(); extern long lDefaultBoot();
extern TBoot* pBootInitial(); extern TBoot* pBootInitial();
extern size_t lGetTblRow(TABLE t); extern size_t lGetTblRow(TABLE t);
extern size_t lGetTableSize(TABLE t);
extern long lGetPermit(TABLE t); extern long lGetPermit(TABLE t);
extern long lGetRowSize(TABLE t); extern long lGetRowSize(TABLE t);
extern TblDef* pGetTblDef(TABLE t); extern TblDef* pGetTblDef(TABLE t);

View File

@ -117,6 +117,7 @@ long _lPop(SATvm *pstSavm, void *pvAddr, void *pvOut, Timesp *tm)
break; break;
} }
retry:
/* at least cost one vaild */ /* at least cost one vaild */
if(pv->m_lMaxRow > (nPos = __sync_add_and_fetch(&pv->m_lListOfs, 1))) if(pv->m_lMaxRow > (nPos = __sync_add_and_fetch(&pv->m_lListOfs, 1)))
; ;
@ -125,10 +126,7 @@ long _lPop(SATvm *pstSavm, void *pvAddr, void *pvOut, Timesp *tm)
ps = (PSHTruck)pGetNode(pvAddr, pv->m_lData + pv->m_lTruck * nPos); ps = (PSHTruck)pGetNode(pvAddr, pv->m_lData + pv->m_lTruck * nPos);
if(IS_TRUCK_NULL(ps)) if(IS_TRUCK_NULL(ps))
{ goto retry;
pstSavm->m_lErrno = SVR_EXCEPTION;
return RC_FAIL;
}
memcpy(pvOut, ps->m_pvData, pv->m_lReSize); memcpy(pvOut, ps->m_pvData, pv->m_lReSize);
SET_DATA_TRUCK(ps, DATA_TRUCK_NULL); SET_DATA_TRUCK(ps, DATA_TRUCK_NULL);

View File

@ -39,6 +39,7 @@ extern long lShutdownTvm();
extern void vSetNode(char *s); extern void vSetNode(char *s);
extern long lStartupTvm(TBoot *pstBoot); extern long lStartupTvm(TBoot *pstBoot);
extern long lMountTable(SATvm *pstSavm, char *pszFile); extern long lMountTable(SATvm *pstSavm, char *pszFile);
extern long lUnuseDump(SATvm *pstSavm, TABLE t);
/************************************************************************************************* /*************************************************************************************************
descriptionget stvm version descriptionget stvm version
@ -4051,9 +4052,7 @@ long lStopSystem(TBoot *pstBoot, char *pszApp)
snprintf(szCmd, sizeof(szCmd), "ps -u %s|grep -E \"%s|%s\"|awk '{print $1}'", snprintf(szCmd, sizeof(szCmd), "ps -u %s|grep -E \"%s|%s\"|awk '{print $1}'",
getenv("LOGNAME"), TVM_LOCAL_SERV, TVM_REMOTE_DOM); getenv("LOGNAME"), TVM_LOCAL_SERV, TVM_REMOTE_DOM);
if(!bIsTvmBoot()) return RC_SUCC; if(TVM_BOOT_SIMPLE != pstBoot->m_lBootType)
if(TVM_BOOT_CLUSTER == pstBoot->m_lBootType)
lOfflineNotify(pstSavm, pstBoot->m_lBootPort); lOfflineNotify(pstSavm, pstBoot->m_lBootPort);
if(NULL == (fp = popen(szCmd, "r"))) if(NULL == (fp = popen(szCmd, "r")))

View File

@ -1841,7 +1841,7 @@ long lPollRequest(SATvm *pstSovm, SKCon *pstCon, TFace *pstFace, void *pstVoi
if(!pstRun->m_bAttch || !pstRun->m_pvAddr) if(!pstRun->m_bAttch || !pstRun->m_pvAddr)
{ {
//Tlog("initial table:%d, %d, %d", pstFace->m_table, pstFace->m_enum, pstRun->m_bAttch); //Tlog("initial table:%d, %d, %d", pstFace->m_table, pstFace->m_enum, pstRun->m_bAttch);
if(RC_SUCC != lInitSATvm(pstSavm, pstFace->m_table)) if(RC_SUCC != lAttchTable(pstSavm, pstFace->m_table))
{ {
pstFace->m_lRows = 0; pstFace->m_lRows = 0;
pstFace->m_lErrno = pstSovm->m_lErrno; pstFace->m_lErrno = pstSovm->m_lErrno;
@ -1942,6 +1942,7 @@ void* vEpollListen(void *pvParam)
TFree(pstCon->pstFace); TFree(pstCon->pstFace);
TFree(pstCon->pstVoid); TFree(pstCon->pstVoid);
close(pstCon->m_skSock); close(pstCon->m_skSock);
TFree(pstCon);
continue; continue;
} }
@ -1977,6 +1978,7 @@ void* vEpollListen(void *pvParam)
TFree(pstCon->pstFace); TFree(pstCon->pstFace);
TFree(pstCon->pstVoid); TFree(pstCon->pstVoid);
close(pstCon->m_skSock); close(pstCon->m_skSock);
TFree(pstCon);
} }
} }
} }
@ -2629,11 +2631,7 @@ long lOfflineNotify(SATvm *pstSavm, long lPort)
stFace.m_table = SYS_TVM_INDEX; stFace.m_table = SYS_TVM_INDEX;
stFace.m_enum = OPERATE_DOMLOFF; stFace.m_enum = OPERATE_DOMLOFF;
if(RC_FAIL == (skSock = skConnectServer(pstSavm, LOCAL_HOST_IP, lPort, false, 5))) if(RC_FAIL == (skSock = skConnectServer(pstSavm, LOCAL_HOST_IP, lPort, false, 5)))
{
fprintf(stderr, "Connect server %s:%ld error, %s\n", LOCAL_HOST_IP, lPort,
sGetTError(pstSavm->m_lErrno));
return RC_FAIL; return RC_FAIL;
}
if(sizeof(TFace) != lSendBuffer(skSock, (void *)&stFace, sizeof(TFace))) if(sizeof(TFace) != lSendBuffer(skSock, (void *)&stFace, sizeof(TFace)))
{ {