diff --git a/demo/count b/demo/count index c1076df..20b45d1 100755 Binary files a/demo/count and b/demo/count differ diff --git a/demo/create b/demo/create index f5f38a4..4f0ba81 100755 Binary files a/demo/create and b/demo/create differ diff --git a/demo/delete b/demo/delete index d772f1b..344cd47 100755 Binary files a/demo/delete and b/demo/delete differ diff --git a/demo/drop b/demo/drop index 844ebdb..35e5d75 100755 Binary files a/demo/drop and b/demo/drop differ diff --git a/demo/extreme b/demo/extreme index e5227b4..b7c580c 100755 Binary files a/demo/extreme and b/demo/extreme differ diff --git a/demo/group b/demo/group index 91389a2..b936525 100755 Binary files a/demo/group and b/demo/group differ diff --git a/demo/insert b/demo/insert index d4c42b5..324cfc3 100755 Binary files a/demo/insert and b/demo/insert differ diff --git a/demo/makefile b/demo/makefile index d03c0e0..b9e656c 100755 --- a/demo/makefile +++ b/demo/makefile @@ -1,5 +1,6 @@ INCDIR= -I/usr/include -I$(HOME)/include -I./ -I./include -I../include -LIBDIR= -L$(HOME)/lib -L../lib -lstvm -lm -lc -ldl -lpthread +#LIBDIR= -L$(HOME)/lib -L../lib -lm -lc -ldl -lpthread -static -lstvm +LIBDIR= -L$(HOME)/lib -L../lib -lstvm -lm -lc -ldl -lpthread CC=cc -fPIC -g CO=-c -pg OUTLIB=../lib @@ -7,6 +8,9 @@ OUTBIN=../bin OBJFILE=tree.o sem.o msg.o tcp.o str.o list.o conf.o CREATE=create +QUEUE=queue +PUSH=push +POP=pop INSERT=insert SELECT=select QUERY=query @@ -21,9 +25,13 @@ CLICK=click REPLACE=replace PRESSURE=press_demo -all: $(CREATE) $(INSERT) $(SELECT) $(QUERY) $(DELETE) $(UPDATE) $(COUNT) $(GROUP) $(EXTREME) $(TRUNCATE) $(DROP) $(PRESSURE) $(CLICK) $(REPLACE) clean +all: $(CREATE) $(INSERT) $(QUEUE) $(PUSH) $(POP) $(SELECT) $(QUERY) $(DELETE) $(UPDATE) $(COUNT) $(GROUP) $(EXTREME) $(TRUNCATE) $(DROP) $(PRESSURE) $(CLICK) $(REPLACE) clean $(CREATE): create.o $(CC) -o $@ $< $(LIBDIR) +$(PUSH): push.o + $(CC) -o $@ $< $(LIBDIR) +$(POP): pop.o + $(CC) -o $@ $< $(LIBDIR) $(INSERT): insert.o $(CC) -o $@ $< $(LIBDIR) $(SELECT): select.o @@ -48,6 +56,8 @@ $(CLICK): click.o $(CC) -o $@ $< $(LIBDIR) $(REPLACE): replace.o $(CC) -o $@ $< $(LIBDIR) +$(QUEUE): queue.o + $(CC) -o $@ $< $(LIBDIR) $(PRESSURE): press_demo.o $(CC) -o $@ $< $(LIBDIR) diff --git a/demo/press_demo b/demo/press_demo index fad8a68..5c0ebe4 100755 Binary files a/demo/press_demo and b/demo/press_demo differ diff --git a/demo/query b/demo/query index 94f7f89..0909d33 100755 Binary files a/demo/query and b/demo/query differ diff --git a/demo/select b/demo/select index 8fc7fba..0c90610 100755 Binary files a/demo/select and b/demo/select differ diff --git a/demo/truncate b/demo/truncate index 4650dec..26a9b8a 100755 Binary files a/demo/truncate and b/demo/truncate differ diff --git a/demo/update b/demo/update index 04ffe70..8f52533 100755 Binary files a/demo/update and b/demo/update differ diff --git a/include/tstr.h b/include/tstr.h index 344c7a6..998d9de 100644 --- a/include/tstr.h +++ b/include/tstr.h @@ -54,6 +54,7 @@ #include #include #include +#include #include #include #include diff --git a/include/tvm.h b/include/tvm.h index fc86da4..7fe7dda 100644 --- a/include/tvm.h +++ b/include/tvm.h @@ -25,6 +25,7 @@ typedef pthread_rwlock_t RWLock; typedef pthread_rwlockattr_t RWAttr; +typedef struct timespec Timesp; typedef unsigned int TABLE; typedef long long llSEQ; typedef long (*TCREATE)(TABLE t); @@ -154,6 +155,7 @@ typedef long CREATE; #define TYPE_INCORE 0x02 #define TYPE_CLIENT 0x03 // custom #define TYPE_KEYVAL 0x04 +#define TYPE_MQUEUE 0x05 // custom #define TVM_NODE_INFO "localhost" #define TVM_RUNCFG_TAG "\x01\x33\xC8\x48" @@ -189,10 +191,13 @@ typedef long CREATE; #define IS_TRUCK_NRML(p) ((p)->m_chTag == DATA_TRUCK_NRML) #define IS_TRUCK_LOCK(p) ((p)->m_chTag == DATA_TRUCK_LOCK) #define SET_DATA_TRUCK(p, type) ((p)->m_chTag = type) -#define TFree(p) if(p) { free(p); p = NULL; } -#define TFgrp(p) do{vDeleteRowgrp(p);p = NULL;}while(0); -#define TFlst(p) do{vDestroyList(p);p = NULL;}while(0); -#define TClose(f) if(f) { fclose(f); f = NULL; } +#define TFree(p) if(p) { free(p); p = NULL; } +#define TFgrp(p) do{vDeleteRowgrp(p);p = NULL;}while(0); +#define TFlst(p) do{vDestroyList(p);p = NULL;}while(0); +#define TClose(f) if(f) { fclose(f); f = NULL; } + +#define Futex(a,o,v,t) syscall(SYS_futex, a, o, v, t, NULL, 0) +#define Tremohold(p,r) if(p->m_bHold) r->m_lState = RESOURCE_ABLE; /************************************************************************************************* 错误码定义区 @@ -294,6 +299,10 @@ typedef long CREATE; #define EXTRE_SET_ERR 94 // extreme set decorate error #define GROUP_SET_ERR 95 // group set decorate error #define CMM_TABLE_MIS 96 // the table of field is missing +#define MQUE_WAIT_TMO 97 // queue waiting for timeout +#define MQUE_WAIT_ERR 98 // queue waiting for failure +#define MQUE_CRTE_BIG 99 // created queue is too big +#define NOT_SUPPT_OPT 100 // table does not support this operation /************************************************************************************************* 创建表宏函数 @@ -316,6 +325,7 @@ typedef long CREATE; return RC_FAIL; #define FINISH return RC_SUCC; +#define lCreateQueue(p,t,r,s,n) lCircleQueue(p, t, r, s, #t, n) /************************************************************************************************* Field assignment *************************************************************************************************/ @@ -497,7 +507,7 @@ typedef struct __SQL_FIELD typedef struct __SYS_TVM_INDEX { TABLE m_table; // table - long m_lType; // table type + uint m_lType; // table type char m_szTable[MAX_FIELD_LEN]; // table name char m_szPart[MAX_FIELD_LEN]; // partition name char m_szOwner[MAX_FIELD_LEN]; // owner @@ -588,6 +598,7 @@ typedef struct __TVM_RUNTIME void *pstVoid; uint m_lState; uint m_lLocal; + uint m_lType; long m_shmID; // Memory Key long m_semID; // semaphore key long m_lRowSize; // Record block size @@ -725,7 +736,7 @@ extern long lMakeConfig(char *pszFile); extern long lGetQueueNum(SATvm *pstSavm, long lQid); extern long lQueueMaxByte(SATvm *pstSavm, long lQid); extern long lQueueRcvTime(SATvm *pstSavm, long lQid); -extern long lCreateQueue(SATvm *pstSavm, bool bCreate); +extern long lCreateQuemsg(SATvm *pstSavm, bool bCreate); extern long lOperateSems(SATvm *pstSavm, long semID, long lSems, Benum evp); extern long lEventWrite(SATvm *pstSavm, long lQid, void *psvData, long lSize); extern long lCreateSems(SATvm *pstSavm, RunTime *pstRun, long lSems, long lValue); @@ -779,6 +790,7 @@ extern long lSelectSeque(SATvm *pstSavm, char *pszSQName, ulong *pulNumbe extern long lSetSequence(SATvm *pstSavm, char *pszSQName, ulong uStart); extern long lCustomTable(SATvm *pstSavm, TABLE t, size_t lRow, TblDef *pstDef); extern long lCreateTable(SATvm *pstSavm, TABLE t, size_t lRow, TCREATE pfCreateFunc); +extern long lCircleQueue(SATvm *pstSavm, TABLE t, size_t lRow, size_t lSize, char *p, char *n); extern long lInsertTrans(SATvm *pstSavm, size_t *plOffset, llSEQ *pllSeq); @@ -794,6 +806,11 @@ extern long lExtreme(SATvm *pstSavm, void *psvOut); extern long lGroup(SATvm *pstSavm, size_t *plOut, void **ppsvOut); extern long lQuery(SATvm *pstSavm, size_t *plOut, void **ppsvOut); +extern long lPops(SATvm *pstSavm, size_t lExpect, Timesp *tm, size_t *plOut, void **ppsvOut); +extern long lPop(SATvm *pstSavm, void *pvOut); +extern long lPushs(SATvm *pstSavm, size_t *plOut, void **ppsvOut); +extern long lPush(SATvm *pstSavm); + extern long lTableDeclare(SATvm *pstSavm); extern long lTableFetch(SATvm *pstSavm, void *psvOut); extern long lNextFetch(SATvm *pstSavm, void **ppvOAddr); diff --git a/src/detvm.c b/src/detvm.c index 74180f9..ba4647f 100644 --- a/src/detvm.c +++ b/src/detvm.c @@ -104,7 +104,8 @@ void vDebugTable(TABLE t, long eType) fprintf(stdout, "TABLE:%9u, extern:%10ld, NAME:%s\t\nSHTree:%8ld, SHList:%10ld, " "TblDef:%11ld\nGroup:%9ld, MaxRow:%10ld, Valid:%12ld\nlNodeNil:%6ld, lIType:%10d, " "Table:%12ld\nIdxLen:%8ld, TreePos:%9ld, TreeRoot:%9ld\nGrpLen:%8ld, GroupPos:%8ld, " - "GroupRoot:%8ld\nData:%10ld, ReSize:%10ld, Truck:%12ld\nListPos:%7ld, ListOfs:%9ld\n", + "GroupRoot:%8ld\nData:%10ld, ReSize:%10ld, Truck:%12ld\nListPos:%7ld, ListOfs:%9ld, " + "ExSeQ:%12ld\n", ((TblDef *)pGetTblDef(t))->m_table, ((TblDef *)pGetTblDef(t))->m_lExtern, ((TblDef *)pGetTblDef(t))->m_szTable, sizeof(SHTree), sizeof(SHList), sizeof(TblDef), ((TblDef *)pGetTblDef(t))->m_lGroup, ((TblDef *)pGetTblDef(t))->m_lMaxRow, @@ -115,7 +116,7 @@ void vDebugTable(TABLE t, long eType) ((TblDef *)pGetTblDef(t))->m_lGroupPos, ((TblDef *)pGetTblDef(t))->m_lGroupRoot, ((TblDef *)pGetTblDef(t))->m_lData, ((TblDef *)pGetTblDef(t))->m_lReSize, ((TblDef *)pGetTblDef(t))->m_lTruck, ((TblDef *)pGetTblDef(t))->m_lListPos, - ((TblDef *)pGetTblDef(t))->m_lListOfs); + ((TblDef *)pGetTblDef(t))->m_lListOfs, ((TblDef *)pGetTblDef(t))->m_lExSeQ); fprintf(stdout, "--------------------------------------------------------------------" "----------\n"); diff --git a/src/makefile b/src/makefile index 873758a..ee58f23 100755 --- a/src/makefile +++ b/src/makefile @@ -13,7 +13,7 @@ OUTBIN=../bin OBJFILE=tree.o sem.o msg.o tcp.o str.o list.o conf.o queue.o TARGET=$(OUTLIB)/libstvm.a -TARDLL=$(OUTLIB)/libstvm.so +#TARDLL=$(OUTLIB)/libstvm.so TARVER=$(OUTLIB)/libstvm.so.1.2 STVM=$(OUTBIN)/stvm DETVM=$(OUTBIN)/detvm diff --git a/src/msg.c b/src/msg.c index 2ee4e7f..5a55836 100644 --- a/src/msg.c +++ b/src/msg.c @@ -33,7 +33,7 @@ RC_SUCC --success RC_FAIL --failure *************************************************************************************************/ -long lCreateQueue(SATvm *pstSavm, bool bCreate) +long lCreateQuemsg(SATvm *pstSavm, bool bCreate) { long lQid; diff --git a/src/stvm.c b/src/stvm.c index d93e237..44224cc 100644 --- a/src/stvm.c +++ b/src/stvm.c @@ -3888,7 +3888,7 @@ long lExecuteSQL(SATvm *pstSavm, char *pszSQL) sGetTError(pstSavm->m_lErrno)); return RC_SUCC; } - else if(!strcasecmp(pszSQL, "show tables")) + else if(!strcasecmp(pszSQL, "show table")) return lShowTables(pstSavm); else if(!strcasecmp(pszSQL, "show info")) { diff --git a/src/tree.c b/src/tree.c index 3f34cd4..e90efce 100644 --- a/src/tree.c +++ b/src/tree.c @@ -43,12 +43,11 @@ extern long _lRenameTableByRt(SATvm *pstSavm, TABLE to, TABLE tn); /************************************************************************************************* macro *************************************************************************************************/ -#define Tremohold(p,r) if(p->m_bHold) r->m_lState = RESOURCE_ABLE; /************************************************************************************************* Error message definition *************************************************************************************************/ -static char tvmerr[100][MAX_INDEX_LEN] = { +static char tvmerr[128][MAX_INDEX_LEN] = { "completed successfully", "sever exception", "index field values is null", @@ -146,6 +145,10 @@ static char tvmerr[100][MAX_INDEX_LEN] = { "extreme set decorate error", "group set decorate error", "the table of field is missing", + "queue waiting for timeout", + "queue waiting for failure", + "created queue is too big", + "table does not support this operation", "", }; @@ -1099,7 +1102,11 @@ void vHoldRelease(SATvm *pstSavm) TFree(pstRun->pstVoid); if(pstRun->m_pvAddr) + { + if(TYPE_MQUEUE == pstRun->m_lType && ((TblDef *)pstRun->m_pvAddr)->m_lGroup > 0) + ((TblDef *)pstRun->m_pvAddr)->m_lGroup --; // process exit shmdt(pstRun->m_pvAddr); + } pstRun->m_pvAddr = NULL; pstRun->m_bAttch = false; } @@ -1133,7 +1140,11 @@ void _vTblRelease(SATvm *pstSavm, TABLE t, bool bHold) pstRun->m_pvCurAddr = NULL; if(pstRun->m_pvAddr) + { + if(TYPE_MQUEUE == pstRun->m_lType && ((TblDef *)pstRun->m_pvAddr)->m_lGroup > 0) + ((TblDef *)pstRun->m_pvAddr)->m_lGroup --; // process exit shmdt(pstRun->m_pvAddr); + } pstRun->m_pvAddr = NULL; pstRun->m_bAttch = false; } @@ -1575,7 +1586,9 @@ long lInitSATvm(SATvm *pstSavm, TABLE t) pstRun->m_shmID = stIndex.m_shmID; pstRun->m_semID = stIndex.m_semID; pstRun->m_lLocal = stIndex.m_lLocal; + pstRun->m_lType = stIndex.m_lType; pstRun->m_lRowSize = stIndex.m_lRowSize; + return RC_SUCC; } @@ -1835,7 +1848,8 @@ void* pInitMemTable(SATvm *pstSavm, TABLE t) } pstRun->m_bAttch = true; - + if(TYPE_MQUEUE == pstRun->m_lType) + ((TblDef *)pstRun->m_pvAddr)->m_lGroup ++; // process join memcpy((void *)pGetTblDef(t), pstRun->m_pvAddr, sizeof(TblDef)); if(pstSavm->lSize != lGetRowSize(t)) @@ -8044,7 +8058,8 @@ long lRegisterTable(SATvm *pstSavm, RunTime *pstRun, TABLE t, long lType) TIndex stIndex; TBoot *pstBoot = (TBoot *)pBootInitial(); - if(TYPE_CLIENT != lType) return RC_SUCC; + if(TYPE_SYSTEM == lType || TYPE_INCORE == lType) + return RC_SUCC; if(RC_SUCC != lInitSATvm(pstSavm, SYS_TVM_INDEX)) return RC_FAIL; @@ -8134,6 +8149,67 @@ long _lCustomTable(SATvm *pstSavm, TABLE t, size_t lRow, bool bCreate, long l return RC_SUCC; } +/************************************************************************************************* + description:create queue + parameters: + pstSavm --stvm handle + t --table + lRow --table maxrows + bCreate --create type + lType --table type + return: + RC_SUCC --success + RC_FAIL --failure + *************************************************************************************************/ +long _lCreateQueue(SATvm *pstSavm, TABLE t, size_t lRow, size_t lSize, char *pszTable, + char *pszNode, bool bCover) +{ + RWAttr attr; + RunTime *pstRun = NULL; + RWLock *prwLock = NULL; + + if(!pstSavm || lRow <= 0) + { + pstSavm->m_lErrno = CONDIT_IS_NIL; + return RC_FAIL; + } + + if((lRow >> (sizeof(int) * 8 - 1)) > 0) + { + pstSavm->m_lErrno = MQUE_CRTE_BIG; + return RC_FAIL; + } + + vInitTblDef(t); + pstSavm->tblName = t; + ((TblDef *)pGetTblDef(t))->m_lIType = bCover; + ((TblDef *)pGetTblDef(t))->m_table = t; + ((TblDef *)pGetTblDef(t))->m_lReSize = lSize; + ((TblDef *)pGetTblDef(t))->m_lTruck = lSize + sizeof(SHTruck); + strncpy(((TblDef *)pGetTblDef(t))->m_szPart, pszNode, MAX_FIELD_LEN); + strncpy(((TblDef *)pGetTblDef(t))->m_szTable, pszTable, MAX_FIELD_LEN); + ((TblDef *)pGetTblDef(t))->m_lTable = lInitialTable(t, lRow); + if(NULL == (pstRun = (RunTime *)pCreateBlock(pstSavm, t, ((TblDef *)pGetTblDef(t))->m_lTable, + false))) + return RC_FAIL; + + memcpy(pstRun->m_pvAddr, (void *)pGetTblDef(t), sizeof(TblDef)); + prwLock = (RWLock *)pGetRWLock(pstRun->m_pvAddr); + pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + pthread_rwlock_init(prwLock, &attr); + + memset(pstRun->m_pvAddr + lGetTblData(t), 0, lGetTableSize(t) - lGetTblData(t)); + vTblDisconnect(pstSavm, t); + + if(RC_SUCC != lRegisterTable(pstSavm, pstRun, t, TYPE_MQUEUE)) + { + shmctl(pstRun->m_shmID, IPC_RMID, NULL); + return RC_FAIL; + } + + return RC_SUCC; +} + /************************************************************************************************* description:create table parameters: @@ -8660,6 +8736,22 @@ long lCustomTable(SATvm *pstSavm, TABLE t, size_t lRow, TblDef *pstDef) return _lCustomTable(pstSavm, t, lRow, false, TYPE_CLIENT); } +/************************************************************************************************* + description:API - CreateQueue + parameters: + pstSavm --stvm handle + t --table + lRow --table maxrows + pfCreateFunc --table field define + return: + RC_SUCC --success + RC_FAIL --failure + *************************************************************************************************/ +long lCircleQueue(SATvm *pstSavm, TABLE t, size_t lRow, size_t lSize, char *pszTable, char *node) +{ + return _lCreateQueue(pstSavm, t, lRow, lSize, pszTable, node, false); +} + /************************************************************************************************* description:API - lDropTable parameters: @@ -8679,7 +8771,6 @@ long lDropTable(SATvm *pstSavm, TABLE t) pstSavm->bSearch = TYPE_SYSTEM; conditinit(pstSavm, stIndex, SYS_TVM_INDEX) conditnum(pstSavm, stIndex, m_table, t) - conditnum(pstSavm, stIndex, m_lType, TYPE_CLIENT) if(RC_SUCC != lSelect(pstSavm, (void *)&stIndex)) return RC_FAIL; @@ -8688,7 +8779,6 @@ long lDropTable(SATvm *pstSavm, TABLE t) { conditinit(pstSavm, stIndex, SYS_TVM_INDEX) conditnum(pstSavm, stIndex, m_table, t) - conditnum(pstSavm, stIndex, m_lType, TYPE_CLIENT) if(RC_SUCC != lDelete(pstSavm)) return RC_FAIL; @@ -8713,9 +8803,15 @@ long lDropTable(SATvm *pstSavm, TABLE t) conditinit(pstSavm, stIndex, SYS_TVM_INDEX) conditnum(pstSavm, stIndex, m_table, t) - conditnum(pstSavm, stIndex, m_lType, TYPE_CLIENT) if(RC_SUCC != lDelete(pstSavm)) return RC_FAIL; + if(TYPE_MQUEUE == pstRun->m_lType) + { + memset(pstRun, 0, sizeof(RunTime)); + pstSavm->m_lEffect = 1; + return RC_SUCC; + } + // Delete the field table if(RC_SUCC != lInitSATvm(pstSavm, SYS_TVM_FIELD)) return RC_FAIL; @@ -9132,7 +9228,14 @@ long lGetTblField(TABLE t, size_t *plOut, TField **ppstField) conditinit(pstSavm, stField, SYS_TVM_FIELD) conditnum(pstSavm, stField, m_table, t) - return lQuery(pstSavm, plOut, (void **)ppstField); + if(RC_SUCC != lQuery(pstSavm, plOut, (void **)ppstField)) + { + if(NO_DATA_FOUND == pstSavm->m_lErrno) + pstSavm->m_lErrno = FIELD_NOT_DEF; + return RC_FAIL; + } + + return RC_SUCC; } /*************************************************************************************************