From 221404ec875ae6c19de6be894e13d35af3f55c47 Mon Sep 17 00:00:00 2001 From: deffpuzzl Date: Thu, 21 Jun 2018 12:47:32 +0800 Subject: [PATCH] fix unload from queue and multithread process --- include/tmain.h | 4 +++- src/queue.c | 45 +++++++++++++++++++++++++++++++++++++++++++++ src/stvm.c | 3 ++- src/tcp.c | 6 +++--- src/tree.c | 7 ++++++- stvm.conf | 11 +---------- 6 files changed, 60 insertions(+), 16 deletions(-) diff --git a/include/tmain.h b/include/tmain.h index 527ea10..9038209 100644 --- a/include/tmain.h +++ b/include/tmain.h @@ -39,11 +39,11 @@ typedef long (*FUNCEXEC)(SATvm *pstSavm, void *arg); #define MAX_CON_EVENTS 65535 #define TVM_PORT_LISTEN 1801 #define TVM_PORT_DOMAIN 1800 +#define MAX_LOCK_TIME 1800 #define TVM_LOCAL_SERV "LIS.tvm" #define TVM_REMOTE_DOM "RDS.tvm" #define LOCAL_HOST_IP "127.0.0.1" - /************************************************************************************************* 表结构&索引定义区 *************************************************************************************************/ @@ -77,6 +77,7 @@ typedef struct __TVM_INTERFACE /************************************************************************************************* macro *************************************************************************************************/ +#define Tlog(...) vTraceLog(__FILE__, __LINE__, __VA_ARGS__) /************************************************************************************************* function @@ -84,6 +85,7 @@ typedef struct __TVM_INTERFACE #ifdef __cplusplus extern "C" { #endif +extern void vTraceLog(const char *pszFile, int nLine, const char *fmt, ...); extern void lInitTitle(int argc, char **argv, char **envp); extern void vSetTitile(const char *name); diff --git a/src/queue.c b/src/queue.c index d1132b7..33fbb3e 100644 --- a/src/queue.c +++ b/src/queue.c @@ -295,6 +295,51 @@ retrys: return RC_SUCC; } +/************************************************************************************************* + description:pop data from queue + parameters: + pstSavm --stvm handle + psvOut --out data + return: + RC_SUCC --success + RC_FAIL --failure + *************************************************************************************************/ +long lTimePop(SATvm *pstSavm, void *pvOut, Uenum eWait) +{ + long lRet; + Timesp tm = {0, 1}; + RunTime *pstRun = NULL; + + if(!pstSavm) + { + pstSavm->m_lErrno = CONDIT_IS_NIL; + return RC_FAIL; + } + + if(NULL == (pstRun = (RunTime *)pInitMemTable(pstSavm, pstSavm->tblName))) + return RC_FAIL; + + if(TYPE_MQUEUE != pstRun->m_lType) + { + pstSavm->m_lErrno = NOT_SUPPT_OPT; + vTblDisconnect(pstSavm, pstSavm->tblName); + return RC_FAIL; + } + + if(RES_REMOT_SID == pstRun->m_lLocal) + { + Tremohold(pstSavm, pstRun); + return _lPopByRt(pstSavm, pvOut); + } + +// if(QUE_NORMAL == eWait) tm.tv_sec = MAX_LOCK_TIME; + if(QUE_NORMAL == eWait) tm.tv_sec = 5; + lRet = _lPop(pstSavm, pstRun->m_pvAddr, pvOut, &tm); + vTblDisconnect(pstSavm, pstSavm->tblName); + return lRet; +} + + /************************************************************************************************* description:pop data from queue parameters: diff --git a/src/stvm.c b/src/stvm.c index ef9be2a..56c0c2e 100644 --- a/src/stvm.c +++ b/src/stvm.c @@ -4387,7 +4387,8 @@ void vInitialCustom() //select nextval from SEQUENCE@SEQ_TEST snprintf(g_stCustom.m_pszWord, ALLOC_CMD_LEN, "SET,FROM,WHERE,COUNT(1),MAX,MIN,NEXTVAL," "ORDER BY,GROUP BY,SEQUENCE@,SYS_TVM_FIELD,SYS_TVM_DOMAIN,SYS_TVM_SEQUE,TABLE,INTO," - "ON,INFO,INDEX,VALUES,DEBUG [ON|OFF],SHOWMODE [ROW|COLUMN],SHOWSIZE [NUM],CLICK,TO,"); + "ON,INFO,INDEX,VALUES,DEBUG [ON|OFF],SHOWMODE [ROW|COLUMN],SHOWSIZE [NUM],CLICK,TO," + "DELIMITER,"); g_stCustom.m_lWord = lgetstrnum(g_stCustom.m_pszWord, ","); rl_attempted_completion_function = pMatchCompletion; diff --git a/src/tcp.c b/src/tcp.c index 2fa7ab4..e9a22fd 100644 --- a/src/tcp.c +++ b/src/tcp.c @@ -33,14 +33,13 @@ Rowgrp *g_pstDomgrp = NULL, *g_pstTblgrp = NULL; extern char* pGetLog(); extern long lGetBootType(); extern void vSetBootType(long lType); +extern long lTimePop(SATvm *pstSavm, void *pvOut, Uenum eWait); void* pParsePacket(SATvm *pstSavm, void *pstVoid, TFace *pstFace, void *pvBuffer, long lLen); void* pProtocaJava(SATvm *pstSavm, void *pstVoid, TFace *pstFace, void *pvBuffer, long lLen); /************************************************************************************************* macro *************************************************************************************************/ -#define Tlog(...) vTraceLog(__FILE__, __LINE__, __VA_ARGS__) - #define checkrequest(s,c,f) if(MAX(f->m_lRows, f->m_lDLen) > c->m_lBuffer) \ { \ if(MAX(f->m_lRows, f->m_lDLen) > DATA_MAX_LEN) \ @@ -1448,7 +1447,7 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat TFree(pvOut); return RC_SUCC; case OPERATE_QUEPOP: - if(RC_SUCC != lPop(pstSavm, (void *)pvData, pstFace->m_lFind)) + if(RC_SUCC != lTimePop(pstSavm, (void *)pvData, pstFace->m_lFind)) { pstFace->m_lErrno = pstSavm->m_lErrno; lData = sizeof(TFace); @@ -1852,6 +1851,7 @@ long lPollRequest(SATvm *pstSovm, SKCon *pstCon, TFace *pstFace, void *pstVoi } else { + pstSovm->stRunTime[pstFace->m_table].m_lType = pstRun->m_lType; pstSovm->stRunTime[pstFace->m_table].m_bAttch = pstRun->m_bAttch; pstSovm->stRunTime[pstFace->m_table].m_pvAddr = pstRun->m_pvAddr; } diff --git a/src/tree.c b/src/tree.c index cb23d20..b1be5f7 100644 --- a/src/tree.c +++ b/src/tree.c @@ -9237,7 +9237,12 @@ long lImportFile(TABLE t, char *pszFile, char *pszFlag) memset(pvData, 0, lGetRowSize(t)); if(TYPE_MQUEUE == pstRun->m_lType) - memcpy(pvData, szLine, lGetRowSize(t)); + { + if(lGetFldNum(t) > 0) + _lImportContext(szLine, lGetFldNum(t), pGetTblKey(t), pvData, pszFlag); + else + memcpy(pvData, szLine, lGetRowSize(t)); + } else _lImportContext(szLine, lGetFldNum(t), pGetTblKey(t), pvData, pszFlag); if(RC_SUCC != __lInsert(pstSavm, pstRun, pstSavm->tblName, 0)) diff --git a/stvm.conf b/stvm.conf index 45b2022..998f732 100644 --- a/stvm.conf +++ b/stvm.conf @@ -4,7 +4,7 @@ MAXTABLE=255 MAXFILED=3000 MAXDOMAIN=1024 MAXSEQUE=1024 -SERVER_EXEC=4 +SERVER_EXEC=2 #SERVER_EXEC=1 DEPLOY=cluster #DEPLOY=local @@ -12,16 +12,7 @@ SERVER_PORT=5050 LOGNAME="/home/stvm/log/stvm.log" *LOCAL_RESOURCE -TABLE=15 PERMIT=15 *REMOTE_DOMAIN -GROUP=1 DOMAINID="DBS" WSADDR="192.168.5.20:5010" TIMEOUT=2 MAXTRY=3 KEEPALIVE=30 -GROUP=2 DOMAINID="CTS" WSADDR="192.168.5.20:5011" TIMEOUT=2 MAXTRY=3 KEEPALIVE=30 *REMOTE_TABLE -TABLE=8 TABLENAME="TBL_BRH_INFO" - MTABLE=17 DOMAINID="DBS" - MTABLE=17 DOMAINID="CTS" -TABLE=9 TABLENAME="TBL_ACCT_INFO" - MTABLE=16 DOMAINID="DBS" - MTABLE=16 DOMAINID="CTS"