diff --git a/.gitignore b/.gitignore index 4342047..e515e0a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,8 @@ vendor logs go.sum +manifest +cmd/.DS_Store +cmd/hummingbird-ui/.DS_Store +hummingbird +mqtt-broker \ No newline at end of file diff --git a/cmd/.DS_Store b/cmd/.DS_Store deleted file mode 100644 index 81794d4..0000000 Binary files a/cmd/.DS_Store and /dev/null differ diff --git a/internal/dtos/device.go b/internal/dtos/device.go index b0f1010..2b61a26 100644 --- a/internal/dtos/device.go +++ b/internal/dtos/device.go @@ -322,3 +322,18 @@ type InvokeDeviceServiceReq struct { Code string `json:"code"` Items map[string]interface{} `json:"inputParams"` } + +type DeviceEffectivePropertyDataReq struct { + DeviceId string `json:"deviceId"` + Codes []string `json:"codes"` +} + +type DeviceEffectivePropertyDataResponse struct { + Data []EffectivePropertyData `json:"propertyInfo"` +} + +type EffectivePropertyData struct { + Code string `json:"code"` + Value interface{} `json:"value"` + Time int64 `json:"time"` +} diff --git a/internal/dtos/thinkmodelmessage.go b/internal/dtos/thinkmodelmessage.go index a2a2c10..c7bc907 100644 --- a/internal/dtos/thinkmodelmessage.go +++ b/internal/dtos/thinkmodelmessage.go @@ -67,8 +67,20 @@ func (m *ThingModelMessage) IsPersistent() bool { return isPersistent } -func (m *ThingModelMessage) TransformMessageDataByProperty() (EdgeXDevicePropertyReport, error) { - var dataMsg EdgeXDevicePropertyReport +func (m *ThingModelMessage) TransformMessageDataBySetProperty() (DevicePropertySetResponse, error) { + var dataMsg DevicePropertySetResponse + err := json.Unmarshal([]byte(m.Data), &dataMsg) + return dataMsg, err +} + +func (m *ThingModelMessage) TransformMessageDataByGetProperty() (DeviceGetPropertyResponse, error) { + var dataMsg DeviceGetPropertyResponse + err := json.Unmarshal([]byte(m.Data), &dataMsg) + return dataMsg, err +} + +func (m *ThingModelMessage) TransformMessageDataByProperty() (DevicePropertyReport, error) { + var dataMsg DevicePropertyReport err := json.Unmarshal([]byte(m.Data), &dataMsg) return dataMsg, err } @@ -106,7 +118,7 @@ func ThingModelMessageFromThingModelMsg(msg *thingmodel.ThingModelMsg) ThingMode } } -type EdgeXDevicePropertyReport struct { +type DevicePropertyReport struct { MsgId string `json:"msgId"` Version string `json:"version"` //Time int64 `json:"time"` @@ -116,6 +128,22 @@ type EdgeXDevicePropertyReport struct { Data map[string]ReportData `json:"data"` } +type DeviceGetPropertyResponse struct { + MsgId string `json:"msgId"` + Data []EffectivePropertyData `json:"data"` +} + +type DevicePropertySetResponse struct { + MsgId string `json:"msgId"` + Data DevicePropertySetData `json:"data"` +} + +type DevicePropertySetData struct { + ErrorMessage string `json:"errorMessage"` + Code uint32 `json:"code"` + Success bool `json:"success"` +} + type DeviceBatchReport struct { MsgId string `json:"msgId"` Version string `json:"version"` @@ -191,6 +219,18 @@ func (r *InvokeDeviceService) ToString() string { return string(s) } +type DeviceGetPropertyData struct { + MsgId string `json:"msgId"` + Version string `json:"version"` + Time int64 `json:"time"` + Data []string `json:"data"` +} + +func (r *DeviceGetPropertyData) ToString() string { + s, _ := json.Marshal(r) + return string(s) +} + type SaveServiceIssueData struct { MsgId string `json:"msgId"` Code string `json:"code"` diff --git a/internal/hummingbird/core/application/deviceapp/deviceaction.go b/internal/hummingbird/core/application/deviceapp/deviceaction.go index 8a81a9d..7e27ae9 100644 --- a/internal/hummingbird/core/application/deviceapp/deviceaction.go +++ b/internal/hummingbird/core/application/deviceapp/deviceaction.go @@ -17,12 +17,14 @@ package deviceapp import ( "context" "encoding/json" + "fmt" "github.com/docker/distribution/uuid" "github.com/winc-link/edge-driver-proto/thingmodel" "github.com/winc-link/hummingbird/internal/dtos" "github.com/winc-link/hummingbird/internal/hummingbird/core/container" "github.com/winc-link/hummingbird/internal/pkg/constants" "github.com/winc-link/hummingbird/internal/pkg/di" + "github.com/winc-link/hummingbird/internal/pkg/errort" "github.com/winc-link/hummingbird/internal/tools/rpcclient" "time" ) @@ -36,7 +38,7 @@ const ( func (p *deviceApp) DeviceAction(jobAction dtos.JobAction) dtos.DeviceExecRes { defer func() { if err := recover(); err != nil { - p.lc.Error("CreateDeviceCallBack Panic:", err) + p.lc.Error("Panic:", err) } }() @@ -80,10 +82,31 @@ func (p *deviceApp) DeviceAction(jobAction dtos.JobAction) dtos.DeviceExecRes { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() _, err = client.ThingModelDownServiceClient.ThingModelMsgIssue(ctx, &rpcRequest) + if err == nil { - return dtos.DeviceExecRes{ - Result: true, - Message: ResultSuccess, + messageStore := container.MessageStoreItfFrom(p.dic.Get) + ch := messageStore.GenAckChan(data.MsgId) + + select { + case <-time.After(10 * time.Second): + ch.TryCloseChan() + return dtos.DeviceExecRes{ + Result: false, + Message: "wait response timeout", + } + case <-ctx.Done(): + return dtos.DeviceExecRes{ + Result: false, + Message: "wait response timeout", + } + case resp := <-ch.DataChan: + if v, ok := resp.(dtos.DevicePropertySetData); ok { + message, _ := json.Marshal(v) + return dtos.DeviceExecRes{ + Result: true, + Message: string(message), + } + } } } else { return dtos.DeviceExecRes{ @@ -98,50 +121,20 @@ func (p *deviceApp) DeviceAction(jobAction dtos.JobAction) dtos.DeviceExecRes { } } -func (p *deviceApp) DeviceInvokeThingService(invokeDeviceServiceReq dtos.InvokeDeviceServiceReq) dtos.DeviceExecRes { +func (p *deviceApp) SetDeviceProperty(req dtos.OpenApiSetDeviceThingModel) error { defer func() { if err := recover(); err != nil { - p.lc.Error("CreateDeviceCallBack Panic:", err) + p.lc.Error("Panic:", err) } }() - device, err := p.dbClient.DeviceById(invokeDeviceServiceReq.DeviceId) + device, err := p.dbClient.DeviceById(req.DeviceId) if err != nil { - return dtos.DeviceExecRes{ - Result: false, - Message: "device not found", - } + return nil } - - product, err := p.dbClient.ProductById(device.ProductId) - if err != nil { - return dtos.DeviceExecRes{ - Result: false, - Message: "product not found", - } - } - var find bool - var callType constants.CallType - for _, action := range product.Actions { - if action.Code == invokeDeviceServiceReq.Code { - find = true - callType = action.CallType - } - } - - if !find { - return dtos.DeviceExecRes{ - Result: false, - Message: "code not found", - } - } - deviceService, err := p.dbClient.DeviceServiceById(device.DriveInstanceId) if err != nil { - return dtos.DeviceExecRes{ - Result: false, - Message: "driver not found", - } + return err } driverService := container.DriverServiceAppFrom(di.GContainer.Get) @@ -149,11 +142,151 @@ func (p *deviceApp) DeviceInvokeThingService(invokeDeviceServiceReq dtos.InvokeD if status == constants.RunStatusStarted { client, errX := rpcclient.NewDriverRpcClient(deviceService.BaseAddress, false, "", deviceService.Id, p.lc) if errX != nil { - return dtos.DeviceExecRes{ - Result: false, - Message: errX.Error(), + return err + } + defer client.Close() + + var rpcRequest thingmodel.ThingModelIssueMsg + rpcRequest.DeviceId = req.DeviceId + rpcRequest.OperationType = thingmodel.OperationType_PROPERTY_SET + var data dtos.PropertySet + data.Version = "v1.0" + data.MsgId = uuid.Generate().String() + data.Time = time.Now().UnixMilli() + data.Params = req.Item + rpcRequest.Data = data.ToString() + + messageStore := container.MessageStoreItfFrom(p.dic.Get) + ch := messageStore.GenAckChan(data.MsgId) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + _, err = client.ThingModelDownServiceClient.ThingModelMsgIssue(ctx, &rpcRequest) + + if err != nil { + ch.TryCloseChan() + return errort.NewCommonErr(errort.DefaultSystemError, fmt.Errorf(err.Error())) + } + + select { + case <-time.After(10 * time.Second): + ch.TryCloseChan() + return errort.NewCommonErr(errort.DeviceLibraryResponseTimeOut, fmt.Errorf("driver id(%s) time out", deviceService.Id)) + case <-ctx.Done(): + return errort.NewCommonErr(errort.DeviceLibraryResponseTimeOut, fmt.Errorf("driver id(%s) time out", deviceService.Id)) + case resp := <-ch.DataChan: + if v, ok := resp.(dtos.DevicePropertySetData); ok { + if v.Success { + return nil + } else { + return errort.NewCommonErr(v.Code, fmt.Errorf(v.ErrorMessage)) + } } } + + } + return errort.NewCommonErr(errort.DeviceServiceNotStarted, fmt.Errorf("driver id(%s) not start", deviceService.Id)) +} + +func (p *deviceApp) DeviceEffectivePropertyData(deviceEffectivePropertyDataReq dtos.DeviceEffectivePropertyDataReq) (dtos.DeviceEffectivePropertyDataResponse, error) { + defer func() { + if err := recover(); err != nil { + p.lc.Error("Panic:", err) + } + }() + + device, err := p.dbClient.DeviceById(deviceEffectivePropertyDataReq.DeviceId) + if err != nil { + return dtos.DeviceEffectivePropertyDataResponse{}, err + } + + _, err = p.dbClient.ProductById(device.ProductId) + if err != nil { + return dtos.DeviceEffectivePropertyDataResponse{}, err + } + + deviceService, err := p.dbClient.DeviceServiceById(device.DriveInstanceId) + if err != nil { + return dtos.DeviceEffectivePropertyDataResponse{}, err + } + + driverService := container.DriverServiceAppFrom(di.GContainer.Get) + status := driverService.GetState(deviceService.Id) + if status == constants.RunStatusStarted { + + client, errX := rpcclient.NewDriverRpcClient(deviceService.BaseAddress, false, "", deviceService.Id, p.lc) + if errX != nil { + return dtos.DeviceEffectivePropertyDataResponse{}, err + } + defer client.Close() + var rpcRequest thingmodel.ThingModelIssueMsg + rpcRequest.DeviceId = deviceEffectivePropertyDataReq.DeviceId + rpcRequest.OperationType = thingmodel.OperationType_PROPERTY_GET + var data dtos.DeviceGetPropertyData + data.Version = "v1.0" + data.MsgId = uuid.Generate().String() + data.Time = time.Now().UnixMilli() + data.Data = deviceEffectivePropertyDataReq.Codes + rpcRequest.Data = data.ToString() + + messageStore := container.MessageStoreItfFrom(p.dic.Get) + ch := messageStore.GenAckChan(data.MsgId) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + _, err = client.ThingModelDownServiceClient.ThingModelMsgIssue(ctx, &rpcRequest) + + if err != nil { + ch.TryCloseChan() + return dtos.DeviceEffectivePropertyDataResponse{}, errort.NewCommonErr(errort.DefaultSystemError, fmt.Errorf("system error")) + } + select { + case <-time.After(10 * time.Second): + ch.TryCloseChan() + return dtos.DeviceEffectivePropertyDataResponse{}, errort.NewCommonErr(errort.DeviceLibraryResponseTimeOut, fmt.Errorf("driver id(%s) time out", deviceService.Id)) + case <-ctx.Done(): + return dtos.DeviceEffectivePropertyDataResponse{}, errort.NewCommonErr(errort.DeviceLibraryResponseTimeOut, fmt.Errorf("driver id(%s) time out", deviceService.Id)) + case resp := <-ch.DataChan: + if v, ok := resp.([]dtos.EffectivePropertyData); ok { + return dtos.DeviceEffectivePropertyDataResponse{ + Data: v, + }, nil + } + } + } + return dtos.DeviceEffectivePropertyDataResponse{}, errort.NewCommonErr(errort.DeviceServiceNotStarted, fmt.Errorf("driver id(%s) not start", deviceService.Id)) +} + +func (p *deviceApp) DeviceInvokeThingService(invokeDeviceServiceReq dtos.InvokeDeviceServiceReq) (map[string]interface{}, error) { + defer func() { + if err := recover(); err != nil { + p.lc.Error("Panic:", err) + } + }() + + device, err := p.dbClient.DeviceById(invokeDeviceServiceReq.DeviceId) + if err != nil { + return nil, err + } + + _, err = p.dbClient.ProductById(device.ProductId) + if err != nil { + return nil, err + + } + deviceService, err := p.dbClient.DeviceServiceById(device.DriveInstanceId) + if err != nil { + return nil, err + + } + + driverService := container.DriverServiceAppFrom(di.GContainer.Get) + status := driverService.GetState(deviceService.Id) + if status == constants.RunStatusStarted { + client, errX := rpcclient.NewDriverRpcClient(deviceService.BaseAddress, false, "", deviceService.Id, p.lc) + if errX != nil { + return nil, err + } defer client.Close() var rpcRequest thingmodel.ThingModelIssueMsg rpcRequest.DeviceId = invokeDeviceServiceReq.DeviceId @@ -165,106 +298,59 @@ func (p *deviceApp) DeviceInvokeThingService(invokeDeviceServiceReq dtos.InvokeD data.Data.Code = invokeDeviceServiceReq.Code data.Data.InputParams = invokeDeviceServiceReq.Items rpcRequest.Data = data.ToString() + messageStore := container.MessageStoreItfFrom(p.dic.Get) + ch := messageStore.GenAckChan(data.MsgId) - if callType == constants.CallTypeAsync { - //saveServiceInfo := genSaveServiceInfo(data.MsgId, data.Time, invokeDeviceServiceReq) - var saveServiceInfo dtos.ThingModelMessage - saveServiceInfo.OpType = int32(thingmodel.OperationType_SERVICE_EXECUTE) - saveServiceInfo.Cid = device.Id - var saveData dtos.SaveServiceIssueData - saveData.MsgId = data.MsgId - saveData.Code = invokeDeviceServiceReq.Code - saveData.Time = data.Time - saveData.InputParams = invokeDeviceServiceReq.Items + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + _, err = client.ThingModelDownServiceClient.ThingModelMsgIssue(ctx, &rpcRequest) + if err != nil { + ch.TryCloseChan() + return nil, errort.NewCommonErr(errort.DefaultSystemError, fmt.Errorf("system error")) + } + //****** + persistItf := container.PersistItfFrom(p.dic.Get) + var saveServiceInfo dtos.ThingModelMessage + saveServiceInfo.OpType = int32(thingmodel.OperationType_SERVICE_EXECUTE) + saveServiceInfo.Cid = device.Id + var saveData dtos.SaveServiceIssueData + saveData.MsgId = data.MsgId + saveData.Code = invokeDeviceServiceReq.Code + saveData.Time = data.Time + saveData.InputParams = invokeDeviceServiceReq.Items + //****** + + select { + case <-time.After(10 * time.Second): + ch.TryCloseChan() saveData.OutputParams = map[string]interface{}{ - "result": true, - "message": "success", + "result": false, + "message": "wait response timeout", } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - _, err = client.ThingModelDownServiceClient.ThingModelMsgIssue(ctx, &rpcRequest) - if err == nil { - persistItf := container.PersistItfFrom(p.dic.Get) - _ = persistItf.SaveDeviceThingModelData(saveServiceInfo) - return dtos.DeviceExecRes{ - Result: true, - Message: ResultSuccess, - } - } else { - return dtos.DeviceExecRes{ - Result: false, - Message: err.Error(), - } + s, _ := json.Marshal(saveData) + saveServiceInfo.Data = string(s) + _ = persistItf.SaveDeviceThingModelData(saveServiceInfo) + return nil, errort.NewCommonErr(errort.DeviceLibraryResponseTimeOut, fmt.Errorf("driver id(%s) time out", deviceService.Id)) + case <-ctx.Done(): + saveData.OutputParams = map[string]interface{}{ + "result": false, + "message": "wait response timeout", } - } else if callType == constants.CallTypeSync { - var saveServiceInfo dtos.ThingModelMessage - saveServiceInfo.OpType = int32(thingmodel.OperationType_SERVICE_EXECUTE) - saveServiceInfo.Cid = device.Id - var saveData dtos.SaveServiceIssueData - saveData.MsgId = data.MsgId - saveData.Code = invokeDeviceServiceReq.Code - saveData.Time = data.Time - saveData.InputParams = invokeDeviceServiceReq.Items - persistItf := container.PersistItfFrom(p.dic.Get) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - _, err = client.ThingModelDownServiceClient.ThingModelMsgIssue(ctx, &rpcRequest) - - if err != nil { - return dtos.DeviceExecRes{ - Result: false, - Message: err.Error(), - } - } - - messageStore := container.MessageStoreItfFrom(p.dic.Get) - ch := messageStore.GenAckChan(data.MsgId) - - select { - case <-time.After(5 * time.Second): - ch.TryCloseChan() - saveData.OutputParams = map[string]interface{}{ - "result": false, - "message": "wait response timeout", - } + s, _ := json.Marshal(saveData) + saveServiceInfo.Data = string(s) + _ = persistItf.SaveDeviceThingModelData(saveServiceInfo) + return nil, errort.NewCommonErr(errort.DeviceLibraryResponseTimeOut, fmt.Errorf("driver id(%s) time out", deviceService.Id)) + case resp := <-ch.DataChan: + if v, ok := resp.(map[string]interface{}); ok { + saveData.OutputParams = v s, _ := json.Marshal(saveData) saveServiceInfo.Data = string(s) _ = persistItf.SaveDeviceThingModelData(saveServiceInfo) - return dtos.DeviceExecRes{ - Result: false, - Message: "wait response timeout", - } - case <-ctx.Done(): - saveData.OutputParams = map[string]interface{}{ - "result": false, - "message": "wait response timeout", - } - s, _ := json.Marshal(saveData) - saveServiceInfo.Data = string(s) - _ = persistItf.SaveDeviceThingModelData(saveServiceInfo) - return dtos.DeviceExecRes{ - Result: false, - Message: "wait response timeout", - } - case resp := <-ch.DataChan: - if v, ok := resp.(map[string]interface{}); ok { - saveData.OutputParams = v - s, _ := json.Marshal(saveData) - saveServiceInfo.Data = string(s) - _ = persistItf.SaveDeviceThingModelData(saveServiceInfo) - message, _ := json.Marshal(v) - return dtos.DeviceExecRes{ - Result: true, - Message: string(message), - } - } - } + return v, nil + } } } - return dtos.DeviceExecRes{ - Result: false, - Message: "driver status stop", - } + return nil, errort.NewCommonErr(errort.DeviceServiceNotStarted, fmt.Errorf("driver id(%s) not start", deviceService.Id)) } diff --git a/internal/hummingbird/core/application/messagestore/messagestore.go b/internal/hummingbird/core/application/messagestore/messagestore.go index 3ec548f..36ba687 100644 --- a/internal/hummingbird/core/application/messagestore/messagestore.go +++ b/internal/hummingbird/core/application/messagestore/messagestore.go @@ -46,6 +46,12 @@ func NewMessageStore(dic *di.Container) *MessageStore { } } +func (wp *MessageStore) StoreRange() { + wp.ackMap.Range(func(key, value any) bool { + return true + }) +} + func (wp *MessageStore) StoreMsgId(id string, ch string) { wp.ackMap.Store(id, ch) } diff --git a/internal/hummingbird/core/application/persistence/thinkmodel.go b/internal/hummingbird/core/application/persistence/thinkmodel.go index 5cb5faf..688eec1 100644 --- a/internal/hummingbird/core/application/persistence/thinkmodel.go +++ b/internal/hummingbird/core/application/persistence/thinkmodel.go @@ -17,7 +17,6 @@ package persistence import ( "context" "encoding/json" - "errors" "github.com/winc-link/edge-driver-proto/thingmodel" "github.com/winc-link/hummingbird/internal/dtos" "github.com/winc-link/hummingbird/internal/hummingbird/core/application/messagestore" @@ -112,6 +111,38 @@ func (pst *persistApp) saveDeviceThingModelToLevelDB(req dtos.ThingModelMessage) if err != nil { return err } + + case thingmodel.OperationType_PROPERTY_GET_RESPONSE: + msg, err := req.TransformMessageDataByGetProperty() + if err != nil { + return err + } + messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get) + ack, ok := messageStore.LoadMsgChan(msg.MsgId) + if !ok { + //超时了。 + return nil + } + if v, ok := ack.(*messagestore.MsgAckChan); ok { + v.TrySendDataAndCloseChan(msg.Data) + messageStore.DeleteMsgId(msg.MsgId) + } + + case thingmodel.OperationType_PROPERTY_SET_RESPONSE: + msg, err := req.TransformMessageDataBySetProperty() + if err != nil { + return err + } + messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get) + ack, ok := messageStore.LoadMsgChan(msg.MsgId) + if !ok { + //超时了。 + return nil + } + if v, ok := ack.(*messagestore.MsgAckChan); ok { + v.TrySendDataAndCloseChan(msg.Data) + messageStore.DeleteMsgId(msg.MsgId) + } case thingmodel.OperationType_SERVICE_EXECUTE_RESPONSE: serviceMsg, err := req.TransformMessageDataByServiceExec() if err != nil { @@ -123,51 +154,61 @@ func (pst *persistApp) saveDeviceThingModelToLevelDB(req dtos.ThingModelMessage) return err } - product, err := pst.dbClient.ProductById(device.ProductId) + _, err = pst.dbClient.ProductById(device.ProductId) if err != nil { return err } - var find bool - var callType constants.CallType - - for _, action := range product.Actions { - if action.Code == serviceMsg.Code { - find = true - callType = action.CallType - break - } + //var find bool + //var callType constants.CallType + // + //for _, action := range product.Actions { + // if action.Code == serviceMsg.Code { + // find = true + // callType = action.CallType + // break + // } + //} + // + //if !find { + // return errors.New("") + //} + messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get) + ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId) + if !ok { + //可能是超时了。 + return nil } - if !find { - return errors.New("") - } - - if callType == constants.CallTypeSync { - messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get) - ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId) - if !ok { - //可能是超时了。 - return nil - } - - if v, ok := ack.(*messagestore.MsgAckChan); ok { - v.TrySendDataAndCloseChan(serviceMsg.OutputParams) - messageStore.DeleteMsgId(serviceMsg.MsgId) - } - - } else if callType == constants.CallTypeAsync { - kvs := make(map[string]interface{}) - var key string - key = generateActionLeveldbKey(req.Cid, serviceMsg.Code, serviceMsg.Time) - value, _ := serviceMsg.Marshal() - kvs[key] = value - err = pst.dataDbClient.Insert(context.Background(), "", kvs) - - if err != nil { - return err - } + if v, ok := ack.(*messagestore.MsgAckChan); ok { + v.TrySendDataAndCloseChan(serviceMsg.OutputParams) + messageStore.DeleteMsgId(serviceMsg.MsgId) } + //if callType == constants.CallTypeSync { + // messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get) + // ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId) + // if !ok { + // //可能是超时了。 + // return nil + // } + // + // if v, ok := ack.(*messagestore.MsgAckChan); ok { + // v.TrySendDataAndCloseChan(serviceMsg.OutputParams) + // messageStore.DeleteMsgId(serviceMsg.MsgId) + // } + // + //} else if callType == constants.CallTypeAsync { + // kvs := make(map[string]interface{}) + // var key string + // key = generateActionLeveldbKey(req.Cid, serviceMsg.Code, serviceMsg.Time) + // value, _ := serviceMsg.Marshal() + // kvs[key] = value + // err = pst.dataDbClient.Insert(context.Background(), "", kvs) + // + // if err != nil { + // return err + // } + //} case thingmodel.OperationType_DATA_BATCH_REPORT: msg, err := req.TransformMessageDataByBatchReport() if err != nil { @@ -249,8 +290,63 @@ func (pst *persistApp) saveDeviceThingModelToTdengine(req dtos.ThingModelMessage if err != nil { return err } + + case thingmodel.OperationType_PROPERTY_GET_RESPONSE: + msg, err := req.TransformMessageDataByGetProperty() + if err != nil { + return err + } + messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get) + ack, ok := messageStore.LoadMsgChan(msg.MsgId) + if !ok { + //超时了。 + return nil + } + if v, ok := ack.(*messagestore.MsgAckChan); ok { + v.TrySendDataAndCloseChan(msg.Data) + messageStore.DeleteMsgId(msg.MsgId) + } + case thingmodel.OperationType_PROPERTY_SET_RESPONSE: + msg, err := req.TransformMessageDataBySetProperty() + if err != nil { + return err + } + messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get) + ack, ok := messageStore.LoadMsgChan(msg.MsgId) + if !ok { + //超时了。 + return nil + } + if v, ok := ack.(*messagestore.MsgAckChan); ok { + v.TrySendDataAndCloseChan(msg.Data) + messageStore.DeleteMsgId(msg.MsgId) + } case thingmodel.OperationType_DATA_BATCH_REPORT: + msg, err := req.TransformMessageDataByBatchReport() + if err != nil { + return err + } + data := make(map[string]interface{}) + + for code, property := range msg.Data.Properties { + data[code] = property.Value + } + for code, event := range msg.Data.Events { + var eventData dtos.EventData + eventData.OutputParams = event.OutputParams + eventData.EventCode = code + eventData.EventTime = msg.Time + data[code] = eventData + + } + //批量写。 + err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data) + + if err != nil { + return err + } + return nil case thingmodel.OperationType_SERVICE_EXECUTE_RESPONSE: serviceMsg, err := req.TransformMessageDataByServiceExec() if err != nil { @@ -262,48 +358,60 @@ func (pst *persistApp) saveDeviceThingModelToTdengine(req dtos.ThingModelMessage return err } - product, err := pst.dbClient.ProductById(device.ProductId) + _, err = pst.dbClient.ProductById(device.ProductId) if err != nil { return err } - var find bool - var callType constants.CallType + //var find bool + //var callType constants.CallType + // + //for _, action := range product.Actions { + // if action.Code == serviceMsg.Code { + // find = true + // callType = action.CallType + // break + // } + //} + // + //if !find { + // return errors.New("") + //} - for _, action := range product.Actions { - if action.Code == serviceMsg.Code { - find = true - callType = action.CallType - break - } + messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get) + ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId) + if !ok { + //可能是超时了。 + return nil } - if !find { - return errors.New("") + if v, ok := ack.(*messagestore.MsgAckChan); ok { + v.TrySendDataAndCloseChan(serviceMsg.OutputParams) + messageStore.DeleteMsgId(serviceMsg.MsgId) } - if callType == constants.CallTypeSync { - messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get) - ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId) - if !ok { - //可能是超时了。 - return nil - } - - if v, ok := ack.(*messagestore.MsgAckChan); ok { - v.TrySendDataAndCloseChan(serviceMsg.OutputParams) - messageStore.DeleteMsgId(serviceMsg.MsgId) - } - - } else if callType == constants.CallTypeAsync { - v, _ := serviceMsg.Marshal() - data := make(map[string]interface{}) - data[serviceMsg.Code] = string(v) - err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data) - if err != nil { - return err - } - } + //if callType == constants.CallTypeSync { + // messageStore := resourceContainer.MessageStoreItfFrom(pst.dic.Get) + // ack, ok := messageStore.LoadMsgChan(serviceMsg.MsgId) + // if !ok { + // //可能是超时了。 + // return nil + // } + // + // if v, ok := ack.(*messagestore.MsgAckChan); ok { + // v.TrySendDataAndCloseChan(serviceMsg.OutputParams) + // messageStore.DeleteMsgId(serviceMsg.MsgId) + // } + // + //} else if callType == constants.CallTypeAsync { + // v, _ := serviceMsg.Marshal() + // data := make(map[string]interface{}) + // data[serviceMsg.Code] = string(v) + // err = pst.dataDbClient.Insert(context.Background(), constants.DB_PREFIX+req.Cid, data) + // if err != nil { + // return err + // } + //} } return nil diff --git a/internal/hummingbird/core/controller/http/openapi/thingmodel.go b/internal/hummingbird/core/controller/http/openapi/thingmodel.go index 024c6f9..bfad056 100644 --- a/internal/hummingbird/core/controller/http/openapi/thingmodel.go +++ b/internal/hummingbird/core/controller/http/openapi/thingmodel.go @@ -63,6 +63,20 @@ func (ctl *controller) OpenApiDeleteThingModel(c *gin.Context) { httphelper.ResultSuccess(nil, c.Writer, lc) } +// OpenApiQueryDeviceEffectivePropertyData 查询设备实时属性 +func (ctl *controller) OpenApiQueryDeviceEffectivePropertyData(c *gin.Context) { + lc := ctl.lc + var req dtos.DeviceEffectivePropertyDataReq + urlDecodeParam(&req, c.Request, lc) + data, err := ctl.getDeviceApp().DeviceEffectivePropertyData(req) + if err != nil { + httphelper.RenderFail(c, err, c.Writer, lc) + return + } + httphelper.ResultSuccess(data, c.Writer, lc) +} + +// OpenApiSetDeviceProperty 设备设备属性 func (ctl *controller) OpenApiSetDeviceProperty(c *gin.Context) { lc := ctl.lc var req dtos.OpenApiSetDeviceThingModel @@ -70,19 +84,12 @@ func (ctl *controller) OpenApiSetDeviceProperty(c *gin.Context) { httphelper.RenderFail(c, errort.NewCommonErr(errort.DefaultReqParamsError, err), c.Writer, lc) return } - - var code string - var value interface{} - for s, i := range req.Item { - code = s - value = i + err := ctl.getDeviceApp().SetDeviceProperty(req) + if err != nil { + httphelper.RenderFail(c, err, c.Writer, lc) + return } - execRes := ctl.getDeviceApp().DeviceAction(dtos.JobAction{ - DeviceId: req.DeviceId, - Code: code, - Value: value, - }) - httphelper.ResultSuccess(execRes, c.Writer, lc) + httphelper.ResultSuccess(nil, c.Writer, lc) } func (ctl *controller) OpenApiInvokeThingService(c *gin.Context) { @@ -92,9 +99,12 @@ func (ctl *controller) OpenApiInvokeThingService(c *gin.Context) { httphelper.RenderFail(c, errort.NewCommonErr(errort.DefaultReqParamsError, err), c.Writer, lc) return } - - execRes := ctl.getDeviceApp().DeviceInvokeThingService(req) - httphelper.ResultSuccess(execRes, c.Writer, lc) + data, err := ctl.getDeviceApp().DeviceInvokeThingService(req) + if err != nil { + httphelper.RenderFail(c, err, c.Writer, lc) + return + } + httphelper.ResultSuccess(data, c.Writer, lc) } func (ctl *controller) OpenApiQueryDevicePropertyData(c *gin.Context) { diff --git a/internal/hummingbird/core/interface/device.go b/internal/hummingbird/core/interface/device.go index 34d7ae6..d51ec74 100644 --- a/internal/hummingbird/core/interface/device.go +++ b/internal/hummingbird/core/interface/device.go @@ -58,7 +58,11 @@ type DeviceCtlItf interface { DeviceAction(jobAction dtos.JobAction) dtos.DeviceExecRes - DeviceInvokeThingService(invokeDeviceServiceReq dtos.InvokeDeviceServiceReq) dtos.DeviceExecRes + DeviceInvokeThingService(invokeDeviceServiceReq dtos.InvokeDeviceServiceReq) (map[string]interface{}, error) + + SetDeviceProperty(req dtos.OpenApiSetDeviceThingModel) error + + DeviceEffectivePropertyData(deviceEffectivePropertyDataReq dtos.DeviceEffectivePropertyDataReq) (dtos.DeviceEffectivePropertyDataResponse, error) } type OpenApiDeviceItf interface { diff --git a/internal/hummingbird/core/interface/message.go b/internal/hummingbird/core/interface/message.go index aec564e..41050f0 100644 --- a/internal/hummingbird/core/interface/message.go +++ b/internal/hummingbird/core/interface/message.go @@ -17,6 +17,7 @@ import ( ) type MessageStores interface { + StoreRange() StoreMsgId(id string, ch string) LoadMsgChan(id string) (interface{}, bool) DeleteMsgId(id string) diff --git a/internal/hummingbird/core/route/openapi.go b/internal/hummingbird/core/route/openapi.go index c0d332d..7058175 100644 --- a/internal/hummingbird/core/route/openapi.go +++ b/internal/hummingbird/core/route/openapi.go @@ -82,6 +82,8 @@ func RegisterOpenApi(engine *gin.Engine, dic *di.Container) { } //物模型使用的API { + //查询设备实时属性数据。 + v1.GET("/queryDeviceEffectivePropertyData", ctl.OpenApiQueryDeviceEffectivePropertyData) //设置设备的属性。 v1.POST("/setDeviceProperty", ctl.OpenApiSetDeviceProperty) //调用设备的服务。 diff --git a/internal/pkg/errort/code.go b/internal/pkg/errort/code.go index 1c0d3f8..10e4327 100644 --- a/internal/pkg/errort/code.go +++ b/internal/pkg/errort/code.go @@ -27,11 +27,11 @@ const ( DeviceLibraryDockerAuthInvalid uint32 = 20205 DeviceLibraryDockerImagesNotFound uint32 = 20206 DeviceLibraryNotExist uint32 = 20211 - DeviceLibraryAtopDefaultConfig uint32 = 20212 DeviceLibraryImageDownloadFail uint32 = 20213 DeviceLibraryImageNotFound uint32 = 20214 DeviceLibraryNotAllowDelete uint32 = 20215 DockerImageRepositoryNotFound uint32 = 20216 + DeviceLibraryResponseTimeOut uint32 = 20217 DeviceServiceMustDeleteDevice uint32 = 20301 DeviceServiceMustStopService uint32 = 20302 diff --git a/internal/pkg/i18n/locales/en.go b/internal/pkg/i18n/locales/en.go index be8161a..4774eb5 100644 --- a/internal/pkg/i18n/locales/en.go +++ b/internal/pkg/i18n/locales/en.go @@ -124,6 +124,10 @@ func GetEnMessages() []*i18n.Message { ID: "20216", Other: `The docker image repository does not exist.`, }, + { + ID: "20217", + Other: `Time out.`, + }, // 驱动实例 { diff --git a/internal/pkg/i18n/locales/zh.go b/internal/pkg/i18n/locales/zh.go index 7fc9063..031e1e9 100644 --- a/internal/pkg/i18n/locales/zh.go +++ b/internal/pkg/i18n/locales/zh.go @@ -124,6 +124,10 @@ func GetZhMessages() []*i18n.Message { ID: "20216", Other: `docker镜像仓库不存在`, }, + { + ID: "20217", + Other: `响应超时`, + }, // 驱动实例 {