From 221404ec875ae6c19de6be894e13d35af3f55c47 Mon Sep 17 00:00:00 2001 From: deffpuzzl Date: Thu, 21 Jun 2018 12:47:32 +0800 Subject: [PATCH 1/7] fix unload from queue and multithread process --- include/tmain.h | 4 +++- src/queue.c | 45 +++++++++++++++++++++++++++++++++++++++++++++ src/stvm.c | 3 ++- src/tcp.c | 6 +++--- src/tree.c | 7 ++++++- stvm.conf | 11 +---------- 6 files changed, 60 insertions(+), 16 deletions(-) diff --git a/include/tmain.h b/include/tmain.h index 527ea10..9038209 100644 --- a/include/tmain.h +++ b/include/tmain.h @@ -39,11 +39,11 @@ typedef long (*FUNCEXEC)(SATvm *pstSavm, void *arg); #define MAX_CON_EVENTS 65535 #define TVM_PORT_LISTEN 1801 #define TVM_PORT_DOMAIN 1800 +#define MAX_LOCK_TIME 1800 #define TVM_LOCAL_SERV "LIS.tvm" #define TVM_REMOTE_DOM "RDS.tvm" #define LOCAL_HOST_IP "127.0.0.1" - /************************************************************************************************* 表结构&索引定义区 *************************************************************************************************/ @@ -77,6 +77,7 @@ typedef struct __TVM_INTERFACE /************************************************************************************************* macro *************************************************************************************************/ +#define Tlog(...) vTraceLog(__FILE__, __LINE__, __VA_ARGS__) /************************************************************************************************* function @@ -84,6 +85,7 @@ typedef struct __TVM_INTERFACE #ifdef __cplusplus extern "C" { #endif +extern void vTraceLog(const char *pszFile, int nLine, const char *fmt, ...); extern void lInitTitle(int argc, char **argv, char **envp); extern void vSetTitile(const char *name); diff --git a/src/queue.c b/src/queue.c index d1132b7..33fbb3e 100644 --- a/src/queue.c +++ b/src/queue.c @@ -295,6 +295,51 @@ retrys: return RC_SUCC; } +/************************************************************************************************* + description:pop data from queue + parameters: + pstSavm --stvm handle + psvOut --out data + return: + RC_SUCC --success + RC_FAIL --failure + *************************************************************************************************/ +long lTimePop(SATvm *pstSavm, void *pvOut, Uenum eWait) +{ + long lRet; + Timesp tm = {0, 1}; + RunTime *pstRun = NULL; + + if(!pstSavm) + { + pstSavm->m_lErrno = CONDIT_IS_NIL; + return RC_FAIL; + } + + if(NULL == (pstRun = (RunTime *)pInitMemTable(pstSavm, pstSavm->tblName))) + return RC_FAIL; + + if(TYPE_MQUEUE != pstRun->m_lType) + { + pstSavm->m_lErrno = NOT_SUPPT_OPT; + vTblDisconnect(pstSavm, pstSavm->tblName); + return RC_FAIL; + } + + if(RES_REMOT_SID == pstRun->m_lLocal) + { + Tremohold(pstSavm, pstRun); + return _lPopByRt(pstSavm, pvOut); + } + +// if(QUE_NORMAL == eWait) tm.tv_sec = MAX_LOCK_TIME; + if(QUE_NORMAL == eWait) tm.tv_sec = 5; + lRet = _lPop(pstSavm, pstRun->m_pvAddr, pvOut, &tm); + vTblDisconnect(pstSavm, pstSavm->tblName); + return lRet; +} + + /************************************************************************************************* description:pop data from queue parameters: diff --git a/src/stvm.c b/src/stvm.c index ef9be2a..56c0c2e 100644 --- a/src/stvm.c +++ b/src/stvm.c @@ -4387,7 +4387,8 @@ void vInitialCustom() //select nextval from SEQUENCE@SEQ_TEST snprintf(g_stCustom.m_pszWord, ALLOC_CMD_LEN, "SET,FROM,WHERE,COUNT(1),MAX,MIN,NEXTVAL," "ORDER BY,GROUP BY,SEQUENCE@,SYS_TVM_FIELD,SYS_TVM_DOMAIN,SYS_TVM_SEQUE,TABLE,INTO," - "ON,INFO,INDEX,VALUES,DEBUG [ON|OFF],SHOWMODE [ROW|COLUMN],SHOWSIZE [NUM],CLICK,TO,"); + "ON,INFO,INDEX,VALUES,DEBUG [ON|OFF],SHOWMODE [ROW|COLUMN],SHOWSIZE [NUM],CLICK,TO," + "DELIMITER,"); g_stCustom.m_lWord = lgetstrnum(g_stCustom.m_pszWord, ","); rl_attempted_completion_function = pMatchCompletion; diff --git a/src/tcp.c b/src/tcp.c index 2fa7ab4..e9a22fd 100644 --- a/src/tcp.c +++ b/src/tcp.c @@ -33,14 +33,13 @@ Rowgrp *g_pstDomgrp = NULL, *g_pstTblgrp = NULL; extern char* pGetLog(); extern long lGetBootType(); extern void vSetBootType(long lType); +extern long lTimePop(SATvm *pstSavm, void *pvOut, Uenum eWait); void* pParsePacket(SATvm *pstSavm, void *pstVoid, TFace *pstFace, void *pvBuffer, long lLen); void* pProtocaJava(SATvm *pstSavm, void *pstVoid, TFace *pstFace, void *pvBuffer, long lLen); /************************************************************************************************* macro *************************************************************************************************/ -#define Tlog(...) vTraceLog(__FILE__, __LINE__, __VA_ARGS__) - #define checkrequest(s,c,f) if(MAX(f->m_lRows, f->m_lDLen) > c->m_lBuffer) \ { \ if(MAX(f->m_lRows, f->m_lDLen) > DATA_MAX_LEN) \ @@ -1448,7 +1447,7 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat TFree(pvOut); return RC_SUCC; case OPERATE_QUEPOP: - if(RC_SUCC != lPop(pstSavm, (void *)pvData, pstFace->m_lFind)) + if(RC_SUCC != lTimePop(pstSavm, (void *)pvData, pstFace->m_lFind)) { pstFace->m_lErrno = pstSavm->m_lErrno; lData = sizeof(TFace); @@ -1852,6 +1851,7 @@ long lPollRequest(SATvm *pstSovm, SKCon *pstCon, TFace *pstFace, void *pstVoi } else { + pstSovm->stRunTime[pstFace->m_table].m_lType = pstRun->m_lType; pstSovm->stRunTime[pstFace->m_table].m_bAttch = pstRun->m_bAttch; pstSovm->stRunTime[pstFace->m_table].m_pvAddr = pstRun->m_pvAddr; } diff --git a/src/tree.c b/src/tree.c index cb23d20..b1be5f7 100644 --- a/src/tree.c +++ b/src/tree.c @@ -9237,7 +9237,12 @@ long lImportFile(TABLE t, char *pszFile, char *pszFlag) memset(pvData, 0, lGetRowSize(t)); if(TYPE_MQUEUE == pstRun->m_lType) - memcpy(pvData, szLine, lGetRowSize(t)); + { + if(lGetFldNum(t) > 0) + _lImportContext(szLine, lGetFldNum(t), pGetTblKey(t), pvData, pszFlag); + else + memcpy(pvData, szLine, lGetRowSize(t)); + } else _lImportContext(szLine, lGetFldNum(t), pGetTblKey(t), pvData, pszFlag); if(RC_SUCC != __lInsert(pstSavm, pstRun, pstSavm->tblName, 0)) diff --git a/stvm.conf b/stvm.conf index 45b2022..998f732 100644 --- a/stvm.conf +++ b/stvm.conf @@ -4,7 +4,7 @@ MAXTABLE=255 MAXFILED=3000 MAXDOMAIN=1024 MAXSEQUE=1024 -SERVER_EXEC=4 +SERVER_EXEC=2 #SERVER_EXEC=1 DEPLOY=cluster #DEPLOY=local @@ -12,16 +12,7 @@ SERVER_PORT=5050 LOGNAME="/home/stvm/log/stvm.log" *LOCAL_RESOURCE -TABLE=15 PERMIT=15 *REMOTE_DOMAIN -GROUP=1 DOMAINID="DBS" WSADDR="192.168.5.20:5010" TIMEOUT=2 MAXTRY=3 KEEPALIVE=30 -GROUP=2 DOMAINID="CTS" WSADDR="192.168.5.20:5011" TIMEOUT=2 MAXTRY=3 KEEPALIVE=30 *REMOTE_TABLE -TABLE=8 TABLENAME="TBL_BRH_INFO" - MTABLE=17 DOMAINID="DBS" - MTABLE=17 DOMAINID="CTS" -TABLE=9 TABLENAME="TBL_ACCT_INFO" - MTABLE=16 DOMAINID="DBS" - MTABLE=16 DOMAINID="CTS" From 95fec9e4a9228aac09338899d0fde8ca9a388a97 Mon Sep 17 00:00:00 2001 From: deffpuzzl Date: Thu, 21 Jun 2018 12:55:51 +0800 Subject: [PATCH 2/7] fix unload from queue and multithread process --- include/tmain.h | 2 +- src/queue.c | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/include/tmain.h b/include/tmain.h index 9038209..85a16c1 100644 --- a/include/tmain.h +++ b/include/tmain.h @@ -39,7 +39,7 @@ typedef long (*FUNCEXEC)(SATvm *pstSavm, void *arg); #define MAX_CON_EVENTS 65535 #define TVM_PORT_LISTEN 1801 #define TVM_PORT_DOMAIN 1800 -#define MAX_LOCK_TIME 1800 +#define MAX_LOCK_TIME 600 #define TVM_LOCAL_SERV "LIS.tvm" #define TVM_REMOTE_DOM "RDS.tvm" #define LOCAL_HOST_IP "127.0.0.1" diff --git a/src/queue.c b/src/queue.c index 33fbb3e..866c1d4 100644 --- a/src/queue.c +++ b/src/queue.c @@ -332,8 +332,7 @@ long lTimePop(SATvm *pstSavm, void *pvOut, Uenum eWait) return _lPopByRt(pstSavm, pvOut); } -// if(QUE_NORMAL == eWait) tm.tv_sec = MAX_LOCK_TIME; - if(QUE_NORMAL == eWait) tm.tv_sec = 5; + if(QUE_NORMAL == eWait) tm.tv_sec = MAX_LOCK_TIME; lRet = _lPop(pstSavm, pstRun->m_pvAddr, pvOut, &tm); vTblDisconnect(pstSavm, pstSavm->tblName); return lRet; From 249d602251565978bf32933d3d3f213217a98629 Mon Sep 17 00:00:00 2001 From: deffpuzzl Date: Thu, 21 Jun 2018 13:49:28 +0800 Subject: [PATCH 3/7] fix unload from queue and multithread process --- README.MD | 4 ---- 1 file changed, 4 deletions(-) diff --git a/README.MD b/README.MD index c9bac0e..4fb83ca 100644 --- a/README.MD +++ b/README.MD @@ -116,10 +116,6 @@ STVM也提供一个类型sqlpuls类型简单工具。 >* 1、新增queue网络同步异步接口 >* 1、新增将表中长时间无用数据导出备份接口 -====***更新日期:20180610***==== - ->* 1、新增分布式锁 - **下本版本:** * 1、新增多机资源共享方式。 * 2、JAVA接口开发 From ca5e806e4c27166a27d89cce8a93fba7ed360e55 Mon Sep 17 00:00:00 2001 From: deffpuzzl Date: Thu, 21 Jun 2018 14:30:58 +0800 Subject: [PATCH 4/7] fix unload from queue and multithread process --- src/stvm.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stvm.c b/src/stvm.c index 56c0c2e..d4fdd54 100644 --- a/src/stvm.c +++ b/src/stvm.c @@ -1091,7 +1091,7 @@ long lCreateByFile(char *pszFile) return RC_FAIL; } - if(NULL == (pszCreate = (char *)calloc(stBuf.st_size, 1))) + if(NULL == (pszCreate = (char *)calloc(stBuf.st_size + 1, 1))) { fprintf(stderr, "create memory error, %s\n", strerror(errno)); return RC_FAIL; From cd0ed89559634e997c764d45187b50d0dc82606e Mon Sep 17 00:00:00 2001 From: deffpuzzl Date: Thu, 21 Jun 2018 19:52:21 +0800 Subject: [PATCH 5/7] add func.c --- src/func.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 src/func.c diff --git a/src/func.c b/src/func.c new file mode 100644 index 0000000..5544365 --- /dev/null +++ b/src/func.c @@ -0,0 +1,106 @@ +/* +* Copyright (c) 2018 Savens Liu +* +* The original has been patented, Open source is not equal to open rights. +* Anyone can clone, download, learn and discuss for free. Without the permission +* of the copyright owner or author, it shall not be merged, published, licensed or sold. +* The copyright owner or author has the right to pursue his responsibility. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +*/ + +#include "tvm.h" + +/************************************************************************************************ + function + ************************************************************************************************/ +extern void vCondInsInit(FdCond *pstCond, TABLE t); + +/************************************************************************************************* + description:dump the unused + parameters: + pstSavm --stvm handle + t --table + return: + RC_SUCC --success + RC_FAIL --failure + *************************************************************************************************/ +long lUnuseDump(SATvm *pstSavm, TABLE t) +{ + FILE *fp = NULL; + char szFile[512]; + RunTime *pstRun = NULL; + SHTruck *pstTruck = NULL; + size_t lRow = 0, lOffset, lDump = 0; + + if(!pstSavm) + { + pstSavm->m_lErrno = CONDIT_IS_NIL; + return RC_FAIL; + } + + memset(szFile, 0, sizeof(szFile)); + if(RC_SUCC != lInitSATvm(pstSavm, t)) + return RC_FAIL; + + vHoldConnect(pstSavm); + if(NULL == (pstRun = (RunTime *)pInitHitTest(pstSavm, t))) + return RC_FAIL; + + if(RES_REMOT_SID == pstRun->m_lLocal) + { + pstSavm->m_lErrno = RMT_NOT_SUPPT; + return RC_FAIL; + } + + snprintf(szFile, sizeof(szFile), "%s/%d.udb", getenv("TVMDBD"), t); + if(NULL == (fp = fopen(szFile, "ab"))) + { + pstSavm->m_lErrno = FILE_NOT_RSET; + return RC_FAIL; + } + + lOffset = lGetTblData(t); + pstRun->m_lCurLine = 0; + pstSavm->lSize = lGetRowSize(t); + pstTruck = (PSHTruck)pGetNode(pstRun->m_pvAddr, lOffset); + for(lRow = 0; (lRow < ((TblDef *)pstRun->m_pvAddr)->m_lValid) && (lOffset < lGetTableSize(t)); + pstTruck = (PSHTruck)pGetNode(pstRun->m_pvAddr, lOffset)) + { + if(IS_TRUCK_NULL(pstTruck) || pstTruck->m_lTimes == 0) + { + lOffset += lGetRowTruck(t); + continue; + } + + fwrite(pstTruck->m_pvData, lGetRowSize(t), 1, fp); + pstSavm->pstVoid = pstTruck->m_pvData; + vCondInsInit(&pstSavm->stCond, t); + lDelete(pstSavm); + + lDump ++; + lOffset += lGetRowTruck(t); + } + fclose(fp); + vForceDisconnect(pstSavm, t); + pstSavm->m_lEffect = lDump; + + fprintf(stdout, "export table:%s valid:%ld, completed successfully !!\n", sGetTableName(t), + pstSavm->m_lEffect); + return RC_SUCC; +} + + +/**************************************************************************************** + code end + ****************************************************************************************/ From 4d2571db73a7a055fc49c66c6f3dd146301f5e2a Mon Sep 17 00:00:00 2001 From: deffpuzzl Date: Thu, 21 Jun 2018 22:49:22 +0800 Subject: [PATCH 6/7] Release 1.2.4 --- include/tvm.h | 2 +- src/queue.c | 41 +++++++++++++++++++++++++++++++++++++++++ src/tree.c | 18 ++++++++---------- 3 files changed, 50 insertions(+), 11 deletions(-) diff --git a/include/tvm.h b/include/tvm.h index 10e3b91..de1c420 100644 --- a/include/tvm.h +++ b/include/tvm.h @@ -34,7 +34,7 @@ typedef long CREATE; //#pragma pack(4) #define TVM_VKERNEL "1.2.0.0" -#define TVM_VERSION "1.2.4.0" +#define TVM_VERSION "1.2.5.0" /************************************************************************************************* custom macro *************************************************************************************************/ diff --git a/src/queue.c b/src/queue.c index 866c1d4..a293937 100644 --- a/src/queue.c +++ b/src/queue.c @@ -28,6 +28,47 @@ extern long _lPushByRt(SATvm *pstSavm); extern long _lPopByRt(SATvm *pstSavm, void *psvOut); extern long _lPopupByRt(SATvm *pstSavm, size_t lExp, time_t lTime, size_t *plOut, void **pp); +/************************************************************************************************* + description:Delete the queue that matches conditions + parameters: + pstSavm --stvm handle + pvAddr --memory address + t --table + return: + RC_SUCC --success + RC_FAIL --failure + *************************************************************************************************/ +long _lDeleteQueue(SATvm *pstSavm, void *pvAddr) +{ + SHTruck *pstTruck = NULL; + TblDef *pv = (TblDef *)pvAddr; + size_t lRow, lOffset = pv->m_lListOfs; + + for(pstSavm->m_lEffect = 0, lRow = 0; lRow < pv->m_lMaxRow; lOffset ++, lRow ++) + { + if(0 >= pv->m_lValid) + break; + + pstTruck = (PSHTruck)pGetNode(pvAddr, pv->m_lData + pv->m_lTruck * (lOffset % pv->m_lMaxRow)); + if(IS_TRUCK_NULL(pstTruck)) + continue; + + if(RC_MISMA == lFeildMatch(&pstSavm->stCond, pstTruck->m_pvData, pstSavm->pstVoid)) + continue; + + pstSavm->m_lEffect ++; + if(0 > (int)__sync_sub_and_fetch(&pv->m_lValid, 1)) + { + __sync_fetch_and_add(&pv->m_lValid, 1); + break; + } + + SET_DATA_TRUCK(pstTruck, DATA_TRUCK_NULL); + } + + return RC_SUCC; +} + /************************************************************************************************* description:push data to queue parameters: diff --git a/src/tree.c b/src/tree.c index b1be5f7..f2e6446 100644 --- a/src/tree.c +++ b/src/tree.c @@ -43,6 +43,7 @@ extern long _lQueryByRt(SATvm *pstSavm, size_t *plOut, void **ppsvOut); extern long _lExtremeByRt(SATvm *pstSavm, void *psvOut); extern void _vDropTableByRt(SATvm *pstSavm, TABLE t); extern long _lRenameTableByRt(SATvm *pstSavm, TABLE to, TABLE tn); +extern long _lDeleteQueue(SATvm *pstSavm, void *pvAddr); /************************************************************************************************* macro @@ -4311,16 +4312,20 @@ long _lDeleteGroup(SATvm *pstSavm, void *pvAddr, TABLE t) RC_SUCC --success RC_FAIL --failure *************************************************************************************************/ -long _lDeleteTruck(SATvm *pstSavm, void *pvAddr, TABLE t) +long _lDeleteTruck(SATvm *pstSavm, RunTime *pstRun, TABLE t) { bool bIsIdx = false; SHTree *pstRoot = NULL; SHTruck *pstTruck = NULL; char szIdx[MAX_INDEX_LEN]; + void *pvAddr = pstRun->m_pvAddr; RWLock *prwLock = (RWLock *)pGetRWLock(pvAddr); size_t lData = 0, lOffset = lGetTblData(t), lIdx; long lRow, lValid = ((TblDef *)pvAddr)->m_lValid; + if(TYPE_MQUEUE == pstRun->m_lType) + return _lDeleteQueue(pstSavm, pvAddr); + if(HAVE_INDEX(t)) bIsIdx = true; if(RC_SUCC != pthread_rwlock_wrlock(prwLock)) @@ -4476,13 +4481,6 @@ long lDelete(SATvm *pstSavm) if(NULL == (pstRun = (RunTime *)pInitMemTable(pstSavm, pstSavm->tblName))) return RC_FAIL; - if(TYPE_MQUEUE == pstRun->m_lType) - { - pstSavm->m_lErrno = NOT_SUPPT_OPT; - vTblDisconnect(pstSavm, pstSavm->tblName); - return RC_FAIL; - } - if(RES_REMOT_SID == pstRun->m_lLocal) { Tremohold(pstSavm, pstRun); @@ -4498,7 +4496,7 @@ long lDelete(SATvm *pstSavm) if(!pstSavm->pstVoid) { - lRet = _lDeleteTruck(pstSavm, pstRun->m_pvAddr, pstSavm->tblName); + lRet = _lDeleteTruck(pstSavm, pstRun, pstSavm->tblName); vTblDisconnect(pstSavm, pstSavm->tblName); return lRet; } @@ -4532,7 +4530,7 @@ long lDelete(SATvm *pstSavm) } } - lRet = _lDeleteTruck(pstSavm, pstRun->m_pvAddr, pstSavm->tblName); + lRet = _lDeleteTruck(pstSavm, pstRun, pstSavm->tblName); vTblDisconnect(pstSavm, pstSavm->tblName); return lRet; } From f1ac59756c3ff5ca2e307952b913335303fcb064 Mon Sep 17 00:00:00 2001 From: deffpuzzl Date: Thu, 21 Jun 2018 23:07:05 +0800 Subject: [PATCH 7/7] Release 1.2.4 --- include/tvm.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/tvm.h b/include/tvm.h index de1c420..10e3b91 100644 --- a/include/tvm.h +++ b/include/tvm.h @@ -34,7 +34,7 @@ typedef long CREATE; //#pragma pack(4) #define TVM_VKERNEL "1.2.0.0" -#define TVM_VERSION "1.2.5.0" +#define TVM_VERSION "1.2.4.0" /************************************************************************************************* custom macro *************************************************************************************************/