mqtt离线缓存、通网续传

This commit is contained in:
iioter 2024-06-01 09:14:17 +08:00
parent e717ef6274
commit b88b2a5083
6 changed files with 50 additions and 45 deletions

Binary file not shown.

View File

@ -259,8 +259,7 @@ namespace IoTGateway.Controllers
var data = myExporter.Export(); var data = myExporter.Export();
string ContentType = "application/vnd.ms-excel"; string ContentType = "application/vnd.ms-excel";
string exportName = "DeviceSettings"; string exportName = $"iotgateway.net_bakup_{DateTime.Now.ToString("yyyyMMddHHmmssffff")}.xlsx";
exportName = $"Export_{exportName}_{DateTime.Now.ToString("yyyyMMddHHmmssffff")}.xlsx";
FileContentResult Result = new FileContentResult(data, ContentType); FileContentResult Result = new FileContentResult(data, ContentType);
Result.FileDownloadName = exportName; Result.FileDownloadName = exportName;
return Result; return Result;

View File

@ -16,11 +16,11 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.17.0" /> <PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.17.0" />
<PackageReference Include="MQTTnet" Version="4.3.1.873" /> <PackageReference Include="MQTTnet" Version="4.3.6.1152" />
<PackageReference Include="MQTTnet.AspNetCore" Version="4.3.1.873" /> <PackageReference Include="MQTTnet.AspNetCore" Version="4.3.6.1152" />
<PackageReference Include="MQTTnet.Extensions.Rpc" Version="4.3.1.873" /> <PackageReference Include="MQTTnet.Extensions.Rpc" Version="4.3.6.1152" />
<PackageReference Include="NLog" Version="5.2.5" /> <PackageReference Include="NLog" Version="5.3.2" />
<PackageReference Include="NLog.Web.AspNetCore" Version="5.3.5" /> <PackageReference Include="NLog.Web.AspNetCore" Version="5.3.11" />
<PackageReference Include="System.IO.Ports" Version="6.0.0" /> <PackageReference Include="System.IO.Ports" Version="6.0.0" />
</ItemGroup> </ItemGroup>

View File

@ -10,6 +10,7 @@ using PluginInterface.IoTSharp;
using PluginInterface.HuaWeiRoma; using PluginInterface.HuaWeiRoma;
using PluginInterface.ThingsBoard; using PluginInterface.ThingsBoard;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using MQTTnet.Extensions.ManagedClient;
namespace Plugin namespace Plugin
{ {
@ -19,9 +20,9 @@ namespace Plugin
//private readonly ReferenceNodeManager? _uaNodeManager; //private readonly ReferenceNodeManager? _uaNodeManager;
private SystemConfig _systemConfig; private SystemConfig _systemConfig;
private MqttClientOptions _options; private ManagedMqttClientOptions _options;
public bool IsConnected => (Client.IsConnected); public bool IsConnected => (Client.IsConnected);
private IMqttClient? Client { get; set; } private IManagedMqttClient? Client { get; set; }
public event EventHandler<RpcRequest> OnExcRpc; public event EventHandler<RpcRequest> OnExcRpc;
public event EventHandler<ISAttributeResponse> OnReceiveAttributes; public event EventHandler<ISAttributeResponse> OnReceiveAttributes;
private readonly string _tbRpcTopic = "v1/gateway/rpc"; private readonly string _tbRpcTopic = "v1/gateway/rpc";
@ -43,12 +44,16 @@ namespace Plugin
{ {
Client.Dispose(); Client.Dispose();
} }
Client = new MqttFactory().CreateMqttClient(); Client = new MqttFactory().CreateManagedMqttClient();
await using var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType); await using var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType);
_systemConfig = dc.Set<SystemConfig>().First(); _systemConfig = dc.Set<SystemConfig>().First();
#region ClientOptions #region ClientOptions
_options = new MqttClientOptionsBuilder()
_options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithMaxPendingMessages(100000)
.WithClientOptions(new MqttClientOptionsBuilder()
.WithClientId(string.IsNullOrEmpty(_systemConfig.ClientId) .WithClientId(string.IsNullOrEmpty(_systemConfig.ClientId)
? Guid.NewGuid().ToString() ? Guid.NewGuid().ToString()
: _systemConfig.ClientId) : _systemConfig.ClientId)
@ -58,17 +63,17 @@ namespace Plugin
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60)) .WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
.WithProtocolVersion(MqttProtocolVersion.V311) .WithProtocolVersion(MqttProtocolVersion.V311)
.WithCleanSession(true) .WithCleanSession(true)
.Build())
.Build(); .Build();
#endregion #endregion
Client.ConnectedAsync += Client_ConnectedAsync; Client.ConnectedAsync += Client_ConnectedAsync;
Client.DisconnectedAsync += Client_DisconnectedAsync; Client.DisconnectedAsync += Client_DisconnectedAsync;
Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync; Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
if(Client.ConnectAsync(_options).IsCompletedSuccessfully) ; await Client.StartAsync(_options);
{
_logger.LogInformation("MQTT WAITING FOR APPLICATION MESSAGES");
}
_logger.LogInformation("MQTT WAITING FOR APPLICATION MESSAGES");
} }
catch (Exception ex) catch (Exception ex)
@ -129,7 +134,7 @@ namespace Plugin
try try
{ {
_logger.LogError($"MQTT DISCONNECTED WITH SERVER "); _logger.LogError($"MQTT DISCONNECTED WITH SERVER ");
await Client.ConnectAsync(_options); //await Client.ConnectAsync(_options);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -259,7 +264,7 @@ namespace Plugin
private async Task ResponseTbRpcAsync(TBRpcResponse tBRpcResponse) private async Task ResponseTbRpcAsync(TBRpcResponse tBRpcResponse)
{ {
await Client.PublishAsync(new MqttApplicationMessageBuilder() await Client.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic(_tbRpcTopic) .WithTopic(_tbRpcTopic)
.WithPayload(JsonConvert.SerializeObject(tBRpcResponse)) .WithPayload(JsonConvert.SerializeObject(tBRpcResponse))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build());
@ -268,7 +273,7 @@ namespace Plugin
private async Task ResponseTcRpcAsync(TCRpcRequest tCRpcResponse) private async Task ResponseTcRpcAsync(TCRpcRequest tCRpcResponse)
{ {
var topic = $"command/reply/{tCRpcResponse.RequestData.RequestId}"; var topic = $"command/reply/{tCRpcResponse.RequestData.RequestId}";
await Client.PublishAsync(new MqttApplicationMessageBuilder() await Client.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic) .WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(tCRpcResponse)) .WithPayload(JsonConvert.SerializeObject(tCRpcResponse))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
@ -278,7 +283,7 @@ namespace Plugin
{ {
//var responseTopic = $"/devices/{deviceid}/rpc/response/{methodName}/{rpcid}"; //var responseTopic = $"/devices/{deviceid}/rpc/response/{methodName}/{rpcid}";
var topic = $"devices/{rpcResult.DeviceId}/rpc/response/{rpcResult.Method}/{rpcResult.ResponseId}"; var topic = $"devices/{rpcResult.DeviceId}/rpc/response/{rpcResult.Method}/{rpcResult.ResponseId}";
await Client.PublishAsync(new MqttApplicationMessageBuilder() await Client.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic) .WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(rpcResult)) .WithPayload(JsonConvert.SerializeObject(rpcResult))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
@ -316,7 +321,7 @@ namespace Plugin
//Message: {"Device A":{"attribute1":"value1", "attribute2": 42}, "Device B":{"attribute1":"value1", "attribute2": 42} //Message: {"Device A":{"attribute1":"value1", "attribute2": 42}, "Device B":{"attribute1":"value1", "attribute2": 42}
try try
{ {
return Client.PublishAsync(new MqttApplicationMessageBuilder() return Client.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic($"devices/{deviceName}/attributes").WithPayload(JsonConvert.SerializeObject(obj)) .WithTopic($"devices/{deviceName}/attributes").WithPayload(JsonConvert.SerializeObject(obj))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.Build()); .Build());
@ -331,14 +336,14 @@ namespace Plugin
public async Task UploadIsTelemetryDataAsync(string deviceName, object obj) public async Task UploadIsTelemetryDataAsync(string deviceName, object obj)
{ {
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"devices/{deviceName}/telemetry") await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"devices/{deviceName}/telemetry")
.WithPayload(JsonConvert.SerializeObject(obj)).Build()); .WithPayload(JsonConvert.SerializeObject(obj)).Build());
} }
public async Task UploadTcTelemetryDataAsync(string deviceName, object obj) public async Task UploadTcTelemetryDataAsync(string deviceName, object obj)
{ {
var toSend = new Dictionary<string, object> { { deviceName, obj } }; var toSend = new Dictionary<string, object> { { deviceName, obj } };
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"gateway/attributes") await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"gateway/attributes")
.WithPayload(JsonConvert.SerializeObject(toSend)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithPayload(JsonConvert.SerializeObject(toSend)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
} }
@ -365,7 +370,7 @@ namespace Plugin
Devices = hwTelemetry Devices = hwTelemetry
}; };
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/datas") await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/datas")
.WithPayload(JsonConvert.SerializeObject(hwTelemetrys)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithPayload(JsonConvert.SerializeObject(hwTelemetrys)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
} }
@ -433,7 +438,7 @@ namespace Plugin
{ "client", true }, { "client", true },
{ "key", args[0] } { "key", args[0] }
}; };
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"v1/gateway/attributes/request") await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"v1/gateway/attributes/request")
.WithPayload(JsonConvert.SerializeObject(tbRequestData)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithPayload(JsonConvert.SerializeObject(tbRequestData)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
case IoTPlatformType.IoTSharp: case IoTPlatformType.IoTSharp:
@ -443,7 +448,7 @@ namespace Plugin
keys.Add(anySide ? "anySide" : "server", string.Join(",", args)); keys.Add(anySide ? "anySide" : "server", string.Join(",", args));
await Client.SubscribeAsync($"devices/{deviceName}/attributes/response/{id}", await Client.SubscribeAsync($"devices/{deviceName}/attributes/response/{id}",
MqttQualityOfServiceLevel.ExactlyOnce); MqttQualityOfServiceLevel.ExactlyOnce);
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic) await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(keys)) .WithPayload(JsonConvert.SerializeObject(keys))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
@ -520,7 +525,7 @@ namespace Plugin
switch (_systemConfig.IoTPlatformType) switch (_systemConfig.IoTPlatformType)
{ {
case IoTPlatformType.ThingsBoard: case IoTPlatformType.ThingsBoard:
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("v1/gateway/telemetry") await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("v1/gateway/telemetry")
.WithPayload(JsonConvert.SerializeObject(sendModel)) .WithPayload(JsonConvert.SerializeObject(sendModel))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build());
break; break;
@ -591,7 +596,7 @@ namespace Plugin
case IoTPlatformType.ThingsBoard: case IoTPlatformType.ThingsBoard:
case IoTPlatformType.IoTSharp: case IoTPlatformType.IoTSharp:
case IoTPlatformType.IoTGateway: case IoTPlatformType.IoTGateway:
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("v1/gateway/connect") await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("v1/gateway/connect")
.WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string> .WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", DeviceName } })) { { "device", DeviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build());
@ -605,7 +610,7 @@ namespace Plugin
case IoTPlatformType.OneNET: case IoTPlatformType.OneNET:
break; break;
case IoTPlatformType.ThingsCloud: case IoTPlatformType.ThingsCloud:
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("gateway/connect") await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("gateway/connect")
.WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string> .WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", DeviceName } })) { { "device", DeviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
@ -625,7 +630,7 @@ namespace Plugin
} }
} }
}; };
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/topo/update") await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/topo/update")
.WithPayload(JsonConvert.SerializeObject(deviceOnLine)) .WithPayload(JsonConvert.SerializeObject(deviceOnLine))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
@ -646,7 +651,7 @@ namespace Plugin
case IoTPlatformType.ThingsBoard: case IoTPlatformType.ThingsBoard:
case IoTPlatformType.IoTSharp: case IoTPlatformType.IoTSharp:
case IoTPlatformType.IoTGateway: case IoTPlatformType.IoTGateway:
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"v1/gateway/disconnect") await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"v1/gateway/disconnect")
.WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string> .WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", DeviceName } })) { { "device", DeviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build());
@ -660,7 +665,7 @@ namespace Plugin
case IoTPlatformType.OneNET: case IoTPlatformType.OneNET:
break; break;
case IoTPlatformType.ThingsCloud: case IoTPlatformType.ThingsCloud:
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"gateway/disconnect") await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"gateway/disconnect")
.WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string> .WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", DeviceName } })) { { "device", DeviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
@ -680,7 +685,7 @@ namespace Plugin
} }
} }
}; };
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/topo/update") await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/topo/update")
.WithPayload(JsonConvert.SerializeObject(deviceOnLine)) .WithPayload(JsonConvert.SerializeObject(deviceOnLine))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
@ -715,7 +720,7 @@ namespace Plugin
ProductType = "A_n" ProductType = "A_n"
} }
); );
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic) await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(addDeviceDto)) .WithPayload(JsonConvert.SerializeObject(addDeviceDto))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
@ -753,7 +758,7 @@ namespace Plugin
ProductType = "A_n" ProductType = "A_n"
} }
}; };
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic) await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(deleteDeviceDto)) .WithPayload(JsonConvert.SerializeObject(deleteDeviceDto))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
} }

View File

@ -9,7 +9,8 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="DynamicExpresso.Core" Version="2.16.1" /> <PackageReference Include="DynamicExpresso.Core" Version="2.16.1" />
<PackageReference Include="Mono.Options" Version="6.12.0.148" /> <PackageReference Include="Mono.Options" Version="6.12.0.148" />
<PackageReference Include="MQTTnet" Version="4.3.1.873" /> <PackageReference Include="MQTTnet" Version="4.3.6.1152" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.3.6.1152" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Server" Version="1.4.370.12" /> <PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Server" Version="1.4.370.12" />
</ItemGroup> </ItemGroup>

Binary file not shown.