diff --git a/api/go/integration/integration.pb.go b/api/go/integration/integration.pb.go index c6c2b9e4..3c996572 100644 --- a/api/go/integration/integration.pb.go +++ b/api/go/integration/integration.pb.go @@ -1088,6 +1088,104 @@ func (x *IntegrationEvent) GetObject() *structpb.Struct { return nil } +// DownlinkCommand is the command to enqueue a downlink payload for the given +// device. +type DownlinkCommand struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // ID (UUID). + // If left blank, a random UUID will be generated. + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // Device EUI (EUI64). + DevEui string `protobuf:"bytes,2,opt,name=dev_eui,json=devEui,proto3" json:"dev_eui,omitempty"` + // Confirmed. + Confirmed bool `protobuf:"varint,3,opt,name=confirmed,proto3" json:"confirmed,omitempty"` + // FPort (must be > 0). + FPort uint32 `protobuf:"varint,4,opt,name=f_port,json=fPort,proto3" json:"f_port,omitempty"` + // Data. + // Or use the json_object field when a codec has been configured. + Data []byte `protobuf:"bytes,5,opt,name=data,proto3" json:"data,omitempty"` + // Only use this when a codec has been configured that can encode this + // object to bytes. + Object *structpb.Struct `protobuf:"bytes,6,opt,name=object,proto3" json:"object,omitempty"` +} + +func (x *DownlinkCommand) Reset() { + *x = DownlinkCommand{} + if protoimpl.UnsafeEnabled { + mi := &file_integration_integration_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DownlinkCommand) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DownlinkCommand) ProtoMessage() {} + +func (x *DownlinkCommand) ProtoReflect() protoreflect.Message { + mi := &file_integration_integration_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DownlinkCommand.ProtoReflect.Descriptor instead. +func (*DownlinkCommand) Descriptor() ([]byte, []int) { + return file_integration_integration_proto_rawDescGZIP(), []int{9} +} + +func (x *DownlinkCommand) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *DownlinkCommand) GetDevEui() string { + if x != nil { + return x.DevEui + } + return "" +} + +func (x *DownlinkCommand) GetConfirmed() bool { + if x != nil { + return x.Confirmed + } + return false +} + +func (x *DownlinkCommand) GetFPort() uint32 { + if x != nil { + return x.FPort + } + return 0 +} + +func (x *DownlinkCommand) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +func (x *DownlinkCommand) GetObject() *structpb.Struct { + if x != nil { + return x.Object + } + return nil +} + var File_integration_integration_proto protoreflect.FileDescriptor var file_integration_integration_proto_rawDesc = []byte{ @@ -1279,30 +1377,41 @@ var file_integration_integration_proto_rawDesc = []byte{ 0x09, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x2f, 0x0a, 0x06, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, - 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x06, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x2a, 0x2c, 0x0a, - 0x08, 0x4c, 0x6f, 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x08, 0x0a, 0x04, 0x49, 0x4e, 0x46, - 0x4f, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x57, 0x41, 0x52, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x01, - 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x02, 0x2a, 0xc0, 0x01, 0x0a, 0x07, - 0x4c, 0x6f, 0x67, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, - 0x57, 0x4e, 0x10, 0x00, 0x12, 0x19, 0x0a, 0x15, 0x44, 0x4f, 0x57, 0x4e, 0x4c, 0x49, 0x4e, 0x4b, - 0x5f, 0x50, 0x41, 0x59, 0x4c, 0x4f, 0x41, 0x44, 0x5f, 0x53, 0x49, 0x5a, 0x45, 0x10, 0x01, 0x12, - 0x10, 0x0a, 0x0c, 0x55, 0x50, 0x4c, 0x49, 0x4e, 0x4b, 0x5f, 0x43, 0x4f, 0x44, 0x45, 0x43, 0x10, - 0x02, 0x12, 0x12, 0x0a, 0x0e, 0x44, 0x4f, 0x57, 0x4e, 0x4c, 0x49, 0x4e, 0x4b, 0x5f, 0x43, 0x4f, - 0x44, 0x45, 0x43, 0x10, 0x03, 0x12, 0x08, 0x0a, 0x04, 0x4f, 0x54, 0x41, 0x41, 0x10, 0x04, 0x12, - 0x16, 0x0a, 0x12, 0x55, 0x50, 0x4c, 0x49, 0x4e, 0x4b, 0x5f, 0x46, 0x5f, 0x43, 0x4e, 0x54, 0x5f, - 0x52, 0x45, 0x53, 0x45, 0x54, 0x10, 0x05, 0x12, 0x0e, 0x0a, 0x0a, 0x55, 0x50, 0x4c, 0x49, 0x4e, - 0x4b, 0x5f, 0x4d, 0x49, 0x43, 0x10, 0x06, 0x12, 0x1f, 0x0a, 0x1b, 0x55, 0x50, 0x4c, 0x49, 0x4e, - 0x4b, 0x5f, 0x46, 0x5f, 0x43, 0x4e, 0x54, 0x5f, 0x52, 0x45, 0x54, 0x52, 0x41, 0x4e, 0x53, 0x4d, - 0x49, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x07, 0x12, 0x14, 0x0a, 0x10, 0x44, 0x4f, 0x57, 0x4e, - 0x4c, 0x49, 0x4e, 0x4b, 0x5f, 0x47, 0x41, 0x54, 0x45, 0x57, 0x41, 0x59, 0x10, 0x08, 0x42, 0x6b, - 0x0a, 0x20, 0x69, 0x6f, 0x2e, 0x63, 0x68, 0x69, 0x72, 0x70, 0x73, 0x74, 0x61, 0x63, 0x6b, 0x2e, - 0x61, 0x70, 0x69, 0x2e, 0x61, 0x73, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x42, 0x10, 0x49, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x50, - 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x62, 0x72, 0x6f, 0x63, 0x61, 0x61, 0x72, 0x2f, 0x63, 0x68, 0x69, 0x72, 0x70, - 0x73, 0x74, 0x61, 0x63, 0x6b, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x34, 0x2f, - 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x06, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x22, 0xb4, 0x01, + 0x0a, 0x0f, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, + 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, + 0x64, 0x12, 0x17, 0x0a, 0x07, 0x64, 0x65, 0x76, 0x5f, 0x65, 0x75, 0x69, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x06, 0x64, 0x65, 0x76, 0x45, 0x75, 0x69, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6f, + 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x63, + 0x6f, 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x65, 0x64, 0x12, 0x15, 0x0a, 0x06, 0x66, 0x5f, 0x70, 0x6f, + 0x72, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x66, 0x50, 0x6f, 0x72, 0x74, 0x12, + 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, + 0x61, 0x74, 0x61, 0x12, 0x2f, 0x0a, 0x06, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x06, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x06, 0x6f, 0x62, + 0x6a, 0x65, 0x63, 0x74, 0x2a, 0x2c, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c, + 0x12, 0x08, 0x0a, 0x04, 0x49, 0x4e, 0x46, 0x4f, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x57, 0x41, + 0x52, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, 0x52, + 0x10, 0x02, 0x2a, 0xc0, 0x01, 0x0a, 0x07, 0x4c, 0x6f, 0x67, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x0b, + 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x19, 0x0a, 0x15, 0x44, + 0x4f, 0x57, 0x4e, 0x4c, 0x49, 0x4e, 0x4b, 0x5f, 0x50, 0x41, 0x59, 0x4c, 0x4f, 0x41, 0x44, 0x5f, + 0x53, 0x49, 0x5a, 0x45, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x55, 0x50, 0x4c, 0x49, 0x4e, 0x4b, + 0x5f, 0x43, 0x4f, 0x44, 0x45, 0x43, 0x10, 0x02, 0x12, 0x12, 0x0a, 0x0e, 0x44, 0x4f, 0x57, 0x4e, + 0x4c, 0x49, 0x4e, 0x4b, 0x5f, 0x43, 0x4f, 0x44, 0x45, 0x43, 0x10, 0x03, 0x12, 0x08, 0x0a, 0x04, + 0x4f, 0x54, 0x41, 0x41, 0x10, 0x04, 0x12, 0x16, 0x0a, 0x12, 0x55, 0x50, 0x4c, 0x49, 0x4e, 0x4b, + 0x5f, 0x46, 0x5f, 0x43, 0x4e, 0x54, 0x5f, 0x52, 0x45, 0x53, 0x45, 0x54, 0x10, 0x05, 0x12, 0x0e, + 0x0a, 0x0a, 0x55, 0x50, 0x4c, 0x49, 0x4e, 0x4b, 0x5f, 0x4d, 0x49, 0x43, 0x10, 0x06, 0x12, 0x1f, + 0x0a, 0x1b, 0x55, 0x50, 0x4c, 0x49, 0x4e, 0x4b, 0x5f, 0x46, 0x5f, 0x43, 0x4e, 0x54, 0x5f, 0x52, + 0x45, 0x54, 0x52, 0x41, 0x4e, 0x53, 0x4d, 0x49, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x07, 0x12, + 0x14, 0x0a, 0x10, 0x44, 0x4f, 0x57, 0x4e, 0x4c, 0x49, 0x4e, 0x4b, 0x5f, 0x47, 0x41, 0x54, 0x45, + 0x57, 0x41, 0x59, 0x10, 0x08, 0x42, 0x6b, 0x0a, 0x20, 0x69, 0x6f, 0x2e, 0x63, 0x68, 0x69, 0x72, + 0x70, 0x73, 0x74, 0x61, 0x63, 0x6b, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x61, 0x73, 0x2e, 0x69, 0x6e, + 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x10, 0x49, 0x6e, 0x74, 0x65, 0x67, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x33, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x72, 0x6f, 0x63, 0x61, 0x61, + 0x72, 0x2f, 0x63, 0x68, 0x69, 0x72, 0x70, 0x73, 0x74, 0x61, 0x63, 0x6b, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x34, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1318,7 +1427,7 @@ func file_integration_integration_proto_rawDescGZIP() []byte { } var file_integration_integration_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_integration_integration_proto_msgTypes = make([]protoimpl.MessageInfo, 11) +var file_integration_integration_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_integration_integration_proto_goTypes = []interface{}{ (LogLevel)(0), // 0: integration.LogLevel (LogCode)(0), // 1: integration.LogCode @@ -1331,47 +1440,49 @@ var file_integration_integration_proto_goTypes = []interface{}{ (*StatusEvent)(nil), // 8: integration.StatusEvent (*LocationEvent)(nil), // 9: integration.LocationEvent (*IntegrationEvent)(nil), // 10: integration.IntegrationEvent - nil, // 11: integration.DeviceInfo.TagsEntry - nil, // 12: integration.LogEvent.ContextEntry - (*timestamppb.Timestamp)(nil), // 13: google.protobuf.Timestamp - (*structpb.Struct)(nil), // 14: google.protobuf.Struct - (*gw.UplinkRxInfo)(nil), // 15: gw.UplinkRxInfo - (*gw.UplinkTxInfo)(nil), // 16: gw.UplinkTxInfo - (*gw.DownlinkTxInfo)(nil), // 17: gw.DownlinkTxInfo - (*common.Location)(nil), // 18: common.Location + (*DownlinkCommand)(nil), // 11: integration.DownlinkCommand + nil, // 12: integration.DeviceInfo.TagsEntry + nil, // 13: integration.LogEvent.ContextEntry + (*timestamppb.Timestamp)(nil), // 14: google.protobuf.Timestamp + (*structpb.Struct)(nil), // 15: google.protobuf.Struct + (*gw.UplinkRxInfo)(nil), // 16: gw.UplinkRxInfo + (*gw.UplinkTxInfo)(nil), // 17: gw.UplinkTxInfo + (*gw.DownlinkTxInfo)(nil), // 18: gw.DownlinkTxInfo + (*common.Location)(nil), // 19: common.Location } var file_integration_integration_proto_depIdxs = []int32{ - 11, // 0: integration.DeviceInfo.tags:type_name -> integration.DeviceInfo.TagsEntry - 13, // 1: integration.UplinkEvent.time:type_name -> google.protobuf.Timestamp + 12, // 0: integration.DeviceInfo.tags:type_name -> integration.DeviceInfo.TagsEntry + 14, // 1: integration.UplinkEvent.time:type_name -> google.protobuf.Timestamp 2, // 2: integration.UplinkEvent.device_info:type_name -> integration.DeviceInfo - 14, // 3: integration.UplinkEvent.object:type_name -> google.protobuf.Struct - 15, // 4: integration.UplinkEvent.rx_info:type_name -> gw.UplinkRxInfo - 16, // 5: integration.UplinkEvent.tx_info:type_name -> gw.UplinkTxInfo - 13, // 6: integration.JoinEvent.time:type_name -> google.protobuf.Timestamp + 15, // 3: integration.UplinkEvent.object:type_name -> google.protobuf.Struct + 16, // 4: integration.UplinkEvent.rx_info:type_name -> gw.UplinkRxInfo + 17, // 5: integration.UplinkEvent.tx_info:type_name -> gw.UplinkTxInfo + 14, // 6: integration.JoinEvent.time:type_name -> google.protobuf.Timestamp 2, // 7: integration.JoinEvent.device_info:type_name -> integration.DeviceInfo - 13, // 8: integration.AckEvent.time:type_name -> google.protobuf.Timestamp + 14, // 8: integration.AckEvent.time:type_name -> google.protobuf.Timestamp 2, // 9: integration.AckEvent.device_info:type_name -> integration.DeviceInfo - 13, // 10: integration.TxAckEvent.time:type_name -> google.protobuf.Timestamp + 14, // 10: integration.TxAckEvent.time:type_name -> google.protobuf.Timestamp 2, // 11: integration.TxAckEvent.device_info:type_name -> integration.DeviceInfo - 17, // 12: integration.TxAckEvent.tx_info:type_name -> gw.DownlinkTxInfo - 13, // 13: integration.LogEvent.time:type_name -> google.protobuf.Timestamp + 18, // 12: integration.TxAckEvent.tx_info:type_name -> gw.DownlinkTxInfo + 14, // 13: integration.LogEvent.time:type_name -> google.protobuf.Timestamp 2, // 14: integration.LogEvent.device_info:type_name -> integration.DeviceInfo 0, // 15: integration.LogEvent.level:type_name -> integration.LogLevel 1, // 16: integration.LogEvent.code:type_name -> integration.LogCode - 12, // 17: integration.LogEvent.context:type_name -> integration.LogEvent.ContextEntry - 13, // 18: integration.StatusEvent.time:type_name -> google.protobuf.Timestamp + 13, // 17: integration.LogEvent.context:type_name -> integration.LogEvent.ContextEntry + 14, // 18: integration.StatusEvent.time:type_name -> google.protobuf.Timestamp 2, // 19: integration.StatusEvent.device_info:type_name -> integration.DeviceInfo - 13, // 20: integration.LocationEvent.time:type_name -> google.protobuf.Timestamp + 14, // 20: integration.LocationEvent.time:type_name -> google.protobuf.Timestamp 2, // 21: integration.LocationEvent.device_info:type_name -> integration.DeviceInfo - 18, // 22: integration.LocationEvent.location:type_name -> common.Location - 13, // 23: integration.IntegrationEvent.time:type_name -> google.protobuf.Timestamp + 19, // 22: integration.LocationEvent.location:type_name -> common.Location + 14, // 23: integration.IntegrationEvent.time:type_name -> google.protobuf.Timestamp 2, // 24: integration.IntegrationEvent.device_info:type_name -> integration.DeviceInfo - 14, // 25: integration.IntegrationEvent.object:type_name -> google.protobuf.Struct - 26, // [26:26] is the sub-list for method output_type - 26, // [26:26] is the sub-list for method input_type - 26, // [26:26] is the sub-list for extension type_name - 26, // [26:26] is the sub-list for extension extendee - 0, // [0:26] is the sub-list for field type_name + 15, // 25: integration.IntegrationEvent.object:type_name -> google.protobuf.Struct + 15, // 26: integration.DownlinkCommand.object:type_name -> google.protobuf.Struct + 27, // [27:27] is the sub-list for method output_type + 27, // [27:27] is the sub-list for method input_type + 27, // [27:27] is the sub-list for extension type_name + 27, // [27:27] is the sub-list for extension extendee + 0, // [0:27] is the sub-list for field type_name } func init() { file_integration_integration_proto_init() } @@ -1488,6 +1599,18 @@ func file_integration_integration_proto_init() { return nil } } + file_integration_integration_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DownlinkCommand); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -1495,7 +1618,7 @@ func file_integration_integration_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_integration_integration_proto_rawDesc, NumEnums: 2, - NumMessages: 11, + NumMessages: 12, NumExtensions: 0, NumServices: 0, }, diff --git a/api/js/integration/integration_pb.d.ts b/api/js/integration/integration_pb.d.ts index 195e56b4..5d62c9c3 100644 --- a/api/js/integration/integration_pb.d.ts +++ b/api/js/integration/integration_pb.d.ts @@ -447,6 +447,50 @@ export namespace IntegrationEvent { } } +export class DownlinkCommand extends jspb.Message { + getId(): string; + setId(value: string): void; + + getDevEui(): string; + setDevEui(value: string): void; + + getConfirmed(): boolean; + setConfirmed(value: boolean): void; + + getFPort(): number; + setFPort(value: number): void; + + getData(): Uint8Array | string; + getData_asU8(): Uint8Array; + getData_asB64(): string; + setData(value: Uint8Array | string): void; + + hasObject(): boolean; + clearObject(): void; + getObject(): google_protobuf_struct_pb.Struct | undefined; + setObject(value?: google_protobuf_struct_pb.Struct): void; + + serializeBinary(): Uint8Array; + toObject(includeInstance?: boolean): DownlinkCommand.AsObject; + static toObject(includeInstance: boolean, msg: DownlinkCommand): DownlinkCommand.AsObject; + static extensions: {[key: number]: jspb.ExtensionFieldInfo}; + static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo}; + static serializeBinaryToWriter(message: DownlinkCommand, writer: jspb.BinaryWriter): void; + static deserializeBinary(bytes: Uint8Array): DownlinkCommand; + static deserializeBinaryFromReader(message: DownlinkCommand, reader: jspb.BinaryReader): DownlinkCommand; +} + +export namespace DownlinkCommand { + export type AsObject = { + id: string, + devEui: string, + confirmed: boolean, + fPort: number, + data: Uint8Array | string, + object?: google_protobuf_struct_pb.Struct.AsObject, + } +} + export interface LogLevelMap { INFO: 0; WARNING: 1; diff --git a/api/js/integration/integration_pb.js b/api/js/integration/integration_pb.js index 82f34dab..59da3ed4 100644 --- a/api/js/integration/integration_pb.js +++ b/api/js/integration/integration_pb.js @@ -15,6 +15,7 @@ var google_protobuf_timestamp_pb = require('google-protobuf/google/protobuf/time var google_protobuf_struct_pb = require('google-protobuf/google/protobuf/struct_pb.js'); goog.exportSymbol('proto.integration.AckEvent', null, global); goog.exportSymbol('proto.integration.DeviceInfo', null, global); +goog.exportSymbol('proto.integration.DownlinkCommand', null, global); goog.exportSymbol('proto.integration.IntegrationEvent', null, global); goog.exportSymbol('proto.integration.JoinEvent', null, global); goog.exportSymbol('proto.integration.LocationEvent', null, global); @@ -3317,6 +3318,343 @@ proto.integration.IntegrationEvent.prototype.hasObject = function() { }; + +/** + * Generated by JsPbCodeGenerator. + * @param {Array=} opt_data Optional initial data array, typically from a + * server response, or constructed directly in Javascript. The array is used + * in place and becomes part of the constructed object. It is not cloned. + * If no data is provided, the constructed object will be empty, but still + * valid. + * @extends {jspb.Message} + * @constructor + */ +proto.integration.DownlinkCommand = function(opt_data) { + jspb.Message.initialize(this, opt_data, 0, -1, null, null); +}; +goog.inherits(proto.integration.DownlinkCommand, jspb.Message); +if (goog.DEBUG && !COMPILED) { + proto.integration.DownlinkCommand.displayName = 'proto.integration.DownlinkCommand'; +} + + +if (jspb.Message.GENERATE_TO_OBJECT) { +/** + * Creates an object representation of this proto suitable for use in Soy templates. + * Field names that are reserved in JavaScript and will be renamed to pb_name. + * To access a reserved field use, foo.pb_, eg, foo.pb_default. + * For the list of reserved names please see: + * com.google.apps.jspb.JsClassTemplate.JS_RESERVED_WORDS. + * @param {boolean=} opt_includeInstance Whether to include the JSPB instance + * for transitional soy proto support: http://goto/soy-param-migration + * @return {!Object} + */ +proto.integration.DownlinkCommand.prototype.toObject = function(opt_includeInstance) { + return proto.integration.DownlinkCommand.toObject(opt_includeInstance, this); +}; + + +/** + * Static version of the {@see toObject} method. + * @param {boolean|undefined} includeInstance Whether to include the JSPB + * instance for transitional soy proto support: + * http://goto/soy-param-migration + * @param {!proto.integration.DownlinkCommand} msg The msg instance to transform. + * @return {!Object} + */ +proto.integration.DownlinkCommand.toObject = function(includeInstance, msg) { + var f, obj = { + id: msg.getId(), + devEui: msg.getDevEui(), + confirmed: msg.getConfirmed(), + fPort: msg.getFPort(), + data: msg.getData_asB64(), + object: (f = msg.getObject()) && google_protobuf_struct_pb.Struct.toObject(includeInstance, f) + }; + + if (includeInstance) { + obj.$jspbMessageInstance = msg; + } + return obj; +}; +} + + +/** + * Deserializes binary data (in protobuf wire format). + * @param {jspb.ByteSource} bytes The bytes to deserialize. + * @return {!proto.integration.DownlinkCommand} + */ +proto.integration.DownlinkCommand.deserializeBinary = function(bytes) { + var reader = new jspb.BinaryReader(bytes); + var msg = new proto.integration.DownlinkCommand; + return proto.integration.DownlinkCommand.deserializeBinaryFromReader(msg, reader); +}; + + +/** + * Deserializes binary data (in protobuf wire format) from the + * given reader into the given message object. + * @param {!proto.integration.DownlinkCommand} msg The message object to deserialize into. + * @param {!jspb.BinaryReader} reader The BinaryReader to use. + * @return {!proto.integration.DownlinkCommand} + */ +proto.integration.DownlinkCommand.deserializeBinaryFromReader = function(msg, reader) { + while (reader.nextField()) { + if (reader.isEndGroup()) { + break; + } + var field = reader.getFieldNumber(); + switch (field) { + case 1: + var value = /** @type {string} */ (reader.readString()); + msg.setId(value); + break; + case 2: + var value = /** @type {string} */ (reader.readString()); + msg.setDevEui(value); + break; + case 3: + var value = /** @type {boolean} */ (reader.readBool()); + msg.setConfirmed(value); + break; + case 4: + var value = /** @type {number} */ (reader.readUint32()); + msg.setFPort(value); + break; + case 5: + var value = /** @type {!Uint8Array} */ (reader.readBytes()); + msg.setData(value); + break; + case 6: + var value = new google_protobuf_struct_pb.Struct; + reader.readMessage(value,google_protobuf_struct_pb.Struct.deserializeBinaryFromReader); + msg.setObject(value); + break; + default: + reader.skipField(); + break; + } + } + return msg; +}; + + +/** + * Class method variant: serializes the given message to binary data + * (in protobuf wire format), writing to the given BinaryWriter. + * @param {!proto.integration.DownlinkCommand} message + * @param {!jspb.BinaryWriter} writer + */ +proto.integration.DownlinkCommand.serializeBinaryToWriter = function(message, writer) { + message.serializeBinaryToWriter(writer); +}; + + +/** + * Serializes the message to binary data (in protobuf wire format). + * @return {!Uint8Array} + */ +proto.integration.DownlinkCommand.prototype.serializeBinary = function() { + var writer = new jspb.BinaryWriter(); + this.serializeBinaryToWriter(writer); + return writer.getResultBuffer(); +}; + + +/** + * Serializes the message to binary data (in protobuf wire format), + * writing to the given BinaryWriter. + * @param {!jspb.BinaryWriter} writer + */ +proto.integration.DownlinkCommand.prototype.serializeBinaryToWriter = function (writer) { + var f = undefined; + f = this.getId(); + if (f.length > 0) { + writer.writeString( + 1, + f + ); + } + f = this.getDevEui(); + if (f.length > 0) { + writer.writeString( + 2, + f + ); + } + f = this.getConfirmed(); + if (f) { + writer.writeBool( + 3, + f + ); + } + f = this.getFPort(); + if (f !== 0) { + writer.writeUint32( + 4, + f + ); + } + f = this.getData_asU8(); + if (f.length > 0) { + writer.writeBytes( + 5, + f + ); + } + f = this.getObject(); + if (f != null) { + writer.writeMessage( + 6, + f, + google_protobuf_struct_pb.Struct.serializeBinaryToWriter + ); + } +}; + + +/** + * Creates a deep clone of this proto. No data is shared with the original. + * @return {!proto.integration.DownlinkCommand} The clone. + */ +proto.integration.DownlinkCommand.prototype.cloneMessage = function() { + return /** @type {!proto.integration.DownlinkCommand} */ (jspb.Message.cloneMessage(this)); +}; + + +/** + * optional string id = 1; + * @return {string} + */ +proto.integration.DownlinkCommand.prototype.getId = function() { + return /** @type {string} */ (jspb.Message.getFieldProto3(this, 1, "")); +}; + + +/** @param {string} value */ +proto.integration.DownlinkCommand.prototype.setId = function(value) { + jspb.Message.setField(this, 1, value); +}; + + +/** + * optional string dev_eui = 2; + * @return {string} + */ +proto.integration.DownlinkCommand.prototype.getDevEui = function() { + return /** @type {string} */ (jspb.Message.getFieldProto3(this, 2, "")); +}; + + +/** @param {string} value */ +proto.integration.DownlinkCommand.prototype.setDevEui = function(value) { + jspb.Message.setField(this, 2, value); +}; + + +/** + * optional bool confirmed = 3; + * Note that Boolean fields may be set to 0/1 when serialized from a Java server. + * You should avoid comparisons like {@code val === true/false} in those cases. + * @return {boolean} + */ +proto.integration.DownlinkCommand.prototype.getConfirmed = function() { + return /** @type {boolean} */ (jspb.Message.getFieldProto3(this, 3, false)); +}; + + +/** @param {boolean} value */ +proto.integration.DownlinkCommand.prototype.setConfirmed = function(value) { + jspb.Message.setField(this, 3, value); +}; + + +/** + * optional uint32 f_port = 4; + * @return {number} + */ +proto.integration.DownlinkCommand.prototype.getFPort = function() { + return /** @type {number} */ (jspb.Message.getFieldProto3(this, 4, 0)); +}; + + +/** @param {number} value */ +proto.integration.DownlinkCommand.prototype.setFPort = function(value) { + jspb.Message.setField(this, 4, value); +}; + + +/** + * optional bytes data = 5; + * @return {!(string|Uint8Array)} + */ +proto.integration.DownlinkCommand.prototype.getData = function() { + return /** @type {!(string|Uint8Array)} */ (jspb.Message.getFieldProto3(this, 5, "")); +}; + + +/** + * optional bytes data = 5; + * This is a type-conversion wrapper around `getData()` + * @return {string} + */ +proto.integration.DownlinkCommand.prototype.getData_asB64 = function() { + return /** @type {string} */ (jspb.Message.bytesAsB64( + this.getData())); +}; + + +/** + * optional bytes data = 5; + * Note that Uint8Array is not supported on all browsers. + * @see http://caniuse.com/Uint8Array + * This is a type-conversion wrapper around `getData()` + * @return {!Uint8Array} + */ +proto.integration.DownlinkCommand.prototype.getData_asU8 = function() { + return /** @type {!Uint8Array} */ (jspb.Message.bytesAsU8( + this.getData())); +}; + + +/** @param {!(string|Uint8Array)} value */ +proto.integration.DownlinkCommand.prototype.setData = function(value) { + jspb.Message.setField(this, 5, value); +}; + + +/** + * optional google.protobuf.Struct object = 6; + * @return {proto.google.protobuf.Struct} + */ +proto.integration.DownlinkCommand.prototype.getObject = function() { + return /** @type{proto.google.protobuf.Struct} */ ( + jspb.Message.getWrapperField(this, google_protobuf_struct_pb.Struct, 6)); +}; + + +/** @param {proto.google.protobuf.Struct|undefined} value */ +proto.integration.DownlinkCommand.prototype.setObject = function(value) { + jspb.Message.setWrapperField(this, 6, value); +}; + + +proto.integration.DownlinkCommand.prototype.clearObject = function() { + this.setObject(undefined); +}; + + +/** + * Returns whether this field is set. + * @return{!boolean} + */ +proto.integration.DownlinkCommand.prototype.hasObject = function() { + return jspb.Message.getField(this, 6) != null; +}; + + /** * @enum {number} */ diff --git a/api/proto/integration/integration.proto b/api/proto/integration/integration.proto index 458006ac..e639d488 100644 --- a/api/proto/integration/integration.proto +++ b/api/proto/integration/integration.proto @@ -276,3 +276,28 @@ message IntegrationEvent { // Struct containing the event object. google.protobuf.Struct object = 6; } + +// DownlinkCommand is the command to enqueue a downlink payload for the given +// device. +message DownlinkCommand { + // ID (UUID). + // If left blank, a random UUID will be generated. + string id = 1; + + // Device EUI (EUI64). + string dev_eui = 2; + + // Confirmed. + bool confirmed = 3; + + // FPort (must be > 0). + uint32 f_port = 4; + + // Data. + // Or use the json_object field when a codec has been configured. + bytes data = 5; + + // Only use this when a codec has been configured that can encode this + // object to bytes. + google.protobuf.Struct object = 6; +} diff --git a/api/rust/proto/chirpstack/integration/integration.proto b/api/rust/proto/chirpstack/integration/integration.proto index 458006ac..e639d488 100644 --- a/api/rust/proto/chirpstack/integration/integration.proto +++ b/api/rust/proto/chirpstack/integration/integration.proto @@ -276,3 +276,28 @@ message IntegrationEvent { // Struct containing the event object. google.protobuf.Struct object = 6; } + +// DownlinkCommand is the command to enqueue a downlink payload for the given +// device. +message DownlinkCommand { + // ID (UUID). + // If left blank, a random UUID will be generated. + string id = 1; + + // Device EUI (EUI64). + string dev_eui = 2; + + // Confirmed. + bool confirmed = 3; + + // FPort (must be > 0). + uint32 f_port = 4; + + // Data. + // Or use the json_object field when a codec has been configured. + bytes data = 5; + + // Only use this when a codec has been configured that can encode this + // object to bytes. + google.protobuf.Struct object = 6; +} diff --git a/chirpstack/src/cmd/configfile.rs b/chirpstack/src/cmd/configfile.rs index f08323a7..3b3c5a34 100644 --- a/chirpstack/src/cmd/configfile.rs +++ b/chirpstack/src/cmd/configfile.rs @@ -248,6 +248,11 @@ pub fn run() { # to the state topic. state_topic="{{ integration.mqtt.state_topic }}" + # Command topic. + # + # This is the topic on which the MQTT subscribes for receiving (enqueue) commands. + command_topic="{{ integration.mqtt.command_topic }}" + # Use JSON encoding instead of Protobuf (binary). json={{ integration.mqtt.json }} diff --git a/chirpstack/src/codec/convert.rs b/chirpstack/src/codec/convert.rs index f929ffa3..67de1c62 100644 --- a/chirpstack/src/codec/convert.rs +++ b/chirpstack/src/codec/convert.rs @@ -98,3 +98,44 @@ fn _struct_to_rquickjs<'js>( }, } } + +pub fn pb_json_to_prost(obj: &pbjson_types::Struct) -> prost_types::Struct { + let mut out = prost_types::Struct::default(); + for (k, v) in &obj.fields { + out.fields.insert(k.to_string(), _pb_json_to_prost(v)); + } + + out +} + +fn _pb_json_to_prost(v: &pbjson_types::Value) -> prost_types::Value { + prost_types::Value { + kind: match &v.kind { + None => None, + Some(v) => Some(match v { + pbjson_types::value::Kind::NullValue(v) => prost_types::value::Kind::NullValue(*v), + pbjson_types::value::Kind::NumberValue(v) => { + prost_types::value::Kind::NumberValue(*v) + } + pbjson_types::value::Kind::StringValue(v) => { + prost_types::value::Kind::StringValue(v.to_string()) + } + pbjson_types::value::Kind::BoolValue(v) => prost_types::value::Kind::BoolValue(*v), + pbjson_types::value::Kind::StructValue(v) => { + prost_types::value::Kind::StructValue(prost_types::Struct { + fields: v + .fields + .iter() + .map(|(k, v)| (k.to_string(), _pb_json_to_prost(v))) + .collect(), + }) + } + pbjson_types::value::Kind::ListValue(v) => { + prost_types::value::Kind::ListValue(prost_types::ListValue { + values: v.values.iter().map(|i| _pb_json_to_prost(i)).collect(), + }) + } + }), + }, + } +} diff --git a/chirpstack/src/codec/mod.rs b/chirpstack/src/codec/mod.rs index ab617057..89f971e8 100644 --- a/chirpstack/src/codec/mod.rs +++ b/chirpstack/src/codec/mod.rs @@ -10,7 +10,7 @@ use diesel::{deserialize, serialize}; use serde::{Deserialize, Serialize}; mod cayenne_lpp; -mod convert; +pub mod convert; mod js; #[derive(Deserialize, Serialize, Copy, Clone, Debug, Eq, PartialEq, AsExpression, FromSqlRow)] diff --git a/chirpstack/src/config.rs b/chirpstack/src/config.rs index 5e4fda86..1c0813df 100644 --- a/chirpstack/src/config.rs +++ b/chirpstack/src/config.rs @@ -244,6 +244,7 @@ pub struct MqttIntegration { pub client: MqttIntegrationClient, pub event_topic: String, pub state_topic: String, + pub command_topic: String, pub json: bool, pub server: String, pub username: String, @@ -262,6 +263,8 @@ impl Default for MqttIntegration { client: Default::default(), event_topic: "application/{{application_id}}/device/{{dev_eui}}/event/{{event}}".into(), state_topic: "application/{{application_id}}/device/{{dev_eui}}/state/{{state}}".into(), + command_topic: "application/{{application_id}}/device/{{dev_eui}}/command/{{command}}" + .into(), json: true, server: "tcp://127.0.0.1:1883/".into(), username: "".into(), diff --git a/chirpstack/src/integration/mod.rs b/chirpstack/src/integration/mod.rs index d00dc3ae..20624c30 100644 --- a/chirpstack/src/integration/mod.rs +++ b/chirpstack/src/integration/mod.rs @@ -1,15 +1,17 @@ use std::collections::HashMap; +use std::str::FromStr; use anyhow::{Context, Result}; use async_trait::async_trait; use futures::future::join_all; use tokio::sync::RwLock; -use tracing::info; +use tracing::{error, info}; use uuid::Uuid; -use crate::config; -use crate::storage::application; +use crate::storage::{application, device, device_profile, device_queue}; +use crate::{codec, config}; use chirpstack_api::integration; +use lrwn::EUI64; mod aws_sns; mod azure_service_bus; @@ -363,3 +365,56 @@ pub async fn integration_event( Ok(()) } + +async fn handle_down_command(application_id: String, pl: integration::DownlinkCommand) { + let err = async { + info!(dev_eui = %pl.dev_eui, "Handling downlink command for device"); + let dev_eui = EUI64::from_str(&pl.dev_eui)?; + let app_id = Uuid::from_str(&application_id)?; + + // Validate that the application_id from the topic is indeed the application ID to which + // the device belongs. + let dev = device::get(&dev_eui).await?; + if dev.application_id != app_id { + return Err(anyhow!( + "Application ID from topic does not match application ID from device" + )); + } + + let mut data = pl.data.clone(); + if let Some(obj) = &pl.object { + let dp = device_profile::get(&dev.device_profile_id).await?; + + data = codec::struct_to_binary( + dp.payload_codec_runtime, + pl.f_port as u8, + &dev.variables, + &dp.payload_codec_script, + &codec::convert::pb_json_to_prost(obj), + ) + .await?; + } + + let qi = device_queue::DeviceQueueItem { + id: match pl.id.is_empty() { + true => Uuid::new_v4(), + false => Uuid::from_str(&pl.id)?, + }, + f_port: pl.f_port as i16, + confirmed: pl.confirmed, + data, + dev_eui, + ..Default::default() + }; + + let _ = device_queue::enqueue_item(qi).await?; + + Ok(()) + } + .await + .err(); + + if err.is_some() { + error!(dev_eui = %pl.dev_eui, error = %err.as_ref().unwrap(), "Handling downlink command error"); + } +} diff --git a/chirpstack/src/integration/mqtt.rs b/chirpstack/src/integration/mqtt.rs index 6eae4f51..91750463 100644 --- a/chirpstack/src/integration/mqtt.rs +++ b/chirpstack/src/integration/mqtt.rs @@ -1,12 +1,15 @@ use std::collections::HashMap; +use std::io::Cursor; use anyhow::{Context, Result}; use async_trait::async_trait; +use futures::stream::StreamExt; use handlebars::Handlebars; use paho_mqtt as mqtt; use prost::Message; +use regex::Regex; use serde::Serialize; -use tracing::info; +use tracing::{error, info}; use super::Integration as IntegrationTrait; use crate::config::MqttIntegration as Config; @@ -17,6 +20,7 @@ pub struct Integration<'a> { templates: Handlebars<'a>, json: bool, qos: usize, + command_regex: Regex, } #[derive(Serialize)] @@ -33,14 +37,23 @@ struct StateTopicContext { pub state: String, } +#[derive(Serialize)] +struct CommandTopicContext { + pub application_id: String, + pub dev_eui: String, + pub command: String, +} + impl<'a> Integration<'a> { pub async fn new(conf: &Config) -> Result> { info!("Initializing MQTT integration"); // topic templates let mut templates = Handlebars::new(); + templates.register_escape_fn(handlebars::no_escape); templates.register_template_string("event_topic", &conf.event_topic)?; templates.register_template_string("state_topic", &conf.state_topic)?; + templates.register_template_string("command_topic", &conf.command_topic)?; // create client let create_opts = mqtt::CreateOptionsBuilder::new() @@ -88,11 +101,22 @@ impl<'a> Integration<'a> { } let conn_opts = conn_opts_b.finalize(); + // get message stream + let mut stream = client.get_stream(25); + let i = Integration { - client, - templates, + command_regex: Regex::new(&templates.render( + "command_topic", + &CommandTopicContext { + application_id: r#"(?P[\w-]+)"#.to_string(), + dev_eui: r#"(?P[\w]+)"#.to_string(), + command: r#"(?P[\w]+)"#.to_string(), + }, + )?)?, qos: conf.qos, json: conf.json, + client, + templates, }; // connect @@ -102,6 +126,55 @@ impl<'a> Integration<'a> { .await .context("Connect to MQTT broker")?; + let command_topic = i.templates.render( + "command_topic", + &CommandTopicContext { + application_id: "+".into(), + dev_eui: "+".into(), + command: "+".into(), + }, + )?; + info!( + command_topic = %command_topic, + "Subscribing to command topic" + ); + i.client + .subscribe(&command_topic, conf.qos as i32) + .await + .context("MQTT subscribe")?; + + tokio::spawn({ + let command_regex = i.command_regex.clone(); + + async move { + info!("Starting MQTT consumer loop"); + while let Some(msg_opt) = stream.next().await { + if let Some(msg) = msg_opt { + let caps = match command_regex.captures(&msg.topic()) { + Some(v) => v, + None => { + error!(topic = %msg.topic(), "Error parsing command topic (regex captures returned None)"); + continue; + } + }; + if caps.len() != 4 { + error!(topic = %msg.topic(), "Parsing command topic returned invalid match count"); + continue; + } + + message_callback( + caps.get(1).map_or("", |m| m.as_str()).to_string(), + caps.get(2).map_or("", |m| m.as_str()).to_string(), + caps.get(3).map_or("", |m| m.as_str()).to_string(), + i.json, + msg, + ) + .await; + } + } + } + }); + // Return integration. Ok(i) } @@ -308,13 +381,65 @@ fn connection_lost_callback(_: &mqtt::AsyncClient) { info!("Connection to MQTT broker lost"); } +async fn message_callback( + application_id: String, + dev_eui: String, + command: String, + json: bool, + msg: mqtt::Message, +) { + let topic = msg.topic(); + let qos = msg.qos(); + let b = msg.payload(); + + info!(topic = topic, qos = qos, "Command received for device"); + + let err = || -> Result<()> { + match command.as_ref() { + "down" => { + let cmd: integration::DownlinkCommand = match json { + true => serde_json::from_slice(&b)?, + false => integration::DownlinkCommand::decode(&mut Cursor::new(b))?, + }; + if dev_eui != cmd.dev_eui { + return Err(anyhow!( + "Payload dev_eui {} does not match topic dev_eui {}", + cmd.dev_eui, + dev_eui + )); + } + tokio::spawn(super::handle_down_command(application_id, cmd)); + } + _ => { + return Err(anyhow!("Unknown command type")); + } + } + + Ok(()) + }() + .err(); + + if err.is_some() { + error!( + topic = topic, + qos = qos, + "Processing command error: {}", + err.as_ref().unwrap() + ); + } +} + #[cfg(test)] pub mod test { + use super::*; use crate::config::MqttIntegration; + use crate::storage::{application, device, device_profile, device_queue, tenant}; use crate::test; use futures::stream::StreamExt; + use lrwn::EUI64; use paho_mqtt as mqtt; + use tokio::time::{sleep, Duration}; use uuid::Uuid; #[tokio::test] @@ -322,6 +447,37 @@ pub mod test { // to avoid race-conditions with other tests using MQTT let _guard = test::prepare().await; + // setup base objects + let t = tenant::create(tenant::Tenant { + name: "test-tenant".into(), + ..Default::default() + }) + .await + .unwrap(); + let app = application::create(application::Application { + name: "test-app".into(), + tenant_id: t.id, + ..Default::default() + }) + .await + .unwrap(); + let dp = device_profile::create(device_profile::DeviceProfile { + name: "test-dp".into(), + tenant_id: t.id, + ..Default::default() + }) + .await + .unwrap(); + let dev = device::create(device::Device { + name: "test-device".into(), + dev_eui: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]), + application_id: app.id, + device_profile_id: dp.id, + ..Default::default() + }) + .await + .unwrap(); + // setup of integration and MQTT client let conf = MqttIntegration { event_topic: "application/{{application_id}}/device/{{dev_eui}}/event/{{event}}".into(), @@ -498,5 +654,34 @@ pub mod test { msg.topic() ); assert_eq!(serde_json::to_string(&pl).unwrap(), msg.payload_str()); + + // downlink command + let down_cmd = integration::DownlinkCommand { + id: Uuid::new_v4().to_string(), + dev_eui: dev.dev_eui.to_string(), + confirmed: false, + f_port: 10, + data: vec![1, 2, 3], + object: None, + }; + let down_cmd_json = serde_json::to_string(&down_cmd).unwrap(); + client + .publish(mqtt::Message::new( + format!("application/{}/device/{}/command/down", app.id, dev.dev_eui), + down_cmd_json, + mqtt::QOS_0, + )) + .await + .unwrap(); + + // give the async consumer some time to process + sleep(Duration::from_millis(200)).await; + + let queue_items = device_queue::get_for_dev_eui(&dev.dev_eui).await.unwrap(); + assert_eq!(1, queue_items.len()); + assert_eq!(down_cmd.id, queue_items[0].id.to_string()); + assert_eq!(dev.dev_eui, queue_items[0].dev_eui); + assert_eq!(10, queue_items[0].f_port); + assert_eq!(vec![1, 2, 3], queue_items[0].data); } }