diff --git a/IoTGateway.ViewModel/Config/SystemConfigVMs/SystemConfigVM.cs b/IoTGateway.ViewModel/Config/SystemConfigVMs/SystemConfigVM.cs index 0befebb..10295c7 100644 --- a/IoTGateway.ViewModel/Config/SystemConfigVMs/SystemConfigVM.cs +++ b/IoTGateway.ViewModel/Config/SystemConfigVMs/SystemConfigVM.cs @@ -30,7 +30,7 @@ namespace IoTGateway.ViewModel.Config.SystemConfigVMs { base.DoEdit(updateAllFields); var myMqttClient = Wtm.ServiceProvider.GetService(typeof(MyMqttClient)) as MyMqttClient; - myMqttClient.StartManagedClientAsync().Wait(); + myMqttClient.StartClientAsync().Wait(); } public override void DoDelete() diff --git a/IoTGateway/wwwroot/3d.rar b/IoTGateway/wwwroot/3d.rar deleted file mode 100644 index 080933e..0000000 Binary files a/IoTGateway/wwwroot/3d.rar and /dev/null differ diff --git a/IoTGateway/wwwroot/3d.zip b/IoTGateway/wwwroot/3d.zip new file mode 100644 index 0000000..b162858 Binary files /dev/null and b/IoTGateway/wwwroot/3d.zip differ diff --git a/Plugins/Plugin/MyMqttClient.cs b/Plugins/Plugin/MyMqttClient.cs index cc062ce..180918a 100644 --- a/Plugins/Plugin/MyMqttClient.cs +++ b/Plugins/Plugin/MyMqttClient.cs @@ -3,8 +3,6 @@ using IoTGateway.Model; using Microsoft.Extensions.Logging; using MQTTnet; using MQTTnet.Client; -using MQTTnet.Extensions.ManagedClient; -using MQTTnet.Packets; using MQTTnet.Protocol; using Newtonsoft.Json; using PluginInterface; @@ -12,7 +10,6 @@ using PluginInterface.HuaWeiRoma; using PluginInterface.IotDB; using PluginInterface.IoTSharp; using PluginInterface.ThingsBoard; -using System.Xml.Linq; namespace Plugin { @@ -22,9 +19,9 @@ namespace Plugin //private readonly ReferenceNodeManager? _uaNodeManager; private SystemConfig _systemConfig; - private ManagedMqttClientOptions _options; + private MqttClientOptions _options; public bool IsConnected => (Client.IsConnected); - private IManagedMqttClient? Client { get; set; } + private IMqttClient Client { get; set; } public event EventHandler OnExcRpc; public event EventHandler OnReceiveAttributes; private readonly string _tbRpcTopic = "v1/gateway/rpc"; @@ -35,10 +32,10 @@ namespace Plugin _logger = logger; //_uaNodeManager = uaService.server.m_server.nodeManagers[0] as ReferenceNodeManager; - StartManagedClientAsync().Wait(); + StartClientAsync().Wait(); } - public async Task StartManagedClientAsync() + public async Task StartClientAsync() { try { @@ -46,50 +43,71 @@ namespace Plugin { Client.Dispose(); } - Client = new MqttFactory().CreateManagedMqttClient(); + Client = new MqttFactory().CreateMqttClient(); await using var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType); _systemConfig = dc.Set().First(); #region ClientOptions // Setup and start a managed MQTT client. - _options = new ManagedMqttClientOptionsBuilder() - .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) - .WithMaxPendingMessages(10000) - .WithClientOptions(new MqttClientOptionsBuilder() - .WithClientId(string.IsNullOrWhiteSpace( _systemConfig.ClientId) ? Guid.NewGuid().ToString() : _systemConfig.ClientId) + _options = new MqttClientOptionsBuilder() + .WithClientId(string.IsNullOrWhiteSpace(_systemConfig.ClientId) + ? Guid.NewGuid().ToString() + : _systemConfig.ClientId) .WithTcpServer(_systemConfig.MqttIp, _systemConfig.MqttPort) .WithCredentials(_systemConfig.MqttUName, _systemConfig.MqttUPwd) .WithTimeout(TimeSpan.FromSeconds(30)) .WithKeepAlivePeriod(TimeSpan.FromSeconds(20)) - .Build()) - .Build(); + .Build(); #endregion - #region Topics - List subTopics = new(); + Client.ConnectedAsync += Client_ConnectedAsync; + Client.DisconnectedAsync += Client_DisconnectedAsync; + Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync; + + try + { + await Client.ConnectAsync(_options); + } + catch (Exception ex) + { + _logger.LogError(ex, "MQTT CONNECTING FAILED"); + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"StartManagedClientAsync FAILED "); + } + } + + private async Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg) + { + _logger.LogInformation($"MQTT CONNECTED WITH SERVER "); + #region Topics + try + { switch (_systemConfig.IoTPlatformType) { case IoTPlatformType.ThingsBoard: //{"device": "Device A", "data": {"id": $request_id, "method": "toggle_gpio", "params": {"pin":1}}} - subTopics.Add(new MqttTopicFilterBuilder().WithTopic(_tbRpcTopic).WithExactlyOnceQoS().Build()); + await Client.SubscribeAsync(_tbRpcTopic, MqttQualityOfServiceLevel.ExactlyOnce); //Message: {"id": $request_id, "device": "Device A", "value": "value1"} - subTopics.Add(new MqttTopicFilterBuilder().WithTopic("v1/gateway/attributes/response").WithExactlyOnceQoS().Build()); + await Client.SubscribeAsync("v1/gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce); //Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}} - subTopics.Add(new MqttTopicFilterBuilder().WithTopic("v1/gateway/attributese").WithExactlyOnceQoS().Build()); + await Client.SubscribeAsync("v1/gateway/attributes", MqttQualityOfServiceLevel.ExactlyOnce); break; - case IoTPlatformType.IotDB: case IoTPlatformType.IoTSharp: - subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/rpc/request/+/+").WithExactlyOnceQoS().Build()); - subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/attributes/update").WithExactlyOnceQoS().Build()); + case IoTPlatformType.IotDB: + await Client.SubscribeAsync("devices/+/rpc/request/+/+", MqttQualityOfServiceLevel.ExactlyOnce); + await Client.SubscribeAsync("devices/+/attributes/update", MqttQualityOfServiceLevel.ExactlyOnce); //Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}} - subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/attributes/response/+").WithExactlyOnceQoS().Build()); + await Client.SubscribeAsync("devices/+/attributes/response/+", MqttQualityOfServiceLevel.ExactlyOnce); break; case IoTPlatformType.ThingsCloud: - subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/response").WithExactlyOnceQoS().Build()); - subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/get/response").WithExactlyOnceQoS().Build()); - subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/push").WithExactlyOnceQoS().Build()); - subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/event/response").WithExactlyOnceQoS().Build()); - subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/command/send").WithExactlyOnceQoS().Build()); + await Client.SubscribeAsync("gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce); + await Client.SubscribeAsync("gateway/attributes/get/response", MqttQualityOfServiceLevel.ExactlyOnce); + await Client.SubscribeAsync("gateway/attributes/push", MqttQualityOfServiceLevel.ExactlyOnce); + await Client.SubscribeAsync("gateway/event/response", MqttQualityOfServiceLevel.ExactlyOnce); + await Client.SubscribeAsync("gateway/command/send", MqttQualityOfServiceLevel.ExactlyOnce); break; case IoTPlatformType.AliCloudIoT: break; @@ -100,34 +118,26 @@ namespace Plugin case IoTPlatformType.OneNET: break; } - - #endregion - Client.ConnectedAsync += Client_ConnectedAsync; - Client.DisconnectedAsync += Client_DisconnectedAsync; - Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync; - await Client.SubscribeAsync(subTopics); - await Client.StartAsync(_options); - - _logger.LogInformation("MQTT WAITING FOR APPLICATION MESSAGES"); } catch (Exception ex) { - _logger.LogError($"StartManagedClientAsync FAILED, {ex}"); + _logger.LogError(ex, "MQTT Subscribe FAILED"); + } + #endregion + } + + private async Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) + { + try + { + await Client.ConnectAsync(_options); + } + catch (Exception ex) + { + _logger.LogError(ex, "MQTT CONNECTING FAILED"); } } - private Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg) - { - _logger.LogInformation($"MQTT CONNECTED WITH SERVER "); - return Task.CompletedTask; - } - - private Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) - { - _logger.LogError($"MQTT CONNECTING FAILED, {arg.ReasonString}"); - return Task.CompletedTask; - } - private Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e) { _logger.LogDebug( @@ -154,8 +164,7 @@ namespace Plugin catch (Exception ex) { _logger.LogError( - $"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}", - ex); + ex, $"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}"); } return Task.CompletedTask; @@ -184,9 +193,8 @@ namespace Plugin } catch (Exception ex) { - _logger.LogError( - $"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}", - ex); + _logger.LogError(ex, + $"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}"); } } @@ -211,9 +219,8 @@ namespace Plugin } catch (Exception ex) { - _logger.LogError( - $"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}", - ex); + _logger.LogError(ex, + $"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}"); } } @@ -246,15 +253,14 @@ namespace Plugin } catch (Exception ex) { - _logger.LogError( - $"ReceiveIsRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}", - ex); + _logger.LogError(ex, + $"ReceiveIsRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}"); } } private async Task ResponseTbRpcAsync(TBRpcResponse tBRpcResponse) { - await Client.EnqueueAsync(new MqttApplicationMessageBuilder() + await Client.PublishAsync(new MqttApplicationMessageBuilder() .WithTopic(_tbRpcTopic) .WithPayload(JsonConvert.SerializeObject(tBRpcResponse)) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); @@ -263,7 +269,7 @@ namespace Plugin private async Task ResponseTcRpcAsync(TCRpcRequest tCRpcResponse) { var topic = $"command/reply/{tCRpcResponse.RequestData.RequestId}"; - await Client.EnqueueAsync(new MqttApplicationMessageBuilder() + await Client.PublishAsync(new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(JsonConvert.SerializeObject(tCRpcResponse)) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); @@ -273,7 +279,7 @@ namespace Plugin { //var responseTopic = $"/devices/{deviceid}/rpc/response/{methodName}/{rpcid}"; var topic = $"devices/{rpcResult.DeviceId}/rpc/response/{rpcResult.Method}/{rpcResult.ResponseId}"; - await Client.EnqueueAsync(new MqttApplicationMessageBuilder() + await Client.PublishAsync(new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(JsonConvert.SerializeObject(rpcResult)) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); @@ -312,13 +318,13 @@ namespace Plugin try { if (Client.IsConnected) - return Client.EnqueueAsync(new MqttApplicationMessageBuilder() + return Client.PublishAsync(new MqttApplicationMessageBuilder() .WithTopic($"devices/{deviceName}/attributes").WithPayload(JsonConvert.SerializeObject(obj)) .Build()); } catch (Exception ex) { - _logger.LogError($"Device:{deviceName} UploadAttributeAsync Failed,{ex}"); + _logger.LogError(ex, $"Device:{deviceName} UploadAttributeAsync Failed"); } return Task.CompletedTask; @@ -326,14 +332,15 @@ namespace Plugin public async Task UploadIsTelemetryDataAsync(string deviceName, object obj) { - await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"devices/{deviceName}/telemetry") + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"devices/{deviceName}/telemetry") .WithPayload(JsonConvert.SerializeObject(obj)).Build()); } public async Task UploadTcTelemetryDataAsync(string deviceName, object obj) { var toSend = new Dictionary { { deviceName, obj } }; - await Client.EnqueueAsync("gateway/attributes", JsonConvert.SerializeObject(toSend)); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"gateway/attributes") + .WithPayload(JsonConvert.SerializeObject(toSend)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); } public async Task UploadHwTelemetryDataAsync(Device device, object obj) @@ -359,8 +366,8 @@ namespace Plugin Devices = hwTelemetry }; - await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/datas", - JsonConvert.SerializeObject(hwTelemetrys)); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/datas") + .WithPayload(JsonConvert.SerializeObject(hwTelemetrys)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); } public async Task ResponseRpcAsync(RpcResponse rpcResponse) @@ -406,7 +413,7 @@ namespace Plugin } catch (Exception ex) { - _logger.LogError($"ResponseRpc Error,{rpcResponse}", ex); + _logger.LogError(ex, $"ResponseRpc Error,{rpcResponse}"); } } @@ -426,8 +433,8 @@ namespace Plugin { "client", true }, { "key", args[0] } }; - await Client.EnqueueAsync("v1/gateway/attributes/request", - JsonConvert.SerializeObject(tbRequestData), MqttQualityOfServiceLevel.ExactlyOnce); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"v1/gateway/attributes/request") + .WithPayload(JsonConvert.SerializeObject(tbRequestData)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); break; case IoTPlatformType.IoTSharp: string topic = $"devices/{deviceName}/attributes/request/{id}"; @@ -435,8 +442,9 @@ namespace Plugin keys.Add(anySide ? "anySide" : "server", string.Join(",", args)); await Client.SubscribeAsync($"devices/{deviceName}/attributes/response/{id}", MqttQualityOfServiceLevel.ExactlyOnce); - await Client.EnqueueAsync(topic, JsonConvert.SerializeObject(keys), - MqttQualityOfServiceLevel.ExactlyOnce); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic) + .WithPayload(JsonConvert.SerializeObject(keys)) + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); break; case IoTPlatformType.AliCloudIoT: break; @@ -450,7 +458,7 @@ namespace Plugin } catch (Exception ex) { - _logger.LogError($"RequestAttributes:{deviceName}", ex); + _logger.LogError(ex, $"RequestAttributes:{deviceName}"); } } @@ -512,8 +520,9 @@ namespace Plugin switch (_systemConfig.IoTPlatformType) { case IoTPlatformType.ThingsBoard: - await Client.EnqueueAsync("v1/gateway/telemetry", - JsonConvert.SerializeObject(sendModel)); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("v1/gateway/telemetry") + .WithPayload(JsonConvert.SerializeObject(sendModel)) + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); break; case IoTPlatformType.IoTSharp: foreach (var payload in sendModel[device.DeviceName]) @@ -532,24 +541,26 @@ namespace Plugin break; case IoTPlatformType.IotDB: - { - foreach (var payload in sendModel[device.DeviceName]) { - if (payload.DeviceStatus != DeviceStatusTypeEnum.Good) - continue; - - IotTsData tsData = new IotTsData() + foreach (var payload in sendModel[device.DeviceName]) { - device = _systemConfig.GatewayName + device.DeviceName, - timestamp = payload.TS, - measurements = payload.Values?.Keys.ToList(), - values = payload.Values?.Values.ToList() - }; - await Client.EnqueueAsync(_systemConfig.GatewayName + device.DeviceName, JsonConvert.SerializeObject(tsData)); - } + if (payload.DeviceStatus != DeviceStatusTypeEnum.Good) + continue; - break; - } + IotTsData tsData = new IotTsData() + { + device = _systemConfig.GatewayName + device.DeviceName, + timestamp = payload.TS, + measurements = payload.Values?.Keys.ToList(), + values = payload.Values?.Values.ToList() + }; + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(_systemConfig.GatewayName + device.DeviceName) + .WithPayload(JsonConvert.SerializeObject(tsData)) + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); + } + + break; + } case IoTPlatformType.HuaWei: foreach (var payload in sendModel[device.DeviceName]) { @@ -581,14 +592,14 @@ namespace Plugin } catch (Exception ex) { - _logger.LogError($"PublishTelemetryAsync Error:{ex}"); + _logger.LogError(ex, $"PublishTelemetryAsync Error"); } } private readonly DateTime _tsStartDt = new(1970, 1, 1); - private readonly List iotDbOnLineMeasurement = new() { "online" }; - private readonly List iotDbOnLine = new() { true }; - private readonly List iotDbOffLine = new() { false }; + private readonly List _iotDbOnLineMeasurement = new() { "online" }; + private readonly List _iotDbOnLine = new() { true }; + private readonly List _iotDbOffLine = new() { false }; public async Task DeviceConnected(Device device) { try @@ -597,9 +608,10 @@ namespace Plugin { case IoTPlatformType.ThingsBoard: case IoTPlatformType.IoTSharp: - await Client.EnqueueAsync("v1/gateway/connect", - JsonConvert.SerializeObject(new Dictionary - { { "device", device.DeviceName } })); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("v1/gateway/connect") + .WithPayload(JsonConvert.SerializeObject(new Dictionary + { { "device", device.DeviceName } })) + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); break; case IoTPlatformType.AliCloudIoT: break; @@ -610,20 +622,22 @@ namespace Plugin case IoTPlatformType.OneNET: break; case IoTPlatformType.ThingsCloud: - await Client.EnqueueAsync("gateway/connect", - JsonConvert.SerializeObject(new Dictionary - { { "device", device.DeviceName } })); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("gateway/connect") + .WithPayload(JsonConvert.SerializeObject(new Dictionary + { { "device", device.DeviceName } })) + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); break; case IoTPlatformType.IotDB: IotTsData onlineData = new IotTsData() { device = _systemConfig.GatewayName + device.DeviceName, timestamp = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds, - measurements = iotDbOnLineMeasurement, - values = iotDbOnLine + measurements = _iotDbOnLineMeasurement, + values = _iotDbOnLine }; - await Client.EnqueueAsync(_systemConfig.GatewayName + device.DeviceName, - JsonConvert.SerializeObject(onlineData)); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(_systemConfig.GatewayName + device.DeviceName) + .WithPayload(JsonConvert.SerializeObject(onlineData)) + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); break; case IoTPlatformType.HuaWei: var deviceOnLine = new HwDeviceOnOffLine() @@ -640,15 +654,15 @@ namespace Plugin } } }; - await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/topo/update", - JsonConvert.SerializeObject(deviceOnLine), MqttQualityOfServiceLevel.AtLeastOnce, - retain: false); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/topo/update") + .WithPayload(JsonConvert.SerializeObject(deviceOnLine)) + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); break; } } catch (Exception ex) { - _logger.LogError($"DeviceConnected:{device.DeviceName}", ex); + _logger.LogError(ex, $"DeviceConnected:{device.DeviceName}"); } } @@ -660,9 +674,10 @@ namespace Plugin { case IoTPlatformType.ThingsBoard: case IoTPlatformType.IoTSharp: - await Client.EnqueueAsync("v1/gateway/disconnect", - JsonConvert.SerializeObject(new Dictionary - { { "device", device.DeviceName } })); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"v1/gateway/disconnect") + .WithPayload(JsonConvert.SerializeObject(new Dictionary + { { "device", device.DeviceName } })) + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); break; case IoTPlatformType.AliCloudIoT: break; @@ -673,20 +688,22 @@ namespace Plugin case IoTPlatformType.OneNET: break; case IoTPlatformType.ThingsCloud: - await Client.EnqueueAsync("gateway/disconnect", - JsonConvert.SerializeObject(new Dictionary - { { "device", device.DeviceName } })); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"gateway/disconnect") + .WithPayload(JsonConvert.SerializeObject(new Dictionary + { { "device", device.DeviceName } })) + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); break; case IoTPlatformType.IotDB: IotTsData onlineData = new IotTsData() { device = _systemConfig.GatewayName + device.DeviceName, timestamp = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds, - measurements = iotDbOnLineMeasurement, - values = iotDbOffLine + measurements = _iotDbOnLineMeasurement, + values = _iotDbOffLine }; - await Client.EnqueueAsync(_systemConfig.GatewayName + device.DeviceName, - JsonConvert.SerializeObject(onlineData)); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(_systemConfig.GatewayName + device.DeviceName) + .WithPayload(JsonConvert.SerializeObject(onlineData)) + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); break; case IoTPlatformType.HuaWei: var deviceOnLine = new HwDeviceOnOffLine() @@ -703,15 +720,15 @@ namespace Plugin } } }; - await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/topo/update", - JsonConvert.SerializeObject(deviceOnLine), MqttQualityOfServiceLevel.AtLeastOnce, - retain: false); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/topo/update") + .WithPayload(JsonConvert.SerializeObject(deviceOnLine)) + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); break; } } catch (Exception ex) { - _logger.LogError($"DeviceDisconnected:{device.DeviceName}", ex); + _logger.LogError(ex, $"DeviceDisconnected:{device.DeviceName}"); } } @@ -738,15 +755,15 @@ namespace Plugin ProductType = "A_n" } ); - - await Client.EnqueueAsync(topic, - JsonConvert.SerializeObject(addDeviceDto)); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic) + .WithPayload(JsonConvert.SerializeObject(addDeviceDto)) + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); break; } } catch (Exception ex) { - _logger.LogError($"DeviceAdded:{device.DeviceName}", ex); + _logger.LogError(ex, $"DeviceAdded:{device.DeviceName}"); } } @@ -776,16 +793,16 @@ namespace Plugin ProductType = "A_n" } }; - - await Client.EnqueueAsync(topic, - JsonConvert.SerializeObject(deleteDeviceDto)); + await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic) + .WithPayload(JsonConvert.SerializeObject(deleteDeviceDto)) + .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); } break; } } catch (Exception ex) { - _logger.LogError($"DeviceAdded:{device.DeviceName}", ex); + _logger.LogError(ex, $"DeviceAdded:{device.DeviceName}"); } } } diff --git a/Plugins/Plugin/Plugin.csproj b/Plugins/Plugin/Plugin.csproj index 9f1bcd3..e52292c 100644 --- a/Plugins/Plugin/Plugin.csproj +++ b/Plugins/Plugin/Plugin.csproj @@ -11,7 +11,6 @@ -