优化Mqtt推送

This commit is contained in:
iioter 2022-08-23 16:29:29 +08:00
parent 90eac18785
commit 81b3d814b9
10 changed files with 368 additions and 382 deletions

View File

@ -30,7 +30,7 @@ namespace IoTGateway.ViewModel.Config.SystemConfigVMs
{ {
base.DoEdit(updateAllFields); base.DoEdit(updateAllFields);
var myMqttClient = Wtm.ServiceProvider.GetService(typeof(MyMqttClient)) as MyMqttClient; var myMqttClient = Wtm.ServiceProvider.GetService(typeof(MyMqttClient)) as MyMqttClient;
myMqttClient.ConnectAsync(); myMqttClient.StartManagedClientAsync().Wait();
} }
public override void DoDelete() public override void DoDelete()

View File

@ -9,7 +9,6 @@ using System.ComponentModel.DataAnnotations;
using IoTGateway.Model; using IoTGateway.Model;
using Microsoft.Extensions.Primitives; using Microsoft.Extensions.Primitives;
using MQTTnet.Server; using MQTTnet.Server;
using MQTTnet.Server.Status;
using MQTTnet.Formatter; using MQTTnet.Formatter;
namespace IoTGateway.ViewModel.MqttClient.MqttServerVMs namespace IoTGateway.ViewModel.MqttClient.MqttServerVMs
@ -45,12 +44,12 @@ namespace IoTGateway.ViewModel.MqttClient.MqttServerVMs
} }
public override void DoSearch() public override void DoSearch()
{ {
var mqttServer = Wtm.ServiceProvider.GetService(typeof(IMqttServer)) as IMqttServer; var mqttServer = Wtm.ServiceProvider.GetService(typeof(MqttServer)) as MqttServer;
foreach (var client in mqttServer.GetClientStatusAsync().Result) foreach (var client in mqttServer.GetClientsAsync().Result)
{ {
MqttClient_View mqttClient_ = new MqttClient_View MqttClient_View mqttClient_ = new MqttClient_View
{ {
ClientId = client.ClientId, ClientId = client.Id,
BytesReceived = client.BytesReceived, BytesReceived = client.BytesReceived,
BytesSent = client.BytesSent, BytesSent = client.BytesSent,
MqttProtocolVersion = client.ProtocolVersion, MqttProtocolVersion = client.ProtocolVersion,

View File

@ -19,12 +19,11 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.14.0" /> <PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.14.0" />
<PackageReference Include="MQTTnet" Version="3.1.2" /> <PackageReference Include="MQTTnet" Version="4.1.0.247" />
<PackageReference Include="MQTTnet.AspNetCore" Version="3.1.2" /> <PackageReference Include="MQTTnet.AspNetCore" Version="4.1.0.247" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="3.1.2" /> <PackageReference Include="MQTTnet.Extensions.Rpc" Version="4.1.0.247" />
<PackageReference Include="MQTTnet.Extensions.Rpc" Version="3.1.2" /> <PackageReference Include="NLog" Version="5.0.2" />
<PackageReference Include="NLog" Version="5.0.1" /> <PackageReference Include="NLog.Web.AspNetCore" Version="5.1.1" />
<PackageReference Include="NLog.Web.AspNetCore" Version="5.1.0" />
<PackageReference Include="System.IO.Ports" Version="6.0.0" /> <PackageReference Include="System.IO.Ports" Version="6.0.0" />
</ItemGroup> </ItemGroup>

View File

@ -6,7 +6,7 @@ using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using MQTTnet.AspNetCore.Extensions; using MQTTnet.AspNetCore;
using WalkingTec.Mvvm.Core; using WalkingTec.Mvvm.Core;
using NLog; using NLog;
using NLog.Web; using NLog.Web;

View File

@ -12,7 +12,6 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.FileProviders; using Microsoft.Extensions.FileProviders;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using MQTTnet.AspNetCore; using MQTTnet.AspNetCore;
using MQTTnet.AspNetCore.Extensions;
using Plugin; using Plugin;
using WalkingTec.Mvvm.Core; using WalkingTec.Mvvm.Core;
using WalkingTec.Mvvm.Core.Extensions; using WalkingTec.Mvvm.Core.Extensions;
@ -66,11 +65,8 @@ namespace IoTGateway
options.ReloadUserFunc = ReloadUser; options.ReloadUserFunc = ReloadUser;
}); });
//MQTTServer //MqttServer
services.AddHostedMqttServer(mqttServer => services.AddHostedMqttServer(mqttServer => mqttServer.WithoutDefaultEndpoint())
{
mqttServer.WithoutDefaultEndpoint();
})
.AddMqttConnectionHandler() .AddMqttConnectionHandler()
.AddConnections(); .AddConnections();
@ -78,7 +74,7 @@ namespace IoTGateway
services.AddHostedService<IoTBackgroundService>(); services.AddHostedService<IoTBackgroundService>();
services.AddSingleton<DeviceService>(); services.AddSingleton<DeviceService>();
services.AddSingleton<DriverService>(); services.AddSingleton<DriverService>();
services.AddSingleton<UAService>(); //services.AddSingleton<UAService>();
services.AddSingleton<MyMqttClient>(); services.AddSingleton<MyMqttClient>();
services.AddSingleton<ModbusSlaveService>(); services.AddSingleton<ModbusSlaveService>();
@ -125,10 +121,15 @@ namespace IoTGateway
app.UseEndpoints(endpoints => app.UseEndpoints(endpoints =>
{ {
//MqttServerWebSocket //MqttServer
endpoints.MapConnectionHandler<MqttConnectionHandler>("/mqtt", options => app.UseEndpoints(endpoints =>
{ {
options.WebSockets.SubProtocolSelector = MqttSubProtocolSelector.SelectSubProtocol; endpoints.MapMqtt("/mqtt");
});
app.UseMqttServer(server =>
{
// Todo: Do something with the server
}); });
endpoints.MapControllerRoute( endpoints.MapControllerRoute(

Binary file not shown.

View File

@ -19,18 +19,18 @@ namespace Plugin
public List<DeviceThread> DeviceThreads = new List<DeviceThread>(); public List<DeviceThread> DeviceThreads = new List<DeviceThread>();
private readonly MyMqttClient _myMqttClient; private readonly MyMqttClient _myMqttClient;
private readonly UAService _uAService; private readonly UAService _uAService;
private readonly IMqttServer _mqttServer; private readonly MqttServer _mqttServer;
private readonly string _connnectSetting = IoTBackgroundService.connnectSetting; private readonly string _connnectSetting = IoTBackgroundService.connnectSetting;
private readonly DBTypeEnum _dbType = IoTBackgroundService.DbType; private readonly DBTypeEnum _dbType = IoTBackgroundService.DbType;
//UAService? uAService,
public DeviceService(IConfiguration configRoot, DriverService drvierManager, MyMqttClient myMqttClient, public DeviceService(IConfiguration configRoot, DriverService drvierManager, MyMqttClient myMqttClient,
UAService uAService, IMqttServer mqttServer, ILogger<DeviceService> logger) MqttServer mqttServer, ILogger<DeviceService> logger)
{ {
if (mqttServer == null) throw new ArgumentNullException(nameof(mqttServer));
_logger = logger; _logger = logger;
DrvierManager = drvierManager; DrvierManager = drvierManager;
_myMqttClient = myMqttClient; _myMqttClient = myMqttClient;
_uAService = uAService; //_uAService = uAService;
_mqttServer = mqttServer ?? throw new ArgumentNullException(nameof(mqttServer)); _mqttServer = mqttServer ?? throw new ArgumentNullException(nameof(mqttServer));
try try
{ {

View File

@ -1,11 +1,13 @@
using PluginInterface; using PluginInterface;
using System.Reflection; using System.Reflection;
using System.Text;
using IoTGateway.DataAccess; using IoTGateway.DataAccess;
using IoTGateway.Model; using IoTGateway.Model;
using DynamicExpresso; using DynamicExpresso;
using MQTTnet.Server; using MQTTnet.Server;
using Newtonsoft.Json; using Newtonsoft.Json;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using MQTTnet;
namespace Plugin namespace Plugin
{ {
@ -26,7 +28,7 @@ namespace Plugin
private bool _lastConnected; private bool _lastConnected;
public DeviceThread(Device device, IDriver driver, string projectId, MyMqttClient myMqttClient, public DeviceThread(Device device, IDriver driver, string projectId, MyMqttClient myMqttClient,
IMqttServer mqttServer, ILogger logger) MqttServer mqttServer, ILogger logger)
{ {
_myMqttClient = myMqttClient; _myMqttClient = myMqttClient;
_myMqttClient.OnExcRpc += MyMqttClient_OnExcRpc; _myMqttClient.OnExcRpc += MyMqttClient_OnExcRpc;
@ -119,13 +121,24 @@ namespace Plugin
ret.CookedValue?.ToString()) ret.CookedValue?.ToString())
{ {
//这是设备变量列表要用的 //这是设备变量列表要用的
mqttServer.PublishAsync( var msg = new InjectedMqttApplicationMessage(
$"internal/v1/gateway/telemetry/{Device.DeviceName}/{item.Name}", new MqttApplicationMessage()
JsonConvert.SerializeObject(ret)); {
Topic =
$"internal/v1/gateway/telemetry/{Device.DeviceName}/{item.Name}",
Payload = Encoding.UTF8.GetBytes(
JsonConvert.SerializeObject(ret))
});
mqttServer.InjectApplicationMessage(msg);
//这是在线组态要用的 //这是在线组态要用的
mqttServer.PublishAsync( msg = new InjectedMqttApplicationMessage(
$"v1/gateway/telemetry/{Device.DeviceName}/{item.Name}", new MqttApplicationMessage()
JsonConvert.SerializeObject(ret.CookedValue)); {
Topic =
$"v1/gateway/telemetry/{Device.DeviceName}/{item.Name}",
Payload = Encoding.UTF8.GetBytes(
JsonConvert.SerializeObject(ret.CookedValue))
});
} }
DeviceValues[item.ID] = ret; DeviceValues[item.ID] = ret;
@ -224,7 +237,7 @@ namespace Plugin
if (!writeResponse.IsSuccess) if (!writeResponse.IsSuccess)
{ {
rpcResponse.Description = writeResponse.Description; rpcResponse.Description = writeResponse.Description;
break; continue;
} }
} }
else else

View File

@ -1,13 +1,10 @@
using System.Text; using IoTGateway.DataAccess;
using IoTGateway.DataAccess;
using IoTGateway.Model; using IoTGateway.Model;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using MQTTnet; using MQTTnet;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Client.Connecting; using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Client.Disconnecting; using MQTTnet.Packets;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol; using MQTTnet.Protocol;
using Newtonsoft.Json; using Newtonsoft.Json;
using PluginInterface; using PluginInterface;
@ -15,7 +12,6 @@ using PluginInterface.HuaWeiRoma;
using PluginInterface.IotDB; using PluginInterface.IotDB;
using PluginInterface.IoTSharp; using PluginInterface.IoTSharp;
using PluginInterface.ThingsBoard; using PluginInterface.ThingsBoard;
using Quickstarts.ReferenceServer;
namespace Plugin namespace Plugin
{ {
@ -24,113 +20,74 @@ namespace Plugin
private readonly ILogger<MyMqttClient> _logger; private readonly ILogger<MyMqttClient> _logger;
//private readonly ReferenceNodeManager? _uaNodeManager; //private readonly ReferenceNodeManager? _uaNodeManager;
private SystemConfig? _systemConfig; private SystemConfig _systemConfig;
private IMqttClientOptions _clientOptions; private ManagedMqttClientOptions _options;
public bool IsConnected => (Client.IsConnected); public bool IsConnected => (Client.IsConnected);
private IMqttClient Client { get; set; } private IManagedMqttClient? Client { get; set; }
public event EventHandler<RpcRequest> OnExcRpc; public event EventHandler<RpcRequest> OnExcRpc;
public event EventHandler<ISAttributeResponse> OnReceiveAttributes; public event EventHandler<ISAttributeResponse> OnReceiveAttributes;
private readonly string _tbRpcTopic = "v1/gateway/rpc";
public MyMqttClient(UAService uaService, ILogger<MyMqttClient> logger) //UAService uaService,
public MyMqttClient(ILogger<MyMqttClient> logger)
{ {
_logger = logger; _logger = logger;
//_uaNodeManager = uaService.server.m_server.nodeManagers[0] as ReferenceNodeManager; //_uaNodeManager = uaService.server.m_server.nodeManagers[0] as ReferenceNodeManager;
ConnectAsync();
StartManagedClientAsync().Wait();
} }
public async Task ConnectAsync() public async Task StartManagedClientAsync()
{ {
try try
{ {
if (Client != null)
{
Client.Dispose();
}
Client = new MqttFactory().CreateManagedMqttClient();
await using var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType); await using var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType);
_systemConfig = dc.Set<SystemConfig>().FirstOrDefault(); _systemConfig = dc.Set<SystemConfig>().First();
if (_systemConfig == null)
{
_systemConfig = new SystemConfig()
{
ID = Guid.NewGuid(),
GatewayName = "iotgateway",
ClientId = Guid.NewGuid().ToString(),
MqttIp = "localhost",
MqttPort = 1888,
MqttUName = "user",
MqttUPwd = "pwd",
IoTPlatformType = IoTPlatformType.IoTSharp
};
dc.Set<SystemConfig>().Add(_systemConfig);
await dc.SaveChangesAsync();
}
var factory = new MqttFactory(); #region ClientOptions
Client = (MqttClient)factory.CreateMqttClient(); // Setup and start a managed MQTT client.
_clientOptions = new MqttClientOptionsBuilder() _options = new ManagedMqttClientOptionsBuilder()
.WithClientId(_systemConfig.ClientId ?? Guid.NewGuid().ToString()) .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithTcpServer(_systemConfig.MqttIp, _systemConfig.MqttPort) .WithMaxPendingMessages(10000)
.WithCredentials(_systemConfig.MqttUName, _systemConfig.MqttUPwd) .WithClientOptions(new MqttClientOptionsBuilder()
.WithCommunicationTimeout(TimeSpan.FromSeconds(30)) .WithClientId(_systemConfig.ClientId ?? Guid.NewGuid().ToString())
.WithKeepAlivePeriod(TimeSpan.FromSeconds(20)) .WithTcpServer(_systemConfig.MqttIp, _systemConfig.MqttPort)
.WithCredentials(_systemConfig.MqttUName, _systemConfig.MqttUPwd)
.WithTimeout(TimeSpan.FromSeconds(30))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(20))
.Build())
.Build(); .Build();
#endregion
#region Topics
Client.ApplicationMessageReceivedHandler = List<MqttTopicFilter> subTopics = new();
new MqttApplicationMessageReceivedHandlerDelegate(Client_ApplicationMessageReceived);
Client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(_ => OnConnected());
Client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(_ => OnDisconnectedAsync());
try
{
await Client.ConnectAsync(_clientOptions);
}
catch (Exception ex)
{
_logger.LogError("MQTT CONNECTING FAILED", ex);
}
_logger.LogInformation("MQTT WAITING FOR APPLICATION MESSAGES");
}
catch (Exception ex)
{
_logger.LogError("MQTT CONNECTING FAILED", ex);
}
}
private async Task OnDisconnectedAsync()
{
try
{
await Client.ConnectAsync(_clientOptions);
}
catch (Exception ex)
{
_logger.LogError("MQTT CONNECTING FAILED", ex);
}
}
private readonly string _tbRpcTopic = "v1/gateway/rpc";
private void OnConnected()
{
if (_systemConfig != null)
switch (_systemConfig.IoTPlatformType) switch (_systemConfig.IoTPlatformType)
{ {
case IoTPlatformType.ThingsBoard: case IoTPlatformType.ThingsBoard:
//{"device": "Device A", "data": {"id": $request_id, "method": "toggle_gpio", "params": {"pin":1}}} //{"device": "Device A", "data": {"id": $request_id, "method": "toggle_gpio", "params": {"pin":1}}}
Client.SubscribeAsync(_tbRpcTopic, MqttQualityOfServiceLevel.ExactlyOnce); subTopics.Add(new MqttTopicFilterBuilder().WithTopic(_tbRpcTopic).WithExactlyOnceQoS().Build());
//Message: {"id": $request_id, "device": "Device A", "value": "value1"} //Message: {"id": $request_id, "device": "Device A", "value": "value1"}
Client.SubscribeAsync("v1/gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce); subTopics.Add(new MqttTopicFilterBuilder().WithTopic("v1/gateway/attributes/response").WithExactlyOnceQoS().Build());
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}} //Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
Client.SubscribeAsync("v1/gateway/attributes", MqttQualityOfServiceLevel.ExactlyOnce); subTopics.Add(new MqttTopicFilterBuilder().WithTopic("v1/gateway/attributese").WithExactlyOnceQoS().Build());
break; break;
case IoTPlatformType.IoTSharp: case IoTPlatformType.IoTSharp:
Client.SubscribeAsync("devices/+/rpc/request/+/+", MqttQualityOfServiceLevel.ExactlyOnce); subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/rpc/request/+/+").WithExactlyOnceQoS().Build());
Client.SubscribeAsync("devices/+/attributes/update", MqttQualityOfServiceLevel.ExactlyOnce); subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/attributes/update").WithExactlyOnceQoS().Build());
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}} //Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
Client.SubscribeAsync("devices/+/attributes/response/+", MqttQualityOfServiceLevel.ExactlyOnce); subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/attributes/response/+").WithExactlyOnceQoS().Build());
break; break;
case IoTPlatformType.ThingsCloud: case IoTPlatformType.ThingsCloud:
Client.SubscribeAsync("gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce); subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/response").WithExactlyOnceQoS().Build());
Client.SubscribeAsync("gateway/attributes/get/response", MqttQualityOfServiceLevel.ExactlyOnce); subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/get/response").WithExactlyOnceQoS().Build());
Client.SubscribeAsync("gateway/attributes/push", MqttQualityOfServiceLevel.ExactlyOnce); subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/push").WithExactlyOnceQoS().Build());
Client.SubscribeAsync("gateway/event/response", MqttQualityOfServiceLevel.ExactlyOnce); subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/event/response").WithExactlyOnceQoS().Build());
Client.SubscribeAsync("gateway/command/send", MqttQualityOfServiceLevel.ExactlyOnce); subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/command/send").WithExactlyOnceQoS().Build());
break; break;
case IoTPlatformType.AliCloudIoT: case IoTPlatformType.AliCloudIoT:
break; break;
@ -142,11 +99,34 @@ namespace Plugin
break; break;
} }
_logger.LogInformation($"MQTT CONNECTED WITH SERVER "); #endregion
Client.ConnectedAsync += Client_ConnectedAsync;
Client.DisconnectedAsync += Client_DisconnectedAsync;
Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
await Client.SubscribeAsync(subTopics);
await Client.StartAsync(_options);
_logger.LogInformation("MQTT WAITING FOR APPLICATION MESSAGES");
}
catch (Exception ex)
{
_logger.LogError($"StartManagedClientAsync FAILED, {ex}");
}
} }
private Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
_logger.LogInformation($"MQTT CONNECTED WITH SERVER ");
return Task.CompletedTask;
}
private Task Client_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) private Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
_logger.LogError($"MQTT CONNECTING FAILED, {arg.ReasonString}");
return Task.CompletedTask;
}
private Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
{ {
_logger.LogDebug( _logger.LogDebug(
$"ApplicationMessageReceived Topic {e.ApplicationMessage.Topic} QualityOfServiceLevel:{e.ApplicationMessage.QualityOfServiceLevel} Retain:{e.ApplicationMessage.Retain} "); $"ApplicationMessageReceived Topic {e.ApplicationMessage.Topic} QualityOfServiceLevel:{e.ApplicationMessage.QualityOfServiceLevel} Retain:{e.ApplicationMessage.Retain} ");
@ -269,7 +249,7 @@ namespace Plugin
private async Task ResponseTbRpcAsync(TBRpcResponse tBRpcResponse) private async Task ResponseTbRpcAsync(TBRpcResponse tBRpcResponse)
{ {
await Client.PublishAsync(new MqttApplicationMessageBuilder() await Client.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic(_tbRpcTopic) .WithTopic(_tbRpcTopic)
.WithPayload(JsonConvert.SerializeObject(tBRpcResponse)) .WithPayload(JsonConvert.SerializeObject(tBRpcResponse))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
@ -278,7 +258,7 @@ namespace Plugin
private async Task ResponseTcRpcAsync(TCRpcRequest tCRpcResponse) private async Task ResponseTcRpcAsync(TCRpcRequest tCRpcResponse)
{ {
var topic = $"command/reply/{tCRpcResponse.RequestData.RequestId}"; var topic = $"command/reply/{tCRpcResponse.RequestData.RequestId}";
await Client.PublishAsync(new MqttApplicationMessageBuilder() await Client.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic) .WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(tCRpcResponse)) .WithPayload(JsonConvert.SerializeObject(tCRpcResponse))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
@ -288,7 +268,7 @@ namespace Plugin
{ {
//var responseTopic = $"/devices/{deviceid}/rpc/response/{methodName}/{rpcid}"; //var responseTopic = $"/devices/{deviceid}/rpc/response/{methodName}/{rpcid}";
var topic = $"devices/{rpcResult.DeviceId}/rpc/response/{rpcResult.Method}/{rpcResult.ResponseId}"; var topic = $"devices/{rpcResult.DeviceId}/rpc/response/{rpcResult.Method}/{rpcResult.ResponseId}";
await Client.PublishAsync(new MqttApplicationMessageBuilder() await Client.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic) .WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(rpcResult)) .WithPayload(JsonConvert.SerializeObject(rpcResult))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
@ -327,7 +307,7 @@ namespace Plugin
try try
{ {
if (Client.IsConnected) if (Client.IsConnected)
return Client.PublishAsync(new MqttApplicationMessageBuilder() return Client.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic($"devices/{deviceName}/attributes").WithPayload(JsonConvert.SerializeObject(obj)) .WithTopic($"devices/{deviceName}/attributes").WithPayload(JsonConvert.SerializeObject(obj))
.Build()); .Build());
} }
@ -341,14 +321,14 @@ namespace Plugin
public async Task UploadIsTelemetryDataAsync(string deviceName, object obj) public async Task UploadIsTelemetryDataAsync(string deviceName, object obj)
{ {
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"devices/{deviceName}/telemetry") await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"devices/{deviceName}/telemetry")
.WithPayload(JsonConvert.SerializeObject(obj)).Build()); .WithPayload(JsonConvert.SerializeObject(obj)).Build());
} }
public async Task UploadTcTelemetryDataAsync(string deviceName, object obj) public async Task UploadTcTelemetryDataAsync(string deviceName, object obj)
{ {
var toSend = new Dictionary<string, object> { { deviceName, obj } }; var toSend = new Dictionary<string, object> { { deviceName, obj } };
await Client.PublishAsync("gateway/attributes", JsonConvert.SerializeObject(toSend)); await Client.EnqueueAsync("gateway/attributes", JsonConvert.SerializeObject(toSend));
} }
public async Task UploadHwTelemetryDataAsync(Device device, object obj) public async Task UploadHwTelemetryDataAsync(Device device, object obj)
@ -374,7 +354,7 @@ namespace Plugin
Devices = hwTelemetry Devices = hwTelemetry
}; };
await Client.PublishAsync($"/v1/devices/{_systemConfig?.GatewayName}/datas", await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/datas",
JsonConvert.SerializeObject(hwTelemetrys)); JsonConvert.SerializeObject(hwTelemetrys));
} }
@ -382,43 +362,42 @@ namespace Plugin
{ {
try try
{ {
if (_systemConfig != null) switch (_systemConfig.IoTPlatformType)
switch (_systemConfig.IoTPlatformType) {
{ case IoTPlatformType.ThingsBoard:
case IoTPlatformType.ThingsBoard: var tRpcResponse = new TBRpcResponse
var tRpcResponse = new TBRpcResponse {
DeviceName = rpcResponse.DeviceName,
RequestId = rpcResponse.RequestId,
ResponseData = new Dictionary<string, object>
{ { "success", rpcResponse.IsSuccess }, { "description", rpcResponse.Description } }
};
await ResponseTbRpcAsync(tRpcResponse);
break;
case IoTPlatformType.IoTSharp:
await ResponseIsRpcAsync(new ISRpcResponse
{
DeviceId = rpcResponse.DeviceName,
Method = "Method",
ResponseId = rpcResponse.RequestId,
Data = JsonConvert.SerializeObject(new Dictionary<string, object>
{ {
DeviceName = rpcResponse.DeviceName, { "success", rpcResponse.IsSuccess }, { "description", rpcResponse.Description }
RequestId = rpcResponse.RequestId, })
ResponseData = new Dictionary<string, object> });
{ { "success", rpcResponse.IsSuccess }, { "description", rpcResponse.Description } } break;
}; case IoTPlatformType.ThingsCloud:
await ResponseTbRpcAsync(tRpcResponse); //官网API不需要回复的
break; break;
case IoTPlatformType.IoTSharp: case IoTPlatformType.AliCloudIoT:
await ResponseIsRpcAsync(new ISRpcResponse break;
{ case IoTPlatformType.TencentIoTHub:
DeviceId = rpcResponse.DeviceName, break;
Method = "Method", case IoTPlatformType.BaiduIoTCore:
ResponseId = rpcResponse.RequestId, break;
Data = JsonConvert.SerializeObject(new Dictionary<string, object> case IoTPlatformType.OneNET:
{ break;
{ "success", rpcResponse.IsSuccess }, { "description", rpcResponse.Description } }
})
});
break;
case IoTPlatformType.ThingsCloud:
//官网API不需要回复的
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
}
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -431,39 +410,38 @@ namespace Plugin
try try
{ {
string id = Guid.NewGuid().ToString(); string id = Guid.NewGuid().ToString();
if (_systemConfig != null) switch (_systemConfig.IoTPlatformType)
switch (_systemConfig.IoTPlatformType) {
{ case IoTPlatformType.ThingsBoard:
case IoTPlatformType.ThingsBoard: //{"id": $request_id, "device": "Device A", "client": true, "key": "attribute1"}
//{"id": $request_id, "device": "Device A", "client": true, "key": "attribute1"} Dictionary<string, object> tbRequestData = new Dictionary<string, object>
Dictionary<string, object> tbRequestData = new Dictionary<string, object> {
{ { "id", id },
{ "id", id }, { "device", deviceName },
{ "device", deviceName }, { "client", true },
{ "client", true }, { "key", args[0] }
{ "key", args[0] } };
}; await Client.EnqueueAsync("v1/gateway/attributes/request",
await Client.PublishAsync("v1/gateway/attributes/request", JsonConvert.SerializeObject(tbRequestData), MqttQualityOfServiceLevel.ExactlyOnce);
JsonConvert.SerializeObject(tbRequestData), MqttQualityOfServiceLevel.ExactlyOnce); break;
break; case IoTPlatformType.IoTSharp:
case IoTPlatformType.IoTSharp: string topic = $"devices/{deviceName}/attributes/request/{id}";
string topic = $"devices/{deviceName}/attributes/request/{id}"; Dictionary<string, string> keys = new Dictionary<string, string>();
Dictionary<string, string> keys = new Dictionary<string, string>(); keys.Add(anySide ? "anySide" : "server", string.Join(",", args));
keys.Add(anySide ? "anySide" : "server", string.Join(",", args)); await Client.SubscribeAsync($"devices/{deviceName}/attributes/response/{id}",
await Client.SubscribeAsync($"devices/{deviceName}/attributes/response/{id}", MqttQualityOfServiceLevel.ExactlyOnce);
MqttQualityOfServiceLevel.ExactlyOnce); await Client.EnqueueAsync(topic, JsonConvert.SerializeObject(keys),
await Client.PublishAsync(topic, JsonConvert.SerializeObject(keys), MqttQualityOfServiceLevel.ExactlyOnce);
MqttQualityOfServiceLevel.ExactlyOnce); break;
break; case IoTPlatformType.AliCloudIoT:
case IoTPlatformType.AliCloudIoT: break;
break; case IoTPlatformType.TencentIoTHub:
case IoTPlatformType.TencentIoTHub: break;
break; case IoTPlatformType.BaiduIoTCore:
case IoTPlatformType.BaiduIoTCore: break;
break; case IoTPlatformType.OneNET:
case IoTPlatformType.OneNET: break;
break; }
}
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -526,64 +504,63 @@ namespace Plugin
{ {
if (CanPubTelemetry(device, sendModel)) if (CanPubTelemetry(device, sendModel))
{ {
if (_systemConfig != null) switch (_systemConfig.IoTPlatformType)
switch (_systemConfig.IoTPlatformType) {
case IoTPlatformType.ThingsBoard:
await Client.EnqueueAsync("v1/gateway/telemetry",
JsonConvert.SerializeObject(sendModel));
break;
case IoTPlatformType.IoTSharp:
foreach (var payload in sendModel[device.DeviceName])
{
if (payload.Values != null)
await UploadIsTelemetryDataAsync(device.DeviceName, payload.Values);
}
break;
case IoTPlatformType.ThingsCloud:
foreach (var payload in sendModel[device.DeviceName])
{
if (payload.Values != null)
await UploadTcTelemetryDataAsync(device.DeviceName, payload.Values);
}
break;
case IoTPlatformType.IotDB:
{ {
case IoTPlatformType.ThingsBoard: foreach (var payload in sendModel[device.DeviceName])
await Client.PublishAsync("v1/gateway/telemetry", {
JsonConvert.SerializeObject(sendModel)); if (payload.DeviceStatus != DeviceStatusTypeEnum.Good)
break; continue;
case IoTPlatformType.IoTSharp:
foreach (var payload in sendModel[device.DeviceName]) IotTsData tsData = new IotTsData()
{ {
if (payload.Values != null) device = device.DeviceName,
await UploadIsTelemetryDataAsync(device.DeviceName, payload.Values); timestamp = payload.TS,
} measurements = payload.Values?.Keys.ToList(),
values = payload.Values?.Values.ToList()
};
await Client.EnqueueAsync(device.DeviceName, JsonConvert.SerializeObject(tsData));
}
break; break;
case IoTPlatformType.ThingsCloud:
foreach (var payload in sendModel[device.DeviceName])
{
if (payload.Values != null)
await UploadTcTelemetryDataAsync(device.DeviceName, payload.Values);
}
break;
case IoTPlatformType.IotDB:
{
foreach (var payload in sendModel[device.DeviceName])
{
if (payload.DeviceStatus != DeviceStatusTypeEnum.Good)
continue;
IotTsData tsData = new IotTsData()
{
device = device.DeviceName,
timestamp = payload.TS,
measurements = payload.Values?.Keys.ToList(),
values = payload.Values?.Values.ToList()
};
await Client.PublishAsync(device.DeviceName, JsonConvert.SerializeObject(tsData));
}
break;
}
case IoTPlatformType.HuaWei:
foreach (var payload in sendModel[device.DeviceName])
{
if (payload.Values != null)
await UploadHwTelemetryDataAsync(device, payload.Values);
}
break;
case IoTPlatformType.AliCloudIoT:
case IoTPlatformType.TencentIoTHub:
case IoTPlatformType.BaiduIoTCore:
case IoTPlatformType.OneNET:
default:
break;
} }
case IoTPlatformType.HuaWei:
foreach (var payload in sendModel[device.DeviceName])
{
if (payload.Values != null)
await UploadHwTelemetryDataAsync(device, payload.Values);
}
break;
case IoTPlatformType.AliCloudIoT:
case IoTPlatformType.TencentIoTHub:
case IoTPlatformType.BaiduIoTCore:
case IoTPlatformType.OneNET:
default:
break;
}
} }
//foreach (var payload in sendModel[device.DeviceName]) //foreach (var payload in sendModel[device.DeviceName])
@ -599,7 +576,7 @@ namespace Plugin
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError($"PublishTelemetryAsync Error", ex); _logger.LogError($"PublishTelemetryAsync Error:{ex}");
} }
} }
@ -607,48 +584,47 @@ namespace Plugin
{ {
try try
{ {
if (_systemConfig != null) switch (_systemConfig.IoTPlatformType)
switch (_systemConfig.IoTPlatformType) {
{ case IoTPlatformType.ThingsBoard:
case IoTPlatformType.ThingsBoard: case IoTPlatformType.IoTSharp:
case IoTPlatformType.IoTSharp: await Client.EnqueueAsync("v1/gateway/connect",
await Client.PublishAsync("v1/gateway/connect", JsonConvert.SerializeObject(new Dictionary<string, string>
JsonConvert.SerializeObject(new Dictionary<string, string> { { "device", device.DeviceName } }));
{ { "device", device.DeviceName } })); break;
break; case IoTPlatformType.AliCloudIoT:
case IoTPlatformType.AliCloudIoT: break;
break; case IoTPlatformType.TencentIoTHub:
case IoTPlatformType.TencentIoTHub: break;
break; case IoTPlatformType.BaiduIoTCore:
case IoTPlatformType.BaiduIoTCore: break;
break; case IoTPlatformType.OneNET:
case IoTPlatformType.OneNET: break;
break; case IoTPlatformType.ThingsCloud:
case IoTPlatformType.ThingsCloud: await Client.EnqueueAsync("gateway/connect",
await Client.PublishAsync("gateway/connect", JsonConvert.SerializeObject(new Dictionary<string, string>
JsonConvert.SerializeObject(new Dictionary<string, string> { { "device", device.DeviceName } }));
{ { "device", device.DeviceName } })); break;
break; case IoTPlatformType.HuaWei:
case IoTPlatformType.HuaWei: var deviceOnLine = new HwDeviceOnOffLine()
var deviceOnLine = new HwDeviceOnOffLine() {
MId = new Random().Next(0, 1024), //命令ID
DeviceStatuses = new List<DeviceStatus>()
{ {
MId = new Random().Next(0, 1024), //命令ID new DeviceStatus()
DeviceStatuses = new List<DeviceStatus>()
{ {
new DeviceStatus() DeviceId = device.DeviceConfigs
{ .FirstOrDefault(x => x.DeviceConfigName == "DeviceId")
DeviceId = device.DeviceConfigs ?.Value,
.FirstOrDefault(x => x.DeviceConfigName == "DeviceId") Status = "ONLINE"
?.Value,
Status = "ONLINE"
}
} }
}; }
await Client.PublishAsync($"/v1/devices/{_systemConfig.GatewayName}/topo/update", };
JsonConvert.SerializeObject(deviceOnLine), MqttQualityOfServiceLevel.AtLeastOnce, await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/topo/update",
retain: false); JsonConvert.SerializeObject(deviceOnLine), MqttQualityOfServiceLevel.AtLeastOnce,
break; retain: false);
} break;
}
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -660,48 +636,47 @@ namespace Plugin
{ {
try try
{ {
if (_systemConfig != null) switch (_systemConfig.IoTPlatformType)
switch (_systemConfig.IoTPlatformType) {
{ case IoTPlatformType.ThingsBoard:
case IoTPlatformType.ThingsBoard: case IoTPlatformType.IoTSharp:
case IoTPlatformType.IoTSharp: await Client.EnqueueAsync("v1/gateway/disconnect",
await Client.PublishAsync("v1/gateway/disconnect", JsonConvert.SerializeObject(new Dictionary<string, string>
JsonConvert.SerializeObject(new Dictionary<string, string> { { "device", device.DeviceName } }));
{ { "device", device.DeviceName } })); break;
break; case IoTPlatformType.AliCloudIoT:
case IoTPlatformType.AliCloudIoT: break;
break; case IoTPlatformType.TencentIoTHub:
case IoTPlatformType.TencentIoTHub: break;
break; case IoTPlatformType.BaiduIoTCore:
case IoTPlatformType.BaiduIoTCore: break;
break; case IoTPlatformType.OneNET:
case IoTPlatformType.OneNET: break;
break; case IoTPlatformType.ThingsCloud:
case IoTPlatformType.ThingsCloud: await Client.EnqueueAsync("gateway/disconnect",
await Client.PublishAsync("gateway/disconnect", JsonConvert.SerializeObject(new Dictionary<string, string>
JsonConvert.SerializeObject(new Dictionary<string, string> { { "device", device.DeviceName } }));
{ { "device", device.DeviceName } })); break;
break; case IoTPlatformType.HuaWei:
case IoTPlatformType.HuaWei: var deviceOnLine = new HwDeviceOnOffLine()
var deviceOnLine = new HwDeviceOnOffLine() {
MId = new Random().Next(0, 1024), //命令ID
DeviceStatuses = new List<DeviceStatus>()
{ {
MId = new Random().Next(0, 1024), //命令ID new DeviceStatus()
DeviceStatuses = new List<DeviceStatus>()
{ {
new DeviceStatus() DeviceId = device.DeviceConfigs
{ .FirstOrDefault(x => x.DeviceConfigName == "DeviceId")
DeviceId = device.DeviceConfigs ?.Value,
.FirstOrDefault(x => x.DeviceConfigName == "DeviceId") Status = "OFFLINE"
?.Value,
Status = "OFFLINE"
}
} }
}; }
await Client.PublishAsync($"/v1/devices/{_systemConfig.GatewayName}/topo/update", };
JsonConvert.SerializeObject(deviceOnLine), MqttQualityOfServiceLevel.AtLeastOnce, await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/topo/update",
retain: false); JsonConvert.SerializeObject(deviceOnLine), MqttQualityOfServiceLevel.AtLeastOnce,
break; retain: false);
} break;
}
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -714,31 +689,30 @@ namespace Plugin
{ {
try try
{ {
if (_systemConfig != null) switch (_systemConfig.IoTPlatformType)
switch (_systemConfig.IoTPlatformType) {
{ case IoTPlatformType.HuaWei:
case IoTPlatformType.HuaWei: var topic = $"/v1/devices/{_systemConfig.GatewayName}/topo/add";
var topic = $"/v1/devices/{_systemConfig.GatewayName}/topo/add";
var addDeviceDto = new HwAddDeviceDto var addDeviceDto = new HwAddDeviceDto
{
MId = new Random().Next(0, 1024), //命令ID
};
addDeviceDto.DeviceInfos.Add(
new DeviceInfo
{ {
MId = new Random().Next(0, 1024), //命令ID NodeId = device.DeviceName,
}; Name = device.DeviceName,
addDeviceDto.DeviceInfos.Add( Description = device.Description,
new DeviceInfo ManufacturerId = "Test_n",
{ ProductType = "A_n"
NodeId = device.DeviceName, }
Name = device.DeviceName, );
Description = device.Description,
ManufacturerId = "Test_n",
ProductType = "A_n"
}
);
await Client.PublishAsync(topic, await Client.EnqueueAsync(topic,
JsonConvert.SerializeObject(addDeviceDto)); JsonConvert.SerializeObject(addDeviceDto));
break; break;
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -751,35 +725,34 @@ namespace Plugin
{ {
try try
{ {
if (_systemConfig != null) switch (_systemConfig.IoTPlatformType)
switch (_systemConfig.IoTPlatformType) {
{ case IoTPlatformType.HuaWei:
case IoTPlatformType.HuaWei: var topic = $"/v1/devices/{_systemConfig.GatewayName}/topo/delete";
var topic = $"/v1/devices/{_systemConfig.GatewayName}/topo/delete";
await using (var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType)) await using (var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType))
{
var deviceId = dc.Set<DeviceConfig>().FirstOrDefault(x =>
x.DeviceId == device.ID && x.DeviceConfigName == "DeviceId")?.Value;
var deleteDeviceDto = new HwDeleteDeviceDto
{ {
var deviceId = dc.Set<DeviceConfig>().FirstOrDefault(x => Id = new Random().Next(0, 1024), //命令ID
x.DeviceId == device.ID && x.DeviceConfigName == "DeviceId")?.Value; DeviceId = deviceId,
RequestTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds,
var deleteDeviceDto = new HwDeleteDeviceDto Request = new()
{ {
Id = new Random().Next(0, 1024), //命令ID ManufacturerId = "Test_n",
DeviceId = deviceId, ManufacturerName = "Test_n",
RequestTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds, ProductType = "A_n"
Request = new() }
{ };
ManufacturerId = "Test_n",
ManufacturerName = "Test_n",
ProductType = "A_n"
}
};
await Client.PublishAsync(topic, await Client.EnqueueAsync(topic,
JsonConvert.SerializeObject(deleteDeviceDto)); JsonConvert.SerializeObject(deleteDeviceDto));
} }
break; break;
} }
} }
catch (Exception ex) catch (Exception ex)
{ {

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFramework>net6.0</TargetFramework> <TargetFramework>net6.0</TargetFramework>
@ -9,8 +9,9 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="DynamicExpresso.Core" Version="2.13.0" /> <PackageReference Include="DynamicExpresso.Core" Version="2.13.0" />
<PackageReference Include="Mono.Options" Version="6.12.0.148" /> <PackageReference Include="Mono.Options" Version="6.12.0.148" />
<PackageReference Include="MQTTnet" Version="3.1.2" /> <PackageReference Include="MQTTnet" Version="4.1.0.247" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Server" Version="1.4.369.30" /> <PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Server" Version="1.4.370.12" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.1.0.247" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>