diff --git a/demo/count b/demo/count index 56539d9..a02c6de 100755 Binary files a/demo/count and b/demo/count differ diff --git a/demo/create b/demo/create deleted file mode 100755 index 9dfb5a7..0000000 Binary files a/demo/create and /dev/null differ diff --git a/demo/create.c b/demo/create.c deleted file mode 100644 index 2718415..0000000 --- a/demo/create.c +++ /dev/null @@ -1,52 +0,0 @@ -#include "tvm.h" -#include "tmain.h" - -#define TBL_USER_INFO 20 - -typedef struct __TBL_USER_INFO -{ - long acct_id; - char user_no[21]; - char user_type[2]; - char user_nm[81]; - char user_addr[161]; - char user_phone[31]; -}dbUser; - -CREATE lCreateUserInfo() -{ - DEFINE(TBL_USER_INFO, "", dbUser) - FIELD(dbUser, acct_id, FIELD_LONG) - FIELD(dbUser, user_no, FIELD_CHAR) - FIELD(dbUser, user_type, FIELD_CHAR) - FIELD(dbUser, user_nm, FIELD_CHAR) - FIELD(dbUser, user_addr, FIELD_CHAR) - FIELD(dbUser, user_phone, FIELD_CHAR) - - CREATE_IDX(NORMAL) // 创建查询索引 - IDX_FIELD(dbUser, acct_id, FIELD_LONG) - - CREATE_IDX(UNQIUE) // 创建唯一索引 - IDX_FIELD(dbUser, user_no, FIELD_CHAR) - IDX_FIELD(dbUser, user_type, FIELD_CHAR) - - FINISH -} - -int main(int argc, char *argv[]) -{ - SATvm *pstSavm = (SATvm *)pGetSATvm(); - - if(RC_SUCC != lCreateTable(pstSavm, TBL_USER_INFO, 1000, lCreateUserInfo)) - { - fprintf(stderr, "create table %d failed, err: %s\n", TBL_USER_INFO, sGetTError(pstSavm->m_lErrno)); - return RC_FAIL; - } - - fprintf(stdout, "初始化表成功, completed successfully!!!\n"); - fflush(stderr); - - return RC_SUCC; -} - - diff --git a/demo/delete b/demo/delete index 9accec5..1e8927a 100755 Binary files a/demo/delete and b/demo/delete differ diff --git a/demo/drop b/demo/drop index 36ac5be..9247447 100755 Binary files a/demo/drop and b/demo/drop differ diff --git a/demo/extreme b/demo/extreme index 7341cc2..9ea0ab6 100755 Binary files a/demo/extreme and b/demo/extreme differ diff --git a/demo/group b/demo/group index e5ecc5b..ac92316 100755 Binary files a/demo/group and b/demo/group differ diff --git a/demo/insert b/demo/insert index 58f0791..5526d3f 100755 Binary files a/demo/insert and b/demo/insert differ diff --git a/demo/makefile b/demo/makefile index b9e656c..57968c2 100755 --- a/demo/makefile +++ b/demo/makefile @@ -7,8 +7,8 @@ OUTLIB=../lib OUTBIN=../bin OBJFILE=tree.o sem.o msg.o tcp.o str.o list.o conf.o -CREATE=create -QUEUE=queue +CREATE=create_table +QUEUE=create_queue PUSH=push POP=pop INSERT=insert @@ -23,10 +23,11 @@ TRUNCATE=truncate DROP=drop CLICK=click REPLACE=replace +POPUP=popup PRESSURE=press_demo -all: $(CREATE) $(INSERT) $(QUEUE) $(PUSH) $(POP) $(SELECT) $(QUERY) $(DELETE) $(UPDATE) $(COUNT) $(GROUP) $(EXTREME) $(TRUNCATE) $(DROP) $(PRESSURE) $(CLICK) $(REPLACE) clean -$(CREATE): create.o +all: $(CREATE) $(INSERT) $(QUEUE) $(PUSH) $(POP) $(SELECT) $(QUERY) $(DELETE) $(UPDATE) $(COUNT) $(GROUP) $(EXTREME) $(TRUNCATE) $(DROP) $(PRESSURE) $(CLICK) $(POPUP) $(REPLACE) clean +$(CREATE): create_table.o $(CC) -o $@ $< $(LIBDIR) $(PUSH): push.o $(CC) -o $@ $< $(LIBDIR) @@ -56,7 +57,9 @@ $(CLICK): click.o $(CC) -o $@ $< $(LIBDIR) $(REPLACE): replace.o $(CC) -o $@ $< $(LIBDIR) -$(QUEUE): queue.o +$(QUEUE): create_queue.o + $(CC) -o $@ $< $(LIBDIR) +$(POPUP): popup.o $(CC) -o $@ $< $(LIBDIR) $(PRESSURE): press_demo.o $(CC) -o $@ $< $(LIBDIR) diff --git a/demo/press_demo b/demo/press_demo index c9e8d9f..3a60ddd 100755 Binary files a/demo/press_demo and b/demo/press_demo differ diff --git a/demo/query b/demo/query index f03ee60..201eec1 100755 Binary files a/demo/query and b/demo/query differ diff --git a/demo/select b/demo/select index 277a29f..c7e1b22 100755 Binary files a/demo/select and b/demo/select differ diff --git a/demo/truncate b/demo/truncate index 8781b2c..c584b47 100755 Binary files a/demo/truncate and b/demo/truncate differ diff --git a/demo/update b/demo/update index cff7cf6..bec573c 100755 Binary files a/demo/update and b/demo/update differ diff --git a/include/tmain.h b/include/tmain.h index 7c34bf1..23a2f86 100644 --- a/include/tmain.h +++ b/include/tmain.h @@ -144,7 +144,7 @@ extern long lAsyReplace(SATvm *pstSavm, void *pvData); // queue interface extern long lTvmPush(SATvm *pstSavm); -extern long lTvmPop(SATvm *pstSavm, void *pvOut); +extern long lTvmPop(SATvm *pstSavm, void *pvOut, Uenum eWait); extern long lTvmPopup(SATvm *pstSavm, size_t lExpect, time_t lTime, size_t *plOut, void **ppsvOut); #ifdef __cplusplus } diff --git a/include/tvm.h b/include/tvm.h index 871610c..ce0a503 100644 --- a/include/tvm.h +++ b/include/tvm.h @@ -51,6 +51,10 @@ typedef long CREATE; #define FIELD_LONG 2 #define FIELD_CHAR 1 +#define QUE_NOWAIT 1 +#define QUE_NORMAL 0 + + // execution plan #define EXE_PLAN_ALL 0 #define EXE_PLAN_IDX 1 @@ -158,7 +162,7 @@ typedef long CREATE; #define TYPE_INCORE 0x02 #define TYPE_CLIENT 0x03 // custom #define TYPE_KEYVAL 0x04 -#define TYPE_MQUEUE 0x05 // custom +#define TYPE_MQUEUE 0x05 #define TVM_NODE_INFO "localhost" #define TVM_RUNCFG_TAG "\x01\x33\xC8\x48" @@ -194,16 +198,16 @@ typedef long CREATE; #define IS_TRUCK_NRML(p) ((p)->m_chTag == DATA_TRUCK_NRML) #define IS_TRUCK_LOCK(p) ((p)->m_chTag == DATA_TRUCK_LOCK) #define SET_DATA_TRUCK(p, type) ((p)->m_chTag = type) -#define TFree(p) if(p) { free(p); p = NULL; } -#define TFgrp(p) do{vDeleteRowgrp(p);p = NULL;}while(0); -#define TFlst(p) do{vDestroyList(p);p = NULL;}while(0); -#define TClose(f) if(f) { fclose(f); f = NULL; } +#define TFree(p) if(p) { free(p); p = NULL; } +#define TFgrp(p) do{vDeleteRowgrp(p);p = NULL;}while(0); +#define TFlst(p) do{vDestroyList(p);p = NULL;}while(0); +#define TClose(f) if(f) { fclose(f); f = NULL; } #define Futex(a,o,v,t) syscall(SYS_futex, a, o, v, t, NULL, 0) #define Tremohold(p,r) if(p->m_bHold) r->m_lState = RESOURCE_ABLE; /************************************************************************************************* - 错误码定义区 + errno *************************************************************************************************/ #define TVM_DONE_SUCC 0 // completed successfully #define SVR_EXCEPTION 1 // sever exception @@ -305,7 +309,7 @@ typedef long CREATE; #define MQUE_WAIT_TMO 97 // queue waiting for timeout #define MQUE_WAIT_ERR 98 // queue waiting for failure #define MQUE_CRTE_BIG 99 // created queue is too big -#define NOT_SUPPT_OPT 100 // table does not support this operation +#define NOT_SUPPT_OPT 100 // queue does not support this operation /************************************************************************************************* 创建表宏函数 @@ -332,13 +336,13 @@ typedef long CREATE; /************************************************************************************************* Field assignment *************************************************************************************************/ -#define defineinit(p,s,t) do{ \ +#define conditbind(p,v,t) do{ \ p->stCond.uFldcmp = 0; \ p->stUpdt.uFldcmp = 0; \ p->lFind = 0; \ p->tblName = t; \ - p->lSize = sizeof(s); \ - p->pstVoid = (void *)&(s); \ + p->lSize = sizeof(v); \ + p->pstVoid = (void *)&(v); \ }while(0); #define conditinit(p,s,t) do{ \ @@ -367,14 +371,11 @@ typedef long CREATE; p->pstVoid = (void *)&v; \ }while(0); -#define queuerset(p,v,l,t) do{ \ +#define queuerbind(p,v,l,t) do{ \ p->lSize = l; \ p->tblName = t; \ p->pstVoid = (void *)v; \ }while(0); - -#define queuebind(p,v,l,t) queuerset(p,v,sizeof(l),t) - #define stringsetv(p,s,f,...) vSetCodField(&p->stCond, sizeof((s).f), (char *)(s).f - (char *)&(s)); \ snprintf((s).f, sizeof((s).f), __VA_ARGS__); @@ -396,7 +397,6 @@ typedef long CREATE; #define numberreset(s,f,v) (s).f = v; #define conditset(p,s,f) vSetCodField(&p->stCond, sizeof((s).f), (char *)&(s).f - (char *)&(s)); -#define conditbind defineinit #define conditfld conditset #define conditnum numberset #define conditstr stringset @@ -521,7 +521,7 @@ typedef struct __SQL_FIELD }SQLFld; /************************************************************************************************* - TVM engine starts the required table (do not move) + TVM engine starts the required table (Warn: do not modify) *************************************************************************************************/ typedef struct __SYS_TVM_INDEX { @@ -821,6 +821,7 @@ extern long lSelectSeque(SATvm *pstSavm, char *pszSQName, ulong *pulNumbe extern long lSetSequence(SATvm *pstSavm, char *pszSQName, ulong uStart); extern long lCustomTable(SATvm *pstSavm, TABLE t, size_t lRow, TblDef *pstDef); extern long lCreateTable(SATvm *pstSavm, TABLE t, size_t lRow, TCREATE pfCreateFunc); +extern long lTableQueue(SATvm *pstSavm, TABLE t, size_t lRow, TCREATE pfCreateFunc); extern long lCircleQueue(SATvm *pstSavm, TABLE t, size_t lRow, size_t lSize, char *p, char *n); extern long lInsertTrans(SATvm *pstSavm, size_t *plOffset, llSEQ *pllSeq); @@ -839,7 +840,7 @@ extern long lQuery(SATvm *pstSavm, size_t *plOut, void **ppsvOut); // queue interface extern long lPopup(SATvm *pstSavm, size_t lExpect, time_t lTime, size_t *plOut, void **ppsvOut); -extern long lPop(SATvm *pstSavm, void *pvOut); +extern long lPop(SATvm *pstSavm, void *pvOut, Uenum eWait); extern long lPushs(SATvm *pstSavm, size_t *plOut, void **ppsvOut); extern long lPush(SATvm *pstSavm); diff --git a/src/queue.c b/src/queue.c index 40ae02e..2c89db7 100644 --- a/src/queue.c +++ b/src/queue.c @@ -80,16 +80,16 @@ long _lPush(SATvm *pstSavm, void *pvAddr) RC_SUCC --success RC_FAIL --failure *************************************************************************************************/ -long _lPopup(SATvm *pstSavm, void *pvAddr, void *pvOut) +long _lPopup(SATvm *pstSavm, void *pvAddr, void *pvOut, Timesp *tm) { - int nPos, lRet; + int nPos, lRet; SHTruck *ps = NULL; TblDef *pv = (TblDef *)pvAddr; errno = 0; while(1) { - Futex(&pv->m_lValid, FUTEX_WAIT, 0, NULL); + Futex(&pv->m_lValid, FUTEX_WAIT, 0, tm); if(EWOULDBLOCK != errno && 0 != errno) { pstSavm->m_lErrno = MQUE_WAIT_ERR; @@ -109,10 +109,10 @@ long _lPopup(SATvm *pstSavm, void *pvAddr, void *pvOut) } /* at least cost one vaild */ - if(pv->m_lMaxRow > (nPos = __sync_add_and_fetch(&pv->m_lExSeQ, 1))) + if(pv->m_lMaxRow > (nPos = __sync_add_and_fetch(&pv->m_lListOfs, 1))) ; else if(0 == (nPos = nPos % pv->m_lMaxRow)) - __sync_sub_and_fetch(&pv->m_lExSeQ, pv->m_lMaxRow); + __sync_sub_and_fetch(&pv->m_lListOfs, pv->m_lMaxRow); ps = (PSHTruck)pGetNode(pvAddr, pv->m_lData + pv->m_lTruck * nPos); if(IS_TRUCK_NULL(ps)) @@ -184,10 +184,10 @@ long _lPops(SATvm *pstSavm, void *pvAddr, size_t lExpect, Timesp *tm, size_t } /* at least cost one vaild */ - if(pv->m_lMaxRow > (nPos = __sync_add_and_fetch(&pv->m_lExSeQ, 1))) + if(pv->m_lMaxRow > (nPos = __sync_add_and_fetch(&pv->m_lListOfs, 1))) ; else if(0 == (nPos = nPos % pv->m_lMaxRow)) - __sync_sub_and_fetch(&pv->m_lExSeQ, pv->m_lMaxRow); + __sync_sub_and_fetch(&pv->m_lListOfs, pv->m_lMaxRow); ps = (PSHTruck)pGetNode(pvAddr, pv->m_lData + pv->m_lTruck * nPos); if(IS_TRUCK_NULL(ps)) @@ -217,10 +217,11 @@ long _lPops(SATvm *pstSavm, void *pvAddr, size_t lExpect, Timesp *tm, size_t RC_SUCC --success RC_FAIL --failure *************************************************************************************************/ -long lPop(SATvm *pstSavm, void *pvOut) +long lPop(SATvm *pstSavm, void *pvOut, Uenum eWait) { long lRet; RunTime *pstRun = NULL; + static Timesp tm = {0, 1}; if(!pstSavm) { @@ -244,7 +245,10 @@ long lPop(SATvm *pstSavm, void *pvOut) return _lPopByRt(pstSavm, pvOut); } - lRet = _lPopup(pstSavm, pstRun->m_pvAddr, pvOut); + if(QUE_NOWAIT == eWait) + lRet = _lPopup(pstSavm, pstRun->m_pvAddr, pvOut, &tm); + else + lRet = _lPopup(pstSavm, pstRun->m_pvAddr, pvOut, NULL); vTblDisconnect(pstSavm, pstSavm->tblName); return lRet; } diff --git a/src/stvm.c b/src/stvm.c index b04dc2b..1fae8aa 100644 --- a/src/stvm.c +++ b/src/stvm.c @@ -4251,6 +4251,7 @@ void vSetHistory() void vCustomization(SATvm *pstSavm, char *s) { sltrim(s); + srtrim(s); if(!strcasecmp(s, "debug on")) g_stCustom.m_eDebug = 1; diff --git a/src/tcp.c b/src/tcp.c index 018a3e4..fd9295d 100644 --- a/src/tcp.c +++ b/src/tcp.c @@ -1445,7 +1445,7 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat TFree(pvOut); return RC_SUCC; case OPERATE_QUEPOP: - if(RC_SUCC != lPop(pstSavm, (void *)pvData)) + if(RC_SUCC != lPop(pstSavm, (void *)pvData, pstFace->m_lFind)) { pstFace->m_lErrno = pstSavm->m_lErrno; lData = sizeof(TFace); @@ -3187,7 +3187,7 @@ long _lPopupByRt(SATvm *pstSavm, size_t lExpect, time_t lTime, size_t *plOut, RC_SUCC --success RC_FAIL --failure *************************************************************************************************/ -long _lPopByRt(SATvm *pstSavm, void *psvOut) +long _lPopByRt(SATvm *pstSavm, void *psvOut, Uenum eWait) { long lRet; TDomain *pvm, *pnoe; @@ -3218,7 +3218,7 @@ long _lPopByRt(SATvm *pstSavm, void *psvOut) pstSavm->m_skSock = pvm->m_skSock; pstSavm->tblName = pnoe->m_mtable; pthread_mutex_lock(&list->pstFset->lock); - lRet = lTvmPop(pstSavm, psvOut); + lRet = lTvmPop(pstSavm, psvOut, eWait); if(RC_SUCC == lRet || SOCK_COM_EXCP != pstSavm->m_lErrno) { pvm->m_lTryTimes = 0; @@ -3242,7 +3242,7 @@ long _lPopByRt(SATvm *pstSavm, void *psvOut) continue; pstSavm->tblName = pvm->m_mtable; - lRet = lTvmPop(pstSavm, psvOut); + lRet = lTvmPop(pstSavm, psvOut, eWait); if(RC_SUCC == lRet || SOCK_COM_EXCP != pstSavm->m_lErrno) { close(pstSavm->m_skSock); @@ -4443,7 +4443,7 @@ long lTvmPopup(SATvm *pstSavm, size_t lExpect, time_t lTime, size_t *plOut, v RC_SUCC --success RC_FAIL --failure *************************************************************************************************/ -long lTvmPop(SATvm *pstSavm, void *pvOut) +long lTvmPop(SATvm *pstSavm, void *pvOut, Uenum eWait) { RunTime *pstRun; TFace *pstFace; @@ -4463,6 +4463,7 @@ long lTvmPop(SATvm *pstSavm, void *pvOut) pstFace = (TFace *)pstRun->pstVoid; pstFace->m_lRows = 0; + pstFace->m_lFind = eWait; pstFace->m_lDLen = pstSavm->lSize; pstFace->m_lErrno = TVM_DONE_SUCC; pstFace->m_enum = OPERATE_QUEPOP; diff --git a/src/tree.c b/src/tree.c index 62344d6..c4061ff 100644 --- a/src/tree.c +++ b/src/tree.c @@ -28,6 +28,8 @@ SATvm g_stSavm = {0}; TblDef g_stTblDef[TVM_MAX_TABLE] = {0}; +extern long _lPush(SATvm *pstSavm, void *pvAddr); +extern long _lPopByRt(SATvm *pstSavm, void *psvOut, Uenum eWait); extern long _lInsertByRt(SATvm *pstSavm); extern long _lGroupByRt(SATvm *pstSavm, size_t *plOut, void **ppvOut); extern long _lSelectByRt(SATvm *pstSavm, void *psvOut); @@ -150,7 +152,7 @@ static char tvmerr[128][MAX_INDEX_LEN] = { "queue waiting for timeout", "queue waiting for failure", "created queue is too big", - "table does not support this operation", + "queue does not support this operation", "", }; @@ -6559,12 +6561,12 @@ long __lInsert(SATvm *pstSavm, RunTime *pstRun, TABLE t, ulong uTimes) return RC_FAIL; } - if(TYPE_MQUEUE == pstRun->m_lType) - return _lPush(pstSavm, pstRun->m_pvAddr); - if(FIELD_INCR & pstSavm->lFind) vIncrease(&pstSavm->stUpdt, (char *)pstSavm->pstVoid, (TblDef *)pstRun->m_pvAddr); + if(TYPE_MQUEUE == pstRun->m_lType) + return _lPush(pstSavm, pstRun->m_pvAddr); + if(HAVE_UNIQ_IDX(t)) { if(RC_SUCC != _lInsertIndex(pstSavm, pstRun->m_pvAddr, t, &pstTruck)) @@ -8066,6 +8068,15 @@ long lUpdate(SATvm *pstSavm, void *pvUpdate) if(NULL == (pstRun = (RunTime *)pInitMemTable(pstSavm, pstSavm->tblName))) return RC_FAIL; +/* + if(TYPE_MQUEUE == pstRun->m_lType) + { + pstSavm->m_lErrno = NOT_SUPPT_OPT; + vTblDisconnect(pstSavm, pstSavm->tblName); + return RC_FAIL; + } +*/ + if(RES_REMOT_SID == pstRun->m_lLocal) { Tremohold(pstSavm, pstRun); @@ -8897,13 +8908,6 @@ long lDropTable(SATvm *pstSavm, TABLE t) conditnum(pstSavm, stIndex, m_table, t) if(RC_SUCC != lDelete(pstSavm)) return RC_FAIL; - if(TYPE_MQUEUE == pstRun->m_lType) - { - memset(pstRun, 0, sizeof(RunTime)); - pstSavm->m_lEffect = 1; - return RC_SUCC; - } - // Delete the field table if(RC_SUCC != lInitSATvm(pstSavm, SYS_TVM_FIELD)) return RC_FAIL;