iotgateway/Plugins/Plugin/MyMqttClient.cs

109 lines
3.8 KiB
C#
Raw Normal View History

2021-12-12 06:55:48 +00:00
using Microsoft.Extensions.Configuration;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using System;
using Newtonsoft.Json;
using WalkingTec.Mvvm.Core;
using System.Collections.Generic;
using IoTGateway.DataAccess;
using IoTGateway.Model;
using System.Linq;
using PluginInterface;
using Microsoft.Extensions.DependencyInjection;
namespace Plugin
{
public class MyMqttClient//: IDependency
{
private static IMqttClient _mqttClient = null;
private static MqttClientOptionsBuilder builder = null;
2021-12-20 15:38:59 +00:00
private SystemConfig systemConfig = null;
2021-12-12 06:55:48 +00:00
public MyMqttClient()
{
InitClient();
}
public void InitClient()
{
try
{
if (_mqttClient != null)
_mqttClient.Dispose();
using (var DC = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DBType))
{
2021-12-20 15:38:59 +00:00
systemConfig = DC.Set<SystemConfig>().FirstOrDefault();
if (systemConfig == null)
2021-12-12 06:55:48 +00:00
Console.WriteLine("配置信息错误,无法启动");
else
{
_mqttClient = new MqttFactory().CreateMqttClient();
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(x => OnConnected());
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(x => OnDisconnected());
_mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnReceived);
builder = new MqttClientOptionsBuilder()
.WithCommunicationTimeout(TimeSpan.FromSeconds(60))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(20))
2021-12-20 15:38:59 +00:00
.WithTcpServer(systemConfig.MqttIp, systemConfig.MqttPort)
.WithClientId(systemConfig.MqttUName + Guid.NewGuid().ToString())
.WithCredentials(systemConfig.MqttUName, systemConfig.MqttUPwd);
2021-12-12 06:55:48 +00:00
_mqttClient.ConnectAsync(builder.Build());
}
}
}
catch (Exception ex)
{
}
2021-12-20 15:38:59 +00:00
2021-12-12 06:55:48 +00:00
}
2021-12-20 15:38:59 +00:00
public void Publish(Device device, Dictionary<string, List<PayLoad>> SendModel)
2021-12-12 06:55:48 +00:00
{
2021-12-20 15:38:59 +00:00
try
{
string TopicBase = "v1/gateway/telemetry";
if (!systemConfig.Disperse)
_mqttClient.PublishAsync(TopicBase, JsonConvert.SerializeObject(SendModel));
else
{
foreach (var payload in SendModel[device.DeviceName])
{
foreach (var kv in payload.Values)
{
_mqttClient.PublishAsync($"{TopicBase}/{device.DeviceName}/{kv.Key}", kv.Value.ToString());
}
}
}
}
catch (Exception ex)
{
}
2021-12-12 06:55:48 +00:00
}
private void OnReceived(MqttApplicationMessageReceivedEventArgs obj)
{
var topic = obj.ApplicationMessage.Topic;
var msg = System.Text.Encoding.UTF8.GetString(obj.ApplicationMessage.Payload);
Console.WriteLine($"{topic}: {msg}");
}
private void OnDisconnected()
{
Console.WriteLine("Mqtt连接断开");
_mqttClient.ConnectAsync(builder.Build());
}
private void OnConnected()
{
Console.WriteLine("Mqtt连接正常");
}
}
}