fix queue

pull/1/head
deffpuzzl 2018-06-07 15:20:53 +08:00
parent 382af22d1f
commit 7332f1593e
20 changed files with 62 additions and 100 deletions

Binary file not shown.

Binary file not shown.

View File

@ -1,52 +0,0 @@
#include "tvm.h"
#include "tmain.h"
#define TBL_USER_INFO 20
typedef struct __TBL_USER_INFO
{
long acct_id;
char user_no[21];
char user_type[2];
char user_nm[81];
char user_addr[161];
char user_phone[31];
}dbUser;
CREATE lCreateUserInfo()
{
DEFINE(TBL_USER_INFO, "", dbUser)
FIELD(dbUser, acct_id, FIELD_LONG)
FIELD(dbUser, user_no, FIELD_CHAR)
FIELD(dbUser, user_type, FIELD_CHAR)
FIELD(dbUser, user_nm, FIELD_CHAR)
FIELD(dbUser, user_addr, FIELD_CHAR)
FIELD(dbUser, user_phone, FIELD_CHAR)
CREATE_IDX(NORMAL) // 创建查询索引
IDX_FIELD(dbUser, acct_id, FIELD_LONG)
CREATE_IDX(UNQIUE) // 创建唯一索引
IDX_FIELD(dbUser, user_no, FIELD_CHAR)
IDX_FIELD(dbUser, user_type, FIELD_CHAR)
FINISH
}
int main(int argc, char *argv[])
{
SATvm *pstSavm = (SATvm *)pGetSATvm();
if(RC_SUCC != lCreateTable(pstSavm, TBL_USER_INFO, 1000, lCreateUserInfo))
{
fprintf(stderr, "create table %d failed, err: %s\n", TBL_USER_INFO, sGetTError(pstSavm->m_lErrno));
return RC_FAIL;
}
fprintf(stdout, "初始化表成功, completed successfully!!!\n");
fflush(stderr);
return RC_SUCC;
}

Binary file not shown.

BIN
demo/drop

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -7,8 +7,8 @@ OUTLIB=../lib
OUTBIN=../bin
OBJFILE=tree.o sem.o msg.o tcp.o str.o list.o conf.o
CREATE=create
QUEUE=queue
CREATE=create_table
QUEUE=create_queue
PUSH=push
POP=pop
INSERT=insert
@ -23,10 +23,11 @@ TRUNCATE=truncate
DROP=drop
CLICK=click
REPLACE=replace
POPUP=popup
PRESSURE=press_demo
all: $(CREATE) $(INSERT) $(QUEUE) $(PUSH) $(POP) $(SELECT) $(QUERY) $(DELETE) $(UPDATE) $(COUNT) $(GROUP) $(EXTREME) $(TRUNCATE) $(DROP) $(PRESSURE) $(CLICK) $(REPLACE) clean
$(CREATE): create.o
all: $(CREATE) $(INSERT) $(QUEUE) $(PUSH) $(POP) $(SELECT) $(QUERY) $(DELETE) $(UPDATE) $(COUNT) $(GROUP) $(EXTREME) $(TRUNCATE) $(DROP) $(PRESSURE) $(CLICK) $(POPUP) $(REPLACE) clean
$(CREATE): create_table.o
$(CC) -o $@ $< $(LIBDIR)
$(PUSH): push.o
$(CC) -o $@ $< $(LIBDIR)
@ -56,7 +57,9 @@ $(CLICK): click.o
$(CC) -o $@ $< $(LIBDIR)
$(REPLACE): replace.o
$(CC) -o $@ $< $(LIBDIR)
$(QUEUE): queue.o
$(QUEUE): create_queue.o
$(CC) -o $@ $< $(LIBDIR)
$(POPUP): popup.o
$(CC) -o $@ $< $(LIBDIR)
$(PRESSURE): press_demo.o
$(CC) -o $@ $< $(LIBDIR)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -144,7 +144,7 @@ extern long lAsyReplace(SATvm *pstSavm, void *pvData);
// queue interface
extern long lTvmPush(SATvm *pstSavm);
extern long lTvmPop(SATvm *pstSavm, void *pvOut);
extern long lTvmPop(SATvm *pstSavm, void *pvOut, Uenum eWait);
extern long lTvmPopup(SATvm *pstSavm, size_t lExpect, time_t lTime, size_t *plOut, void **ppsvOut);
#ifdef __cplusplus
}

View File

@ -51,6 +51,10 @@ typedef long CREATE;
#define FIELD_LONG 2
#define FIELD_CHAR 1
#define QUE_NOWAIT 1
#define QUE_NORMAL 0
// execution plan
#define EXE_PLAN_ALL 0
#define EXE_PLAN_IDX 1
@ -158,7 +162,7 @@ typedef long CREATE;
#define TYPE_INCORE 0x02
#define TYPE_CLIENT 0x03 // custom
#define TYPE_KEYVAL 0x04
#define TYPE_MQUEUE 0x05 // custom
#define TYPE_MQUEUE 0x05
#define TVM_NODE_INFO "localhost"
#define TVM_RUNCFG_TAG "\x01\x33\xC8\x48"
@ -203,7 +207,7 @@ typedef long CREATE;
#define Tremohold(p,r) if(p->m_bHold) r->m_lState = RESOURCE_ABLE;
/*************************************************************************************************
errno
*************************************************************************************************/
#define TVM_DONE_SUCC 0 // completed successfully
#define SVR_EXCEPTION 1 // sever exception
@ -305,7 +309,7 @@ typedef long CREATE;
#define MQUE_WAIT_TMO 97 // queue waiting for timeout
#define MQUE_WAIT_ERR 98 // queue waiting for failure
#define MQUE_CRTE_BIG 99 // created queue is too big
#define NOT_SUPPT_OPT 100 // table does not support this operation
#define NOT_SUPPT_OPT 100 // queue does not support this operation
/*************************************************************************************************
@ -332,13 +336,13 @@ typedef long CREATE;
/*************************************************************************************************
Field assignment
*************************************************************************************************/
#define defineinit(p,s,t) do{ \
#define conditbind(p,v,t) do{ \
p->stCond.uFldcmp = 0; \
p->stUpdt.uFldcmp = 0; \
p->lFind = 0; \
p->tblName = t; \
p->lSize = sizeof(s); \
p->pstVoid = (void *)&(s); \
p->lSize = sizeof(v); \
p->pstVoid = (void *)&(v); \
}while(0);
#define conditinit(p,s,t) do{ \
@ -367,14 +371,11 @@ typedef long CREATE;
p->pstVoid = (void *)&v; \
}while(0);
#define queuerset(p,v,l,t) do{ \
#define queuerbind(p,v,l,t) do{ \
p->lSize = l; \
p->tblName = t; \
p->pstVoid = (void *)v; \
}while(0);
#define queuebind(p,v,l,t) queuerset(p,v,sizeof(l),t)
#define stringsetv(p,s,f,...) vSetCodField(&p->stCond, sizeof((s).f), (char *)(s).f - (char *)&(s)); \
snprintf((s).f, sizeof((s).f), __VA_ARGS__);
@ -396,7 +397,6 @@ typedef long CREATE;
#define numberreset(s,f,v) (s).f = v;
#define conditset(p,s,f) vSetCodField(&p->stCond, sizeof((s).f), (char *)&(s).f - (char *)&(s));
#define conditbind defineinit
#define conditfld conditset
#define conditnum numberset
#define conditstr stringset
@ -521,7 +521,7 @@ typedef struct __SQL_FIELD
}SQLFld;
/*************************************************************************************************
TVM engine starts the required table (do not move)
TVM engine starts the required table (Warn: do not modify)
*************************************************************************************************/
typedef struct __SYS_TVM_INDEX
{
@ -821,6 +821,7 @@ extern long lSelectSeque(SATvm *pstSavm, char *pszSQName, ulong *pulNumbe
extern long lSetSequence(SATvm *pstSavm, char *pszSQName, ulong uStart);
extern long lCustomTable(SATvm *pstSavm, TABLE t, size_t lRow, TblDef *pstDef);
extern long lCreateTable(SATvm *pstSavm, TABLE t, size_t lRow, TCREATE pfCreateFunc);
extern long lTableQueue(SATvm *pstSavm, TABLE t, size_t lRow, TCREATE pfCreateFunc);
extern long lCircleQueue(SATvm *pstSavm, TABLE t, size_t lRow, size_t lSize, char *p, char *n);
extern long lInsertTrans(SATvm *pstSavm, size_t *plOffset, llSEQ *pllSeq);
@ -839,7 +840,7 @@ extern long lQuery(SATvm *pstSavm, size_t *plOut, void **ppsvOut);
// queue interface
extern long lPopup(SATvm *pstSavm, size_t lExpect, time_t lTime, size_t *plOut, void **ppsvOut);
extern long lPop(SATvm *pstSavm, void *pvOut);
extern long lPop(SATvm *pstSavm, void *pvOut, Uenum eWait);
extern long lPushs(SATvm *pstSavm, size_t *plOut, void **ppsvOut);
extern long lPush(SATvm *pstSavm);

View File

@ -80,7 +80,7 @@ long _lPush(SATvm *pstSavm, void *pvAddr)
RC_SUCC --success
RC_FAIL --failure
*************************************************************************************************/
long _lPopup(SATvm *pstSavm, void *pvAddr, void *pvOut)
long _lPopup(SATvm *pstSavm, void *pvAddr, void *pvOut, Timesp *tm)
{
int nPos, lRet;
SHTruck *ps = NULL;
@ -89,7 +89,7 @@ long _lPopup(SATvm *pstSavm, void *pvAddr, void *pvOut)
errno = 0;
while(1)
{
Futex(&pv->m_lValid, FUTEX_WAIT, 0, NULL);
Futex(&pv->m_lValid, FUTEX_WAIT, 0, tm);
if(EWOULDBLOCK != errno && 0 != errno)
{
pstSavm->m_lErrno = MQUE_WAIT_ERR;
@ -109,10 +109,10 @@ long _lPopup(SATvm *pstSavm, void *pvAddr, void *pvOut)
}
/* at least cost one vaild */
if(pv->m_lMaxRow > (nPos = __sync_add_and_fetch(&pv->m_lExSeQ, 1)))
if(pv->m_lMaxRow > (nPos = __sync_add_and_fetch(&pv->m_lListOfs, 1)))
;
else if(0 == (nPos = nPos % pv->m_lMaxRow))
__sync_sub_and_fetch(&pv->m_lExSeQ, pv->m_lMaxRow);
__sync_sub_and_fetch(&pv->m_lListOfs, pv->m_lMaxRow);
ps = (PSHTruck)pGetNode(pvAddr, pv->m_lData + pv->m_lTruck * nPos);
if(IS_TRUCK_NULL(ps))
@ -184,10 +184,10 @@ long _lPops(SATvm *pstSavm, void *pvAddr, size_t lExpect, Timesp *tm, size_t
}
/* at least cost one vaild */
if(pv->m_lMaxRow > (nPos = __sync_add_and_fetch(&pv->m_lExSeQ, 1)))
if(pv->m_lMaxRow > (nPos = __sync_add_and_fetch(&pv->m_lListOfs, 1)))
;
else if(0 == (nPos = nPos % pv->m_lMaxRow))
__sync_sub_and_fetch(&pv->m_lExSeQ, pv->m_lMaxRow);
__sync_sub_and_fetch(&pv->m_lListOfs, pv->m_lMaxRow);
ps = (PSHTruck)pGetNode(pvAddr, pv->m_lData + pv->m_lTruck * nPos);
if(IS_TRUCK_NULL(ps))
@ -217,10 +217,11 @@ long _lPops(SATvm *pstSavm, void *pvAddr, size_t lExpect, Timesp *tm, size_t
RC_SUCC --success
RC_FAIL --failure
*************************************************************************************************/
long lPop(SATvm *pstSavm, void *pvOut)
long lPop(SATvm *pstSavm, void *pvOut, Uenum eWait)
{
long lRet;
RunTime *pstRun = NULL;
static Timesp tm = {0, 1};
if(!pstSavm)
{
@ -244,7 +245,10 @@ long lPop(SATvm *pstSavm, void *pvOut)
return _lPopByRt(pstSavm, pvOut);
}
lRet = _lPopup(pstSavm, pstRun->m_pvAddr, pvOut);
if(QUE_NOWAIT == eWait)
lRet = _lPopup(pstSavm, pstRun->m_pvAddr, pvOut, &tm);
else
lRet = _lPopup(pstSavm, pstRun->m_pvAddr, pvOut, NULL);
vTblDisconnect(pstSavm, pstSavm->tblName);
return lRet;
}

View File

@ -4251,6 +4251,7 @@ void vSetHistory()
void vCustomization(SATvm *pstSavm, char *s)
{
sltrim(s);
srtrim(s);
if(!strcasecmp(s, "debug on"))
g_stCustom.m_eDebug = 1;

View File

@ -1445,7 +1445,7 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat
TFree(pvOut);
return RC_SUCC;
case OPERATE_QUEPOP:
if(RC_SUCC != lPop(pstSavm, (void *)pvData))
if(RC_SUCC != lPop(pstSavm, (void *)pvData, pstFace->m_lFind))
{
pstFace->m_lErrno = pstSavm->m_lErrno;
lData = sizeof(TFace);
@ -3187,7 +3187,7 @@ long _lPopupByRt(SATvm *pstSavm, size_t lExpect, time_t lTime, size_t *plOut,
RC_SUCC --success
RC_FAIL --failure
*************************************************************************************************/
long _lPopByRt(SATvm *pstSavm, void *psvOut)
long _lPopByRt(SATvm *pstSavm, void *psvOut, Uenum eWait)
{
long lRet;
TDomain *pvm, *pnoe;
@ -3218,7 +3218,7 @@ long _lPopByRt(SATvm *pstSavm, void *psvOut)
pstSavm->m_skSock = pvm->m_skSock;
pstSavm->tblName = pnoe->m_mtable;
pthread_mutex_lock(&list->pstFset->lock);
lRet = lTvmPop(pstSavm, psvOut);
lRet = lTvmPop(pstSavm, psvOut, eWait);
if(RC_SUCC == lRet || SOCK_COM_EXCP != pstSavm->m_lErrno)
{
pvm->m_lTryTimes = 0;
@ -3242,7 +3242,7 @@ long _lPopByRt(SATvm *pstSavm, void *psvOut)
continue;
pstSavm->tblName = pvm->m_mtable;
lRet = lTvmPop(pstSavm, psvOut);
lRet = lTvmPop(pstSavm, psvOut, eWait);
if(RC_SUCC == lRet || SOCK_COM_EXCP != pstSavm->m_lErrno)
{
close(pstSavm->m_skSock);
@ -4443,7 +4443,7 @@ long lTvmPopup(SATvm *pstSavm, size_t lExpect, time_t lTime, size_t *plOut, v
RC_SUCC --success
RC_FAIL --failure
*************************************************************************************************/
long lTvmPop(SATvm *pstSavm, void *pvOut)
long lTvmPop(SATvm *pstSavm, void *pvOut, Uenum eWait)
{
RunTime *pstRun;
TFace *pstFace;
@ -4463,6 +4463,7 @@ long lTvmPop(SATvm *pstSavm, void *pvOut)
pstFace = (TFace *)pstRun->pstVoid;
pstFace->m_lRows = 0;
pstFace->m_lFind = eWait;
pstFace->m_lDLen = pstSavm->lSize;
pstFace->m_lErrno = TVM_DONE_SUCC;
pstFace->m_enum = OPERATE_QUEPOP;

View File

@ -28,6 +28,8 @@
SATvm g_stSavm = {0};
TblDef g_stTblDef[TVM_MAX_TABLE] = {0};
extern long _lPush(SATvm *pstSavm, void *pvAddr);
extern long _lPopByRt(SATvm *pstSavm, void *psvOut, Uenum eWait);
extern long _lInsertByRt(SATvm *pstSavm);
extern long _lGroupByRt(SATvm *pstSavm, size_t *plOut, void **ppvOut);
extern long _lSelectByRt(SATvm *pstSavm, void *psvOut);
@ -150,7 +152,7 @@ static char tvmerr[128][MAX_INDEX_LEN] = {
"queue waiting for timeout",
"queue waiting for failure",
"created queue is too big",
"table does not support this operation",
"queue does not support this operation",
"",
};
@ -6559,12 +6561,12 @@ long __lInsert(SATvm *pstSavm, RunTime *pstRun, TABLE t, ulong uTimes)
return RC_FAIL;
}
if(TYPE_MQUEUE == pstRun->m_lType)
return _lPush(pstSavm, pstRun->m_pvAddr);
if(FIELD_INCR & pstSavm->lFind)
vIncrease(&pstSavm->stUpdt, (char *)pstSavm->pstVoid, (TblDef *)pstRun->m_pvAddr);
if(TYPE_MQUEUE == pstRun->m_lType)
return _lPush(pstSavm, pstRun->m_pvAddr);
if(HAVE_UNIQ_IDX(t))
{
if(RC_SUCC != _lInsertIndex(pstSavm, pstRun->m_pvAddr, t, &pstTruck))
@ -8066,6 +8068,15 @@ long lUpdate(SATvm *pstSavm, void *pvUpdate)
if(NULL == (pstRun = (RunTime *)pInitMemTable(pstSavm, pstSavm->tblName)))
return RC_FAIL;
/*
if(TYPE_MQUEUE == pstRun->m_lType)
{
pstSavm->m_lErrno = NOT_SUPPT_OPT;
vTblDisconnect(pstSavm, pstSavm->tblName);
return RC_FAIL;
}
*/
if(RES_REMOT_SID == pstRun->m_lLocal)
{
Tremohold(pstSavm, pstRun);
@ -8897,13 +8908,6 @@ long lDropTable(SATvm *pstSavm, TABLE t)
conditnum(pstSavm, stIndex, m_table, t)
if(RC_SUCC != lDelete(pstSavm)) return RC_FAIL;
if(TYPE_MQUEUE == pstRun->m_lType)
{
memset(pstRun, 0, sizeof(RunTime));
pstSavm->m_lEffect = 1;
return RC_SUCC;
}
// Delete the field table
if(RC_SUCC != lInitSATvm(pstSavm, SYS_TVM_FIELD))
return RC_FAIL;