add open api

master
winc-link 2023-08-31 20:28:49 +08:00
parent d36f97a6a6
commit 45cff7df24
14 changed files with 511 additions and 226 deletions

5
.gitignore vendored
View File

@ -2,3 +2,8 @@
vendor
logs
go.sum
manifest
cmd/.DS_Store
cmd/hummingbird-ui/.DS_Store
hummingbird
mqtt-broker

BIN
cmd/.DS_Store vendored

Binary file not shown.

View File

@ -322,3 +322,18 @@ type InvokeDeviceServiceReq struct {
Code string `json:"code"`
Items map[string]interface{} `json:"inputParams"`
}
type DeviceEffectivePropertyDataReq struct {
DeviceId string `json:"deviceId"`
Codes []string `json:"codes"`
}
type DeviceEffectivePropertyDataResponse struct {
Data []EffectivePropertyData `json:"propertyInfo"`
}
type EffectivePropertyData struct {
Code string `json:"code"`
Value interface{} `json:"value"`
Time int64 `json:"time"`
}

View File

@ -67,8 +67,20 @@ func (m *ThingModelMessage) IsPersistent() bool {
return isPersistent
}
func (m *ThingModelMessage) TransformMessageDataByProperty() (EdgeXDevicePropertyReport, error) {
var dataMsg EdgeXDevicePropertyReport
func (m *ThingModelMessage) TransformMessageDataBySetProperty() (DevicePropertySetResponse, error) {
var dataMsg DevicePropertySetResponse
err := json.Unmarshal([]byte(m.Data), &dataMsg)
return dataMsg, err
}
func (m *ThingModelMessage) TransformMessageDataByGetProperty() (DeviceGetPropertyResponse, error) {
var dataMsg DeviceGetPropertyResponse
err := json.Unmarshal([]byte(m.Data), &dataMsg)
return dataMsg, err
}
func (m *ThingModelMessage) TransformMessageDataByProperty() (DevicePropertyReport, error) {
var dataMsg DevicePropertyReport
err := json.Unmarshal([]byte(m.Data), &dataMsg)
return dataMsg, err
}
@ -106,7 +118,7 @@ func ThingModelMessageFromThingModelMsg(msg *thingmodel.ThingModelMsg) ThingMode
}
}
type EdgeXDevicePropertyReport struct {
type DevicePropertyReport struct {
MsgId string `json:"msgId"`
Version string `json:"version"`
//Time int64 `json:"time"`
@ -116,6 +128,22 @@ type EdgeXDevicePropertyReport struct {
Data map[string]ReportData `json:"data"`
}
type DeviceGetPropertyResponse struct {
MsgId string `json:"msgId"`
Data []EffectivePropertyData `json:"data"`
}
type DevicePropertySetResponse struct {
MsgId string `json:"msgId"`
Data DevicePropertySetData `json:"data"`
}
type DevicePropertySetData struct {
ErrorMessage string `json:"errorMessage"`
Code uint32 `json:"code"`
Success bool `json:"success"`
}
type DeviceBatchReport struct {
MsgId string `json:"msgId"`
Version string `json:"version"`
@ -191,6 +219,18 @@ func (r *InvokeDeviceService) ToString() string {
return string(s)
}
type DeviceGetPropertyData struct {
MsgId string `json:"msgId"`
Version string `json:"version"`
Time int64 `json:"time"`
Data []string `json:"data"`
}
func (r *DeviceGetPropertyData) ToString() string {
s, _ := json.Marshal(r)
return string(s)
}
type SaveServiceIssueData struct {
MsgId string `json:"msgId"`
Code string `json:"code"`

View File

@ -17,12 +17,14 @@ package deviceapp
import (
"context"
"encoding/json"
"fmt"
"github.com/docker/distribution/uuid"
"github.com/winc-link/edge-driver-proto/thingmodel"
"github.com/winc-link/hummingbird/internal/dtos"
"github.com/winc-link/hummingbird/internal/hummingbird/core/container"
"github.com/winc-link/hummingbird/internal/pkg/constants"
"github.com/winc-link/hummingbird/internal/pkg/di"
"github.com/winc-link/hummingbird/internal/pkg/errort"
"github.com/winc-link/hummingbird/internal/tools/rpcclient"
"time"
)
@ -36,7 +38,7 @@ const (
func (p *deviceApp) DeviceAction(jobAction dtos.JobAction) dtos.DeviceExecRes {
defer func() {
if err := recover(); err != nil {
p.lc.Error("CreateDeviceCallBack Panic:", err)
p.lc.Error("Panic:", err)
}
}()
@ -80,10 +82,31 @@ func (p *deviceApp) DeviceAction(jobAction dtos.JobAction) dtos.DeviceExecRes {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err = client.ThingModelDownServiceClient.ThingModelMsgIssue(ctx, &rpcRequest)
if err == nil {
return dtos.DeviceExecRes{
Result: true,
Message: ResultSuccess,
messageStore := container.MessageStoreItfFrom(p.dic.Get)
ch := messageStore.GenAckChan(data.MsgId)
select {
case <-time.After(10 * time.Second):
ch.TryCloseChan()
return dtos.DeviceExecRes{
Result: false,
Message: "wait response timeout",
}
case <-ctx.Done():
return dtos.DeviceExecRes{
Result: false,
Message: "wait response timeout",
}
case resp := <-ch.DataChan:
if v, ok := resp.(dtos.DevicePropertySetData); ok {
message, _ := json.Marshal(v)
return dtos.DeviceExecRes{
Result: true,
Message: string(message),
}
}
}
} else {
return dtos.DeviceExecRes{
@ -98,50 +121,20 @@ func (p *deviceApp) DeviceAction(jobAction dtos.JobAction) dtos.DeviceExecRes {
}
}
func (p *deviceApp) DeviceInvokeThingService(invokeDeviceServiceReq dtos.InvokeDeviceServiceReq) dtos.DeviceExecRes {
func (p *deviceApp) SetDeviceProperty(req dtos.OpenApiSetDeviceThingModel) error {
defer func() {
if err := recover(); err != nil {
p.lc.Error("CreateDeviceCallBack Panic:", err)
p.lc.Error("Panic:", err)
}
}()
device, err := p.dbClient.DeviceById(invokeDeviceServiceReq.DeviceId)
device, err := p.dbClient.DeviceById(req.DeviceId)
if err != nil {
return dtos.DeviceExecRes{
Result: false,
Message: "device not found",
}
return nil
}
product, err := p.dbClient.ProductById(device.ProductId)
if err != nil {
return dtos.DeviceExecRes{
Result: false,
Message: "product not found",
}
}
var find bool
var callType constants.CallType
for _, action := range product.Actions {
if action.Code == invokeDeviceServiceReq.Code {
find = true
callType = action.CallType
}
}
if !find {
return dtos.DeviceExecRes{
Result: false,
Message: "code not found",
}
}
deviceService, err := p.dbClient.DeviceServiceById(device.DriveInstanceId)
if err != nil {
return dtos.DeviceExecRes{
Result: false,
Message: "driver not found",
}
return err
}
driverService := container.DriverServiceAppFrom(di.GContainer.Get)
@ -149,11 +142,151 @@ func (p *deviceApp) DeviceInvokeThingService(invokeDeviceServiceReq dtos.InvokeD
if status == constants.RunStatusStarted {
client, errX := rpcclient.NewDriverRpcClient(deviceService.BaseAddress, false, "", deviceService.Id, p.lc)
if errX != nil {
return dtos.DeviceExecRes{
Result: false,
Message: errX.Error(),
return err
}
defer client.Close()
var rpcRequest thingmodel.ThingModelIssueMsg
rpcRequest.DeviceId = req.DeviceId
rpcRequest.OperationType = thingmodel.OperationType_PROPERTY_SET
var data dtos.PropertySet
data.Version = "v1.0"
data.MsgId = uuid.Generate().String()
data.Time = time.Now().UnixMilli()
data.Params = req.Item
rpcRequest.Data = data.ToString()
messageStore := container.MessageStoreItfFrom(p.dic.Get)
ch := messageStore.GenAckChan(data.MsgId)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
_, err = client.ThingModelDownServiceClient.ThingModelMsgIssue(ctx, &rpcRequest)
if err != nil {
ch.TryCloseChan()
return errort.NewCommonErr(errort.DefaultSystemError, fmt.Errorf(err.Error()))
}
select {
case <-time.After(10 * time.Second):
ch.TryCloseChan()
return errort.NewCommonErr(errort.DeviceLibraryResponseTimeOut, fmt.Errorf("driver id(%s) time out", deviceService.Id))
case <-ctx.Done():
return errort.NewCommonErr(errort.DeviceLibraryResponseTimeOut, fmt.Errorf("driver id(%s) time out", deviceService.Id))
case resp := <-ch.DataChan:
if v, ok := resp.(dtos.DevicePropertySetData); ok {
if v.Success {
return nil
} else {
return errort.NewCommonErr(v.Code, fmt.Errorf(v.ErrorMessage))
}
}
}
}
return errort.NewCommonErr(errort.DeviceServiceNotStarted, fmt.Errorf("driver id(%s) not start", deviceService.Id))
}
func (p *deviceApp) DeviceEffectivePropertyData(deviceEffectivePropertyDataReq dtos.DeviceEffectivePropertyDataReq) (dtos.DeviceEffectivePropertyDataResponse, error) {
defer func() {
if err := recover(); err != nil {
p.lc.Error("Panic:", err)
}
}()
device, err := p.dbClient.DeviceById(deviceEffectivePropertyDataReq.DeviceId)
if err != nil {
return dtos.DeviceEffectivePropertyDataResponse{}, err
}
_, err = p.dbClient.ProductById(device.ProductId)
if err != nil {
return dtos.DeviceEffectivePropertyDataResponse{}, err
}
deviceService, err := p.dbClient.DeviceServiceById(device.DriveInstanceId)
if err != nil {
return dtos.DeviceEffectivePropertyDataResponse{}, err
}
driverService := container.DriverServiceAppFrom(di.GContainer.Get)
status := driverService.GetState(deviceService.Id)
if status == constants.RunStatusStarted {
client, errX := rpcclient.NewDriverRpcClient(deviceService.BaseAddress, false, "", deviceService.Id, p.lc)
if errX != nil {
return dtos.DeviceEffectivePropertyDataResponse{}, err
}
defer client.Close()
var rpcRequest thingmodel.ThingModelIssueMsg
rpcRequest.DeviceId = deviceEffectivePropertyDataReq.DeviceId
rpcRequest.OperationType = thingmodel.OperationType_PROPERTY_GET
var data dtos.DeviceGetPropertyData
data.Version = "v1.0"
data.MsgId = uuid.Generate().String()
data.Time = time.Now().UnixMilli()
data.Data = deviceEffectivePropertyDataReq.Codes
rpcRequest.Data = data.ToString()
messageStore := container.MessageStoreItfFrom(p.dic.Get)
ch := messageStore.GenAckChan(data.MsgId)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
_, err = client.ThingModelDownServiceClient.ThingModelMsgIssue(ctx, &rpcRequest)
if err != nil {
ch.TryCloseChan()
return dtos.DeviceEffectivePropertyDataResponse{}, errort.NewCommonErr(errort.DefaultSystemError, fmt.Errorf("system error"))
}
select {
case <-time.After(10 * time.Second):
ch.TryCloseChan()
return dtos.DeviceEffectivePropertyDataResponse{}, errort.NewCommonErr(errort.DeviceLibraryResponseTimeOut, fmt.Errorf("driver id(%s) time out", deviceService.Id))
case <-ctx.Done():
return dtos.DeviceEffectivePropertyDataResponse{}, errort.NewCommonErr(errort.DeviceLibraryResponseTimeOut, fmt.Errorf("driver id(%s) time out", deviceService.Id))
case resp := <-ch.DataChan:
if v, ok := resp.([]dtos.EffectivePropertyData); ok {
return dtos.DeviceEffectivePropertyDataResponse{
Data: v,
}, nil
}
}
}
return dtos.DeviceEffectivePropertyDataResponse{}, errort.NewCommonErr(errort.DeviceServiceNotStarted, fmt.Errorf("driver id(%s) not start", deviceService.Id))
}
func (p *deviceApp) DeviceInvokeThingService(invokeDeviceServiceReq dtos.InvokeDeviceServiceReq) (map[string]interface{}, error) {
defer func() {
if err := recover(); err != nil {
p.lc.Error("Panic:", err)
}
}()
device, err := p.dbClient.DeviceById(invokeDeviceServiceReq.DeviceId)
if err != nil {
return nil, err
}
_, err = p.dbClient.ProductById(device.ProductId)
if err != nil {
return nil, err
}
deviceService, err := p.dbClient.DeviceServiceById(device.DriveInstanceId)
if err != nil {
return nil, err
}
driverService := container.DriverServiceAppFrom(di.GContainer.Get)
status := driverService.GetState(deviceService.Id)
if status == constants.RunStatusStarted {
client, errX := rpcclient.NewDriverRpcClient(deviceService.BaseAddress, false, "", deviceService.Id, p.lc)
if errX != nil {
return nil, err
}
defer client.Close()
var rpcRequest thingmodel.ThingModelIssueMsg
rpcRequest.DeviceId = invokeDeviceServiceReq.DeviceId
@ -165,106 +298,59 @@ func (p *deviceApp) DeviceInvokeThingService(invokeDeviceServiceReq dtos.InvokeD
data.Data.Code = invokeDeviceServiceReq.Code
data.Data.InputParams = invokeDeviceServiceReq.Items
rpcRequest.Data = data.ToString()
messageStore := container.MessageStoreItfFrom(p.dic.Get)
ch := messageStore.GenAckChan(data.MsgId)
if callType == constants.CallTypeAsync {
//saveServiceInfo := genSaveServiceInfo(data.MsgId, data.Time, invokeDeviceServiceReq)
var saveServiceInfo dtos.ThingModelMessage
saveServiceInfo.OpType = int32(thingmodel.OperationType_SERVICE_EXECUTE)
saveServiceInfo.Cid = device.Id
var saveData dtos.SaveServiceIssueData
saveData.MsgId = data.MsgId
saveData.Code = invokeDeviceServiceReq.Code
saveData.Time = data.Time
saveData.InputParams = invokeDeviceServiceReq.Items
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
_, err = client.ThingModelDownServiceClient.ThingModelMsgIssue(ctx, &rpcRequest)
if err != nil {
ch.TryCloseChan()
return nil, errort.NewCommonErr(errort.DefaultSystemError, fmt.Errorf("system error"))
}
//******
persistItf := container.PersistItfFrom(p.dic.Get)
var saveServiceInfo dtos.ThingModelMessage
saveServiceInfo.OpType = int32(thingmodel.OperationType_SERVICE_EXECUTE)
saveServiceInfo.Cid = device.Id
var saveData dtos.SaveServiceIssueData
saveData.MsgId = data.MsgId
saveData.Code = invokeDeviceServiceReq.Code
saveData.Time = data.Time
saveData.InputParams = invokeDeviceServiceReq.Items
//******
select {
case <-time.After(10 * time.Second):
ch.TryCloseChan()
saveData.OutputParams = map[string]interface{}{
"result": true,
"message": "success",
"result": false,
"message": "wait response timeout",
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err = client.ThingModelDownServiceClient.ThingModelMsgIssue(ctx, &rpcRequest)
if err == nil {
persistItf := container.PersistItfFrom(p.dic.Get)
_ = persistItf.SaveDeviceThingModelData(saveServiceInfo)
return dtos.DeviceExecRes{
Result: true,
Message: ResultSuccess,
}
} else {
return dtos.DeviceExecRes{
Result: false,
Message: err.Error(),
}
s, _ := json.Marshal(saveData)
saveServiceInfo.Data = string(s)
_ = persistItf.SaveDeviceThingModelData(saveServiceInfo)
return nil, errort.NewCommonErr(errort.DeviceLibraryResponseTimeOut, fmt.Errorf("driver id(%s) time out", deviceService.Id))
case <-ctx.Done():
saveData.OutputParams = map[string]interface{}{
"result": false,
"message": "wait response timeout",
}
} else if callType == constants.CallTypeSync {
var saveServiceInfo dtos.ThingModelMessage
saveServiceInfo.OpType = int32(thingmodel.OperationType_SERVICE_EXECUTE)
saveServiceInfo.Cid = device.Id
var saveData dtos.SaveServiceIssueData
saveData.MsgId = data.MsgId
saveData.Code = invokeDeviceServiceReq.Code
saveData.Time = data.Time
saveData.InputParams = invokeDeviceServiceReq.Items
persistItf := container.PersistItfFrom(p.dic.Get)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err = client.ThingModelDownServiceClient.ThingModelMsgIssue(ctx, &rpcRequest)
if err != nil {
return dtos.DeviceExecRes{
Result: false,
Message: err.Error(),
}
}
messageStore := container.MessageStoreItfFrom(p.dic.Get)
ch := messageStore.GenAckChan(data.MsgId)
select {
case <-time.After(5 * time.Second):
ch.TryCloseChan()
saveData.OutputParams = map[string]interface{}{
"result": false,
"message": "wait response timeout",
}
s, _ := json.Marshal(saveData)
saveServiceInfo.Data = string(s)
_ = persistItf.SaveDeviceThingModelData(saveServiceInfo)
return nil, errort.NewCommonErr(errort.DeviceLibraryResponseTimeOut, fmt.Errorf("driver id(%s) time out", deviceService.Id))
case resp := <-ch.DataChan:
if v, ok := resp.(map[string]interface{}); ok {
saveData.OutputParams = v
s, _ := json.Marshal(saveData)
saveServiceInfo.Data = string(s)
_ = persistItf.SaveDeviceThingModelData(saveServiceInfo)
return dtos.DeviceExecRes{
Result: false,
Message: "wait response timeout",
}
case <-ctx.Done():
saveData.OutputParams = map[string]interface{}{
"result": false,
"message": "wait response timeout",
}
s, _ := json.Marshal(saveData)
saveServiceInfo.Data = string(s)
_ = persistItf.SaveDeviceThingModelData(saveServiceInfo)
return dtos.DeviceExecRes{
Result: false,
Message: "wait response timeout",
}
case resp := <-ch.DataChan:
if v, ok := resp.(map[string]interface{}); ok {
saveData.OutputParams = v
s, _ := json.Marshal(saveData)
saveServiceInfo.Data = string(s)
_ = persistItf.SaveDeviceThingModelData(saveServiceInfo)
message, _ := json.Marshal(v)
return dtos.DeviceExecRes{
Result: true,
Message: string(message),
}
}
}
return v, nil
}
}
}
return dtos.DeviceExecRes{
Result: false,
Message: "driver status stop",
}
return nil, errort.NewCommonErr(errort.DeviceServiceNotStarted, fmt.Errorf("driver id(%s) not start", deviceService.Id))
}

View File

@ -46,6 +46,12 @@ func NewMessageStore(dic *di.Container) *MessageStore {
}
}
func (wp *MessageStore) StoreRange() {
wp.ackMap.Range(func(key, value any) bool {
return true
})
}
func (wp *MessageStore) StoreMsgId(id string, ch string) {
wp.ackMap.Store(id, ch)
}

View File

@ -17,7 +17,6 @@ package persistence
import (
"context"
"encoding/json"
"errors"
"github.com/winc-link/edge-driver-proto/thingmodel"
"github.com/winc-link/hummingbird/internal/dtos"
"github.com/winc-link/hummingbird/internal/hummingbird/core/application/messagestore"
@ -112,6 +111,38 @@ func (pst *persistApp) saveDeviceThingModelToLevelDB(req dtos.ThingModelMessage)
if err != nil {
return err
}
case thingmodel.OperationType_PROPERTY_GET_RESPONSE:
msg, err := req.TransformMessageDataByGetProperty()
if err != nil {
return err
}
messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get)
ack, ok := messageStore.LoadMsgChan(msg.MsgId)
if !ok {
//超时了。
return nil
}
if v, ok := ack.(*messagestore.MsgAckChan); ok {
v.TrySendDataAndCloseChan(msg.Data)
messageStore.DeleteMsgId(msg.MsgId)
}
case thingmodel.OperationType_PROPERTY_SET_RESPONSE:
msg, err := req.TransformMessageDataBySetProperty()
if err != nil {
return err
}
messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get)
ack, ok := messageStore.LoadMsgChan(msg.MsgId)
if !ok {
//超时了。
return nil
}
if v, ok := ack.(*messagestore.MsgAckChan); ok {
v.TrySendDataAndCloseChan(msg.Data)
messageStore.DeleteMsgId(msg.MsgId)
}
case thingmodel.OperationType_SERVICE_EXECUTE_RESPONSE:
serviceMsg, err := req.TransformMessageDataByServiceExec()
if err != nil {
@ -123,51 +154,61 @@ func (pst *persistApp) saveDeviceThingModelToLevelDB(req dtos.ThingModelMessage)
return err
}
product, err := pst.dbClient.ProductById(device.ProductId)
_, err = pst.dbClient.ProductById(device.ProductId)
if err != nil {
return err
}
var find bool
var callType constants.CallType
for _, action := range product.Actions {
if action.Code == serviceMsg.Code {
find = true
callType = action.CallType
break
}
//var find bool
//var callType constants.CallType
//
//for _, action := range product.Actions {
// if action.Code == serviceMsg.Code {
// find = true
// callType = action.CallType
// break
// }
//}
//
//if !find {
// return errors.New("")
//}
messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get)
ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId)
if !ok {
//可能是超时了。
return nil
}
if !find {
return errors.New("")
}
if callType == constants.CallTypeSync {
messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get)
ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId)
if !ok {
//可能是超时了。
return nil
}
if v, ok := ack.(*messagestore.MsgAckChan); ok {
v.TrySendDataAndCloseChan(serviceMsg.OutputParams)
messageStore.DeleteMsgId(serviceMsg.MsgId)
}
} else if callType == constants.CallTypeAsync {
kvs := make(map[string]interface{})
var key string
key = generateActionLeveldbKey(req.Cid, serviceMsg.Code, serviceMsg.Time)
value, _ := serviceMsg.Marshal()
kvs[key] = value
err = pst.dataDbClient.Insert(context.Background(), "", kvs)
if err != nil {
return err
}
if v, ok := ack.(*messagestore.MsgAckChan); ok {
v.TrySendDataAndCloseChan(serviceMsg.OutputParams)
messageStore.DeleteMsgId(serviceMsg.MsgId)
}
//if callType == constants.CallTypeSync {
// messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get)
// ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId)
// if !ok {
// //可能是超时了。
// return nil
// }
//
// if v, ok := ack.(*messagestore.MsgAckChan); ok {
// v.TrySendDataAndCloseChan(serviceMsg.OutputParams)
// messageStore.DeleteMsgId(serviceMsg.MsgId)
// }
//
//} else if callType == constants.CallTypeAsync {
// kvs := make(map[string]interface{})
// var key string
// key = generateActionLeveldbKey(req.Cid, serviceMsg.Code, serviceMsg.Time)
// value, _ := serviceMsg.Marshal()
// kvs[key] = value
// err = pst.dataDbClient.Insert(context.Background(), "", kvs)
//
// if err != nil {
// return err
// }
//}
case thingmodel.OperationType_DATA_BATCH_REPORT:
msg, err := req.TransformMessageDataByBatchReport()
if err != nil {
@ -249,8 +290,63 @@ func (pst *persistApp) saveDeviceThingModelToTdengine(req dtos.ThingModelMessage
if err != nil {
return err
}
case thingmodel.OperationType_PROPERTY_GET_RESPONSE:
msg, err := req.TransformMessageDataByGetProperty()
if err != nil {
return err
}
messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get)
ack, ok := messageStore.LoadMsgChan(msg.MsgId)
if !ok {
//超时了。
return nil
}
if v, ok := ack.(*messagestore.MsgAckChan); ok {
v.TrySendDataAndCloseChan(msg.Data)
messageStore.DeleteMsgId(msg.MsgId)
}
case thingmodel.OperationType_PROPERTY_SET_RESPONSE:
msg, err := req.TransformMessageDataBySetProperty()
if err != nil {
return err
}
messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get)
ack, ok := messageStore.LoadMsgChan(msg.MsgId)
if !ok {
//超时了。
return nil
}
if v, ok := ack.(*messagestore.MsgAckChan); ok {
v.TrySendDataAndCloseChan(msg.Data)
messageStore.DeleteMsgId(msg.MsgId)
}
case thingmodel.OperationType_DATA_BATCH_REPORT:
msg, err := req.TransformMessageDataByBatchReport()
if err != nil {
return err
}
data := make(map[string]interface{})
for code, property := range msg.Data.Properties {
data[code] = property.Value
}
for code, event := range msg.Data.Events {
var eventData dtos.EventData
eventData.OutputParams = event.OutputParams
eventData.EventCode = code
eventData.EventTime = msg.Time
data[code] = eventData
}
//批量写。
err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data)
if err != nil {
return err
}
return nil
case thingmodel.OperationType_SERVICE_EXECUTE_RESPONSE:
serviceMsg, err := req.TransformMessageDataByServiceExec()
if err != nil {
@ -262,48 +358,60 @@ func (pst *persistApp) saveDeviceThingModelToTdengine(req dtos.ThingModelMessage
return err
}
product, err := pst.dbClient.ProductById(device.ProductId)
_, err = pst.dbClient.ProductById(device.ProductId)
if err != nil {
return err
}
var find bool
var callType constants.CallType
//var find bool
//var callType constants.CallType
//
//for _, action := range product.Actions {
// if action.Code == serviceMsg.Code {
// find = true
// callType = action.CallType
// break
// }
//}
//
//if !find {
// return errors.New("")
//}
for _, action := range product.Actions {
if action.Code == serviceMsg.Code {
find = true
callType = action.CallType
break
}
messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get)
ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId)
if !ok {
//可能是超时了。
return nil
}
if !find {
return errors.New("")
if v, ok := ack.(*messagestore.MsgAckChan); ok {
v.TrySendDataAndCloseChan(serviceMsg.OutputParams)
messageStore.DeleteMsgId(serviceMsg.MsgId)
}
if callType == constants.CallTypeSync {
messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get)
ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId)
if !ok {
//可能是超时了。
return nil
}
if v, ok := ack.(*messagestore.MsgAckChan); ok {
v.TrySendDataAndCloseChan(serviceMsg.OutputParams)
messageStore.DeleteMsgId(serviceMsg.MsgId)
}
} else if callType == constants.CallTypeAsync {
v, _ := serviceMsg.Marshal()
data := make(map[string]interface{})
data[serviceMsg.Code] = string(v)
err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data)
if err != nil {
return err
}
}
//if callType == constants.CallTypeSync {
// messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get)
// ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId)
// if !ok {
// //可能是超时了。
// return nil
// }
//
// if v, ok := ack.(*messagestore.MsgAckChan); ok {
// v.TrySendDataAndCloseChan(serviceMsg.OutputParams)
// messageStore.DeleteMsgId(serviceMsg.MsgId)
// }
//
//} else if callType == constants.CallTypeAsync {
// v, _ := serviceMsg.Marshal()
// data := make(map[string]interface{})
// data[serviceMsg.Code] = string(v)
// err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data)
// if err != nil {
// return err
// }
//}
}
return nil

View File

@ -63,6 +63,20 @@ func (ctl *controller) OpenApiDeleteThingModel(c *gin.Context) {
httphelper.ResultSuccess(nil, c.Writer, lc)
}
// OpenApiQueryDeviceEffectivePropertyData 查询设备实时属性
func (ctl *controller) OpenApiQueryDeviceEffectivePropertyData(c *gin.Context) {
lc := ctl.lc
var req dtos.DeviceEffectivePropertyDataReq
urlDecodeParam(&req, c.Request, lc)
data, err := ctl.getDeviceApp().DeviceEffectivePropertyData(req)
if err != nil {
httphelper.RenderFail(c, err, c.Writer, lc)
return
}
httphelper.ResultSuccess(data, c.Writer, lc)
}
// OpenApiSetDeviceProperty 设备设备属性
func (ctl *controller) OpenApiSetDeviceProperty(c *gin.Context) {
lc := ctl.lc
var req dtos.OpenApiSetDeviceThingModel
@ -70,19 +84,12 @@ func (ctl *controller) OpenApiSetDeviceProperty(c *gin.Context) {
httphelper.RenderFail(c, errort.NewCommonErr(errort.DefaultReqParamsError, err), c.Writer, lc)
return
}
var code string
var value interface{}
for s, i := range req.Item {
code = s
value = i
err := ctl.getDeviceApp().SetDeviceProperty(req)
if err != nil {
httphelper.RenderFail(c, err, c.Writer, lc)
return
}
execRes := ctl.getDeviceApp().DeviceAction(dtos.JobAction{
DeviceId: req.DeviceId,
Code: code,
Value: value,
})
httphelper.ResultSuccess(execRes, c.Writer, lc)
httphelper.ResultSuccess(nil, c.Writer, lc)
}
func (ctl *controller) OpenApiInvokeThingService(c *gin.Context) {
@ -92,9 +99,12 @@ func (ctl *controller) OpenApiInvokeThingService(c *gin.Context) {
httphelper.RenderFail(c, errort.NewCommonErr(errort.DefaultReqParamsError, err), c.Writer, lc)
return
}
execRes := ctl.getDeviceApp().DeviceInvokeThingService(req)
httphelper.ResultSuccess(execRes, c.Writer, lc)
data, err := ctl.getDeviceApp().DeviceInvokeThingService(req)
if err != nil {
httphelper.RenderFail(c, err, c.Writer, lc)
return
}
httphelper.ResultSuccess(data, c.Writer, lc)
}
func (ctl *controller) OpenApiQueryDevicePropertyData(c *gin.Context) {

View File

@ -58,7 +58,11 @@ type DeviceCtlItf interface {
DeviceAction(jobAction dtos.JobAction) dtos.DeviceExecRes
DeviceInvokeThingService(invokeDeviceServiceReq dtos.InvokeDeviceServiceReq) dtos.DeviceExecRes
DeviceInvokeThingService(invokeDeviceServiceReq dtos.InvokeDeviceServiceReq) (map[string]interface{}, error)
SetDeviceProperty(req dtos.OpenApiSetDeviceThingModel) error
DeviceEffectivePropertyData(deviceEffectivePropertyDataReq dtos.DeviceEffectivePropertyDataReq) (dtos.DeviceEffectivePropertyDataResponse, error)
}
type OpenApiDeviceItf interface {

View File

@ -17,6 +17,7 @@ import (
)
type MessageStores interface {
StoreRange()
StoreMsgId(id string, ch string)
LoadMsgChan(id string) (interface{}, bool)
DeleteMsgId(id string)

View File

@ -82,6 +82,8 @@ func RegisterOpenApi(engine *gin.Engine, dic *di.Container) {
}
//物模型使用的API
{
//查询设备实时属性数据。
v1.GET("/queryDeviceEffectivePropertyData", ctl.OpenApiQueryDeviceEffectivePropertyData)
//设置设备的属性。
v1.POST("/setDeviceProperty", ctl.OpenApiSetDeviceProperty)
//调用设备的服务。

View File

@ -27,11 +27,11 @@ const (
DeviceLibraryDockerAuthInvalid uint32 = 20205
DeviceLibraryDockerImagesNotFound uint32 = 20206
DeviceLibraryNotExist uint32 = 20211
DeviceLibraryAtopDefaultConfig uint32 = 20212
DeviceLibraryImageDownloadFail uint32 = 20213
DeviceLibraryImageNotFound uint32 = 20214
DeviceLibraryNotAllowDelete uint32 = 20215
DockerImageRepositoryNotFound uint32 = 20216
DeviceLibraryResponseTimeOut uint32 = 20217
DeviceServiceMustDeleteDevice uint32 = 20301
DeviceServiceMustStopService uint32 = 20302

View File

@ -124,6 +124,10 @@ func GetEnMessages() []*i18n.Message {
ID: "20216",
Other: `The docker image repository does not exist.`,
},
{
ID: "20217",
Other: `Time out.`,
},
// 驱动实例
{

View File

@ -124,6 +124,10 @@ func GetZhMessages() []*i18n.Message {
ID: "20216",
Other: `docker镜像仓库不存在`,
},
{
ID: "20217",
Other: `响应超时`,
},
// 驱动实例
{