feat: 北向使用工厂模式

This commit is contained in:
iioter 2025-01-11 23:23:33 +08:00
parent b5d7508216
commit c4d3bcd5f6
21 changed files with 772 additions and 5582 deletions

View File

@ -26,8 +26,8 @@ namespace IoTGateway.ViewModel.BasicData.DeviceVMs
= "复制失败,找不到设备";
else
{
var myMqttClient = Wtm.ServiceProvider.GetService(typeof(MyMqttClient)) as MyMqttClient;
myMqttClient.RequestAttributes(device.DeviceName, true, device.DeviceConfigs.Where(x => x.DataSide == DataSide.AnySide).Select(x => x.DeviceConfigName).ToArray());
var messageService = Wtm.ServiceProvider.GetService(typeof(MessageService)) as MessageService;
messageService.RequestAttributes(device.DeviceName, true, device.DeviceConfigs.Where(x => x.DataSide == DataSide.AnySide).Select(x => x.DeviceConfigName).ToArray());
}
DC.SaveChanges();
transaction.Commit();

View File

@ -39,8 +39,8 @@ namespace IoTGateway.ViewModel.BasicData.DeviceVMs
var device = DC.Set<Device>().Where(x => x.ID == Entity.ID).Include(x => x.Parent).Include(x => x.Driver).Include(x => x.DeviceVariables).SingleOrDefault();
deviceService.CreateDeviceThread(device);
var myMqttClient = Wtm.ServiceProvider.GetService(typeof(MyMqttClient)) as MyMqttClient;
myMqttClient.DeviceAdded(device);
var messageService = Wtm.ServiceProvider.GetService(typeof(MessageService)) as MessageService;
messageService.DeviceAdded(device);
}
}
catch (Exception ex)
@ -62,8 +62,8 @@ namespace IoTGateway.ViewModel.BasicData.DeviceVMs
List<Guid> Ids = new List<Guid>() { Guid.Parse(FC["id"].ToString()) };
var pluginManager = Wtm.ServiceProvider.GetService(typeof(DeviceService)) as DeviceService;
var myMqttClient = Wtm.ServiceProvider.GetService(typeof(MyMqttClient)) as MyMqttClient;
myMqttClient.DeviceDeleted(Entity);
var messageService = Wtm.ServiceProvider.GetService(typeof(MessageService)) as MessageService;
messageService.DeviceDeleted(Entity);
var ret = DeleteDevices.doDelete(pluginManager, DC, Ids);
if (!ret.IsSuccess)
{

View File

@ -97,8 +97,8 @@ namespace IoTGateway.ViewModel.BasicData
transaction.Commit();
var myMqttClient = Wtm.ServiceProvider.GetService(typeof(MyMqttClient)) as MyMqttClient;
myMqttClient.StartClientAsync().Wait();
var messageService = Wtm.ServiceProvider.GetService(typeof(MessageService)) as MessageService;
messageService.StartClientAsync().Wait();
//重新启动采集
deviceService.CreateDeviceThreads();

View File

@ -29,8 +29,8 @@ namespace IoTGateway.ViewModel.Config.SystemConfigVMs
public override void DoEdit(bool updateAllFields = false)
{
base.DoEdit(updateAllFields);
var myMqttClient = Wtm.ServiceProvider.GetService(typeof(MyMqttClient)) as MyMqttClient;
myMqttClient.StartClientAsync().Wait();
var messageService = Wtm.ServiceProvider.GetService(typeof(MessageService)) as MessageService;
messageService.StartClientAsync().Wait();
}
public override void DoDelete()

View File

@ -74,8 +74,7 @@ namespace IoTGateway
services.AddHostedService<IoTBackgroundService>();
services.AddSingleton<DeviceService>();
services.AddSingleton<DriverService>();
//services.AddSingleton<UAService>();
services.AddSingleton<MyMqttClient>();
services.AddSingleton<MessageService>();
services.AddSingleton<ModbusSlaveService>();
}

View File

@ -17,19 +17,18 @@ namespace Plugin
public DriverService DrvierManager;
public List<DeviceThread> DeviceThreads = new List<DeviceThread>();
private readonly MyMqttClient _myMqttClient;
private readonly UAService _uAService;
private readonly MessageService _messageService;
private readonly MqttServer _mqttServer;
private readonly string _connnectSetting = IoTBackgroundService.connnectSetting;
private readonly DBTypeEnum _dbType = IoTBackgroundService.DbType;
//UAService? uAService,
public DeviceService(IConfiguration configRoot, DriverService drvierManager, MyMqttClient myMqttClient,
public DeviceService(IConfiguration configRoot, DriverService drvierManager, MessageService messageService,
MqttServer mqttServer, ILogger<DeviceService> logger)
{
_logger = logger;
DrvierManager = drvierManager;
_myMqttClient = myMqttClient;
_messageService = messageService;
//_uAService = uAService;
_mqttServer = mqttServer ?? throw new ArgumentNullException(nameof(mqttServer));
@ -152,7 +151,7 @@ namespace Plugin
if (deviceObj != null && systemManage != null)
{
var deviceThread = new DeviceThread(device, deviceObj, systemManage.GatewayName,
_myMqttClient,
_messageService,
_mqttServer, _logger);
DeviceThreads.Add(deviceThread);
}

View File

@ -18,7 +18,7 @@ namespace Plugin
public readonly Device Device;
public readonly IDriver Driver;
private readonly string _projectId;
private readonly MyMqttClient? _myMqttClient;
private readonly MessageService _messageService;
private Interpreter? _interpreter;
internal List<MethodInfo>? Methods { get; set; }
private Task? _task;
@ -26,11 +26,11 @@ namespace Plugin
private readonly CancellationTokenSource _tokenSource = new CancellationTokenSource();
private ManualResetEvent resetEvent = new(true);
public DeviceThread(Device device, IDriver driver, string projectId, MyMqttClient myMqttClient,
public DeviceThread(Device device, IDriver driver, string projectId, MessageService messageService,
MqttServer mqttServer, ILogger logger)
{
_myMqttClient = myMqttClient;
_myMqttClient.OnExcRpc += MyMqttClient_OnExcRpc;
_messageService = messageService;
_messageService.OnExcRpc += MyMqttClient_OnExcRpc;
Device = device;
Driver = driver;
_projectId = projectId;
@ -64,7 +64,7 @@ namespace Plugin
//上传客户端属性
foreach (var deviceVariables in Device.DeviceVariables!.GroupBy(x => x.Alias))
{
_myMqttClient.UploadAttributeAsync(string.IsNullOrWhiteSpace(deviceVariables.Key)
_messageService.UploadAttributeAsync(string.IsNullOrWhiteSpace(deviceVariables.Key)
? Device.DeviceName
: deviceVariables.Key,
Device.DeviceConfigs.Where(x => x.DataSide == DataSide.ClientSide || x.DataSide == DataSide.AnySide)
@ -126,13 +126,13 @@ namespace Plugin
payLoad.TS = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds;
payLoad.DeviceStatus = DeviceStatusTypeEnum.Good;
sendModel[deviceName] = new List<PayLoad> { payLoad };
_myMqttClient
_messageService
.PublishTelemetryAsync(deviceName,
Device, sendModel).Wait();
}
if (deviceVariables.Any(x => x.StatusType == VaribaleStatusTypeEnum.Bad))
_myMqttClient?.DeviceDisconnected(deviceName, Device);
_messageService?.DeviceDisconnected(deviceName, Device);
}
}
@ -153,7 +153,7 @@ namespace Plugin
? Device.DeviceName
: deviceVariables.Key;
_myMqttClient?.DeviceDisconnected(deviceName, Device);
_messageService?.DeviceDisconnected(deviceName, Device);
}
if (Driver.Connect())
@ -164,7 +164,7 @@ namespace Plugin
? Device.DeviceName
: deviceVariables.Key;
_myMqttClient?.DeviceConnected(deviceName, Device);
_messageService?.DeviceConnected(deviceName, Device);
}
}
}
@ -348,7 +348,7 @@ namespace Plugin
}
//反馈RPC
_myMqttClient.ResponseRpcAsync(rpcResponse).Wait();
_messageService.ResponseRpcAsync(rpcResponse).Wait();
//纪录入库
rpcLog.IsSuccess = rpcResponse.IsSuccess;
rpcLog.Description = rpcResponse.Description;
@ -371,12 +371,12 @@ namespace Plugin
string deviceName = string.IsNullOrWhiteSpace(deviceVariables.Key)
? Device.DeviceName
: deviceVariables.Key;
_myMqttClient?.DeviceDisconnected(deviceName, Device);
_messageService?.DeviceDisconnected(deviceName, Device);
}
}
if (_task == null) return;
if (_myMqttClient != null) _myMqttClient.OnExcRpc -= MyMqttClient_OnExcRpc;
if (_messageService != null) _messageService.OnExcRpc -= MyMqttClient_OnExcRpc;
_tokenSource.Cancel();
Driver.Close();
}

View File

@ -0,0 +1,294 @@
using IoTGateway.Model;
using Microsoft.Extensions.Logging;
using MQTTnet.Extensions.ManagedClient;
using PluginInterface.IoTSharp;
using PluginInterface;
using IoTGateway.DataAccess;
using MQTTnet.Client;
using MQTTnet.Formatter;
using MQTTnet;
using MQTTnet.Protocol;
using Plugin.PlatformHandler;
using Newtonsoft.Json;
using Quartz.Logging;
namespace Plugin
{
public class MessageService
{
private readonly ILogger<MessageService> _logger;
private IPlatformHandler _platformHandler;
private SystemConfig _systemConfig;
private ManagedMqttClientOptions _options;
public bool IsConnected => (Client.IsConnected);
private IManagedMqttClient? Client { get; set; }
public event EventHandler<RpcRequest> OnExcRpc;
public event EventHandler<ISAttributeResponse> OnReceiveAttributes;
private readonly string _tbRpcTopic = "v1/gateway/rpc";
public MessageService(ILogger<MessageService> logger)
{
_logger = logger;
StartClientAsync().Wait();
}
public async Task StartClientAsync()
{
try
{
if (Client != null)
{
Client.Dispose();
}
Client = new MqttFactory().CreateManagedMqttClient();
await using var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType);
_systemConfig = dc.Set<SystemConfig>().First();
#region ClientOptions
_options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithMaxPendingMessages(100000)
.WithClientOptions(new MqttClientOptionsBuilder()
.WithClientId(string.IsNullOrEmpty(_systemConfig.ClientId)
? Guid.NewGuid().ToString()
: _systemConfig.ClientId)
.WithTcpServer(_systemConfig.MqttIp, _systemConfig.MqttPort)
.WithCredentials(_systemConfig.MqttUName, _systemConfig.MqttUPwd)
.WithTimeout(TimeSpan.FromSeconds(30))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
.WithProtocolVersion(MqttProtocolVersion.V311)
.WithCleanSession(true)
.Build())
.Build();
#endregion
Client.ConnectedAsync += Client_ConnectedAsync;
Client.DisconnectedAsync += Client_DisconnectedAsync;
Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
await Client.StartAsync(_options);
// 使用工厂模式创建对应平台的处理器
_platformHandler = PlatformHandlerFactory.CreateHandler(_systemConfig.IoTPlatformType, Client, _logger, OnExcRpc);
_logger.LogInformation("MQTT WAITING FOR APPLICATION MESSAGES");
}
catch (Exception ex)
{
_logger.LogError(ex, $"StartManagedClientAsync FAILED ");
}
}
public async Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
_logger.LogInformation($"MQTT CONNECTED WITH SERVER ");
#region Topics
try
{
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
//{"device": "Device A", "data": {"id": $request_id, "method": "toggle_gpio", "params": {"pin":1}}}
await Client.SubscribeAsync(_tbRpcTopic, MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"id": $request_id, "device": "Device A", "value": "value1"}
await Client.SubscribeAsync("v1/gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
await Client.SubscribeAsync("v1/gateway/attributes", MqttQualityOfServiceLevel.ExactlyOnce);
break;
case IoTPlatformType.IoTSharp:
case IoTPlatformType.IoTGateway:
await Client.SubscribeAsync("devices/+/rpc/request/+/+", MqttQualityOfServiceLevel.ExactlyOnce);
await Client.SubscribeAsync("devices/+/attributes/update", MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
await Client.SubscribeAsync("devices/+/attributes/response/+", MqttQualityOfServiceLevel.ExactlyOnce);
break;
case IoTPlatformType.ThingsCloud:
await Client.SubscribeAsync("gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce);
await Client.SubscribeAsync("gateway/attributes/get/response", MqttQualityOfServiceLevel.ExactlyOnce);
await Client.SubscribeAsync("gateway/attributes/push", MqttQualityOfServiceLevel.ExactlyOnce);
await Client.SubscribeAsync("gateway/event/response", MqttQualityOfServiceLevel.ExactlyOnce);
await Client.SubscribeAsync("gateway/command/send", MqttQualityOfServiceLevel.ExactlyOnce);
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "MQTT Subscribe FAILED");
}
#endregion
}
public async Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
try
{
_logger.LogError($"MQTT DISCONNECTED WITH SERVER ");
await Task.CompletedTask;
}
catch (Exception ex)
{
_logger.LogError(ex, "MQTT CONNECTING FAILED");
}
}
public Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
{
_logger.LogDebug(
$"ApplicationMessageReceived Topic {e.ApplicationMessage.Topic} QualityOfServiceLevel:{e.ApplicationMessage.QualityOfServiceLevel} Retain:{e.ApplicationMessage.Retain} ");
try
{
_platformHandler.ReceiveRpc(e);
}
catch (Exception ex)
{
_logger.LogError(
ex, $"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
}
return Task.CompletedTask;
}
public async Task PublishTelemetryAsync(string deviceName, Device device,
Dictionary<string, List<PayLoad>> sendModel)
{
if (CanPubTelemetry(deviceName, device, sendModel))
{
await _platformHandler.PublishTelemetryAsync(deviceName, device, sendModel);
}
}
public async Task UploadAttributeAsync(string deviceName, object obj)
{
await _platformHandler.UploadAttributeAsync(deviceName, obj);
}
public async Task DeviceConnected(string deviceName, Device device)
{
try
{
await _platformHandler.DeviceConnected(deviceName, device);
}
catch (Exception ex)
{
_logger.LogError(ex, $"DeviceConnected:{deviceName}");
}
}
public async Task DeviceDisconnected(string deviceName, Device device)
{
try
{
await _platformHandler.DeviceDisconnected(deviceName, device);
}
catch (Exception ex)
{
_logger.LogError(ex, $"DeviceDisconnected:{deviceName}");
}
}
public async Task ResponseRpcAsync(RpcResponse rpcResponse)
{
try
{
await _platformHandler.ResponseRpcAsync(rpcResponse);
}
catch (Exception ex)
{
_logger.LogError(ex, $"ResponseRpc Error,{rpcResponse}");
}
}
public async Task RequestAttributes(string deviceName, bool anySide, params string[] args)
{
await _platformHandler.RequestAttributes(deviceName, anySide, args);
}
public async Task DeviceAdded(Device device)
{
try
{
await _platformHandler.DeviceAdded(device);
}
catch (Exception ex)
{
_logger.LogError(ex, $"DeviceAdded:{device.DeviceName}");
}
}
public async Task DeviceDeleted(Device device)
{
try
{
await _platformHandler.DeviceDeleted(device);
}
catch (Exception ex)
{
_logger.LogError(ex, $"DeviceAdded:{device.DeviceName}");
}
}
private Dictionary<string, List<PayLoad>> _lastTelemetrys = new(0);
/// <summary>
/// 判断是否推送遥测数据
/// </summary>
/// <param name="deviceName"></param>
/// <param name="device">设备</param>
/// <param name="sendModel">遥测</param>
/// <returns></returns>
private bool CanPubTelemetry(string deviceName, Device device, Dictionary<string, List<PayLoad>> sendModel)
{
bool canPub = false;
try
{//第一次上传
if (!_lastTelemetrys.ContainsKey(deviceName))
canPub = true;
else
{
//变化上传
if (device.CgUpload)
{
//是否超过归档周期
if (sendModel[deviceName][0].TS - _lastTelemetrys[deviceName][0].TS >
device.EnforcePeriod)
canPub = true;
//是否变化 这里不好先用
else
{
if (JsonConvert.SerializeObject(sendModel[deviceName][0].Values) !=
JsonConvert.SerializeObject(_lastTelemetrys[deviceName][0].Values))
canPub = true;
}
}
//非变化上传
else
canPub = true;
}
}
catch (Exception e)
{
canPub = true;
Console.WriteLine(e);
}
if (canPub)
_lastTelemetrys[deviceName] = sendModel[deviceName];
return canPub;
}
}
}

View File

@ -1,774 +0,0 @@
using MQTTnet;
using MQTTnet.Client;
using Newtonsoft.Json;
using PluginInterface;
using IoTGateway.Model;
using MQTTnet.Protocol;
using MQTTnet.Formatter;
using IoTGateway.DataAccess;
using PluginInterface.IoTSharp;
using PluginInterface.HuaWeiRoma;
using PluginInterface.ThingsBoard;
using Microsoft.Extensions.Logging;
using MQTTnet.Extensions.ManagedClient;
namespace Plugin
{
public class MyMqttClient
{
private readonly ILogger<MyMqttClient> _logger;
//private readonly ReferenceNodeManager? _uaNodeManager;
private SystemConfig _systemConfig;
private ManagedMqttClientOptions _options;
public bool IsConnected => (Client.IsConnected);
private IManagedMqttClient? Client { get; set; }
public event EventHandler<RpcRequest> OnExcRpc;
public event EventHandler<ISAttributeResponse> OnReceiveAttributes;
private readonly string _tbRpcTopic = "v1/gateway/rpc";
//UAService uaService,
public MyMqttClient(ILogger<MyMqttClient> logger)
{
_logger = logger;
//_uaNodeManager = uaService.server.m_server.nodeManagers[0] as ReferenceNodeManager;
StartClientAsync().Wait();
}
public async Task StartClientAsync()
{
try
{
if (Client != null)
{
Client.Dispose();
}
Client = new MqttFactory().CreateManagedMqttClient();
await using var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType);
_systemConfig = dc.Set<SystemConfig>().First();
#region ClientOptions
_options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithMaxPendingMessages(100000)
.WithClientOptions(new MqttClientOptionsBuilder()
.WithClientId(string.IsNullOrEmpty(_systemConfig.ClientId)
? Guid.NewGuid().ToString()
: _systemConfig.ClientId)
.WithTcpServer(_systemConfig.MqttIp, _systemConfig.MqttPort)
.WithCredentials(_systemConfig.MqttUName, _systemConfig.MqttUPwd)
.WithTimeout(TimeSpan.FromSeconds(30))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
.WithProtocolVersion(MqttProtocolVersion.V311)
.WithCleanSession(true)
.Build())
.Build();
#endregion
Client.ConnectedAsync += Client_ConnectedAsync;
Client.DisconnectedAsync += Client_DisconnectedAsync;
Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
await Client.StartAsync(_options);
_logger.LogInformation("MQTT WAITING FOR APPLICATION MESSAGES");
}
catch (Exception ex)
{
_logger.LogError(ex, $"StartManagedClientAsync FAILED ");
}
}
private async Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
_logger.LogInformation($"MQTT CONNECTED WITH SERVER ");
#region Topics
try
{
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
//{"device": "Device A", "data": {"id": $request_id, "method": "toggle_gpio", "params": {"pin":1}}}
await Client.SubscribeAsync(_tbRpcTopic, MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"id": $request_id, "device": "Device A", "value": "value1"}
await Client.SubscribeAsync("v1/gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
await Client.SubscribeAsync("v1/gateway/attributes", MqttQualityOfServiceLevel.ExactlyOnce);
break;
case IoTPlatformType.IoTSharp:
case IoTPlatformType.IoTGateway:
await Client.SubscribeAsync("devices/+/rpc/request/+/+", MqttQualityOfServiceLevel.ExactlyOnce);
await Client.SubscribeAsync("devices/+/attributes/update", MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
await Client.SubscribeAsync("devices/+/attributes/response/+", MqttQualityOfServiceLevel.ExactlyOnce);
break;
case IoTPlatformType.ThingsCloud:
await Client.SubscribeAsync("gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce);
await Client.SubscribeAsync("gateway/attributes/get/response", MqttQualityOfServiceLevel.ExactlyOnce);
await Client.SubscribeAsync("gateway/attributes/push", MqttQualityOfServiceLevel.ExactlyOnce);
await Client.SubscribeAsync("gateway/event/response", MqttQualityOfServiceLevel.ExactlyOnce);
await Client.SubscribeAsync("gateway/command/send", MqttQualityOfServiceLevel.ExactlyOnce);
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "MQTT Subscribe FAILED");
}
#endregion
}
private async Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
try
{
_logger.LogError($"MQTT DISCONNECTED WITH SERVER ");
//await Client.ConnectAsync(_options);
}
catch (Exception ex)
{
_logger.LogError(ex, "MQTT CONNECTING FAILED");
}
}
private Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
{
_logger.LogDebug(
$"ApplicationMessageReceived Topic {e.ApplicationMessage.Topic} QualityOfServiceLevel:{e.ApplicationMessage.QualityOfServiceLevel} Retain:{e.ApplicationMessage.Retain} ");
try
{
if (e.ApplicationMessage.Topic == _tbRpcTopic)
ReceiveTbRpc(e);
else if (e.ApplicationMessage.Topic.StartsWith($"devices/") &&
e.ApplicationMessage.Topic.Contains("/response/"))
{
ReceiveAttributes(e);
}
else if (e.ApplicationMessage.Topic.StartsWith($"devices/") &&
e.ApplicationMessage.Topic.Contains("/rpc/request/"))
{
ReceiveIsRpc(e);
}
else if (e.ApplicationMessage.Topic == "gateway/command/send")
{
ReceiveTcRpc(e);
}
}
catch (Exception ex)
{
_logger.LogError(
ex, $"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
}
return Task.CompletedTask;
}
/// <summary>
/// thingsboard rpc
/// </summary>
/// <param name="e"></param>
private void ReceiveTbRpc(MqttApplicationMessageReceivedEventArgs e)
{
try
{
var tBRpcRequest =
JsonConvert.DeserializeObject<TBRpcRequest>(e.ApplicationMessage.ConvertPayloadToString());
if (tBRpcRequest != null && !string.IsNullOrWhiteSpace(tBRpcRequest.RequestData.Method))
{
OnExcRpc(Client, new RpcRequest()
{
Method = tBRpcRequest.RequestData.Method,
DeviceName = tBRpcRequest.DeviceName,
RequestId = tBRpcRequest.RequestData.RequestId,
Params = tBRpcRequest.RequestData.Params
});
}
}
catch (Exception ex)
{
_logger.LogError(ex,
$"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
}
}
/// <summary>
/// thingscloud rpc
/// </summary>
/// <param name="e"></param>
private void ReceiveTcRpc(MqttApplicationMessageReceivedEventArgs e)
{
try
{
var tCRpcRequest =
JsonConvert.DeserializeObject<TCRpcRequest>(e.ApplicationMessage.ConvertPayloadToString());
if (tCRpcRequest != null)
OnExcRpc.Invoke(Client, new RpcRequest()
{
Method = tCRpcRequest.RequestData.Method,
DeviceName = tCRpcRequest.DeviceName,
RequestId = tCRpcRequest.RequestData.RequestId,
Params = tCRpcRequest.RequestData.Params
});
}
catch (Exception ex)
{
_logger.LogError(ex,
$"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
}
}
private void ReceiveIsRpc(MqttApplicationMessageReceivedEventArgs e)
{
try
{
var tps = e.ApplicationMessage.Topic.Split(new[] { '/' }, StringSplitOptions.RemoveEmptyEntries);
var rpcMethodName = tps[4];
var rpcDeviceName = tps[1];
var rpcRequestId = tps[5];
_logger.LogInformation($"rpcMethodName={rpcMethodName} ");
_logger.LogInformation($"rpcDeviceName={rpcDeviceName} ");
_logger.LogInformation($"rpcRequestId={rpcRequestId} ");
if (!string.IsNullOrEmpty(rpcMethodName) && !string.IsNullOrEmpty(rpcDeviceName) &&
!string.IsNullOrEmpty(rpcRequestId))
{
Task.Run(() =>
{
OnExcRpc(Client, new RpcRequest()
{
Method = rpcMethodName,
DeviceName = rpcDeviceName,
RequestId = rpcRequestId,
Params = JsonConvert.DeserializeObject<Dictionary<string, object>>(e.ApplicationMessage
.ConvertPayloadToString())
});
});
}
}
catch (Exception ex)
{
_logger.LogError(ex,
$"ReceiveIsRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
}
}
private async Task ResponseTbRpcAsync(TBRpcResponse tBRpcResponse)
{
await Client.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic(_tbRpcTopic)
.WithPayload(JsonConvert.SerializeObject(tBRpcResponse))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build());
}
private async Task ResponseTcRpcAsync(TCRpcRequest tCRpcResponse)
{
var topic = $"command/reply/{tCRpcResponse.RequestData.RequestId}";
await Client.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(tCRpcResponse))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
private async Task ResponseIsRpcAsync(ISRpcResponse rpcResult)
{
//var responseTopic = $"/devices/{deviceid}/rpc/response/{methodName}/{rpcid}";
var topic = $"devices/{rpcResult.DeviceId}/rpc/response/{rpcResult.Method}/{rpcResult.ResponseId}";
await Client.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(rpcResult))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
private void ReceiveAttributes(MqttApplicationMessageReceivedEventArgs e)
{
var tps = e.ApplicationMessage.Topic.Split(new[] { '/' }, StringSplitOptions.RemoveEmptyEntries);
var rpcMethodName = tps[2];
var rpcDeviceName = tps[1];
var rpcRequestId = tps[4];
_logger.LogInformation($"rpcMethodName={rpcMethodName}");
_logger.LogInformation($"rpcDeviceName={rpcDeviceName}");
_logger.LogInformation($"rpcRequestId={rpcRequestId}");
if (!string.IsNullOrEmpty(rpcMethodName) && !string.IsNullOrEmpty(rpcDeviceName) &&
!string.IsNullOrEmpty(rpcRequestId))
{
if (e.ApplicationMessage.Topic.Contains("/attributes/"))
{
OnReceiveAttributes.Invoke(Client, new ISAttributeResponse()
{
KeyName = rpcMethodName,
DeviceName = rpcDeviceName,
Id = rpcRequestId,
Data = e.ApplicationMessage.ConvertPayloadToString()
});
}
}
}
public Task UploadAttributeAsync(string deviceName, object obj)
{
//Topic: v1/gateway/attributes
//Message: {"Device A":{"attribute1":"value1", "attribute2": 42}, "Device B":{"attribute1":"value1", "attribute2": 42}
try
{
return Client.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic($"devices/{deviceName}/attributes").WithPayload(JsonConvert.SerializeObject(obj))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.Build());
}
catch (Exception ex)
{
_logger.LogError(ex, $"Device:{deviceName} UploadAttributeAsync Failed");
}
return Task.CompletedTask;
}
public async Task UploadIsTelemetryDataAsync(string deviceName, object obj)
{
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"devices/{deviceName}/telemetry")
.WithPayload(JsonConvert.SerializeObject(obj)).Build());
}
public async Task UploadTcTelemetryDataAsync(string deviceName, object obj)
{
var toSend = new Dictionary<string, object> { { deviceName, obj } };
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"gateway/attributes")
.WithPayload(JsonConvert.SerializeObject(toSend)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
public async Task UploadHwTelemetryDataAsync(Device device, object obj)
{
var hwTelemetry = new List<HwTelemetry>()
{
new HwTelemetry()
{
DeviceId = device.DeviceConfigs.FirstOrDefault(x => x.DeviceConfigName == "DeviceId")?.Value,
Services = new()
{
new Service()
{
ServiceId = "serviceId",
EventTime = DateTime.Now.ToString("yyyyMMddTHHmmssZ"),
Data = obj
}
}
}
};
var hwTelemetrys = new HwTelemetrys()
{
Devices = hwTelemetry
};
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/datas")
.WithPayload(JsonConvert.SerializeObject(hwTelemetrys)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
public async Task ResponseRpcAsync(RpcResponse rpcResponse)
{
try
{
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
var tRpcResponse = new TBRpcResponse
{
DeviceName = rpcResponse.DeviceName,
RequestId = rpcResponse.RequestId,
ResponseData = new Dictionary<string, object>
{ { "success", rpcResponse.IsSuccess }, { "description", rpcResponse.Description } }
};
await ResponseTbRpcAsync(tRpcResponse);
break;
case IoTPlatformType.IoTSharp:
case IoTPlatformType.IoTGateway:
await ResponseIsRpcAsync(new ISRpcResponse
{
DeviceId = rpcResponse.DeviceName,
Method = rpcResponse.Method,
ResponseId = rpcResponse.RequestId,
Data = JsonConvert.SerializeObject(new Dictionary<string, object>
{
{ "success", rpcResponse.IsSuccess }, { "description", rpcResponse.Description }
})
});
break;
case IoTPlatformType.ThingsCloud:
//官网API不需要回复的
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"ResponseRpc Error,{rpcResponse}");
}
}
public async Task RequestAttributes(string deviceName, bool anySide, params string[] args)
{
try
{
string id = Guid.NewGuid().ToString();
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
//{"id": $request_id, "device": "Device A", "client": true, "key": "attribute1"}
Dictionary<string, object> tbRequestData = new Dictionary<string, object>
{
{ "id", id },
{ "device", deviceName },
{ "client", true },
{ "key", args[0] }
};
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"v1/gateway/attributes/request")
.WithPayload(JsonConvert.SerializeObject(tbRequestData)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break;
case IoTPlatformType.IoTSharp:
case IoTPlatformType.IoTGateway:
string topic = $"devices/{deviceName}/attributes/request/{id}";
Dictionary<string, string> keys = new Dictionary<string, string>();
keys.Add(anySide ? "anySide" : "server", string.Join(",", args));
await Client.SubscribeAsync($"devices/{deviceName}/attributes/response/{id}",
MqttQualityOfServiceLevel.ExactlyOnce);
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(keys))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"RequestAttributes:{deviceName}");
}
}
private Dictionary<string, List<PayLoad>> _lastTelemetrys = new(0);
/// <summary>
/// 判断是否推送遥测数据
/// </summary>
/// <param name="device">设备</param>
/// <param name="sendModel">遥测</param>
/// <returns></returns>
private bool CanPubTelemetry(string DeviceName, Device device, Dictionary<string, List<PayLoad>> sendModel)
{
bool canPub = false;
try
{//第一次上传
if (!_lastTelemetrys.ContainsKey(DeviceName))
canPub = true;
else
{
//变化上传
if (device.CgUpload)
{
//是否超过归档周期
if (sendModel[DeviceName][0].TS - _lastTelemetrys[DeviceName][0].TS >
device.EnforcePeriod)
canPub = true;
//是否变化 这里不好先用
else
{
if (JsonConvert.SerializeObject(sendModel[DeviceName][0].Values) !=
JsonConvert.SerializeObject(_lastTelemetrys[DeviceName][0].Values))
canPub = true;
}
}
//非变化上传
else
canPub = true;
}
}
catch (Exception e)
{
canPub = true;
Console.WriteLine(e);
}
if (canPub)
_lastTelemetrys[DeviceName] = sendModel[DeviceName];
return canPub;
}
public async Task PublishTelemetryAsync(string deviceName, Device device, Dictionary<string, List<PayLoad>> sendModel)
{
try
{
if (CanPubTelemetry(deviceName, device, sendModel))
{
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("v1/gateway/telemetry")
.WithPayload(JsonConvert.SerializeObject(sendModel))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build());
break;
case IoTPlatformType.IoTSharp:
case IoTPlatformType.IoTGateway:
foreach (var payload in sendModel[deviceName])
{
if (payload.Values != null)
{
if (_systemConfig.IoTPlatformType == IoTPlatformType.IoTGateway)
payload.Values["_ts_"] = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds;
await UploadIsTelemetryDataAsync(deviceName, payload.Values);
}
}
break;
case IoTPlatformType.ThingsCloud:
foreach (var payload in sendModel[deviceName])
{
if (payload.Values != null)
await UploadTcTelemetryDataAsync(deviceName, payload.Values);
}
break;
case IoTPlatformType.HuaWei:
foreach (var payload in sendModel[deviceName])
{
if (payload.Values != null)
await UploadHwTelemetryDataAsync(device, payload.Values);
}
break;
case IoTPlatformType.AliCloudIoT:
case IoTPlatformType.TencentIoTHub:
case IoTPlatformType.BaiduIoTCore:
case IoTPlatformType.OneNET:
default:
break;
}
}
//foreach (var payload in sendModel[deviceName])
//{
// if (payload.Values != null)
// foreach (var kv in payload.Values)
// {
// //更新到UAService
// _uaNodeManager?.UpdateNode($"{device.Parent.DeviceName}.{deviceName}.{kv.Key}",
// kv.Value);
// }
//}
}
catch (Exception ex)
{
_logger.LogError(ex, $"PublishTelemetryAsync Error");
}
}
private readonly DateTime _tsStartDt = new(1970, 1, 1);
public async Task DeviceConnected(string DeviceName, Device device)
{
try
{
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
case IoTPlatformType.IoTSharp:
case IoTPlatformType.IoTGateway:
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("v1/gateway/connect")
.WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", DeviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build());
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
case IoTPlatformType.ThingsCloud:
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("gateway/connect")
.WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", DeviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break;
case IoTPlatformType.HuaWei:
var deviceOnLine = new HwDeviceOnOffLine()
{
MId = new Random().Next(0, 1024), //命令ID
DeviceStatuses = new List<DeviceStatus>()
{
new DeviceStatus()
{
DeviceId = device.DeviceConfigs
.FirstOrDefault(x => x.DeviceConfigName == "DeviceId")
?.Value,
Status = "ONLINE"
}
}
};
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/topo/update")
.WithPayload(JsonConvert.SerializeObject(deviceOnLine))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"DeviceConnected:{DeviceName}");
}
}
public async Task DeviceDisconnected(string DeviceName, Device device)
{
try
{
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
case IoTPlatformType.IoTSharp:
case IoTPlatformType.IoTGateway:
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"v1/gateway/disconnect")
.WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", DeviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build());
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
case IoTPlatformType.ThingsCloud:
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"gateway/disconnect")
.WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", DeviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break;
case IoTPlatformType.HuaWei:
var deviceOnLine = new HwDeviceOnOffLine()
{
MId = new Random().Next(0, 1024), //命令ID
DeviceStatuses = new List<DeviceStatus>()
{
new DeviceStatus()
{
DeviceId = device.DeviceConfigs
.FirstOrDefault(x => x.DeviceConfigName == "DeviceId")
?.Value,
Status = "OFFLINE"
}
}
};
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/topo/update")
.WithPayload(JsonConvert.SerializeObject(deviceOnLine))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"DeviceDisconnected:{DeviceName}");
}
}
public async Task DeviceAdded(Device device)
{
try
{
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.HuaWei:
var topic = $"/v1/devices/{_systemConfig.GatewayName}/topo/add";
var addDeviceDto = new HwAddDeviceDto
{
MId = new Random().Next(0, 1024), //命令ID
};
addDeviceDto.DeviceInfos.Add(
new DeviceInfo
{
NodeId = device.DeviceName,
Name = device.DeviceName,
Description = device.Description,
ManufacturerId = "Test_n",
ProductType = "A_n"
}
);
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(addDeviceDto))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"DeviceAdded:{device.DeviceName}");
}
}
public async Task DeviceDeleted(Device device)
{
try
{
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.HuaWei:
var topic = $"/v1/devices/{_systemConfig.GatewayName}/topo/delete";
await using (var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType))
{
var deviceId = dc.Set<DeviceConfig>().FirstOrDefault(x =>
x.DeviceId == device.ID && x.DeviceConfigName == "DeviceId")?.Value;
var deleteDeviceDto = new HwDeleteDeviceDto
{
Id = new Random().Next(0, 1024), //命令ID
DeviceId = deviceId,
RequestTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds,
Request = new()
{
ManufacturerId = "Test_n",
ManufacturerName = "Test_n",
ProductType = "A_n"
}
};
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(deleteDeviceDto))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"DeviceAdded:{device.DeviceName}");
}
}
}
}

View File

@ -0,0 +1,27 @@
using MQTTnet.Client;
using Microsoft.Extensions.Logging;
using MQTTnet.Extensions.ManagedClient;
using PluginInterface;
using IoTGateway.Model;
namespace Plugin.PlatformHandler
{
public interface IPlatformHandler
{
IManagedMqttClient MqttClient { get; }
ILogger<MessageService> Logger { get; }
public event EventHandler<RpcRequest> OnExcRpc;
Task ClientConnected();
void ReceiveRpc(MqttApplicationMessageReceivedEventArgs e);
Task ResponseRpcAsync(RpcResponse rpcResponse);
Task PublishTelemetryAsync(string deviceName, Device device, Dictionary<string, List<PayLoad>> sendModel);
Task UploadAttributeAsync(string deviceName, object obj);
Task RequestAttributes(string deviceName, bool anySide, params string[] args);
Task DeviceConnected(string deviceName, Device device);
Task DeviceDisconnected(string deviceName, Device device);
Task DeviceAdded(Device device);
Task DeviceDeleted(Device device);
}
}

View File

@ -0,0 +1,150 @@
using IoTGateway.Model;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Protocol;
using Newtonsoft.Json;
using PluginInterface;
using PluginInterface.IoTSharp;
namespace Plugin.PlatformHandler
{
public class IoTSharpHandler : IPlatformHandler
{
public IManagedMqttClient MqttClient { get; }
public ILogger<MessageService> Logger { get; }
public event EventHandler<RpcRequest> OnExcRpc;
private readonly DateTime _tsStartDt = new(1970, 1, 1);
public IoTSharpHandler(IManagedMqttClient mqttClient, ILogger<MessageService> logger, EventHandler<RpcRequest> onExcRpc)
{
MqttClient = mqttClient;
Logger = logger;
OnExcRpc = onExcRpc;
}
public async Task ClientConnected()
{
await MqttClient.SubscribeAsync("devices/+/rpc/request/+/+", MqttQualityOfServiceLevel.ExactlyOnce);
await MqttClient.SubscribeAsync("devices/+/attributes/update", MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
await MqttClient.SubscribeAsync("devices/+/attributes/response/+", MqttQualityOfServiceLevel.ExactlyOnce);
}
public void ReceiveRpc(MqttApplicationMessageReceivedEventArgs e)
{
try
{
var tps = e.ApplicationMessage.Topic.Split(new[] { '/' }, StringSplitOptions.RemoveEmptyEntries);
var rpcMethodName = tps[4];
var rpcDeviceName = tps[1];
var rpcRequestId = tps[5];
Logger.LogInformation($"rpcMethodName={rpcMethodName} ");
Logger.LogInformation($"rpcDeviceName={rpcDeviceName} ");
Logger.LogInformation($"rpcRequestId={rpcRequestId} ");
if (!string.IsNullOrEmpty(rpcMethodName) && !string.IsNullOrEmpty(rpcDeviceName) &&
!string.IsNullOrEmpty(rpcRequestId))
{
Task.Run(() =>
{
OnExcRpc(MqttClient, new RpcRequest()
{
Method = rpcMethodName,
DeviceName = rpcDeviceName,
RequestId = rpcRequestId,
Params = JsonConvert.DeserializeObject<Dictionary<string, object>>(e.ApplicationMessage
.ConvertPayloadToString())
});
});
}
}
catch (Exception ex)
{
Logger.LogError(ex,
$"ReceiveIsRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
}
}
public async Task ResponseRpcAsync(RpcResponse rpcResponse)
{
var rpcResult = new ISRpcResponse
{
DeviceId = rpcResponse.DeviceName,
Method = rpcResponse.Method,
ResponseId = rpcResponse.RequestId,
Data = JsonConvert.SerializeObject(new Dictionary<string, object>
{
{ "success", rpcResponse.IsSuccess }, { "description", rpcResponse.Description }
})
};
//var responseTopic = $"/devices/{deviceid}/rpc/response/{methodName}/{rpcid}";
var topic = $"devices/{rpcResult.DeviceId}/rpc/response/{rpcResult.Method}/{rpcResult.ResponseId}";
await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(rpcResult))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
public async Task PublishTelemetryAsync(string deviceName, Device device, Dictionary<string, List<PayLoad>> sendModel)
{
foreach (var payload in sendModel[deviceName])
{
if (payload.Values != null)
{
payload.Values["_ts_"] = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds;
await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"devices/{deviceName}/telemetry")
.WithPayload(JsonConvert.SerializeObject(payload.Values)).Build());
}
}
}
public Task UploadAttributeAsync(string deviceName, object obj)
{
return Task.CompletedTask;
}
public async Task RequestAttributes(string deviceName, bool anySide, params string[] args)
{
string id = Guid.NewGuid().ToString();
string topic = $"devices/{deviceName}/attributes/request/{id}";
Dictionary<string, string> keys = new Dictionary<string, string>();
keys.Add(anySide ? "anySide" : "server", string.Join(",", args));
await MqttClient.SubscribeAsync($"devices/{deviceName}/attributes/response/{id}",
MqttQualityOfServiceLevel.ExactlyOnce);
await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(keys))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
public async Task DeviceConnected(string deviceName, Device device)
{
await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("v1/gateway/connect")
.WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", deviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build());
}
public async Task DeviceDisconnected(string deviceName, Device device)
{
await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"v1/gateway/disconnect")
.WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", deviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build());
}
public Task DeviceAdded(Device device)
{
return Task.CompletedTask;
}
public Task DeviceDeleted(Device device)
{
return Task.CompletedTask;
}
}
}

View File

@ -0,0 +1,24 @@
using IoTGateway.Model;
using Microsoft.Extensions.Logging;
using MQTTnet.Extensions.ManagedClient;
using PluginInterface;
namespace Plugin.PlatformHandler
{
public static class PlatformHandlerFactory
{
public static IPlatformHandler CreateHandler(IoTPlatformType platform, IManagedMqttClient mqttClient, ILogger<MessageService> logger, EventHandler<RpcRequest> onExcRpc)
{
switch (platform)
{
case IoTPlatformType.ThingsBoard:
return new ThingsBoardHandler(mqttClient, logger, onExcRpc);
case IoTPlatformType.ThingsCloud:
return new ThingsCloudHandler(mqttClient, logger, onExcRpc);
case IoTPlatformType.IoTSharp:
default:
return new IoTSharpHandler(mqttClient, logger, onExcRpc);
}
}
}
}

View File

@ -0,0 +1,134 @@
using MQTTnet.Extensions.ManagedClient;
using IoTGateway.Model;
using PluginInterface;
using MQTTnet.Protocol;
using MQTTnet.Client;
using Microsoft.Extensions.Logging;
using MQTTnet;
using Newtonsoft.Json;
using PluginInterface.ThingsBoard;
namespace Plugin.PlatformHandler
{
public class ThingsBoardHandler : IPlatformHandler
{
private readonly string _tbRpcTopic = "v1/gateway/rpc";
public IManagedMqttClient MqttClient { get; }
public ILogger<MessageService> Logger { get; }
public event EventHandler<RpcRequest> OnExcRpc;
public ThingsBoardHandler(IManagedMqttClient mqttClient, ILogger<MessageService> logger, EventHandler<RpcRequest> onExcRpc)
{
MqttClient = mqttClient;
Logger = logger;
OnExcRpc = onExcRpc;
}
public async Task ClientConnected()
{
//{"device": "Device A", "data": {"id": $request_id, "method": "toggle_gpio", "params": {"pin":1}}}
await MqttClient.SubscribeAsync(_tbRpcTopic, MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"id": $request_id, "device": "Device A", "value": "value1"}
await MqttClient.SubscribeAsync("v1/gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
await MqttClient.SubscribeAsync("v1/gateway/attributes", MqttQualityOfServiceLevel.ExactlyOnce);
}
public void ReceiveRpc(MqttApplicationMessageReceivedEventArgs e)
{
try
{
var tBRpcRequest =
JsonConvert.DeserializeObject<TBRpcRequest>(e.ApplicationMessage.ConvertPayloadToString());
if (tBRpcRequest != null && !string.IsNullOrWhiteSpace(tBRpcRequest.RequestData.Method))
{
OnExcRpc(MqttClient, new RpcRequest()
{
Method = tBRpcRequest.RequestData.Method,
DeviceName = tBRpcRequest.DeviceName,
RequestId = tBRpcRequest.RequestData.RequestId,
Params = tBRpcRequest.RequestData.Params
});
}
}
catch (Exception ex)
{
Logger.LogError(ex,
$"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
}
}
public async Task ResponseRpcAsync(RpcResponse rpcResponse)
{
var tBRpcResponse = new TBRpcResponse
{
DeviceName = rpcResponse.DeviceName,
RequestId = rpcResponse.RequestId,
ResponseData = new Dictionary<string, object>
{ { "success", rpcResponse.IsSuccess }, { "description", rpcResponse.Description } }
};
await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic(_tbRpcTopic)
.WithPayload(JsonConvert.SerializeObject(tBRpcResponse))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build());
}
public async Task PublishTelemetryAsync(string deviceName, Device device, Dictionary<string, List<PayLoad>> sendModel)
{
await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("v1/gateway/telemetry")
.WithPayload(JsonConvert.SerializeObject(sendModel))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build());
}
public Task UploadAttributeAsync(string deviceName, object obj)
{
return MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder()
.WithTopic($"devices/{deviceName}/attributes").WithPayload(JsonConvert.SerializeObject(obj))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.Build());
}
public async Task RequestAttributes(string deviceName, bool anySide, params string[] args)
{
string id = Guid.NewGuid().ToString();
//{"id": $request_id, "device": "Device A", "client": true, "key": "attribute1"}
Dictionary<string, object> tbRequestData = new Dictionary<string, object>
{
{ "id", id },
{ "device", deviceName },
{ "client", true },
{ "key", args[0] }
};
await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"v1/gateway/attributes/request")
.WithPayload(JsonConvert.SerializeObject(tbRequestData)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
public async Task DeviceConnected(string deviceName, Device device)
{
await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("v1/gateway/connect")
.WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", deviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build());
}
public async Task DeviceDisconnected(string deviceName, Device device)
{
await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"v1/gateway/disconnect")
.WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", deviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce).Build());
}
public Task DeviceAdded(Device device)
{
return Task.CompletedTask;
}
public Task DeviceDeleted(Device device)
{
return Task.CompletedTask;
}
}
}

View File

@ -0,0 +1,111 @@
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Protocol;
using PluginInterface;
using IoTGateway.Model;
using Microsoft.Extensions.Logging;
using MQTTnet;
using Newtonsoft.Json;
using PluginInterface.ThingsBoard;
namespace Plugin.PlatformHandler
{
public class ThingsCloudHandler : IPlatformHandler
{
public IManagedMqttClient MqttClient { get; }
public ILogger<MessageService> Logger { get; }
public event EventHandler<RpcRequest> OnExcRpc;
public ThingsCloudHandler(IManagedMqttClient mqttClient, ILogger<MessageService> logger, EventHandler<RpcRequest> onExcRpc)
{
MqttClient = mqttClient;
Logger = logger;
OnExcRpc = onExcRpc;
}
public async Task ClientConnected()
{
await MqttClient.SubscribeAsync("gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce);
await MqttClient.SubscribeAsync("gateway/attributes/get/response", MqttQualityOfServiceLevel.ExactlyOnce);
await MqttClient.SubscribeAsync("gateway/attributes/push", MqttQualityOfServiceLevel.ExactlyOnce);
await MqttClient.SubscribeAsync("gateway/event/response", MqttQualityOfServiceLevel.ExactlyOnce);
await MqttClient.SubscribeAsync("gateway/command/send", MqttQualityOfServiceLevel.ExactlyOnce);
}
public void ReceiveRpc(MqttApplicationMessageReceivedEventArgs e)
{
try
{
var tCRpcRequest =
JsonConvert.DeserializeObject<TCRpcRequest>(e.ApplicationMessage.ConvertPayloadToString());
if (tCRpcRequest != null)
OnExcRpc.Invoke(MqttClient, new RpcRequest()
{
Method = tCRpcRequest.RequestData.Method,
DeviceName = tCRpcRequest.DeviceName,
RequestId = tCRpcRequest.RequestData.RequestId,
Params = tCRpcRequest.RequestData.Params
});
}
catch (Exception ex)
{
Logger.LogError(ex,
$"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
}
}
public Task ResponseRpcAsync(RpcResponse rpcResponse)
{
return Task.CompletedTask;
}
public async Task PublishTelemetryAsync(string deviceName, Device device, Dictionary<string, List<PayLoad>> sendModel)
{
foreach (var payload in sendModel[deviceName])
{
if (payload.Values != null)
{
var toSend = new Dictionary<string, object> { { deviceName, payload.Values } };
await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"gateway/attributes")
.WithPayload(JsonConvert.SerializeObject(toSend)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
}
}
public Task UploadAttributeAsync(string deviceName, object obj)
{
return Task.CompletedTask;
}
public Task RequestAttributes(string deviceName, bool anySide, params string[] args)
{
return Task.CompletedTask;
}
public async Task DeviceConnected(string deviceName, Device device)
{
await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("gateway/connect")
.WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", deviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
public async Task DeviceDisconnected(string deviceName, Device device)
{
await MqttClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"gateway/disconnect")
.WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", deviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
public Task DeviceAdded(Device device)
{
return Task.CompletedTask;
}
public Task DeviceDeleted(Device device)
{
return Task.CompletedTask;
}
}
}

View File

@ -1,275 +0,0 @@
/* ========================================================================
* Copyright (c) 2005-2021 The OPC Foundation, Inc. All rights reserved.
*
* OPC Foundation MIT License 1.00
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*
* The complete license agreement can be found here:
* http://opcfoundation.org/License/MIT/1.00/
* ======================================================================*/
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Mono.Options;
using Opc.Ua;
using Opc.Ua.Configuration;
using static Opc.Ua.Utils;
namespace Quickstarts
{
/// <summary>
/// The log output implementation of a TextWriter.
/// </summary>
public class LogWriter : TextWriter
{
private StringBuilder m_builder = new StringBuilder();
public override void Write(char value)
{
m_builder.Append(value);
}
public override void WriteLine(char value)
{
m_builder.Append(value);
//LogInfo("{0}", m_builder.ToString());
m_builder.Clear();
}
public override void WriteLine()
{
//LogInfo("{0}", m_builder.ToString());
m_builder.Clear();
}
public override void WriteLine(string format, object arg0)
{
m_builder.Append(format);
//LogInfo(m_builder.ToString(), arg0);
m_builder.Clear();
}
public override void WriteLine(string format, object arg0, object arg1)
{
m_builder.Append(format);
//LogInfo(m_builder.ToString(), arg0, arg1);
m_builder.Clear();
}
public override void WriteLine(string format, params object[] arg)
{
m_builder.Append(format);
//LogInfo(m_builder.ToString(), arg);
m_builder.Clear();
}
public override void Write(string value)
{
m_builder.Append(value);
}
public override void WriteLine(string value)
{
m_builder.Append(value);
//LogInfo("{0}", m_builder.ToString());
m_builder.Clear();
}
public override Encoding Encoding
{
get { return Encoding.Default; }
}
}
/// <summary>
/// The error code why the application exit.
/// </summary>
public enum ExitCode : int
{
Ok = 0,
ErrorNotStarted = 0x80,
ErrorRunning = 0x81,
ErrorException = 0x82,
ErrorStopping = 0x83,
ErrorCertificate = 0x84,
ErrorInvalidCommandLine = 0x100
};
/// <summary>
/// An exception that occured and caused an exit of the application.
/// </summary>
public class ErrorExitException : Exception
{
public ExitCode ExitCode { get; }
public ErrorExitException(ExitCode exitCode)
{
ExitCode = exitCode;
}
public ErrorExitException()
{
ExitCode = ExitCode.Ok;
}
public ErrorExitException(string message) : base(message)
{
ExitCode = ExitCode.Ok;
}
public ErrorExitException(string message, ExitCode exitCode) : base(message)
{
ExitCode = exitCode;
}
public ErrorExitException(string message, Exception innerException) : base(message, innerException)
{
ExitCode = ExitCode.Ok;
}
public ErrorExitException(string message, Exception innerException, ExitCode exitCode) : base(message, innerException)
{
ExitCode = exitCode;
}
}
/// <summary>
/// A dialog which asks for user input.
/// </summary>
public class ApplicationMessageDlg : IApplicationMessageDlg
{
private TextWriter m_output;
private string m_message = string.Empty;
private bool m_ask;
public ApplicationMessageDlg(TextWriter output)
{
m_output = output;
}
public override void Message(string text, bool ask)
{
m_message = text;
m_ask = ask;
}
public override async Task<bool> ShowAsync()
{
if (m_ask)
{
var message = new StringBuilder(m_message);
message.Append(" (y/n, default y): ");
//m_output.Write(message.ToString());
try
{
ConsoleKeyInfo result = Console.ReadKey();
//m_output.WriteLine();
return await Task.FromResult((result.KeyChar == 'y') ||
(result.KeyChar == 'Y') || (result.KeyChar == '\r')).ConfigureAwait(false);
}
catch
{
// intentionally fall through
}
}
else
{
//m_output.WriteLine(m_message);
}
return await Task.FromResult(true).ConfigureAwait(false);
}
}
/// <summary>
/// Helper functions shared in various console applications.
/// </summary>
public static class ConsoleUtils
{
/// <summary>
/// Process a command line of the console sample application.
/// </summary>
public static string ProcessCommandLine(
TextWriter output,
string[] args,
Mono.Options.OptionSet options,
ref bool showHelp,
bool noExtraArgs = true)
{
IList<string> extraArgs = null;
try
{
extraArgs = options.Parse(args);
if (noExtraArgs)
{
foreach (string extraArg in extraArgs)
{
output.WriteLine("Error: Unknown option: {0}", extraArg);
showHelp = true;
}
}
}
catch (OptionException e)
{
output.WriteLine(e.Message);
showHelp = true;
}
if (showHelp)
{
options.WriteOptionDescriptions(output);
throw new ErrorExitException("Invalid Commandline or help requested.", ExitCode.ErrorInvalidCommandLine);
}
return extraArgs.FirstOrDefault();
}
/// <summary>
/// Create an event which is set if a user
/// enters the Ctrl-C key combination.
/// </summary>
public static ManualResetEvent CtrlCHandler()
{
var quitEvent = new ManualResetEvent(false);
try
{
Console.CancelKeyPress += (_, eArgs) => {
quitEvent.Set();
eArgs.Cancel = true;
};
}
catch
{
// intentionally left blank
}
return quitEvent;
}
}
}

View File

@ -1,42 +0,0 @@
/* ========================================================================
* Copyright (c) 2005-2020 The OPC Foundation, Inc. All rights reserved.
*
* OPC Foundation MIT License 1.00
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*
* The complete license agreement can be found here:
* http://opcfoundation.org/License/MIT/1.00/
* ======================================================================*/
namespace Quickstarts.ReferenceServer
{
/// <summary>
/// Defines constants for namespaces used by the servers.
/// </summary>
public static partial class Namespaces
{
/// <summary>
/// The namespace for the nodes provided by the reference server.
/// </summary>
public const string ReferenceServer = "http://opcfoundation.org/Quickstarts/ReferenceServer";
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,381 +0,0 @@
/* ========================================================================
* Copyright (c) 2005-2020 The OPC Foundation, Inc. All rights reserved.
*
* OPC Foundation MIT License 1.00
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*
* The complete license agreement can be found here:
* http://opcfoundation.org/License/MIT/1.00/
* ======================================================================*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Security.Cryptography.X509Certificates;
using Opc.Ua;
using Opc.Ua.Server;
namespace Quickstarts.ReferenceServer
{
/// <summary>
/// Implements the Quickstart Reference Server.
/// </summary>
/// <remarks>
/// Each server instance must have one instance of a StandardServer object which is
/// responsible for reading the configuration file, creating the endpoints and dispatching
/// incoming requests to the appropriate handler.
///
/// This sub-class specifies non-configurable metadata such as Product Name and initializes
/// the EmptyNodeManager which provides access to the data exposed by the Server.
/// </remarks>
public partial class ReferenceServer : ReverseConnectServer
{
#region Overridden Methods
/// <summary>
/// Creates the node managers for the server.
/// </summary>
/// <remarks>
/// This method allows the sub-class create any additional node managers which it uses. The SDK
/// always creates a CoreNodeManager which handles the built-in nodes defined by the specification.
/// Any additional NodeManagers are expected to handle application specific nodes.
/// </remarks>
protected override MasterNodeManager CreateMasterNodeManager(IServerInternal server, ApplicationConfiguration configuration)
{
// create the custom node managers.
nodeManagers.Add(new ReferenceNodeManager(server, configuration));
if (m_nodeManagerFactory == null || m_nodeManagerFactory.Count == 0)
{
AddDefaultFactories();
}
foreach (var nodeManagerFactory in m_nodeManagerFactory)
{
nodeManagers.Add(nodeManagerFactory.Create(server, configuration));
}
// create master node manager.
return new MasterNodeManager(server, configuration, null, nodeManagers.ToArray());
}
/// <summary>
/// Loads the non-configurable properties for the application.
/// </summary>
/// <remarks>
/// These properties are exposed by the server but cannot be changed by administrators.
/// </remarks>
protected override ServerProperties LoadServerProperties()
{
ServerProperties properties = new ServerProperties();
properties.ManufacturerName = "OPC Foundation";
properties.ProductName = "Quickstart Reference Server";
properties.ProductUri = "http://opcfoundation.org/Quickstart/ReferenceServer/v1.04";
properties.SoftwareVersion = Utils.GetAssemblySoftwareVersion();
properties.BuildNumber = Utils.GetAssemblyBuildNumber();
properties.BuildDate = Utils.GetAssemblyTimestamp();
return properties;
}
/// <summary>
/// Creates the resource manager for the server.
/// </summary>
protected override ResourceManager CreateResourceManager(IServerInternal server, ApplicationConfiguration configuration)
{
ResourceManager resourceManager = new ResourceManager(server, configuration);
System.Reflection.FieldInfo[] fields = typeof(StatusCodes).GetFields(System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Static);
foreach (System.Reflection.FieldInfo field in fields)
{
uint? id = field.GetValue(typeof(StatusCodes)) as uint?;
if (id != null)
{
resourceManager.Add(id.Value, "en-US", field.Name);
}
}
return resourceManager;
}
/// <summary>
/// Initializes the server before it starts up.
/// </summary>
/// <remarks>
/// This method is called before any startup processing occurs. The sub-class may update the
/// configuration object or do any other application specific startup tasks.
/// </remarks>
protected override void OnServerStarting(ApplicationConfiguration configuration)
{
base.OnServerStarting(configuration);
// it is up to the application to decide how to validate user identity tokens.
// this function creates validator for X509 identity tokens.
CreateUserIdentityValidators(configuration);
}
/// <summary>
/// Called after the server has been started.
/// </summary>
protected override void OnServerStarted(IServerInternal server)
{
base.OnServerStarted(server);
// request notifications when the user identity is changed. all valid users are accepted by default.
server.SessionManager.ImpersonateUser += new ImpersonateEventHandler(SessionManager_ImpersonateUser);
try
{
lock (ServerInternal.Status.Lock)
{
// allow a faster sampling interval for CurrentTime node.
ServerInternal.Status.Variable.CurrentTime.MinimumSamplingInterval = 250;
}
}
catch
{ }
}
#endregion
#region User Validation Functions
/// <summary>
/// Creates the objects used to validate the user identity tokens supported by the server.
/// </summary>
private void CreateUserIdentityValidators(ApplicationConfiguration configuration)
{
for (int ii = 0; ii < configuration.ServerConfiguration.UserTokenPolicies.Count; ii++)
{
UserTokenPolicy policy = configuration.ServerConfiguration.UserTokenPolicies[ii];
// create a validator for a certificate token policy.
if (policy.TokenType == UserTokenType.Certificate)
{
// check if user certificate trust lists are specified in configuration.
if (configuration.SecurityConfiguration.TrustedUserCertificates != null &&
configuration.SecurityConfiguration.UserIssuerCertificates != null)
{
CertificateValidator certificateValidator = new CertificateValidator();
certificateValidator.Update(configuration.SecurityConfiguration).Wait();
certificateValidator.Update(configuration.SecurityConfiguration.UserIssuerCertificates,
configuration.SecurityConfiguration.TrustedUserCertificates,
configuration.SecurityConfiguration.RejectedCertificateStore);
// set custom validator for user certificates.
m_userCertificateValidator = certificateValidator.GetChannelValidator();
}
}
}
}
/// <summary>
/// Called when a client tries to change its user identity.
/// </summary>
private void SessionManager_ImpersonateUser(Session session, ImpersonateEventArgs args)
{
// check for a user name token.
UserNameIdentityToken userNameToken = args.NewIdentity as UserNameIdentityToken;
if (userNameToken != null)
{
args.Identity = VerifyPassword(userNameToken);
// set AuthenticatedUser role for accepted user/password authentication
args.Identity.GrantedRoleIds.Add(ObjectIds.WellKnownRole_AuthenticatedUser);
if (args.Identity is SystemConfigurationIdentity)
{
// set ConfigureAdmin role for user with permission to configure server
args.Identity.GrantedRoleIds.Add(ObjectIds.WellKnownRole_ConfigureAdmin);
args.Identity.GrantedRoleIds.Add(ObjectIds.WellKnownRole_SecurityAdmin);
}
return;
}
// check for x509 user token.
X509IdentityToken x509Token = args.NewIdentity as X509IdentityToken;
if (x509Token != null)
{
VerifyUserTokenCertificate(x509Token.Certificate);
args.Identity = new UserIdentity(x509Token);
// set AuthenticatedUser role for accepted certificate authentication
args.Identity.GrantedRoleIds.Add(ObjectIds.WellKnownRole_AuthenticatedUser);
return;
}
// check for anonymous token.
if (args.NewIdentity is AnonymousIdentityToken || args.NewIdentity == null)
{
// allow anonymous authentication and set Anonymous role for this authentication
args.Identity = new UserIdentity();
args.Identity.GrantedRoleIds.Add(ObjectIds.WellKnownRole_Anonymous);
return;
}
// unsuported identity token type.
throw ServiceResultException.Create(StatusCodes.BadIdentityTokenInvalid,
"Not supported user token type: {0}.", args.NewIdentity);
}
/// <summary>
/// Validates the password for a username token.
/// </summary>
private IUserIdentity VerifyPassword(UserNameIdentityToken userNameToken)
{
var userName = userNameToken.UserName;
var password = userNameToken.DecryptedPassword;
if (String.IsNullOrEmpty(userName))
{
// an empty username is not accepted.
throw ServiceResultException.Create(StatusCodes.BadIdentityTokenInvalid,
"Security token is not a valid username token. An empty username is not accepted.");
}
if (String.IsNullOrEmpty(password))
{
// an empty password is not accepted.
throw ServiceResultException.Create(StatusCodes.BadIdentityTokenRejected,
"Security token is not a valid username token. An empty password is not accepted.");
}
// User with permission to configure server
if (userName == "sysadmin" && password == "demo")
{
return new SystemConfigurationIdentity(new UserIdentity(userNameToken));
}
// standard users for CTT verification
if (!((userName == "user1" && password == "password") ||
(userName == "user2" && password == "password1")))
{
// construct translation object with default text.
TranslationInfo info = new TranslationInfo(
"InvalidPassword",
"en-US",
"Invalid username or password.",
userName);
// create an exception with a vendor defined sub-code.
throw new ServiceResultException(new ServiceResult(
StatusCodes.BadUserAccessDenied,
"InvalidPassword",
LoadServerProperties().ProductUri,
new LocalizedText(info)));
}
return new UserIdentity(userNameToken);
}
/// <summary>
/// Verifies that a certificate user token is trusted.
/// </summary>
private void VerifyUserTokenCertificate(X509Certificate2 certificate)
{
try
{
if (m_userCertificateValidator != null)
{
m_userCertificateValidator.Validate(certificate);
}
else
{
CertificateValidator.Validate(certificate);
}
}
catch (Exception e)
{
TranslationInfo info;
StatusCode result = StatusCodes.BadIdentityTokenRejected;
ServiceResultException se = e as ServiceResultException;
if (se != null && se.StatusCode == StatusCodes.BadCertificateUseNotAllowed)
{
info = new TranslationInfo(
"InvalidCertificate",
"en-US",
"'{0}' is an invalid user certificate.",
certificate.Subject);
result = StatusCodes.BadIdentityTokenInvalid;
}
else
{
// construct translation object with default text.
info = new TranslationInfo(
"UntrustedCertificate",
"en-US",
"'{0}' is not a trusted user certificate.",
certificate.Subject);
}
// create an exception with a vendor defined sub-code.
throw new ServiceResultException(new ServiceResult(
result,
info.Key,
LoadServerProperties().ProductUri,
new LocalizedText(info)));
}
}
private static INodeManagerFactory IsINodeManagerFactoryType(Type type)
{
var nodeManagerTypeInfo = type.GetTypeInfo();
if (nodeManagerTypeInfo.IsAbstract ||
!typeof(INodeManagerFactory).IsAssignableFrom(type))
{
return null;
}
return Activator.CreateInstance(type) as INodeManagerFactory;
}
private void AddDefaultFactories()
{
var assembly = GetType().Assembly;
var factories = assembly.GetExportedTypes().Select(type => IsINodeManagerFactoryType(type)).Where(type => type != null);
m_nodeManagerFactory = new List<INodeManagerFactory>();
foreach (var nodeManagerFactory in factories)
{
m_nodeManagerFactory.Add(nodeManagerFactory);
}
}
#endregion
#region Private Fields
private IList<INodeManagerFactory> m_nodeManagerFactory;
private ICertificateValidator m_userCertificateValidator;
#endregion
public List<INodeManager> nodeManagers = new List<INodeManager>();
}
}

View File

@ -1,82 +0,0 @@
/* ========================================================================
* Copyright (c) 2005-2020 The OPC Foundation, Inc. All rights reserved.
*
* OPC Foundation MIT License 1.00
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*
* The complete license agreement can be found here:
* http://opcfoundation.org/License/MIT/1.00/
* ======================================================================*/
using System.Runtime.Serialization;
namespace Quickstarts.ReferenceServer
{
/// <summary>
/// Stores the configuration the data access node manager.
/// </summary>
[DataContract(Namespace = Namespaces.ReferenceServer)]
public class ReferenceServerConfiguration
{
#region Constructors
/// <summary>
/// The default constructor.
/// </summary>
public ReferenceServerConfiguration()
{
Initialize();
}
/// <summary>
/// Initializes the object during deserialization.
/// </summary>
[OnDeserializing()]
private void Initialize(StreamingContext context)
{
Initialize();
}
/// <summary>
/// Sets private members to default values.
/// </summary>
private static void Initialize()
{
}
#endregion
#region Public Properties
/// <summary>
/// Whether the user dialog for accepting invalid certificates should be displayed.
/// </summary>
[DataMember(Order = 1)]
public bool ShowCertificateValidationDialog
{
get { return m_showCertificateValidationDialog; }
set { m_showCertificateValidationDialog = value; }
}
#endregion
#region Private Members
private bool m_showCertificateValidationDialog;
#endregion
}
}

View File

@ -1,267 +0,0 @@
/* ========================================================================
* Copyright (c) 2005-2020 The OPC Foundation, Inc. All rights reserved.
*
* OPC Foundation MIT License 1.00
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*
* The complete license agreement can be found here:
* http://opcfoundation.org/License/MIT/1.00/
* ======================================================================*/
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Opc.Ua;
using Opc.Ua.Configuration;
using Opc.Ua.Server;
namespace Quickstarts
{
public class UAServer<T> where T : StandardServer, new()
{
public ApplicationInstance Application => m_application;
public ApplicationConfiguration Configuration => m_application.ApplicationConfiguration;
public bool AutoAccept { get; set; }
public string Password { get; set; }
public ExitCode ExitCode { get; private set; }
/// <summary>
/// Ctor of the server.
/// </summary>
/// <param name="writer">The text output.</param>
public UAServer(TextWriter writer)
{
m_output = writer;
}
/// <summary>
/// Load the application configuration.
/// </summary>
public async Task LoadAsync(string applicationName, string configSectionName)
{
try
{
ExitCode = ExitCode.ErrorNotStarted;
ApplicationInstance.MessageDlg = new ApplicationMessageDlg(m_output);
CertificatePasswordProvider PasswordProvider = new CertificatePasswordProvider(Password);
m_application = new ApplicationInstance {
ApplicationName = applicationName,
ApplicationType = ApplicationType.Server,
ConfigSectionName = configSectionName,
CertificatePasswordProvider = PasswordProvider
};
// load the application configuration.
await m_application.LoadApplicationConfiguration(false).ConfigureAwait(false);
}
catch (Exception ex)
{
throw new ErrorExitException(ex.Message, ExitCode);
}
}
/// <summary>
/// Load the application configuration.
/// </summary>
public async Task CheckCertificateAsync(bool renewCertificate)
{
try
{
var config = m_application.ApplicationConfiguration;
if (renewCertificate)
{
//await m_application.DeleteApplicationInstanceCertificate().ConfigureAwait(false);
}
// check the application certificate.
bool haveAppCertificate = await m_application.CheckApplicationInstanceCertificate(false, minimumKeySize: 0).ConfigureAwait(false);
if (!haveAppCertificate)
{
throw new Exception("Application instance certificate invalid!");
}
if (!config.SecurityConfiguration.AutoAcceptUntrustedCertificates)
{
config.CertificateValidator.CertificateValidation += new CertificateValidationEventHandler(CertificateValidator_CertificateValidation);
}
}
catch (Exception ex)
{
throw new ErrorExitException(ex.Message, ExitCode);
}
}
/// <summary>
/// Start the server.
/// </summary>
public async Task StartAsync()
{
try
{
// create the server.
m_server = new T();
// start the server
await m_application.Start(m_server).ConfigureAwait(false);
// save state
ExitCode = ExitCode.ErrorRunning;
// print endpoint info
var endpoints = m_application.Server.GetEndpoints().Select(e => e.EndpointUrl).Distinct();
foreach (var endpoint in endpoints)
{
Console.WriteLine(endpoint);
}
// start the status thread
m_status = Task.Run(StatusThreadAsync);
// print notification on session events
m_server.CurrentInstance.SessionManager.SessionActivated += EventStatus;
m_server.CurrentInstance.SessionManager.SessionClosing += EventStatus;
m_server.CurrentInstance.SessionManager.SessionCreated += EventStatus;
}
catch (Exception ex)
{
throw new ErrorExitException(ex.Message, ExitCode);
}
}
/// <summary>
/// Stops the server.
/// </summary>
public async Task StopAsync()
{
try
{
if (m_server != null)
{
using (T server = m_server)
{
// Stop status thread
m_server = null;
await m_status.ConfigureAwait(false);
// Stop server and dispose
server.Stop();
}
}
ExitCode = ExitCode.Ok;
}
catch (Exception ex)
{
throw new ErrorExitException(ex.Message, ExitCode.ErrorStopping);
}
}
/// <summary>
/// The certificate validator is used
/// if auto accept is not selected in the configuration.
/// </summary>
private void CertificateValidator_CertificateValidation(CertificateValidator validator, CertificateValidationEventArgs e)
{
if (e.Error.StatusCode == StatusCodes.BadCertificateUntrusted)
{
if (AutoAccept)
{
Console.WriteLine("Accepted Certificate: [{0}] [{1}]", e.Certificate.Subject, e.Certificate.Thumbprint);
e.Accept = true;
return;
}
}
Console.WriteLine("Rejected Certificate: {0} [{1}] [{2}]", e.Error, e.Certificate.Subject, e.Certificate.Thumbprint);
}
/// <summary>
/// Update the session status.
/// </summary>
private void EventStatus(Session session, SessionEventReason reason)
{
m_lastEventTime = DateTime.UtcNow;
PrintSessionStatus(session, reason.ToString());
}
/// <summary>
/// Output the status of a connected session.
/// </summary>
private void PrintSessionStatus(Session session, string reason, bool lastContact = false)
{
lock (session.DiagnosticsLock)
{
StringBuilder item = new StringBuilder();
item.AppendFormat("{0,9}:{1,20}:", reason, session.SessionDiagnostics.SessionName);
if (lastContact)
{
item.AppendFormat("Last Event:{0:HH:mm:ss}", session.SessionDiagnostics.ClientLastContactTime.ToLocalTime());
}
else
{
if (session.Identity != null)
{
item.AppendFormat(":{0,20}", session.Identity.DisplayName);
}
item.AppendFormat(":{0}", session.Id);
}
//Console.WriteLine(item.ToString());
}
}
/// <summary>
/// Status thread, prints connection status every 10 seconds.
/// </summary>
private async Task StatusThreadAsync()
{
while (m_server != null)
{
if (DateTime.UtcNow - m_lastEventTime > TimeSpan.FromMilliseconds(10000))
{
IList<Session> sessions = m_server.CurrentInstance.SessionManager.GetSessions();
for (int ii = 0; ii < sessions.Count; ii++)
{
Session session = sessions[ii];
PrintSessionStatus(session, "-Status-", true);
}
m_lastEventTime = DateTime.UtcNow;
}
await Task.Delay(1000).ConfigureAwait(false);
}
}
#region Private Members
private readonly TextWriter m_output;
private ApplicationInstance m_application;
public T m_server;
private Task m_status;
private DateTime m_lastEventTime;
#endregion
}
}

View File

@ -1,31 +0,0 @@
using Quickstarts;
using Quickstarts.ReferenceServer;
namespace Plugin
{
public class UAService :IDisposable
{
string applicationName = "ConsoleReferenceServer";
string configSectionName = "Quickstarts.ReferenceServer";
public UAServer<ReferenceServer> server = null;
public UAService()
{
server = new UAServer<ReferenceServer>(null)
{
AutoAccept = false,
Password = null
};
server.LoadAsync(applicationName, configSectionName).ConfigureAwait(false);
server.CheckCertificateAsync(false).ConfigureAwait(false);
server.StartAsync().ConfigureAwait(false);
}
public void Dispose()
{
server.StopAsync().ConfigureAwait(false);
}
}
}