iotgateway/Plugins/Plugin/MessageService.cs
2025-01-12 20:08:29 +08:00

263 lines
8.8 KiB
C#

using IoTGateway.Model;
using Microsoft.Extensions.Logging;
using MQTTnet.Extensions.ManagedClient;
using PluginInterface.IoTSharp;
using PluginInterface;
using IoTGateway.DataAccess;
using MQTTnet.Client;
using MQTTnet.Formatter;
using MQTTnet;
using MQTTnet.Protocol;
using Plugin.PlatformHandler;
using Newtonsoft.Json;
using Quartz.Logging;
namespace Plugin
{
public class MessageService
{
private readonly ILogger<MessageService> _logger;
private IPlatformHandler _platformHandler;
private SystemConfig _systemConfig;
private ManagedMqttClientOptions _options;
public bool IsConnected => (Client.IsConnected);
private IManagedMqttClient? Client { get; set; }
public event EventHandler<RpcRequest> OnExcRpc;
public event EventHandler<ISAttributeResponse> OnReceiveAttributes;
private readonly string _tbRpcTopic = "v1/gateway/rpc";
public MessageService(ILogger<MessageService> logger)
{
_logger = logger;
StartClientAsync().Wait();
}
public async Task StartClientAsync()
{
try
{
if (Client != null)
{
Client.Dispose();
}
Client = new MqttFactory().CreateManagedMqttClient();
await using var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType);
_systemConfig = dc.Set<SystemConfig>().First();
#region ClientOptions
_options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithMaxPendingMessages(100000)
.WithClientOptions(new MqttClientOptionsBuilder()
.WithClientId(string.IsNullOrEmpty(_systemConfig.ClientId)
? Guid.NewGuid().ToString()
: _systemConfig.ClientId)
.WithTcpServer(_systemConfig.MqttIp, _systemConfig.MqttPort)
.WithCredentials(_systemConfig.MqttUName, _systemConfig.MqttUPwd)
.WithTimeout(TimeSpan.FromSeconds(30))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
.WithProtocolVersion(MqttProtocolVersion.V311)
.WithCleanSession(true)
.Build())
.Build();
#endregion
Client.ConnectedAsync += Client_ConnectedAsync;
Client.DisconnectedAsync += Client_DisconnectedAsync;
Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
await Client.StartAsync(_options);
// 使用工厂模式创建对应平台的处理器
_platformHandler = PlatformHandlerFactory.CreateHandler(_systemConfig.IoTPlatformType, Client, _logger, OnExcRpc);
_logger.LogInformation("MQTT WAITING FOR APPLICATION MESSAGES");
}
catch (Exception ex)
{
_logger.LogError(ex, $"StartManagedClientAsync FAILED ");
}
}
public async Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
_logger.LogInformation($"MQTT CONNECTED WITH SERVER ");
#region Topics
try
{
await _platformHandler.ClientConnected();
}
catch (Exception ex)
{
_logger.LogError(ex, "MQTT Subscribe FAILED");
}
#endregion
}
public async Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
try
{
_logger.LogError($"MQTT DISCONNECTED WITH SERVER ");
await Task.CompletedTask;
}
catch (Exception ex)
{
_logger.LogError(ex, "MQTT CONNECTING FAILED");
}
}
public Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
{
_logger.LogDebug(
$"ApplicationMessageReceived Topic {e.ApplicationMessage.Topic} QualityOfServiceLevel:{e.ApplicationMessage.QualityOfServiceLevel} Retain:{e.ApplicationMessage.Retain} ");
try
{
_platformHandler.ReceiveRpc(e);
}
catch (Exception ex)
{
_logger.LogError(
ex, $"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
}
return Task.CompletedTask;
}
public async Task PublishTelemetryAsync(string deviceName, Device device,
Dictionary<string, List<PayLoad>> sendModel)
{
if (CanPubTelemetry(deviceName, device, sendModel))
{
await _platformHandler.PublishTelemetryAsync(deviceName, device, sendModel);
}
}
public async Task UploadAttributeAsync(string deviceName, object obj)
{
await _platformHandler.UploadAttributeAsync(deviceName, obj);
}
public async Task DeviceConnected(string deviceName, Device device)
{
try
{
await _platformHandler.DeviceConnected(deviceName, device);
}
catch (Exception ex)
{
_logger.LogError(ex, $"DeviceConnected:{deviceName}");
}
}
public async Task DeviceDisconnected(string deviceName, Device device)
{
try
{
await _platformHandler.DeviceDisconnected(deviceName, device);
}
catch (Exception ex)
{
_logger.LogError(ex, $"DeviceDisconnected:{deviceName}");
}
}
public async Task ResponseRpcAsync(RpcResponse rpcResponse)
{
try
{
await _platformHandler.ResponseRpcAsync(rpcResponse);
}
catch (Exception ex)
{
_logger.LogError(ex, $"ResponseRpc Error,{rpcResponse}");
}
}
public async Task RequestAttributes(string deviceName, bool anySide, params string[] args)
{
await _platformHandler.RequestAttributes(deviceName, anySide, args);
}
public async Task DeviceAdded(Device device)
{
try
{
await _platformHandler.DeviceAdded(device);
}
catch (Exception ex)
{
_logger.LogError(ex, $"DeviceAdded:{device.DeviceName}");
}
}
public async Task DeviceDeleted(Device device)
{
try
{
await _platformHandler.DeviceDeleted(device);
}
catch (Exception ex)
{
_logger.LogError(ex, $"DeviceAdded:{device.DeviceName}");
}
}
private Dictionary<string, List<PayLoad>> _lastTelemetrys = new(0);
/// <summary>
/// 判断是否推送遥测数据
/// </summary>
/// <param name="deviceName"></param>
/// <param name="device">设备</param>
/// <param name="sendModel">遥测</param>
/// <returns></returns>
private bool CanPubTelemetry(string deviceName, Device device, Dictionary<string, List<PayLoad>> sendModel)
{
bool canPub = false;
try
{//第一次上传
if (!_lastTelemetrys.ContainsKey(deviceName))
canPub = true;
else
{
//变化上传
if (device.CgUpload)
{
//是否超过归档周期
if (sendModel[deviceName][0].TS - _lastTelemetrys[deviceName][0].TS >
device.EnforcePeriod)
canPub = true;
//是否变化 这里不好先用
else
{
if (JsonConvert.SerializeObject(sendModel[deviceName][0].Values) !=
JsonConvert.SerializeObject(_lastTelemetrys[deviceName][0].Values))
canPub = true;
}
}
//非变化上传
else
canPub = true;
}
}
catch (Exception e)
{
canPub = true;
Console.WriteLine(e);
}
if (canPub)
_lastTelemetrys[deviceName] = sendModel[deviceName];
return canPub;
}
}
}