fix unload from queue and multithread process
parent
13a0ccb70b
commit
221404ec87
|
@ -39,11 +39,11 @@ typedef long (*FUNCEXEC)(SATvm *pstSavm, void *arg);
|
||||||
#define MAX_CON_EVENTS 65535
|
#define MAX_CON_EVENTS 65535
|
||||||
#define TVM_PORT_LISTEN 1801
|
#define TVM_PORT_LISTEN 1801
|
||||||
#define TVM_PORT_DOMAIN 1800
|
#define TVM_PORT_DOMAIN 1800
|
||||||
|
#define MAX_LOCK_TIME 1800
|
||||||
#define TVM_LOCAL_SERV "LIS.tvm"
|
#define TVM_LOCAL_SERV "LIS.tvm"
|
||||||
#define TVM_REMOTE_DOM "RDS.tvm"
|
#define TVM_REMOTE_DOM "RDS.tvm"
|
||||||
#define LOCAL_HOST_IP "127.0.0.1"
|
#define LOCAL_HOST_IP "127.0.0.1"
|
||||||
|
|
||||||
|
|
||||||
/*************************************************************************************************
|
/*************************************************************************************************
|
||||||
表结构&索引定义区
|
表结构&索引定义区
|
||||||
*************************************************************************************************/
|
*************************************************************************************************/
|
||||||
|
@ -77,6 +77,7 @@ typedef struct __TVM_INTERFACE
|
||||||
/*************************************************************************************************
|
/*************************************************************************************************
|
||||||
macro
|
macro
|
||||||
*************************************************************************************************/
|
*************************************************************************************************/
|
||||||
|
#define Tlog(...) vTraceLog(__FILE__, __LINE__, __VA_ARGS__)
|
||||||
|
|
||||||
/*************************************************************************************************
|
/*************************************************************************************************
|
||||||
function
|
function
|
||||||
|
@ -84,6 +85,7 @@ typedef struct __TVM_INTERFACE
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
extern void vTraceLog(const char *pszFile, int nLine, const char *fmt, ...);
|
||||||
|
|
||||||
extern void lInitTitle(int argc, char **argv, char **envp);
|
extern void lInitTitle(int argc, char **argv, char **envp);
|
||||||
extern void vSetTitile(const char *name);
|
extern void vSetTitile(const char *name);
|
||||||
|
|
45
src/queue.c
45
src/queue.c
|
@ -295,6 +295,51 @@ retrys:
|
||||||
return RC_SUCC;
|
return RC_SUCC;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*************************************************************************************************
|
||||||
|
description:pop data from queue
|
||||||
|
parameters:
|
||||||
|
pstSavm --stvm handle
|
||||||
|
psvOut --out data
|
||||||
|
return:
|
||||||
|
RC_SUCC --success
|
||||||
|
RC_FAIL --failure
|
||||||
|
*************************************************************************************************/
|
||||||
|
long lTimePop(SATvm *pstSavm, void *pvOut, Uenum eWait)
|
||||||
|
{
|
||||||
|
long lRet;
|
||||||
|
Timesp tm = {0, 1};
|
||||||
|
RunTime *pstRun = NULL;
|
||||||
|
|
||||||
|
if(!pstSavm)
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = CONDIT_IS_NIL;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(NULL == (pstRun = (RunTime *)pInitMemTable(pstSavm, pstSavm->tblName)))
|
||||||
|
return RC_FAIL;
|
||||||
|
|
||||||
|
if(TYPE_MQUEUE != pstRun->m_lType)
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = NOT_SUPPT_OPT;
|
||||||
|
vTblDisconnect(pstSavm, pstSavm->tblName);
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(RES_REMOT_SID == pstRun->m_lLocal)
|
||||||
|
{
|
||||||
|
Tremohold(pstSavm, pstRun);
|
||||||
|
return _lPopByRt(pstSavm, pvOut);
|
||||||
|
}
|
||||||
|
|
||||||
|
// if(QUE_NORMAL == eWait) tm.tv_sec = MAX_LOCK_TIME;
|
||||||
|
if(QUE_NORMAL == eWait) tm.tv_sec = 5;
|
||||||
|
lRet = _lPop(pstSavm, pstRun->m_pvAddr, pvOut, &tm);
|
||||||
|
vTblDisconnect(pstSavm, pstSavm->tblName);
|
||||||
|
return lRet;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*************************************************************************************************
|
/*************************************************************************************************
|
||||||
description:pop data from queue
|
description:pop data from queue
|
||||||
parameters:
|
parameters:
|
||||||
|
|
|
@ -4387,7 +4387,8 @@ void vInitialCustom()
|
||||||
//select nextval from SEQUENCE@SEQ_TEST
|
//select nextval from SEQUENCE@SEQ_TEST
|
||||||
snprintf(g_stCustom.m_pszWord, ALLOC_CMD_LEN, "SET,FROM,WHERE,COUNT(1),MAX,MIN,NEXTVAL,"
|
snprintf(g_stCustom.m_pszWord, ALLOC_CMD_LEN, "SET,FROM,WHERE,COUNT(1),MAX,MIN,NEXTVAL,"
|
||||||
"ORDER BY,GROUP BY,SEQUENCE@,SYS_TVM_FIELD,SYS_TVM_DOMAIN,SYS_TVM_SEQUE,TABLE,INTO,"
|
"ORDER BY,GROUP BY,SEQUENCE@,SYS_TVM_FIELD,SYS_TVM_DOMAIN,SYS_TVM_SEQUE,TABLE,INTO,"
|
||||||
"ON,INFO,INDEX,VALUES,DEBUG [ON|OFF],SHOWMODE [ROW|COLUMN],SHOWSIZE [NUM],CLICK,TO,");
|
"ON,INFO,INDEX,VALUES,DEBUG [ON|OFF],SHOWMODE [ROW|COLUMN],SHOWSIZE [NUM],CLICK,TO,"
|
||||||
|
"DELIMITER,");
|
||||||
g_stCustom.m_lWord = lgetstrnum(g_stCustom.m_pszWord, ",");
|
g_stCustom.m_lWord = lgetstrnum(g_stCustom.m_pszWord, ",");
|
||||||
|
|
||||||
rl_attempted_completion_function = pMatchCompletion;
|
rl_attempted_completion_function = pMatchCompletion;
|
||||||
|
|
|
@ -33,14 +33,13 @@ Rowgrp *g_pstDomgrp = NULL, *g_pstTblgrp = NULL;
|
||||||
extern char* pGetLog();
|
extern char* pGetLog();
|
||||||
extern long lGetBootType();
|
extern long lGetBootType();
|
||||||
extern void vSetBootType(long lType);
|
extern void vSetBootType(long lType);
|
||||||
|
extern long lTimePop(SATvm *pstSavm, void *pvOut, Uenum eWait);
|
||||||
void* pParsePacket(SATvm *pstSavm, void *pstVoid, TFace *pstFace, void *pvBuffer, long lLen);
|
void* pParsePacket(SATvm *pstSavm, void *pstVoid, TFace *pstFace, void *pvBuffer, long lLen);
|
||||||
void* pProtocaJava(SATvm *pstSavm, void *pstVoid, TFace *pstFace, void *pvBuffer, long lLen);
|
void* pProtocaJava(SATvm *pstSavm, void *pstVoid, TFace *pstFace, void *pvBuffer, long lLen);
|
||||||
|
|
||||||
/*************************************************************************************************
|
/*************************************************************************************************
|
||||||
macro
|
macro
|
||||||
*************************************************************************************************/
|
*************************************************************************************************/
|
||||||
#define Tlog(...) vTraceLog(__FILE__, __LINE__, __VA_ARGS__)
|
|
||||||
|
|
||||||
#define checkrequest(s,c,f) if(MAX(f->m_lRows, f->m_lDLen) > c->m_lBuffer) \
|
#define checkrequest(s,c,f) if(MAX(f->m_lRows, f->m_lDLen) > c->m_lBuffer) \
|
||||||
{ \
|
{ \
|
||||||
if(MAX(f->m_lRows, f->m_lDLen) > DATA_MAX_LEN) \
|
if(MAX(f->m_lRows, f->m_lDLen) > DATA_MAX_LEN) \
|
||||||
|
@ -1448,7 +1447,7 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat
|
||||||
TFree(pvOut);
|
TFree(pvOut);
|
||||||
return RC_SUCC;
|
return RC_SUCC;
|
||||||
case OPERATE_QUEPOP:
|
case OPERATE_QUEPOP:
|
||||||
if(RC_SUCC != lPop(pstSavm, (void *)pvData, pstFace->m_lFind))
|
if(RC_SUCC != lTimePop(pstSavm, (void *)pvData, pstFace->m_lFind))
|
||||||
{
|
{
|
||||||
pstFace->m_lErrno = pstSavm->m_lErrno;
|
pstFace->m_lErrno = pstSavm->m_lErrno;
|
||||||
lData = sizeof(TFace);
|
lData = sizeof(TFace);
|
||||||
|
@ -1852,6 +1851,7 @@ long lPollRequest(SATvm *pstSovm, SKCon *pstCon, TFace *pstFace, void *pstVoi
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
pstSovm->stRunTime[pstFace->m_table].m_lType = pstRun->m_lType;
|
||||||
pstSovm->stRunTime[pstFace->m_table].m_bAttch = pstRun->m_bAttch;
|
pstSovm->stRunTime[pstFace->m_table].m_bAttch = pstRun->m_bAttch;
|
||||||
pstSovm->stRunTime[pstFace->m_table].m_pvAddr = pstRun->m_pvAddr;
|
pstSovm->stRunTime[pstFace->m_table].m_pvAddr = pstRun->m_pvAddr;
|
||||||
}
|
}
|
||||||
|
|
|
@ -9237,7 +9237,12 @@ long lImportFile(TABLE t, char *pszFile, char *pszFlag)
|
||||||
|
|
||||||
memset(pvData, 0, lGetRowSize(t));
|
memset(pvData, 0, lGetRowSize(t));
|
||||||
if(TYPE_MQUEUE == pstRun->m_lType)
|
if(TYPE_MQUEUE == pstRun->m_lType)
|
||||||
|
{
|
||||||
|
if(lGetFldNum(t) > 0)
|
||||||
|
_lImportContext(szLine, lGetFldNum(t), pGetTblKey(t), pvData, pszFlag);
|
||||||
|
else
|
||||||
memcpy(pvData, szLine, lGetRowSize(t));
|
memcpy(pvData, szLine, lGetRowSize(t));
|
||||||
|
}
|
||||||
else
|
else
|
||||||
_lImportContext(szLine, lGetFldNum(t), pGetTblKey(t), pvData, pszFlag);
|
_lImportContext(szLine, lGetFldNum(t), pGetTblKey(t), pvData, pszFlag);
|
||||||
if(RC_SUCC != __lInsert(pstSavm, pstRun, pstSavm->tblName, 0))
|
if(RC_SUCC != __lInsert(pstSavm, pstRun, pstSavm->tblName, 0))
|
||||||
|
|
11
stvm.conf
11
stvm.conf
|
@ -4,7 +4,7 @@ MAXTABLE=255
|
||||||
MAXFILED=3000
|
MAXFILED=3000
|
||||||
MAXDOMAIN=1024
|
MAXDOMAIN=1024
|
||||||
MAXSEQUE=1024
|
MAXSEQUE=1024
|
||||||
SERVER_EXEC=4
|
SERVER_EXEC=2
|
||||||
#SERVER_EXEC=1
|
#SERVER_EXEC=1
|
||||||
DEPLOY=cluster
|
DEPLOY=cluster
|
||||||
#DEPLOY=local
|
#DEPLOY=local
|
||||||
|
@ -12,16 +12,7 @@ SERVER_PORT=5050
|
||||||
LOGNAME="/home/stvm/log/stvm.log"
|
LOGNAME="/home/stvm/log/stvm.log"
|
||||||
|
|
||||||
*LOCAL_RESOURCE
|
*LOCAL_RESOURCE
|
||||||
TABLE=15 PERMIT=15
|
|
||||||
|
|
||||||
*REMOTE_DOMAIN
|
*REMOTE_DOMAIN
|
||||||
GROUP=1 DOMAINID="DBS" WSADDR="192.168.5.20:5010" TIMEOUT=2 MAXTRY=3 KEEPALIVE=30
|
|
||||||
GROUP=2 DOMAINID="CTS" WSADDR="192.168.5.20:5011" TIMEOUT=2 MAXTRY=3 KEEPALIVE=30
|
|
||||||
|
|
||||||
*REMOTE_TABLE
|
*REMOTE_TABLE
|
||||||
TABLE=8 TABLENAME="TBL_BRH_INFO"
|
|
||||||
MTABLE=17 DOMAINID="DBS"
|
|
||||||
MTABLE=17 DOMAINID="CTS"
|
|
||||||
TABLE=9 TABLENAME="TBL_ACCT_INFO"
|
|
||||||
MTABLE=16 DOMAINID="DBS"
|
|
||||||
MTABLE=16 DOMAINID="CTS"
|
|
||||||
|
|
Loading…
Reference in New Issue