iotgateway/Plugins/Plugin/DeviceThread.cs

402 lines
18 KiB
C#
Raw Normal View History

using PluginInterface;
2021-12-12 06:55:48 +00:00
using System.Reflection;
2022-08-23 08:29:29 +00:00
using System.Text;
2021-12-12 06:55:48 +00:00
using IoTGateway.DataAccess;
using IoTGateway.Model;
using DynamicExpresso;
using MQTTnet.Server;
using Newtonsoft.Json;
2022-03-24 13:38:11 +00:00
using Microsoft.Extensions.Logging;
2022-08-23 08:29:29 +00:00
using MQTTnet;
2021-12-12 06:55:48 +00:00
namespace Plugin
{
public class DeviceThread : IDisposable
{
2023-11-09 05:35:07 +00:00
private readonly MqttServer _mqttServer;
2022-03-24 13:38:11 +00:00
private readonly ILogger _logger;
2022-08-10 08:55:44 +00:00
public readonly Device Device;
public readonly IDriver Driver;
private readonly string _projectId;
private readonly MyMqttClient? _myMqttClient;
private Interpreter? _interpreter;
public Dictionary<Guid, DriverReturnValueModel> DeviceValues { get; set; } = new();
2022-08-10 08:55:44 +00:00
internal List<MethodInfo>? Methods { get; set; }
private Task? _task;
private readonly DateTime _tsStartDt = new(1970, 1, 1);
private readonly CancellationTokenSource _tokenSource = new CancellationTokenSource();
2023-10-21 14:14:18 +00:00
private ManualResetEvent resetEvent = new(true);
2022-08-10 08:55:44 +00:00
public DeviceThread(Device device, IDriver driver, string projectId, MyMqttClient myMqttClient,
2022-08-23 08:29:29 +00:00
MqttServer mqttServer, ILogger logger)
2021-12-12 06:55:48 +00:00
{
2022-04-13 09:01:24 +00:00
_myMqttClient = myMqttClient;
_myMqttClient.OnExcRpc += MyMqttClient_OnExcRpc;
2022-08-10 08:55:44 +00:00
Device = device;
Driver = driver;
_projectId = projectId;
_interpreter = new Interpreter();
2022-03-24 13:38:11 +00:00
_logger = logger;
2023-11-09 05:35:07 +00:00
_mqttServer = mqttServer;
2022-08-10 08:55:44 +00:00
Methods = Driver.GetType().GetMethods().Where(x => x.GetCustomAttribute(typeof(MethodAttribute)) != null)
.ToList();
if (Device.AutoStart)
2021-12-12 06:55:48 +00:00
{
2022-08-10 08:55:44 +00:00
_logger.LogInformation($"线程已启动:{Device.DeviceName}");
2021-12-12 06:55:48 +00:00
2022-08-10 08:55:44 +00:00
if (Device.DeviceVariables != null)
2021-12-12 06:55:48 +00:00
{
2022-08-10 08:55:44 +00:00
foreach (var item in Device.DeviceVariables)
2023-11-09 05:35:07 +00:00
{
item.StatusType = VaribaleStatusTypeEnum.Bad;
2023-11-09 05:35:07 +00:00
if (string.IsNullOrWhiteSpace(item.Alias))
item.Alias = string.Empty;
}
}
CreateThread().Wait();
}
}
public async Task CreateThread()
{
_task = await Task.Factory.StartNew(async () =>
{
await Task.Delay(5000);
//上传客户端属性
foreach (var deviceVariables in Device.DeviceVariables!.GroupBy(x => x.Alias))
{
_myMqttClient.UploadAttributeAsync(string.IsNullOrWhiteSpace(deviceVariables.Key)
? Device.DeviceName
: deviceVariables.Key,
Device.DeviceConfigs.Where(x => x.DataSide == DataSide.ClientSide || x.DataSide == DataSide.AnySide)
.ToDictionary(x => x.DeviceConfigName, x => x.Value));
2021-12-12 06:55:48 +00:00
}
2022-05-09 15:57:46 +00:00
2023-11-09 05:35:07 +00:00
while (true)
{
2023-11-09 05:35:07 +00:00
if (_tokenSource.IsCancellationRequested)
{
2023-11-09 05:35:07 +00:00
_logger.LogInformation($"停止线程:{Device.DeviceName}");
return;
}
2022-05-06 07:56:05 +00:00
2023-11-09 05:35:07 +00:00
resetEvent.WaitOne();
try
2022-05-09 15:57:46 +00:00
{
2023-11-09 05:35:07 +00:00
if (Driver.IsConnected)
2022-05-09 15:57:46 +00:00
{
2023-11-09 05:35:07 +00:00
foreach (var deviceVariables in Device.DeviceVariables.Where(x => x.ProtectType != ProtectTypeEnum.WriteOnly).GroupBy(x => x.Alias))
2022-05-09 15:57:46 +00:00
{
2023-11-09 05:35:07 +00:00
string deviceName = string.IsNullOrWhiteSpace(deviceVariables.Key)
? Device.DeviceName
: deviceVariables.Key;
2022-10-12 06:35:45 +00:00
2023-11-09 05:35:07 +00:00
Dictionary<string, List<PayLoad>> sendModel = new()
{ { deviceName, new() } };
2023-11-09 05:35:07 +00:00
if (deviceVariables.Any())
{
var payLoadTrigger = new PayLoad() { Values = new() };
2023-10-21 14:14:18 +00:00
bool canPub = false;
var triggerVariables = deviceVariables.Where(x => x.IsTrigger).ToList();
ReadVariables(ref triggerVariables, ref payLoadTrigger, _mqttServer);
2023-10-21 14:14:18 +00:00
var triggerValues = DeviceValues.Where(x =>
triggerVariables.Select(x => x.ID).Contains(x.Key))
.ToDictionary(
x => Device.DeviceVariables.FirstOrDefault(y => y.ID == x.Key).Name,
z => z.Value.CookedValue);
var payLoadUnTrigger = new PayLoad() { Values = new() };
//有需要上传 或者全部是非触发
if (triggerValues.Values.Any(x => x is true) || !triggerVariables.Any())
{
var variables = Device.DeviceVariables.Where(x => !triggerVariables.Select(y => y.ID).Contains(x.ID)).ToList();
ReadVariables(ref variables, ref payLoadUnTrigger, _mqttServer);
canPub = true;
2022-05-09 15:57:46 +00:00
}
2023-10-21 14:14:18 +00:00
if (canPub)
2023-11-09 05:35:07 +00:00
{
var payLoad = new PayLoad()
{
Values = DeviceValues
.Where(x => x.Value.StatusType == VaribaleStatusTypeEnum.Good &&
deviceVariables.Where(x => x.IsUpload).Select(x => x.ID)
.Contains(x.Key))
.ToDictionary(kv => deviceVariables.First(x => x.ID == kv.Key).Name,
kv => kv.Value.CookedValue),
DeviceStatus = payLoadTrigger.DeviceStatus
};
payLoad.TS = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds;
2023-11-09 05:35:07 +00:00
payLoad.DeviceStatus = DeviceStatusTypeEnum.Good;
sendModel[deviceName] = new List<PayLoad> { payLoad };
_myMqttClient
.PublishTelemetryAsync(deviceName,
Device, sendModel).Wait();
}
if (deviceVariables.Any(x => x.StatusType == VaribaleStatusTypeEnum.Bad))
2023-11-09 05:35:07 +00:00
_myMqttClient?.DeviceDisconnected(deviceName, Device);
2023-10-21 14:14:18 +00:00
}
2023-11-09 05:35:07 +00:00
2023-10-21 14:14:18 +00:00
}
2023-11-09 05:35:07 +00:00
//只要有读取异常且连接正常就断开
if (DeviceValues
.Any(x => x.Value.StatusType != VaribaleStatusTypeEnum.Good)&& Driver.IsConnected)
2023-11-09 05:35:07 +00:00
{
Driver.Close();
Driver.Dispose();
}
}
else
{
foreach (var deviceVariables in Device.DeviceVariables!.GroupBy(x => x.Alias))
{
string deviceName = string.IsNullOrWhiteSpace(deviceVariables.Key)
? Device.DeviceName
: deviceVariables.Key;
_myMqttClient?.DeviceDisconnected(deviceName, Device);
}
if (Driver.Connect())
2023-10-21 14:14:18 +00:00
{
foreach (var deviceVariables in Device.DeviceVariables!.GroupBy(x => x.Alias))
{
string deviceName = string.IsNullOrWhiteSpace(deviceVariables.Key)
? Device.DeviceName
: deviceVariables.Key;
2023-11-09 05:35:07 +00:00
_myMqttClient?.DeviceConnected(deviceName, Device);
2022-05-09 15:57:46 +00:00
}
}
2023-10-21 14:14:18 +00:00
}
2023-11-09 05:35:07 +00:00
}
catch (Exception ex)
{
_logger.LogError(ex, $"线程循环异常,{Device.DeviceName}");
}
2022-05-09 15:57:46 +00:00
2023-11-09 05:35:07 +00:00
await Task.Delay(Device.DeviceVariables!.Any() ? (int)Driver.MinPeriod : 10000);
}
}, TaskCreationOptions.LongRunning);
2021-12-12 06:55:48 +00:00
}
private void ReadVariables(ref List<DeviceVariable> variables, ref PayLoad payLoad, MqttServer mqttServer)
{
if (!variables.Any())
return;
foreach (var item in variables.OrderBy(x => x.Index))
{
var ret = new DriverReturnValueModel();
var ioarg = new DriverAddressIoArgModel
{
ID = item.ID,
Address = item.DeviceAddress,
ValueType = item.DataType
};
var method = Methods.Where(x => x.Name == item.Method).FirstOrDefault();
if (method == null)
ret.StatusType = VaribaleStatusTypeEnum.MethodError;
else
ret = (DriverReturnValueModel)method.Invoke(Driver,
new object[] { ioarg })!;
ret.Timestamp = DateTime.Now;
item.EnqueueVariable(ret.Value);
if (ret.StatusType == VaribaleStatusTypeEnum.Good &&
!string.IsNullOrWhiteSpace(item.Expressions?.Trim()))
{
var expressionText = DealMysqlStr(item.Expressions)
.Replace("raw",
item.Values[0] is bool
? $"Convert.ToBoolean(\"{item.Values[0]}\")"
: item.Values[0]?.ToString())
.Replace("$ppv",
item.Values[2] is bool
? $"Convert.ToBoolean(\"{item.Values[2]}\")"
: item.Values[2]?.ToString())
.Replace("$pv",
item.Values[1] is bool
? $"Convert.ToBoolean(\"{item.Values[1]}\")"
: item.Values[1]?.ToString());
try
{
ret.CookedValue = _interpreter.Eval(expressionText);
}
catch (Exception)
{
ret.Message = $"表达式错误:{expressionText}";
ret.StatusType = VaribaleStatusTypeEnum.ExpressionError;
}
}
else
ret.CookedValue = ret.Value;
item.EnqueueCookedVariable(ret.CookedValue);
payLoad.Values[item.Name] = ret.CookedValue;
ret.VarId = item.ID;
//变化了才推送到mqttserver用于前端展示
if (JsonConvert.SerializeObject(item.Values[1]) != JsonConvert.SerializeObject(item.Values[0])|| JsonConvert.SerializeObject(item.CookedValues[1]) != JsonConvert.SerializeObject(item.CookedValues[0]))
{
var msgInternal = new InjectedMqttApplicationMessage(
new MqttApplicationMessage()
{
Topic =
$"internal/v1/gateway/telemetry/{Device.DeviceName}/{item.Name}",
PayloadSegment = Encoding.UTF8.GetBytes(
JsonConvert.SerializeObject(ret))
});
mqttServer.InjectApplicationMessage(msgInternal);
}
DeviceValues[item.ID] = ret;
Thread.Sleep((int)Device.CmdPeriod);
}
}
2022-05-09 15:57:46 +00:00
public void MyMqttClient_OnExcRpc(object? sender, RpcRequest e)
2022-04-13 09:01:24 +00:00
{
//设备名或者设备别名
if (e.DeviceName == Device.DeviceName || Device.DeviceVariables.Select(x => x.Alias).Contains(e.DeviceName))
2022-04-13 09:01:24 +00:00
{
RpcLog rpcLog = new RpcLog()
{
2022-08-10 08:55:44 +00:00
DeviceId = Device.ID,
2022-04-13 09:01:24 +00:00
StartTime = DateTime.Now,
Method = e.Method,
RpcSide = RpcSide.ServerSide,
Params = JsonConvert.SerializeObject(e.Params)
};
_logger.LogInformation($"{e.DeviceName}收到RPC,{e}");
2022-08-10 08:55:44 +00:00
RpcResponse rpcResponse = new()
2023-11-09 05:35:07 +00:00
{ DeviceName = e.DeviceName, RequestId = e.RequestId, IsSuccess = false, Method = e.Method };
2022-04-13 09:01:24 +00:00
//执行写入变量RPC
if (e.Method.ToLower() == "write")
{
2023-10-21 14:14:18 +00:00
resetEvent.Reset();
bool rpcConnected = false;
//没连接就连接
if (!Driver.IsConnected)
if (Driver.Connect())
rpcConnected = true;
//连接成功就尝试一个一个的写入,注意:目前写入地址和读取地址是相同的对于PLC来说没问题其他的要自己改........
if (Driver.IsConnected)
2022-04-13 09:01:24 +00:00
{
2023-10-21 14:14:18 +00:00
foreach (var para in e.Params)
2022-04-13 09:01:24 +00:00
{
2023-10-21 14:14:18 +00:00
//先查配置项,要用到配置的地址、数据类型、方法(方法最主要是用于区分写入数据的辅助判断比如modbus不同的功能码)
//先找别名中的变量名,找不到就用设备名
DeviceVariable? deviceVariable;
if (e.DeviceName == Device.DeviceName)
deviceVariable = Device.DeviceVariables.FirstOrDefault(x =>
x.Name == para.Key && string.IsNullOrWhiteSpace(x.Alias));
else
deviceVariable = Device.DeviceVariables.FirstOrDefault(x =>
x.Name == para.Key && x.Alias == e.DeviceName);
if (deviceVariable != null && deviceVariable.ProtectType != ProtectTypeEnum.ReadOnly)
2022-04-13 09:01:24 +00:00
{
2023-10-21 14:14:18 +00:00
DriverAddressIoArgModel ioArgModel = new()
2022-04-13 09:01:24 +00:00
{
2023-10-21 14:14:18 +00:00
Address = deviceVariable.DeviceAddress,
Value = para.Value,
2024-01-02 06:38:48 +00:00
ValueType = deviceVariable.DataType,
EndianType = deviceVariable.EndianType
2023-10-21 14:14:18 +00:00
};
var writeResponse = Driver
.WriteAsync(e.RequestId, deviceVariable.Method, ioArgModel).Result;
rpcResponse.IsSuccess = writeResponse.IsSuccess;
if (!writeResponse.IsSuccess)
2022-04-13 09:01:24 +00:00
{
2023-10-21 14:14:18 +00:00
rpcResponse.Description += writeResponse.Description;
2022-04-13 09:01:24 +00:00
}
}
2023-10-21 14:14:18 +00:00
else
{
rpcResponse.IsSuccess = false;
rpcResponse.Description += $"未能找到支持写入的变量:{para.Key},";
}
}
2023-10-21 14:14:18 +00:00
if (rpcConnected)
Driver.Close();
}
else //连接失败
{
rpcResponse.IsSuccess = false;
rpcResponse.Description = $"{e.DeviceName} 连接失败";
2022-04-13 09:01:24 +00:00
}
2023-10-21 14:14:18 +00:00
resetEvent.Set();
2022-04-13 09:01:24 +00:00
}
//其他RPC TODO
else
{
rpcResponse.IsSuccess = false;
rpcResponse.Description = $"方法:{e.Method}暂未实现";
}
//反馈RPC
_myMqttClient.ResponseRpcAsync(rpcResponse).Wait();
2022-04-13 09:01:24 +00:00
//纪录入库
rpcLog.IsSuccess = rpcResponse.IsSuccess;
rpcLog.Description = rpcResponse.Description;
rpcLog.EndTime = DateTime.Now;
2022-04-13 09:01:24 +00:00
2022-10-13 00:44:33 +00:00
using var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType);
dc.Set<RpcLog>().Add(rpcLog);
dc.SaveChanges();
2022-04-13 09:01:24 +00:00
}
}
2021-12-12 06:55:48 +00:00
public void StopThread()
{
2022-08-10 08:55:44 +00:00
_logger.LogInformation($"线程停止:{Device.DeviceName}");
2023-11-09 05:35:07 +00:00
if (Device.DeviceVariables != null && Device.DeviceVariables.Any())
2021-12-12 06:55:48 +00:00
{
foreach (var deviceVariables in Device.DeviceVariables.GroupBy(x => x.Alias))
{
string deviceName = string.IsNullOrWhiteSpace(deviceVariables.Key)
? Device.DeviceName
: deviceVariables.Key;
_myMqttClient?.DeviceDisconnected(deviceName, Device);
}
2021-12-12 06:55:48 +00:00
}
if (_task == null) return;
if (_myMqttClient != null) _myMqttClient.OnExcRpc -= MyMqttClient_OnExcRpc;
_tokenSource.Cancel();
Driver.Close();
2021-12-12 06:55:48 +00:00
}
public void Dispose()
{
2022-08-10 08:55:44 +00:00
Driver.Dispose();
_interpreter = null;
Methods = null;
2022-08-10 08:55:44 +00:00
_logger.LogInformation($"线程释放,{Device.DeviceName}");
2021-12-12 06:55:48 +00:00
}
//mysql会把一些符号转义没找到原因先临时处理下
2022-08-10 08:55:44 +00:00
private string DealMysqlStr(string expression)
{
2022-08-10 08:55:44 +00:00
return expression.Replace("&lt;", "<").Replace("&gt;", ">").Replace("&amp;", "&").Replace("&quot;", "\"");
}
2021-12-12 06:55:48 +00:00
}
2022-08-10 08:55:44 +00:00
}