add the queue network interface
parent
644cad7ef1
commit
21cf72b74a
|
@ -141,6 +141,11 @@ extern long lAsyInsert(SATvm *pstSavm);
|
||||||
extern long lAsyDelete(SATvm *pstSavm);
|
extern long lAsyDelete(SATvm *pstSavm);
|
||||||
extern long lAsyUpdate(SATvm *pstSavm, void *pvData);
|
extern long lAsyUpdate(SATvm *pstSavm, void *pvData);
|
||||||
extern long lAsyReplace(SATvm *pstSavm, void *pvData);
|
extern long lAsyReplace(SATvm *pstSavm, void *pvData);
|
||||||
|
|
||||||
|
// queue interface
|
||||||
|
extern long lTvmPush(SATvm *pstSavm);
|
||||||
|
extern long lTvmPop(SATvm *pstSavm, void *pvOut);
|
||||||
|
extern long lTvmPopup(SATvm *pstSavm, size_t lExpect, time_t lTime, size_t *plOut, void **ppsvOut);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -97,7 +97,10 @@ typedef long CREATE;
|
||||||
#define OPERATE_ROLWORK 34
|
#define OPERATE_ROLWORK 34
|
||||||
#define OPERATE_CMTWORK 35
|
#define OPERATE_CMTWORK 35
|
||||||
#define OPERATE_ENDWORK 36
|
#define OPERATE_ENDWORK 36
|
||||||
#define OPERATE_CLICK 37
|
#define OPERATE_CLICK 45
|
||||||
|
#define OPERATE_QUEPUSH 46
|
||||||
|
#define OPERATE_QUEPOP 47
|
||||||
|
#define OPERATE_QUEPOPS 48
|
||||||
#define OPERATE_EXEEXIT 99
|
#define OPERATE_EXEEXIT 99
|
||||||
|
|
||||||
#define OPERATE_DEFAULT (OPERATE_SELECT | OPERATE_UPDATE | OPERATE_DELETE | OPERATE_INSERT)
|
#define OPERATE_DEFAULT (OPERATE_SELECT | OPERATE_UPDATE | OPERATE_DELETE | OPERATE_INSERT)
|
||||||
|
@ -829,7 +832,8 @@ extern long lExtreme(SATvm *pstSavm, void *psvOut);
|
||||||
extern long lGroup(SATvm *pstSavm, size_t *plOut, void **ppsvOut);
|
extern long lGroup(SATvm *pstSavm, size_t *plOut, void **ppsvOut);
|
||||||
extern long lQuery(SATvm *pstSavm, size_t *plOut, void **ppsvOut);
|
extern long lQuery(SATvm *pstSavm, size_t *plOut, void **ppsvOut);
|
||||||
|
|
||||||
extern long lPops(SATvm *pstSavm, size_t lExpect, Timesp *tm, size_t *plOut, void **ppsvOut);
|
// queue interface
|
||||||
|
extern long lPopup(SATvm *pstSavm, size_t lExpect, time_t lTime, size_t *plOut, void **ppsvOut);
|
||||||
extern long lPop(SATvm *pstSavm, void *pvOut);
|
extern long lPop(SATvm *pstSavm, void *pvOut);
|
||||||
extern long lPushs(SATvm *pstSavm, size_t *plOut, void **ppsvOut);
|
extern long lPushs(SATvm *pstSavm, size_t *plOut, void **ppsvOut);
|
||||||
extern long lPush(SATvm *pstSavm);
|
extern long lPush(SATvm *pstSavm);
|
||||||
|
|
33
src/queue.c
33
src/queue.c
|
@ -24,6 +24,10 @@
|
||||||
/************************************************************************************************
|
/************************************************************************************************
|
||||||
function
|
function
|
||||||
************************************************************************************************/
|
************************************************************************************************/
|
||||||
|
extern long _lPushByRt(SATvm *pstSavm);
|
||||||
|
extern long _lPopByRt(SATvm *pstSavm, void *psvOut);
|
||||||
|
extern long _lPopupByRt(SATvm *pstSavm, size_t lExp, time_t lTime, size_t *plOut, void **pp);
|
||||||
|
|
||||||
/*************************************************************************************************
|
/*************************************************************************************************
|
||||||
description:push data to queue
|
description:push data to queue
|
||||||
parameters:
|
parameters:
|
||||||
|
@ -60,6 +64,7 @@ long _lPush(SATvm *pstSavm, void *pvAddr)
|
||||||
memcpy(ps->m_pvData, pstSavm->pstVoid, pv->m_lReSize);
|
memcpy(ps->m_pvData, pstSavm->pstVoid, pv->m_lReSize);
|
||||||
SET_DATA_TRUCK(ps, DATA_TRUCK_NRML);
|
SET_DATA_TRUCK(ps, DATA_TRUCK_NRML);
|
||||||
__sync_add_and_fetch(&pv->m_lValid, 1);
|
__sync_add_and_fetch(&pv->m_lValid, 1);
|
||||||
|
pstSavm->m_lEffect = 1;
|
||||||
|
|
||||||
Futex(&pv->m_lValid, FUTEX_WAKE, pv->m_lGroup, NULL);
|
Futex(&pv->m_lValid, FUTEX_WAKE, pv->m_lGroup, NULL);
|
||||||
return RC_SUCC;
|
return RC_SUCC;
|
||||||
|
@ -94,7 +99,7 @@ long _lPopup(SATvm *pstSavm, void *pvAddr, void *pvOut)
|
||||||
if(0 == pv->m_lValid)
|
if(0 == pv->m_lValid)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if(0 > (llong)__sync_sub_and_fetch(&pv->m_lValid, 1))
|
if(0 > (int)__sync_sub_and_fetch(&pv->m_lValid, 1))
|
||||||
{
|
{
|
||||||
__sync_fetch_and_add(&pv->m_lValid, 1);
|
__sync_fetch_and_add(&pv->m_lValid, 1);
|
||||||
continue;
|
continue;
|
||||||
|
@ -118,6 +123,7 @@ long _lPopup(SATvm *pstSavm, void *pvAddr, void *pvOut)
|
||||||
|
|
||||||
memcpy(pvOut, ps->m_pvData, pv->m_lReSize);
|
memcpy(pvOut, ps->m_pvData, pv->m_lReSize);
|
||||||
SET_DATA_TRUCK(ps, DATA_TRUCK_NULL);
|
SET_DATA_TRUCK(ps, DATA_TRUCK_NULL);
|
||||||
|
pstSavm->m_lEffect = 1;
|
||||||
|
|
||||||
return RC_SUCC;
|
return RC_SUCC;
|
||||||
}
|
}
|
||||||
|
@ -135,8 +141,8 @@ long _lPopup(SATvm *pstSavm, void *pvAddr, void *pvOut)
|
||||||
RC_SUCC --success
|
RC_SUCC --success
|
||||||
RC_FAIL --failure
|
RC_FAIL --failure
|
||||||
*************************************************************************************************/
|
*************************************************************************************************/
|
||||||
long _lPops(SATvm *pstSavm, void *pvAddr, size_t lExpect, struct timespec *tm,
|
long _lPops(SATvm *pstSavm, void *pvAddr, size_t lExpect, Timesp *tm, size_t *plOut,
|
||||||
size_t *plOut, void **ppsvOut)
|
void **ppsvOut)
|
||||||
{
|
{
|
||||||
int nPos;
|
int nPos;
|
||||||
SHTruck *ps = NULL;
|
SHTruck *ps = NULL;
|
||||||
|
@ -171,7 +177,7 @@ long _lPops(SATvm *pstSavm, void *pvAddr, size_t lExpect, struct timespec *tm
|
||||||
if(0 == pv->m_lValid)
|
if(0 == pv->m_lValid)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if(0 > (llong)__sync_sub_and_fetch(&pv->m_lValid, 1))
|
if(0 > (int)__sync_sub_and_fetch(&pv->m_lValid, 1))
|
||||||
{
|
{
|
||||||
__sync_fetch_and_add(&pv->m_lValid, 1);
|
__sync_fetch_and_add(&pv->m_lValid, 1);
|
||||||
continue;
|
continue;
|
||||||
|
@ -197,6 +203,8 @@ long _lPops(SATvm *pstSavm, void *pvAddr, size_t lExpect, struct timespec *tm
|
||||||
++ (*plOut);
|
++ (*plOut);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pstSavm->m_lEffect = *plOut;
|
||||||
|
|
||||||
return RC_SUCC;
|
return RC_SUCC;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,8 +241,7 @@ long lPop(SATvm *pstSavm, void *pvOut)
|
||||||
if(RES_REMOT_SID == pstRun->m_lLocal)
|
if(RES_REMOT_SID == pstRun->m_lLocal)
|
||||||
{
|
{
|
||||||
Tremohold(pstSavm, pstRun);
|
Tremohold(pstSavm, pstRun);
|
||||||
return RC_FAIL;
|
return _lPopByRt(pstSavm, pvOut);
|
||||||
// return _lPopupByRt(pstSavm, psvOut);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
lRet = _lPopup(pstSavm, pstRun->m_pvAddr, pvOut);
|
lRet = _lPopup(pstSavm, pstRun->m_pvAddr, pvOut);
|
||||||
|
@ -254,9 +261,10 @@ long lPop(SATvm *pstSavm, void *pvOut)
|
||||||
RC_SUCC --success
|
RC_SUCC --success
|
||||||
RC_FAIL --failure
|
RC_FAIL --failure
|
||||||
*************************************************************************************************/
|
*************************************************************************************************/
|
||||||
long lPops(SATvm *pstSavm, size_t lExpect, Timesp *tm, size_t *plOut, void **ppsvOut)
|
long lPopup(SATvm *pstSavm, size_t lExpect, time_t lTime, size_t *plOut, void **ppsvOut)
|
||||||
{
|
{
|
||||||
long lRet;
|
long lRet;
|
||||||
|
Timesp tm = {0};
|
||||||
RunTime *pstRun = NULL;
|
RunTime *pstRun = NULL;
|
||||||
|
|
||||||
if(!pstSavm)
|
if(!pstSavm)
|
||||||
|
@ -278,11 +286,11 @@ long lPops(SATvm *pstSavm, size_t lExpect, Timesp *tm, size_t *plOut, void **
|
||||||
if(RES_REMOT_SID == pstRun->m_lLocal)
|
if(RES_REMOT_SID == pstRun->m_lLocal)
|
||||||
{
|
{
|
||||||
Tremohold(pstSavm, pstRun);
|
Tremohold(pstSavm, pstRun);
|
||||||
return RC_FAIL;
|
return _lPopupByRt(pstSavm, lExpect, lTime, plOut, ppsvOut);
|
||||||
// return _lPopupByRt(pstSavm, psvOut);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
lRet = _lPops(pstSavm, pstRun->m_pvAddr, lExpect, tm, plOut, ppsvOut);
|
tm.tv_sec = lTime;
|
||||||
|
lRet = _lPops(pstSavm, pstRun->m_pvAddr, lExpect, &tm, plOut, ppsvOut);
|
||||||
vTblDisconnect(pstSavm, pstSavm->tblName);
|
vTblDisconnect(pstSavm, pstSavm->tblName);
|
||||||
return lRet;
|
return lRet;
|
||||||
}
|
}
|
||||||
|
@ -323,8 +331,7 @@ long lPushs(SATvm *pstSavm, size_t *plOut, void **ppsvOut)
|
||||||
if(RES_REMOT_SID == pstRun->m_lLocal)
|
if(RES_REMOT_SID == pstRun->m_lLocal)
|
||||||
{
|
{
|
||||||
Tremohold(pstSavm, pstRun);
|
Tremohold(pstSavm, pstRun);
|
||||||
return RC_FAIL;
|
return _lPushByRt(pstSavm);
|
||||||
// return _lInsertByRt(pstSavm, );
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if(RC_SUCC != _lPush(pstSavm, pstRun->m_pvAddr))
|
if(RC_SUCC != _lPush(pstSavm, pstRun->m_pvAddr))
|
||||||
|
@ -367,7 +374,7 @@ long lPush(SATvm *pstSavm)
|
||||||
if(RES_REMOT_SID == pstRun->m_lLocal)
|
if(RES_REMOT_SID == pstRun->m_lLocal)
|
||||||
{
|
{
|
||||||
Tremohold(pstSavm, pstRun);
|
Tremohold(pstSavm, pstRun);
|
||||||
return _lInsertByRt(pstSavm);
|
return _lPushByRt(pstSavm);
|
||||||
}
|
}
|
||||||
|
|
||||||
lRet = _lPush(pstSavm, pstRun->m_pvAddr);
|
lRet = _lPush(pstSavm, pstRun->m_pvAddr);
|
||||||
|
|
472
src/tcp.c
472
src/tcp.c
|
@ -1426,6 +1426,42 @@ long lEventOperate(SATvm *pstSavm, SKCon *pstCon, TFace *pstFace, char *pvDat
|
||||||
if(RC_SUCC == lInsert(pstSavm))
|
if(RC_SUCC == lInsert(pstSavm))
|
||||||
pstCon->m_pstWork = pstSavm->m_pstWork;
|
pstCon->m_pstWork = pstSavm->m_pstWork;
|
||||||
return RC_SUCC;
|
return RC_SUCC;
|
||||||
|
case OPERATE_QUEPOPS:
|
||||||
|
if(RC_SUCC == lPopup(pstSavm, pstFace->m_lFind, pstFace->m_lErrno,
|
||||||
|
(size_t *)&pstFace->m_lRows, (void *)&pvOut))
|
||||||
|
lData = pstFace->m_lDLen * pstFace->m_lRows;
|
||||||
|
|
||||||
|
pstFace->m_lErrno = pstSavm->m_lErrno;
|
||||||
|
if(sizeof(TFace) != lSendBuffer(pstCon->m_skSock, (void *)pstFace, sizeof(TFace)))
|
||||||
|
{
|
||||||
|
TFree(pvOut);
|
||||||
|
return RC_SUCC;
|
||||||
|
}
|
||||||
|
|
||||||
|
lSendBuffer(pstCon->m_skSock, pvOut, lData);
|
||||||
|
TFree(pvOut);
|
||||||
|
return RC_SUCC;
|
||||||
|
case OPERATE_QUEPOP:
|
||||||
|
if(RC_SUCC != lPop(pstSavm, (void *)pvData))
|
||||||
|
{
|
||||||
|
pstFace->m_lErrno = pstSavm->m_lErrno;
|
||||||
|
lData = sizeof(TFace);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
lData = pstFace->m_lDLen + sizeof(TFace);
|
||||||
|
pstFace->m_lRows = pstSavm->m_lEffect;
|
||||||
|
}
|
||||||
|
|
||||||
|
lSendBuffer(pstCon->m_skSock, (void *)pstFace, lData);
|
||||||
|
return RC_SUCC;
|
||||||
|
case OPERATE_QUEPUSH:
|
||||||
|
if(RC_SUCC != lPush(pstSavm))
|
||||||
|
pstFace->m_lErrno = pstSavm->m_lErrno;
|
||||||
|
else
|
||||||
|
pstFace->m_lRows = pstSavm->m_lEffect;
|
||||||
|
lSendBuffer(pstCon->m_skSock, (void *)pstFace, sizeof(TFace));
|
||||||
|
return RC_SUCC;
|
||||||
case OPERATE_INSERT:
|
case OPERATE_INSERT:
|
||||||
pstSavm->m_bWork = pstCon->m_bWork;
|
pstSavm->m_bWork = pstCon->m_bWork;
|
||||||
pstSavm->m_pstWork = pstCon->m_pstWork;
|
pstSavm->m_pstWork = pstCon->m_pstWork;
|
||||||
|
@ -3059,6 +3095,250 @@ long _lTruncateByRt(SATvm *pstSavm, TABLE t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*************************************************************************************************
|
||||||
|
description:remote - pop
|
||||||
|
parameters:
|
||||||
|
pstSavm --stvm handle
|
||||||
|
return:
|
||||||
|
RC_SUCC --success
|
||||||
|
RC_FAIL --failure
|
||||||
|
*************************************************************************************************/
|
||||||
|
long _lPopupByRt(SATvm *pstSavm, size_t lExpect, time_t lTime, size_t *plOut, void **ppsvOut)
|
||||||
|
{
|
||||||
|
long lRet;
|
||||||
|
TDomain *pvm, *pnoe;
|
||||||
|
Rowgrp *list = NULL, *node = NULL;
|
||||||
|
|
||||||
|
if(NULL == (node = pGetTblNode(pstSavm->tblName)))
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = DOM_NOT_INITL;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pstSavm->m_lErrno = RESOU_DISABLE;
|
||||||
|
switch(lGetBootType())
|
||||||
|
{
|
||||||
|
case TVM_BOOT_CLUSTER:
|
||||||
|
for(list = node->pstSSet; list; list = list->pstNext)
|
||||||
|
{
|
||||||
|
if(!list->pstFset)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if(NULL == (pvm = (TDomain *)(list->pstFset->psvData)))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
pnoe = (TDomain *)list->psvData;
|
||||||
|
if(RESOURCE_ABLE != pvm->m_lStatus || pnoe->m_lRelia < 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
pstSavm->m_skSock = pvm->m_skSock;
|
||||||
|
pstSavm->tblName = pnoe->m_mtable;
|
||||||
|
pthread_mutex_lock(&list->pstFset->lock);
|
||||||
|
lRet = lTvmPopup(pstSavm, lExpect, lTime, plOut, ppsvOut);
|
||||||
|
if(RC_SUCC == lRet || SOCK_COM_EXCP != pstSavm->m_lErrno)
|
||||||
|
{
|
||||||
|
pvm->m_lTryTimes = 0;
|
||||||
|
pvm->m_lLastTime = time(NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&list->pstFset->lock);
|
||||||
|
return lRet;
|
||||||
|
}
|
||||||
|
return RC_FAIL;
|
||||||
|
default:
|
||||||
|
for(list = node->pstSSet; list; list = list->pstNext)
|
||||||
|
{
|
||||||
|
if(NULL == (pvm = (TDomain *)(list->psvData)))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if(0 == (OPERATE_SELECT & pvm->m_lPers))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if(RC_SUCC != lTvmConnect(pstSavm, pvm->m_szIp, pvm->m_lPort, pvm->m_lTimeOut))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
pstSavm->tblName = pvm->m_mtable;
|
||||||
|
lRet = lTvmPopup(pstSavm, lExpect, lTime, plOut, ppsvOut);
|
||||||
|
if(RC_SUCC == lRet || SOCK_COM_EXCP != pstSavm->m_lErrno)
|
||||||
|
{
|
||||||
|
close(pstSavm->m_skSock);
|
||||||
|
((RunTime *)pGetRunTime(pstSavm, 0))->m_lRowSize = 0;
|
||||||
|
TFree(((RunTime *)pGetRunTime(pstSavm, 0))->pstVoid);
|
||||||
|
return lRet;
|
||||||
|
}
|
||||||
|
|
||||||
|
close(pstSavm->m_skSock);
|
||||||
|
}
|
||||||
|
|
||||||
|
((RunTime *)pGetRunTime(pstSavm, 0))->m_lRowSize = 0;
|
||||||
|
TFree(((RunTime *)pGetRunTime(pstSavm, 0))->pstVoid);
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*************************************************************************************************
|
||||||
|
description:remote - pop
|
||||||
|
parameters:
|
||||||
|
pstSavm --stvm handle
|
||||||
|
return:
|
||||||
|
RC_SUCC --success
|
||||||
|
RC_FAIL --failure
|
||||||
|
*************************************************************************************************/
|
||||||
|
long _lPopByRt(SATvm *pstSavm, void *psvOut)
|
||||||
|
{
|
||||||
|
long lRet;
|
||||||
|
TDomain *pvm, *pnoe;
|
||||||
|
Rowgrp *list = NULL, *node = NULL;
|
||||||
|
|
||||||
|
if(NULL == (node = pGetTblNode(pstSavm->tblName)))
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = DOM_NOT_INITL;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pstSavm->m_lErrno = RESOU_DISABLE;
|
||||||
|
switch(lGetBootType())
|
||||||
|
{
|
||||||
|
case TVM_BOOT_CLUSTER:
|
||||||
|
for(list = node->pstSSet; list; list = list->pstNext)
|
||||||
|
{
|
||||||
|
if(!list->pstFset)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if(NULL == (pvm = (TDomain *)(list->pstFset->psvData)))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
pnoe = (TDomain *)list->psvData;
|
||||||
|
if(RESOURCE_ABLE != pvm->m_lStatus || pnoe->m_lRelia < 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
pstSavm->m_skSock = pvm->m_skSock;
|
||||||
|
pstSavm->tblName = pnoe->m_mtable;
|
||||||
|
pthread_mutex_lock(&list->pstFset->lock);
|
||||||
|
lRet = lTvmPop(pstSavm, psvOut);
|
||||||
|
if(RC_SUCC == lRet || SOCK_COM_EXCP != pstSavm->m_lErrno)
|
||||||
|
{
|
||||||
|
pvm->m_lTryTimes = 0;
|
||||||
|
pvm->m_lLastTime = time(NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&list->pstFset->lock);
|
||||||
|
return lRet;
|
||||||
|
}
|
||||||
|
return RC_FAIL;
|
||||||
|
default:
|
||||||
|
for(list = node->pstSSet; list; list = list->pstNext)
|
||||||
|
{
|
||||||
|
if(NULL == (pvm = (TDomain *)(list->psvData)))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if(0 == (OPERATE_SELECT & pvm->m_lPers))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if(RC_SUCC != lTvmConnect(pstSavm, pvm->m_szIp, pvm->m_lPort, pvm->m_lTimeOut))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
pstSavm->tblName = pvm->m_mtable;
|
||||||
|
lRet = lTvmPop(pstSavm, psvOut);
|
||||||
|
if(RC_SUCC == lRet || SOCK_COM_EXCP != pstSavm->m_lErrno)
|
||||||
|
{
|
||||||
|
close(pstSavm->m_skSock);
|
||||||
|
((RunTime *)pGetRunTime(pstSavm, 0))->m_lRowSize = 0;
|
||||||
|
TFree(((RunTime *)pGetRunTime(pstSavm, 0))->pstVoid);
|
||||||
|
return lRet;
|
||||||
|
}
|
||||||
|
|
||||||
|
close(pstSavm->m_skSock);
|
||||||
|
}
|
||||||
|
|
||||||
|
((RunTime *)pGetRunTime(pstSavm, 0))->m_lRowSize = 0;
|
||||||
|
TFree(((RunTime *)pGetRunTime(pstSavm, 0))->pstVoid);
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*************************************************************************************************
|
||||||
|
description:remote - push
|
||||||
|
parameters:
|
||||||
|
pstSavm --stvm handle
|
||||||
|
return:
|
||||||
|
RC_SUCC --success
|
||||||
|
RC_FAIL --failure
|
||||||
|
*************************************************************************************************/
|
||||||
|
long _lPushByRt(SATvm *pstSavm)
|
||||||
|
{
|
||||||
|
TDomain *pvm, *pnoe;
|
||||||
|
long lRet = RC_FAIL;
|
||||||
|
Rowgrp *list = NULL, *node = NULL;
|
||||||
|
|
||||||
|
if(NULL == (node = pGetTblNode(pstSavm->tblName)))
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = DOM_NOT_INITL;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pstSavm->m_lErrno = RESOU_DISABLE;
|
||||||
|
switch(lGetBootType())
|
||||||
|
{
|
||||||
|
case TVM_BOOT_CLUSTER:
|
||||||
|
for(list = node->pstSSet; list; list = list->pstNext)
|
||||||
|
{
|
||||||
|
if(!list->pstFset)
|
||||||
|
continue;
|
||||||
|
if(NULL == (pvm = (TDomain *)(list->pstFset->psvData)))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
pnoe = (TDomain *)list->psvData;
|
||||||
|
if(RESOURCE_ABLE != pvm->m_lStatus)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
pstSavm->m_skSock = pvm->m_skSock;
|
||||||
|
pstSavm->tblName = pnoe->m_mtable;
|
||||||
|
pthread_mutex_lock(&list->pstFset->lock);
|
||||||
|
if(RC_SUCC == lTvmPush(pstSavm))
|
||||||
|
{
|
||||||
|
lRet = RC_SUCC;
|
||||||
|
pvm->m_lTryTimes = 0;
|
||||||
|
pvm->m_lLastTime = time(NULL);
|
||||||
|
}
|
||||||
|
else if(SOCK_COM_EXCP == pstSavm->m_lErrno)
|
||||||
|
{
|
||||||
|
pnoe->m_lRelia --;
|
||||||
|
Tlog("Insert err: %s, T(%d), F(%s:%d), R(%d)", sGetTError(pstSavm->m_lErrno),
|
||||||
|
pstSavm->tblName, pvm->m_szIp, pvm->m_lPort, pnoe->m_lRelia);
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&list->pstFset->lock);
|
||||||
|
}
|
||||||
|
return lRet;
|
||||||
|
default:
|
||||||
|
for(list = node->pstSSet; list; list = list->pstNext)
|
||||||
|
{
|
||||||
|
if(NULL == (pvm = (TDomain *)(list->psvData)))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if(RC_SUCC != lTvmConnect(pstSavm, pvm->m_szIp, pvm->m_lPort, pvm->m_lTimeOut))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
pstSavm->tblName = pvm->m_mtable;
|
||||||
|
lRet = lTvmPush(pstSavm);
|
||||||
|
if(RC_SUCC == lRet || SOCK_COM_EXCP != pstSavm->m_lErrno)
|
||||||
|
{
|
||||||
|
close(pstSavm->m_skSock);
|
||||||
|
((RunTime *)pGetRunTime(pstSavm, 0))->m_lRowSize = 0;
|
||||||
|
TFree(((RunTime *)pGetRunTime(pstSavm, 0))->pstVoid);
|
||||||
|
return lRet;
|
||||||
|
}
|
||||||
|
|
||||||
|
close(pstSavm->m_skSock);
|
||||||
|
}
|
||||||
|
|
||||||
|
((RunTime *)pGetRunTime(pstSavm, 0))->m_lRowSize = 0;
|
||||||
|
TFree(((RunTime *)pGetRunTime(pstSavm, 0))->pstVoid);
|
||||||
|
return lRet;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*************************************************************************************************
|
/*************************************************************************************************
|
||||||
description:remote - Insert
|
description:remote - Insert
|
||||||
parameters:
|
parameters:
|
||||||
|
@ -3734,6 +4014,7 @@ void* pProtocaJava(SATvm *pstSavm, void *pstVoid, TFace *pstFace, void *pvBuf
|
||||||
|
|
||||||
switch(pstFace->m_enum)
|
switch(pstFace->m_enum)
|
||||||
{
|
{
|
||||||
|
case OPERATE_QUEPUSH:
|
||||||
case OPERATE_INSERT:
|
case OPERATE_INSERT:
|
||||||
return pvBuffer;
|
return pvBuffer;
|
||||||
case OPERATE_REPLACE:
|
case OPERATE_REPLACE:
|
||||||
|
@ -3868,6 +4149,7 @@ void* pParsePacket(SATvm *pstSavm, void *pstVoid, TFace *pstFace, void *pvBuf
|
||||||
{
|
{
|
||||||
case OPERAYS_INSERT:
|
case OPERAYS_INSERT:
|
||||||
case OPERATE_INSERT:
|
case OPERATE_INSERT:
|
||||||
|
case OPERATE_QUEPUSH:
|
||||||
return memcpy(pstVoid, pvData, pstFace->m_lDLen);
|
return memcpy(pstVoid, pvData, pstFace->m_lDLen);
|
||||||
case OPERAYS_UPDATE:
|
case OPERAYS_UPDATE:
|
||||||
case OPERATE_UPDATE:
|
case OPERATE_UPDATE:
|
||||||
|
@ -3924,6 +4206,8 @@ void* pParsePacket(SATvm *pstSavm, void *pstVoid, TFace *pstFace, void *pvBuf
|
||||||
memcpy(pstCond->stFdKey, pvData, sizeof(FdKey) * pstCond->uFldcmp);
|
memcpy(pstCond->stFdKey, pvData, sizeof(FdKey) * pstCond->uFldcmp);
|
||||||
return pvBuffer;
|
return pvBuffer;
|
||||||
/*
|
/*
|
||||||
|
case OPERATE_QUEPOP:
|
||||||
|
case OPERATE_QUEPOPS:
|
||||||
case OPERATE_TBDROP:
|
case OPERATE_TBDROP:
|
||||||
case OPERATE_RENAME:
|
case OPERATE_RENAME:
|
||||||
case OPERATE_SELSEQ:
|
case OPERATE_SELSEQ:
|
||||||
|
@ -3995,6 +4279,135 @@ void vAppendCond(void *pvData, FdCond *pstCond, uint *plRows)
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*************************************************************************************************
|
||||||
|
description:API - popup
|
||||||
|
parameters:
|
||||||
|
pstSavm --stvm handle
|
||||||
|
pvOut --out data
|
||||||
|
return:
|
||||||
|
RC_SUCC --success
|
||||||
|
RC_FAIL --failure
|
||||||
|
*************************************************************************************************/
|
||||||
|
long lTvmPopup(SATvm *pstSavm, size_t lExpect, time_t lTime, size_t *plOut, void **ppsvOut)
|
||||||
|
{
|
||||||
|
RunTime *pstRun;
|
||||||
|
TFace *pstFace;
|
||||||
|
|
||||||
|
if(!pstSavm)
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = CONDIT_IS_NIL;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pstRun = (RunTime *)pGetRunTime(pstSavm, 0);
|
||||||
|
if(!pstRun->pstVoid)
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = DOM_NOT_INITL;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pstFace = (TFace *)pstRun->pstVoid;
|
||||||
|
pstFace->m_lRows = 0;
|
||||||
|
pstFace->m_lErrno = lTime;
|
||||||
|
pstFace->m_lFind = lExpect;
|
||||||
|
pstFace->m_lDLen = pstSavm->lSize;
|
||||||
|
pstFace->m_enum = OPERATE_QUEPOPS;
|
||||||
|
pstFace->m_table = pstSavm->tblName;
|
||||||
|
|
||||||
|
if(sizeof(TFace) != lSendBuffer(pstSavm->m_skSock, (void *)pstRun->pstVoid, sizeof(TFace)))
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = SOCK_COM_EXCP;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(sizeof(TFace) != lRecvBuffer(pstSavm->m_skSock, (char *)pstRun->pstVoid, sizeof(TFace)))
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = SOCK_COM_EXCP;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pstSavm->m_lErrno = pstFace->m_lErrno;
|
||||||
|
if(0 != pstSavm->m_lErrno)
|
||||||
|
return RC_FAIL;
|
||||||
|
|
||||||
|
pstRun->m_lRowSize = pstSavm->lSize * pstFace->m_lRows;
|
||||||
|
if(NULL == (*ppsvOut = (void *)malloc(pstRun->m_lRowSize)))
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = MALLC_MEM_ERR;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(pstRun->m_lRowSize != lRecvBuffer(pstSavm->m_skSock, (char *)*ppsvOut, pstRun->m_lRowSize))
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = SOCK_COM_EXCP;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
*plOut = pstFace->m_lRows;
|
||||||
|
pstSavm->m_lEffect = pstFace->m_lRows;
|
||||||
|
return RC_SUCC;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*************************************************************************************************
|
||||||
|
description:API - pop
|
||||||
|
parameters:
|
||||||
|
pstSavm --stvm handle
|
||||||
|
pvOut --out data
|
||||||
|
return:
|
||||||
|
RC_SUCC --success
|
||||||
|
RC_FAIL --failure
|
||||||
|
*************************************************************************************************/
|
||||||
|
long lTvmPop(SATvm *pstSavm, void *pvOut)
|
||||||
|
{
|
||||||
|
RunTime *pstRun;
|
||||||
|
TFace *pstFace;
|
||||||
|
|
||||||
|
if(!pstSavm)
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = CONDIT_IS_NIL;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pstRun = (RunTime *)pGetRunTime(pstSavm, 0);
|
||||||
|
if(!pstRun->pstVoid)
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = DOM_NOT_INITL;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pstFace = (TFace *)pstRun->pstVoid;
|
||||||
|
pstFace->m_lRows = 0;
|
||||||
|
pstFace->m_lDLen = pstSavm->lSize;
|
||||||
|
pstFace->m_lErrno = TVM_DONE_SUCC;
|
||||||
|
pstFace->m_enum = OPERATE_QUEPOP;
|
||||||
|
pstFace->m_table = pstSavm->tblName;
|
||||||
|
|
||||||
|
if(sizeof(TFace) != lSendBuffer(pstSavm->m_skSock, (void *)pstRun->pstVoid, sizeof(TFace)))
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = SOCK_COM_EXCP;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(sizeof(TFace) != lRecvBuffer(pstSavm->m_skSock, (char *)pstRun->pstVoid, sizeof(TFace)))
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = SOCK_COM_EXCP;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pstSavm->m_lErrno = pstFace->m_lErrno;
|
||||||
|
if(0 != pstSavm->m_lErrno)
|
||||||
|
return RC_FAIL;
|
||||||
|
|
||||||
|
pstSavm->m_lEffect = pstFace->m_lRows;
|
||||||
|
if(pstSavm->lSize != lRecvBuffer(pstSavm->m_skSock, (char *)pvOut, pstSavm->lSize))
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = SOCK_COM_EXCP;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return RC_SUCC;
|
||||||
|
}
|
||||||
|
|
||||||
/*************************************************************************************************
|
/*************************************************************************************************
|
||||||
description:API - select
|
description:API - select
|
||||||
parameters:
|
parameters:
|
||||||
|
@ -4348,6 +4761,65 @@ long lAsyInsert(SATvm *pstSavm)
|
||||||
return RC_SUCC;
|
return RC_SUCC;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*************************************************************************************************
|
||||||
|
description:API - push
|
||||||
|
parameters:
|
||||||
|
pstSavm --stvm handle
|
||||||
|
return:
|
||||||
|
RC_SUCC --success
|
||||||
|
RC_FAIL --failure
|
||||||
|
*************************************************************************************************/
|
||||||
|
long lTvmPush(SATvm *pstSavm)
|
||||||
|
{
|
||||||
|
RunTime *pstRun;
|
||||||
|
TFace *pstFace;
|
||||||
|
uint lWrite = sizeof(TFace);
|
||||||
|
|
||||||
|
if(!pstSavm || !pstSavm->pstVoid)
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = CONDIT_IS_NIL;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pstRun = (RunTime *)pGetRunTime(pstSavm, 0);
|
||||||
|
if(!pstRun->pstVoid)
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = DOM_NOT_INITL;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pstFace = (TFace *)pstRun->pstVoid;
|
||||||
|
pstFace->m_lFind = pstSavm->lFind;
|
||||||
|
pstFace->m_lDLen = pstSavm->lSize;
|
||||||
|
pstFace->m_lErrno = TVM_DONE_SUCC;
|
||||||
|
pstFace->m_enum = OPERATE_QUEPUSH;
|
||||||
|
pstFace->m_table = pstSavm->tblName;
|
||||||
|
|
||||||
|
pstFace->m_lRows = pstSavm->lSize;
|
||||||
|
lWrite = pstFace->m_lRows + sizeof(TFace);
|
||||||
|
|
||||||
|
checkbuffer(pstSavm, pstRun, 1);
|
||||||
|
memcpy(pstRun->pstVoid + sizeof(TFace), pstSavm->pstVoid, pstSavm->lSize);
|
||||||
|
if(lWrite != lSendBuffer(pstSavm->m_skSock, (void *)pstRun->pstVoid, lWrite))
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = SOCK_COM_EXCP;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(sizeof(TFace) != lRecvBuffer(pstSavm->m_skSock, (char *)pstRun->pstVoid, sizeof(TFace)))
|
||||||
|
{
|
||||||
|
pstSavm->m_lErrno = SOCK_COM_EXCP;
|
||||||
|
return RC_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pstSavm->m_lErrno = pstFace->m_lErrno;
|
||||||
|
if(0 != pstSavm->m_lErrno)
|
||||||
|
return RC_FAIL;
|
||||||
|
|
||||||
|
pstSavm->m_lEffect = pstFace->m_lRows;
|
||||||
|
return RC_SUCC;
|
||||||
|
}
|
||||||
|
|
||||||
/*************************************************************************************************
|
/*************************************************************************************************
|
||||||
description:API - insert
|
description:API - insert
|
||||||
parameters:
|
parameters:
|
||||||
|
|
Loading…
Reference in New Issue