using MQTTnet.Client; using MQTTnet.Extensions.ManagedClient; using MQTTnet.Protocol; using PluginInterface; using IoTGateway.Model; using Microsoft.Extensions.Logging; using MQTTnet; using Newtonsoft.Json; using PluginInterface.ThingsBoard; namespace Plugin.PlatformHandler { public class ThingsCloudHandler : IPlatformHandler { public IManagedMqttClient MqttClient { get; } public ILogger Logger { get; } public event EventHandler OnExcRpc; public ThingsCloudHandler(IManagedMqttClient mqttClient, ILogger logger, EventHandler onExcRpc) { MqttClient = mqttClient; Logger = logger; OnExcRpc = onExcRpc; } public async Task ClientConnected() { await MqttClient.SubscribeAsync("gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce); await MqttClient.SubscribeAsync("gateway/attributes/get/response", MqttQualityOfServiceLevel.ExactlyOnce); await MqttClient.SubscribeAsync("gateway/attributes/push", MqttQualityOfServiceLevel.ExactlyOnce); await MqttClient.SubscribeAsync("gateway/event/response", MqttQualityOfServiceLevel.ExactlyOnce); await MqttClient.SubscribeAsync("gateway/command/send", MqttQualityOfServiceLevel.ExactlyOnce); } public void ReceiveRpc(MqttApplicationMessageReceivedEventArgs e) { try { var tCRpcRequest = JsonConvert.DeserializeObject(e.ApplicationMessage.ConvertPayloadToString()); if (tCRpcRequest != null) OnExcRpc.Invoke(MqttClient, new RpcRequest() { Method = tCRpcRequest.RequestData.Method, DeviceName = tCRpcRequest.DeviceName, RequestId = tCRpcRequest.RequestData.RequestId, Params = tCRpcRequest.RequestData.Params }); } catch (Exception ex) { Logger.LogError(ex, $"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}"); } } public Task ResponseRpcAsync(RpcResponse rpcResponse) { return Task.CompletedTask; } public async Task PublishTelemetryAsync(string deviceName, Device device, Dictionary> sendModel) { foreach (var payload in sendModel[deviceName]) { if (payload.Values != null) { var toSend = new Dictionary { { deviceName, payload.Values } }; await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"gateway/attributes") .WithPayload(JsonConvert.SerializeObject(toSend)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); } } } public Task UploadAttributeAsync(string deviceName, object obj) { return Task.CompletedTask; } public Task RequestAttributes(string deviceName, bool anySide, params string[] args) { return Task.CompletedTask; } public async Task DeviceConnected(string deviceName, Device device) { await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("gateway/connect") .WithPayload(JsonConvert.SerializeObject(new Dictionary { { "device", deviceName } })) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); } public async Task DeviceDisconnected(string deviceName, Device device) { await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"gateway/disconnect") .WithPayload(JsonConvert.SerializeObject(new Dictionary { { "device", deviceName } })) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); } public Task DeviceAdded(Device device) { return Task.CompletedTask; } public Task DeviceDeleted(Device device) { return Task.CompletedTask; } } }