iotgateway/Plugins/Plugin/MyMqttClient.cs

788 lines
35 KiB
C#
Raw Normal View History

2022-08-23 08:29:29 +00:00
using IoTGateway.DataAccess;
2022-03-24 13:38:11 +00:00
using IoTGateway.Model;
using Microsoft.Extensions.Logging;
2021-12-12 06:55:48 +00:00
using MQTTnet;
using MQTTnet.Client;
2022-08-23 08:29:29 +00:00
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Packets;
2022-03-24 13:38:11 +00:00
using MQTTnet.Protocol;
2021-12-12 06:55:48 +00:00
using Newtonsoft.Json;
using PluginInterface;
2022-08-10 08:55:44 +00:00
using PluginInterface.HuaWeiRoma;
2022-06-06 07:15:19 +00:00
using PluginInterface.IotDB;
2022-04-13 09:01:24 +00:00
using PluginInterface.IoTSharp;
using PluginInterface.ThingsBoard;
2022-08-29 05:45:46 +00:00
using System.Xml.Linq;
2021-12-12 06:55:48 +00:00
namespace Plugin
{
2022-03-24 13:38:11 +00:00
public class MyMqttClient
2021-12-12 06:55:48 +00:00
{
2022-03-24 13:38:11 +00:00
private readonly ILogger<MyMqttClient> _logger;
//private readonly ReferenceNodeManager? _uaNodeManager;
2022-03-24 13:38:11 +00:00
2022-08-23 08:29:29 +00:00
private SystemConfig _systemConfig;
private ManagedMqttClientOptions _options;
2022-08-10 08:55:44 +00:00
public bool IsConnected => (Client.IsConnected);
2022-08-23 08:29:29 +00:00
private IManagedMqttClient? Client { get; set; }
2022-03-24 13:38:11 +00:00
public event EventHandler<RpcRequest> OnExcRpc;
2022-04-13 09:01:24 +00:00
public event EventHandler<ISAttributeResponse> OnReceiveAttributes;
2022-08-23 08:29:29 +00:00
private readonly string _tbRpcTopic = "v1/gateway/rpc";
2022-08-10 08:55:44 +00:00
2022-08-23 08:29:29 +00:00
//UAService uaService,
public MyMqttClient(ILogger<MyMqttClient> logger)
2021-12-12 06:55:48 +00:00
{
2022-03-24 13:38:11 +00:00
_logger = logger;
//_uaNodeManager = uaService.server.m_server.nodeManagers[0] as ReferenceNodeManager;
2022-08-23 08:29:29 +00:00
StartManagedClientAsync().Wait();
2021-12-12 06:55:48 +00:00
}
2022-03-24 13:38:11 +00:00
2022-08-23 08:29:29 +00:00
public async Task StartManagedClientAsync()
2021-12-12 06:55:48 +00:00
{
try
{
2022-08-23 08:29:29 +00:00
if (Client != null)
2021-12-12 06:55:48 +00:00
{
2022-08-23 08:29:29 +00:00
Client.Dispose();
2022-08-10 08:55:44 +00:00
}
2022-08-23 08:29:29 +00:00
Client = new MqttFactory().CreateManagedMqttClient();
await using var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType);
_systemConfig = dc.Set<SystemConfig>().First();
#region ClientOptions
// Setup and start a managed MQTT client.
_options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithMaxPendingMessages(10000)
.WithClientOptions(new MqttClientOptionsBuilder()
.WithClientId(_systemConfig.ClientId ?? Guid.NewGuid().ToString())
.WithTcpServer(_systemConfig.MqttIp, _systemConfig.MqttPort)
.WithCredentials(_systemConfig.MqttUName, _systemConfig.MqttUPwd)
.WithTimeout(TimeSpan.FromSeconds(30))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(20))
.Build())
2022-08-10 08:55:44 +00:00
.Build();
2022-08-23 08:29:29 +00:00
#endregion
#region Topics
2022-08-10 08:55:44 +00:00
2022-08-23 08:29:29 +00:00
List<MqttTopicFilter> subTopics = new();
2022-08-10 08:55:44 +00:00
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
//{"device": "Device A", "data": {"id": $request_id, "method": "toggle_gpio", "params": {"pin":1}}}
2022-08-23 08:29:29 +00:00
subTopics.Add(new MqttTopicFilterBuilder().WithTopic(_tbRpcTopic).WithExactlyOnceQoS().Build());
2022-08-10 08:55:44 +00:00
//Message: {"id": $request_id, "device": "Device A", "value": "value1"}
2022-08-23 08:29:29 +00:00
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("v1/gateway/attributes/response").WithExactlyOnceQoS().Build());
2022-08-10 08:55:44 +00:00
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
2022-08-23 08:29:29 +00:00
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("v1/gateway/attributese").WithExactlyOnceQoS().Build());
2022-08-10 08:55:44 +00:00
break;
case IoTPlatformType.IoTSharp:
2022-08-23 08:29:29 +00:00
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/rpc/request/+/+").WithExactlyOnceQoS().Build());
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/attributes/update").WithExactlyOnceQoS().Build());
2022-08-10 08:55:44 +00:00
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
2022-08-23 08:29:29 +00:00
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/attributes/response/+").WithExactlyOnceQoS().Build());
2022-08-10 08:55:44 +00:00
break;
case IoTPlatformType.ThingsCloud:
2022-08-23 08:29:29 +00:00
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/response").WithExactlyOnceQoS().Build());
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/get/response").WithExactlyOnceQoS().Build());
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/push").WithExactlyOnceQoS().Build());
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/event/response").WithExactlyOnceQoS().Build());
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/command/send").WithExactlyOnceQoS().Build());
2022-08-10 08:55:44 +00:00
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
}
2022-08-23 08:29:29 +00:00
#endregion
Client.ConnectedAsync += Client_ConnectedAsync;
Client.DisconnectedAsync += Client_DisconnectedAsync;
Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
await Client.SubscribeAsync(subTopics);
await Client.StartAsync(_options);
_logger.LogInformation("MQTT WAITING FOR APPLICATION MESSAGES");
}
catch (Exception ex)
{
_logger.LogError($"StartManagedClientAsync FAILED, {ex}");
}
2022-03-24 13:38:11 +00:00
}
2022-08-23 08:29:29 +00:00
private Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
_logger.LogInformation($"MQTT CONNECTED WITH SERVER ");
return Task.CompletedTask;
}
2022-04-13 09:01:24 +00:00
2022-08-23 08:29:29 +00:00
private Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
_logger.LogError($"MQTT CONNECTING FAILED, {arg.ReasonString}");
return Task.CompletedTask;
}
private Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
2022-03-24 13:38:11 +00:00
{
2022-08-10 08:55:44 +00:00
_logger.LogDebug(
$"ApplicationMessageReceived Topic {e.ApplicationMessage.Topic} QualityOfServiceLevel:{e.ApplicationMessage.QualityOfServiceLevel} Retain:{e.ApplicationMessage.Retain} ");
2022-03-24 13:38:11 +00:00
try
{
2022-08-10 08:55:44 +00:00
if (e.ApplicationMessage.Topic == _tbRpcTopic)
2022-04-13 09:01:24 +00:00
ReceiveTbRpc(e);
2022-08-10 08:55:44 +00:00
else if (e.ApplicationMessage.Topic.StartsWith($"devices/") &&
e.ApplicationMessage.Topic.Contains("/response/"))
2022-03-24 13:38:11 +00:00
{
ReceiveAttributes(e);
}
2022-08-10 08:55:44 +00:00
else if (e.ApplicationMessage.Topic.StartsWith($"devices/") &&
e.ApplicationMessage.Topic.Contains("/rpc/request/"))
2022-03-24 13:38:11 +00:00
{
2022-04-13 09:01:24 +00:00
ReceiveIsRpc(e);
2022-06-06 07:15:19 +00:00
}
else if (e.ApplicationMessage.Topic == "gateway/command/send")
2022-04-20 06:05:44 +00:00
{
ReceiveTcRpc(e);
2021-12-12 06:55:48 +00:00
}
}
catch (Exception ex)
{
2022-08-10 08:55:44 +00:00
_logger.LogError(
$"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}",
ex);
2022-03-24 13:38:11 +00:00
}
2022-08-10 08:55:44 +00:00
2022-03-24 13:38:11 +00:00
return Task.CompletedTask;
}
2022-04-20 06:05:44 +00:00
/// <summary>
/// thingsboard rpc
/// </summary>
/// <param name="e"></param>
2022-04-13 09:01:24 +00:00
private void ReceiveTbRpc(MqttApplicationMessageReceivedEventArgs e)
{
try
{
2022-08-10 08:55:44 +00:00
var tBRpcRequest =
JsonConvert.DeserializeObject<TBRpcRequest>(e.ApplicationMessage.ConvertPayloadToString());
if (tBRpcRequest != null && !string.IsNullOrWhiteSpace(tBRpcRequest.RequestData.Method))
2022-04-13 09:01:24 +00:00
{
2022-08-10 08:55:44 +00:00
OnExcRpc(Client, new RpcRequest()
2022-05-09 15:57:46 +00:00
{
Method = tBRpcRequest.RequestData.Method,
DeviceName = tBRpcRequest.DeviceName,
RequestId = tBRpcRequest.RequestData.RequestId,
Params = tBRpcRequest.RequestData.Params
});
}
2022-04-13 09:01:24 +00:00
}
catch (Exception ex)
{
2022-08-10 08:55:44 +00:00
_logger.LogError(
$"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}",
ex);
2022-04-13 09:01:24 +00:00
}
}
2022-04-20 06:05:44 +00:00
/// <summary>
/// thingscloud rpc
/// </summary>
/// <param name="e"></param>
private void ReceiveTcRpc(MqttApplicationMessageReceivedEventArgs e)
{
try
{
2022-08-10 08:55:44 +00:00
var tCRpcRequest =
JsonConvert.DeserializeObject<TCRpcRequest>(e.ApplicationMessage.ConvertPayloadToString());
if (tCRpcRequest != null)
OnExcRpc.Invoke(Client, new RpcRequest()
{
Method = tCRpcRequest.RequestData.Method,
DeviceName = tCRpcRequest.DeviceName,
RequestId = tCRpcRequest.RequestData.RequestId,
Params = tCRpcRequest.RequestData.Params
});
2022-04-20 06:05:44 +00:00
}
catch (Exception ex)
{
2022-08-10 08:55:44 +00:00
_logger.LogError(
$"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}",
ex);
2022-04-20 06:05:44 +00:00
}
}
2022-08-10 08:55:44 +00:00
2022-04-13 09:01:24 +00:00
private void ReceiveIsRpc(MqttApplicationMessageReceivedEventArgs e)
{
try
{
2022-08-10 08:55:44 +00:00
var tps = e.ApplicationMessage.Topic.Split(new[] { '/' }, StringSplitOptions.RemoveEmptyEntries);
var rpcMethodName = tps[4];
var rpcDeviceName = tps[1];
var rpcRequestId = tps[5];
_logger.LogInformation($"rpcMethodName={rpcMethodName} ");
_logger.LogInformation($"rpcDeviceName={rpcDeviceName} ");
_logger.LogInformation($"rpcRequestId={rpcRequestId} ");
if (!string.IsNullOrEmpty(rpcMethodName) && !string.IsNullOrEmpty(rpcDeviceName) &&
!string.IsNullOrEmpty(rpcRequestId))
2022-04-13 09:01:24 +00:00
{
2022-08-10 08:55:44 +00:00
OnExcRpc(Client, new RpcRequest()
2022-04-13 09:01:24 +00:00
{
2022-08-10 08:55:44 +00:00
Method = rpcMethodName,
DeviceName = rpcDeviceName,
RequestId = rpcRequestId,
Params = JsonConvert.DeserializeObject<Dictionary<string, object>>(e.ApplicationMessage
.ConvertPayloadToString())
2022-04-13 09:01:24 +00:00
});
}
}
catch (Exception ex)
{
2022-08-10 08:55:44 +00:00
_logger.LogError(
$"ReceiveIsRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}",
ex);
2022-04-13 09:01:24 +00:00
}
}
2022-08-10 08:55:44 +00:00
private async Task ResponseTbRpcAsync(TBRpcResponse tBRpcResponse)
2022-04-13 09:01:24 +00:00
{
2022-08-23 08:29:29 +00:00
await Client.EnqueueAsync(new MqttApplicationMessageBuilder()
2022-08-10 08:55:44 +00:00
.WithTopic(_tbRpcTopic)
2022-04-13 09:01:24 +00:00
.WithPayload(JsonConvert.SerializeObject(tBRpcResponse))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
2022-08-10 08:55:44 +00:00
private async Task ResponseTcRpcAsync(TCRpcRequest tCRpcResponse)
2022-04-20 06:05:44 +00:00
{
2022-08-10 08:55:44 +00:00
var topic = $"command/reply/{tCRpcResponse.RequestData.RequestId}";
2022-08-23 08:29:29 +00:00
await Client.EnqueueAsync(new MqttApplicationMessageBuilder()
2022-04-20 06:05:44 +00:00
.WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(tCRpcResponse))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
2022-08-10 08:55:44 +00:00
private async Task ResponseIsRpcAsync(ISRpcResponse rpcResult)
2022-04-13 09:01:24 +00:00
{
2022-08-10 08:55:44 +00:00
//var responseTopic = $"/devices/{deviceid}/rpc/response/{methodName}/{rpcid}";
var topic = $"devices/{rpcResult.DeviceId}/rpc/response/{rpcResult.Method}/{rpcResult.ResponseId}";
2022-08-23 08:29:29 +00:00
await Client.EnqueueAsync(new MqttApplicationMessageBuilder()
2022-04-13 09:01:24 +00:00
.WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(rpcResult))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
2022-03-24 13:38:11 +00:00
private void ReceiveAttributes(MqttApplicationMessageReceivedEventArgs e)
{
2022-08-10 08:55:44 +00:00
var tps = e.ApplicationMessage.Topic.Split(new[] { '/' }, StringSplitOptions.RemoveEmptyEntries);
var rpcMethodName = tps[2];
var rpcDeviceName = tps[1];
var rpcRequestId = tps[4];
_logger.LogInformation($"rpcMethodName={rpcMethodName}");
_logger.LogInformation($"rpcDeviceName={rpcDeviceName}");
_logger.LogInformation($"rpcRequestId={rpcRequestId}");
if (!string.IsNullOrEmpty(rpcMethodName) && !string.IsNullOrEmpty(rpcDeviceName) &&
!string.IsNullOrEmpty(rpcRequestId))
2022-03-24 13:38:11 +00:00
{
if (e.ApplicationMessage.Topic.Contains("/attributes/"))
{
2022-08-10 08:55:44 +00:00
OnReceiveAttributes.Invoke(Client, new ISAttributeResponse()
2022-03-24 13:38:11 +00:00
{
2022-08-10 08:55:44 +00:00
KeyName = rpcMethodName,
DeviceName = rpcDeviceName,
Id = rpcRequestId,
2022-03-24 13:38:11 +00:00
Data = e.ApplicationMessage.ConvertPayloadToString()
});
}
2021-12-12 06:55:48 +00:00
}
2022-03-24 13:38:11 +00:00
}
2021-12-20 15:38:59 +00:00
2022-08-10 08:55:44 +00:00
public Task UploadAttributeAsync(string deviceName, object obj)
2022-03-24 13:38:11 +00:00
{
2022-04-13 09:01:24 +00:00
//Topic: v1/gateway/attributes
//Message: {"Device A":{"attribute1":"value1", "attribute2": 42}, "Device B":{"attribute1":"value1", "attribute2": 42}
try
{
if (Client.IsConnected)
2022-08-23 08:29:29 +00:00
return Client.EnqueueAsync(new MqttApplicationMessageBuilder()
2022-08-10 08:55:44 +00:00
.WithTopic($"devices/{deviceName}/attributes").WithPayload(JsonConvert.SerializeObject(obj))
.Build());
2022-04-13 09:01:24 +00:00
}
catch (Exception ex)
{
2022-08-10 08:55:44 +00:00
_logger.LogError($"Device:{deviceName} UploadAttributeAsync Failed,{ex}");
2022-04-13 09:01:24 +00:00
}
2022-08-10 08:55:44 +00:00
return Task.CompletedTask;
2021-12-12 06:55:48 +00:00
}
2022-08-10 08:55:44 +00:00
public async Task UploadIsTelemetryDataAsync(string deviceName, object obj)
2022-03-24 13:38:11 +00:00
{
2022-08-23 08:29:29 +00:00
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"devices/{deviceName}/telemetry")
2022-08-10 08:55:44 +00:00
.WithPayload(JsonConvert.SerializeObject(obj)).Build());
2022-03-24 13:38:11 +00:00
}
2022-08-10 08:55:44 +00:00
public async Task UploadTcTelemetryDataAsync(string deviceName, object obj)
2022-04-20 06:05:44 +00:00
{
2022-08-10 08:55:44 +00:00
var toSend = new Dictionary<string, object> { { deviceName, obj } };
2022-08-23 08:29:29 +00:00
await Client.EnqueueAsync("gateway/attributes", JsonConvert.SerializeObject(toSend));
2022-04-20 06:05:44 +00:00
}
2022-08-10 08:55:44 +00:00
public async Task UploadHwTelemetryDataAsync(Device device, object obj)
2022-03-24 13:38:11 +00:00
{
2022-08-10 08:55:44 +00:00
var hwTelemetry = new List<HwTelemetry>()
2022-04-13 09:01:24 +00:00
{
2022-08-10 08:55:44 +00:00
new HwTelemetry()
2022-04-13 09:01:24 +00:00
{
2022-08-10 08:55:44 +00:00
DeviceId = device.DeviceConfigs.FirstOrDefault(x => x.DeviceConfigName == "DeviceId")?.Value,
Services = new()
{
new Service()
2022-04-13 09:01:24 +00:00
{
2022-08-10 08:55:44 +00:00
ServiceId = "serviceId",
EventTime = DateTime.Now.ToString("yyyyMMddTHHmmssZ"),
2022-08-10 16:07:02 +00:00
Data = obj
2022-08-10 08:55:44 +00:00
}
}
2022-04-13 09:01:24 +00:00
}
2022-08-10 08:55:44 +00:00
};
var hwTelemetrys = new HwTelemetrys()
{
Devices = hwTelemetry
};
2022-08-23 08:29:29 +00:00
await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/datas",
2022-08-10 08:55:44 +00:00
JsonConvert.SerializeObject(hwTelemetrys));
}
public async Task ResponseRpcAsync(RpcResponse rpcResponse)
{
try
{
2022-08-23 08:29:29 +00:00
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
var tRpcResponse = new TBRpcResponse
{
DeviceName = rpcResponse.DeviceName,
RequestId = rpcResponse.RequestId,
ResponseData = new Dictionary<string, object>
{ { "success", rpcResponse.IsSuccess }, { "description", rpcResponse.Description } }
};
await ResponseTbRpcAsync(tRpcResponse);
break;
case IoTPlatformType.IoTSharp:
await ResponseIsRpcAsync(new ISRpcResponse
{
DeviceId = rpcResponse.DeviceName,
Method = "Method",
ResponseId = rpcResponse.RequestId,
Data = JsonConvert.SerializeObject(new Dictionary<string, object>
2022-08-10 08:55:44 +00:00
{
2022-08-23 08:29:29 +00:00
{ "success", rpcResponse.IsSuccess }, { "description", rpcResponse.Description }
})
});
break;
case IoTPlatformType.ThingsCloud:
//官网API不需要回复的
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
}
2022-04-13 09:01:24 +00:00
}
catch (Exception ex)
{
_logger.LogError($"ResponseRpc Error,{rpcResponse}", ex);
}
2022-03-24 13:38:11 +00:00
}
2022-08-10 08:55:44 +00:00
public async Task RequestAttributes(string deviceName, bool anySide, params string[] args)
2022-03-24 13:38:11 +00:00
{
try
{
string id = Guid.NewGuid().ToString();
2022-08-23 08:29:29 +00:00
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
//{"id": $request_id, "device": "Device A", "client": true, "key": "attribute1"}
Dictionary<string, object> tbRequestData = new Dictionary<string, object>
{
{ "id", id },
{ "device", deviceName },
{ "client", true },
{ "key", args[0] }
};
await Client.EnqueueAsync("v1/gateway/attributes/request",
JsonConvert.SerializeObject(tbRequestData), MqttQualityOfServiceLevel.ExactlyOnce);
break;
case IoTPlatformType.IoTSharp:
string topic = $"devices/{deviceName}/attributes/request/{id}";
Dictionary<string, string> keys = new Dictionary<string, string>();
keys.Add(anySide ? "anySide" : "server", string.Join(",", args));
await Client.SubscribeAsync($"devices/{deviceName}/attributes/response/{id}",
MqttQualityOfServiceLevel.ExactlyOnce);
await Client.EnqueueAsync(topic, JsonConvert.SerializeObject(keys),
MqttQualityOfServiceLevel.ExactlyOnce);
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
}
}
catch (Exception ex)
{
2022-08-10 08:55:44 +00:00
_logger.LogError($"RequestAttributes:{deviceName}", ex);
}
2022-03-24 13:38:11 +00:00
}
2022-08-10 08:55:44 +00:00
private Dictionary<string, List<PayLoad>> _lastTelemetrys = new(0);
/// <summary>
/// 判断是否推送遥测数据
/// </summary>
/// <param name="device">设备</param>
2022-08-10 08:55:44 +00:00
/// <param name="sendModel">遥测</param>
/// <returns></returns>
2022-08-10 08:55:44 +00:00
private bool CanPubTelemetry(Device device, Dictionary<string, List<PayLoad>> sendModel)
2021-12-12 06:55:48 +00:00
{
bool canPub = false;
2021-12-20 15:38:59 +00:00
try
{
//第一次上传
2022-08-10 08:55:44 +00:00
if (!_lastTelemetrys.ContainsKey(device.DeviceName))
canPub = true;
else
2021-12-20 15:38:59 +00:00
{
//变化上传
if (device.CgUpload)
{
//是否超过归档周期
2022-08-10 08:55:44 +00:00
if (sendModel[device.DeviceName][0].TS - _lastTelemetrys[device.DeviceName][0].TS >
device.EnforcePeriod)
canPub = true;
2022-08-29 04:07:50 +00:00
//是否变化 这里不好先用
else
2022-04-20 06:05:44 +00:00
{
2022-08-10 08:55:44 +00:00
if (JsonConvert.SerializeObject(sendModel[device.DeviceName][0].Values) !=
JsonConvert.SerializeObject(_lastTelemetrys[device.DeviceName][0].Values))
canPub = true;
2021-12-20 15:38:59 +00:00
}
}
//非变化上传
else
canPub = true;
}
}
catch (Exception e)
{
canPub = true;
Console.WriteLine(e);
}
2022-08-10 08:55:44 +00:00
if (canPub)
_lastTelemetrys[device.DeviceName] = sendModel[device.DeviceName];
return canPub;
}
2022-08-10 08:55:44 +00:00
public async Task PublishTelemetryAsync(Device device, Dictionary<string, List<PayLoad>> sendModel)
{
try
{
2022-08-10 08:55:44 +00:00
if (CanPubTelemetry(device, sendModel))
{
2022-08-23 08:29:29 +00:00
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
await Client.EnqueueAsync("v1/gateway/telemetry",
JsonConvert.SerializeObject(sendModel));
break;
case IoTPlatformType.IoTSharp:
foreach (var payload in sendModel[device.DeviceName])
{
if (payload.Values != null)
await UploadIsTelemetryDataAsync(device.DeviceName, payload.Values);
}
break;
case IoTPlatformType.ThingsCloud:
foreach (var payload in sendModel[device.DeviceName])
{
if (payload.Values != null)
await UploadTcTelemetryDataAsync(device.DeviceName, payload.Values);
}
break;
case IoTPlatformType.IotDB:
2022-08-10 08:55:44 +00:00
{
2022-08-23 08:29:29 +00:00
foreach (var payload in sendModel[device.DeviceName])
{
if (payload.DeviceStatus != DeviceStatusTypeEnum.Good)
continue;
2022-08-10 08:55:44 +00:00
2022-08-23 08:29:29 +00:00
IotTsData tsData = new IotTsData()
2022-08-10 08:55:44 +00:00
{
2022-08-29 04:07:50 +00:00
device = _systemConfig.GatewayName + device.DeviceName,
2022-08-23 08:29:29 +00:00
timestamp = payload.TS,
measurements = payload.Values?.Keys.ToList(),
values = payload.Values?.Values.ToList()
};
2022-08-29 05:45:46 +00:00
await Client.EnqueueAsync(_systemConfig.GatewayName + device.DeviceName, JsonConvert.SerializeObject(tsData));
2022-08-23 08:29:29 +00:00
}
2022-08-10 08:55:44 +00:00
2022-08-23 08:29:29 +00:00
break;
}
case IoTPlatformType.HuaWei:
foreach (var payload in sendModel[device.DeviceName])
{
if (payload.Values != null)
await UploadHwTelemetryDataAsync(device, payload.Values);
}
2022-08-10 08:55:44 +00:00
2022-08-23 08:29:29 +00:00
break;
2022-08-10 08:55:44 +00:00
2022-08-23 08:29:29 +00:00
case IoTPlatformType.AliCloudIoT:
case IoTPlatformType.TencentIoTHub:
case IoTPlatformType.BaiduIoTCore:
case IoTPlatformType.OneNET:
default:
break;
}
2022-08-10 08:55:44 +00:00
}
//foreach (var payload in sendModel[device.DeviceName])
//{
// if (payload.Values != null)
// foreach (var kv in payload.Values)
// {
// //更新到UAService
// _uaNodeManager?.UpdateNode($"{device.Parent.DeviceName}.{device.DeviceName}.{kv.Key}",
// kv.Value);
// }
//}
2022-08-10 08:55:44 +00:00
}
catch (Exception ex)
{
2022-08-23 08:29:29 +00:00
_logger.LogError($"PublishTelemetryAsync Error:{ex}");
2022-08-10 08:55:44 +00:00
}
}
2022-08-29 05:45:46 +00:00
private readonly DateTime _tsStartDt = new(1970, 1, 1);
private readonly List<string> iotDbOnLineMeasurement = new() { "online" };
private readonly List<object> iotDbOnLine = new() { true };
private readonly List<object> iotDbOffLine = new() { false };
2022-08-10 08:55:44 +00:00
public async Task DeviceConnected(Device device)
{
try
{
2022-08-23 08:29:29 +00:00
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
case IoTPlatformType.IoTSharp:
await Client.EnqueueAsync("v1/gateway/connect",
JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", device.DeviceName } }));
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
case IoTPlatformType.ThingsCloud:
await Client.EnqueueAsync("gateway/connect",
JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", device.DeviceName } }));
break;
2022-08-29 05:45:46 +00:00
case IoTPlatformType.IotDB:
IotTsData onlineData = new IotTsData()
{
device = _systemConfig.GatewayName + device.DeviceName,
timestamp = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds,
measurements = iotDbOnLineMeasurement,
values = iotDbOnLine
};
await Client.EnqueueAsync(_systemConfig.GatewayName + device.DeviceName,
JsonConvert.SerializeObject(onlineData));
break;
2022-08-23 08:29:29 +00:00
case IoTPlatformType.HuaWei:
var deviceOnLine = new HwDeviceOnOffLine()
{
MId = new Random().Next(0, 1024), //命令ID
DeviceStatuses = new List<DeviceStatus>()
2022-06-06 07:15:19 +00:00
{
2022-08-23 08:29:29 +00:00
new DeviceStatus()
2022-06-06 07:15:19 +00:00
{
2022-08-23 08:29:29 +00:00
DeviceId = device.DeviceConfigs
.FirstOrDefault(x => x.DeviceConfigName == "DeviceId")
?.Value,
Status = "ONLINE"
2022-06-06 07:15:19 +00:00
}
2022-08-23 08:29:29 +00:00
}
};
await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/topo/update",
JsonConvert.SerializeObject(deviceOnLine), MqttQualityOfServiceLevel.AtLeastOnce,
retain: false);
break;
}
2022-08-10 08:55:44 +00:00
}
catch (Exception ex)
{
_logger.LogError($"DeviceConnected:{device.DeviceName}", ex);
}
}
2022-06-06 07:15:19 +00:00
2022-08-10 08:55:44 +00:00
public async Task DeviceDisconnected(Device device)
{
try
{
2022-08-23 08:29:29 +00:00
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
case IoTPlatformType.IoTSharp:
await Client.EnqueueAsync("v1/gateway/disconnect",
JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", device.DeviceName } }));
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
case IoTPlatformType.ThingsCloud:
await Client.EnqueueAsync("gateway/disconnect",
JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", device.DeviceName } }));
break;
2022-08-29 05:45:46 +00:00
case IoTPlatformType.IotDB:
IotTsData onlineData = new IotTsData()
{
device = _systemConfig.GatewayName + device.DeviceName,
timestamp = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds,
measurements = iotDbOnLineMeasurement,
values = iotDbOffLine
};
await Client.EnqueueAsync(_systemConfig.GatewayName + device.DeviceName,
JsonConvert.SerializeObject(onlineData));
break;
2022-08-23 08:29:29 +00:00
case IoTPlatformType.HuaWei:
var deviceOnLine = new HwDeviceOnOffLine()
{
MId = new Random().Next(0, 1024), //命令ID
DeviceStatuses = new List<DeviceStatus>()
2022-08-10 08:55:44 +00:00
{
2022-08-23 08:29:29 +00:00
new DeviceStatus()
2022-08-10 08:55:44 +00:00
{
2022-08-23 08:29:29 +00:00
DeviceId = device.DeviceConfigs
.FirstOrDefault(x => x.DeviceConfigName == "DeviceId")
?.Value,
Status = "OFFLINE"
2022-08-10 08:55:44 +00:00
}
2022-08-23 08:29:29 +00:00
}
};
await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/topo/update",
JsonConvert.SerializeObject(deviceOnLine), MqttQualityOfServiceLevel.AtLeastOnce,
retain: false);
break;
}
2021-12-20 15:38:59 +00:00
}
catch (Exception ex)
{
2022-08-10 08:55:44 +00:00
_logger.LogError($"DeviceDisconnected:{device.DeviceName}", ex);
2021-12-20 15:38:59 +00:00
}
2021-12-12 06:55:48 +00:00
}
2022-08-10 08:55:44 +00:00
public async Task DeviceAdded(Device device)
2022-04-13 09:01:24 +00:00
{
try
{
2022-08-23 08:29:29 +00:00
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.HuaWei:
var topic = $"/v1/devices/{_systemConfig.GatewayName}/topo/add";
2022-08-10 08:55:44 +00:00
2022-08-23 08:29:29 +00:00
var addDeviceDto = new HwAddDeviceDto
{
MId = new Random().Next(0, 1024), //命令ID
};
addDeviceDto.DeviceInfos.Add(
new DeviceInfo
2022-08-10 08:55:44 +00:00
{
2022-08-23 08:29:29 +00:00
NodeId = device.DeviceName,
Name = device.DeviceName,
Description = device.Description,
ManufacturerId = "Test_n",
ProductType = "A_n"
}
);
2022-08-10 08:55:44 +00:00
2022-08-23 08:29:29 +00:00
await Client.EnqueueAsync(topic,
JsonConvert.SerializeObject(addDeviceDto));
break;
}
}
catch (Exception ex)
{
2022-08-10 08:55:44 +00:00
_logger.LogError($"DeviceAdded:{device.DeviceName}", ex);
}
2022-04-13 09:01:24 +00:00
}
2022-08-10 08:55:44 +00:00
public async Task DeviceDeleted(Device device)
2022-04-13 09:01:24 +00:00
{
try
{
2022-08-23 08:29:29 +00:00
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.HuaWei:
var topic = $"/v1/devices/{_systemConfig.GatewayName}/topo/delete";
2022-08-10 08:55:44 +00:00
2022-08-23 08:29:29 +00:00
await using (var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType))
{
var deviceId = dc.Set<DeviceConfig>().FirstOrDefault(x =>
x.DeviceId == device.ID && x.DeviceConfigName == "DeviceId")?.Value;
2022-08-10 16:07:02 +00:00
2022-08-23 08:29:29 +00:00
var deleteDeviceDto = new HwDeleteDeviceDto
{
Id = new Random().Next(0, 1024), //命令ID
DeviceId = deviceId,
RequestTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds,
Request = new()
2022-08-10 08:55:44 +00:00
{
2022-08-23 08:29:29 +00:00
ManufacturerId = "Test_n",
ManufacturerName = "Test_n",
ProductType = "A_n"
}
};
2022-08-10 08:55:44 +00:00
2022-08-23 08:29:29 +00:00
await Client.EnqueueAsync(topic,
JsonConvert.SerializeObject(deleteDeviceDto));
}
break;
}
}
catch (Exception ex)
{
2022-08-10 08:55:44 +00:00
_logger.LogError($"DeviceAdded:{device.DeviceName}", ex);
}
2022-04-13 09:01:24 +00:00
}
2021-12-12 06:55:48 +00:00
}
2022-08-10 08:55:44 +00:00
}