设备线程内使用ManualResetEvent

This commit is contained in:
iioter 2023-10-21 22:14:18 +08:00
parent 4152b8c598
commit 64e950c52b

View File

@ -23,7 +23,7 @@ namespace Plugin
private Task? _task; private Task? _task;
private readonly DateTime _tsStartDt = new(1970, 1, 1); private readonly DateTime _tsStartDt = new(1970, 1, 1);
private readonly CancellationTokenSource _tokenSource = new CancellationTokenSource(); private readonly CancellationTokenSource _tokenSource = new CancellationTokenSource();
private readonly object _lock = new(); private ManualResetEvent resetEvent = new(true);
public DeviceThread(Device device, IDriver driver, string projectId, MyMqttClient myMqttClient, public DeviceThread(Device device, IDriver driver, string projectId, MyMqttClient myMqttClient,
MqttServer mqttServer, ILogger logger) MqttServer mqttServer, ILogger logger)
@ -68,135 +68,145 @@ namespace Plugin
return; return;
} }
lock (_lock) resetEvent.WaitOne();
try
{ {
try if (driver.IsConnected)
{ {
if (driver.IsConnected) foreach (var deviceVariables in Device.DeviceVariables.Where(x => x.ProtectType != ProtectTypeEnum.WriteOnly).GroupBy(x => x.Alias))
{ {
foreach (var deviceVariables in Device.DeviceVariables.Where(x=>x.ProtectType!= ProtectTypeEnum.WriteOnly).GroupBy(x => x.Alias)) string deviceName = string.IsNullOrWhiteSpace(deviceVariables.Key)
{ ? Device.DeviceName
string deviceName = string.IsNullOrWhiteSpace(deviceVariables.Key) : deviceVariables.Key;
? Device.DeviceName
: deviceVariables.Key;
Dictionary<string, List<PayLoad>> sendModel = new() Dictionary<string, List<PayLoad>> sendModel = new()
{ { deviceName, new() } }; { { deviceName, new() } };
var payLoad = new PayLoad() { Values = new() }; var payLoad = new PayLoad() { Values = new() };
if (deviceVariables.Any()) if (deviceVariables.Any())
{
foreach (var item in deviceVariables.OrderBy(x => x.Index))
{ {
foreach (var item in deviceVariables.OrderBy(x => x.Index)) item.Value = null;
item.CookedValue = null;
item.StatusType = VaribaleStatusTypeEnum.Bad;
Thread.Sleep((int)Device.CmdPeriod);
var ret = new DriverReturnValueModel();
var ioarg = new DriverAddressIoArgModel
{ {
item.Value = null; ID = item.ID,
item.CookedValue = null; Address = item.DeviceAddress,
item.StatusType = VaribaleStatusTypeEnum.Bad; ValueType = item.DataType,
EndianType = item.EndianType
};
var method = Methods.FirstOrDefault(x => x.Name == item.Method);
if (method == null)
ret.StatusType = VaribaleStatusTypeEnum.MethodError;
else
ret = (DriverReturnValueModel)method.Invoke(Driver,
new object[] { ioarg })!;
Thread.Sleep((int)Device.CmdPeriod); item.EnqueueVariable(ret.Value);
if (ret.StatusType == VaribaleStatusTypeEnum.Good &&
!string.IsNullOrWhiteSpace(item.Expressions?.Trim()))
{
var expressionText = DealMysqlStr(item.Expressions)
.Replace("raw",
item.Values[0] is bool
? $"Convert.ToBoolean(\"{item.Values[0]}\")"
: item.Values[0]?.ToString())
.Replace("$ppv",
item.Values[2] is bool
? $"Convert.ToBoolean(\"{item.Values[2]}\")"
: item.Values[2]?.ToString())
.Replace("$pv",
item.Values[1] is bool
? $"Convert.ToBoolean(\"{item.Values[1]}\")"
: item.Values[1]?.ToString());
var ret = new DriverReturnValueModel(); try
var ioarg = new DriverAddressIoArgModel
{ {
ID = item.ID, ret.CookedValue = _interpreter.Eval(expressionText);
Address = item.DeviceAddress,
ValueType = item.DataType,
EndianType = item.EndianType
};
var method = Methods.FirstOrDefault(x => x.Name == item.Method);
if (method == null)
ret.StatusType = VaribaleStatusTypeEnum.MethodError;
else
ret = (DriverReturnValueModel)method.Invoke(Driver,
new object[] { ioarg })!;
item.EnqueueVariable(ret.Value);
if (ret.StatusType == VaribaleStatusTypeEnum.Good &&
!string.IsNullOrWhiteSpace(item.Expressions?.Trim()))
{
var expressionText = DealMysqlStr(item.Expressions)
.Replace("raw",
item.Values[0] is bool
? $"Convert.ToBoolean(\"{item.Values[0]}\")"
: item.Values[0]?.ToString())
.Replace("$ppv",
item.Values[2] is bool
? $"Convert.ToBoolean(\"{item.Values[2]}\")"
: item.Values[2]?.ToString())
.Replace("$pv",
item.Values[1] is bool
? $"Convert.ToBoolean(\"{item.Values[1]}\")"
: item.Values[1]?.ToString());
try
{
ret.CookedValue = _interpreter.Eval(expressionText);
}
catch (Exception)
{
ret.StatusType = VaribaleStatusTypeEnum.ExpressionError;
}
} }
else catch (Exception)
ret.CookedValue = ret.Value;
if(item.IsUpload)
payLoad.Values[item.Name] = ret.CookedValue;
ret.VarId = item.ID;
//变化了才推送到mqttserver用于前端展示
if ((item.Values[1] == null && item.Values[0] != null) ||
(item.Values[1] != null && item.Values[0] != null && item.Values[1].ToString() != item.Values[0].ToString()))
{ {
//这是设备变量列表要用的 ret.StatusType = VaribaleStatusTypeEnum.ExpressionError;
var msgInternal = new InjectedMqttApplicationMessage(
new MqttApplicationMessage()
{
Topic = $"internal/v1/gateway/telemetry/{deviceName}/{item.Name}",
Payload = Encoding.UTF8.GetBytes(JsonUtility.SerializeToJson(ret))
});
mqttServer.InjectApplicationMessage(msgInternal);
//这是在线组态要用的
var msgConfigure = new InjectedMqttApplicationMessage(
new MqttApplicationMessage()
{
Topic = $"v1/gateway/telemetry/{deviceName}/{item.Name}",
Payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(ret.CookedValue))
});
mqttServer.InjectApplicationMessage(msgConfigure);
} }
}
else
ret.CookedValue = ret.Value;
item.Value = ret.Value;
item.CookedValue = ret.CookedValue; if (item.IsUpload)
item.Timestamp = ret.Timestamp; payLoad.Values[item.Name] = ret.CookedValue;
item.StatusType = ret.StatusType;
ret.VarId = item.ID;
//变化了才推送到mqttserver用于前端展示
if ((item.Values[1] == null && item.Values[0] != null) ||
(item.Values[1] != null && item.Values[0] != null && item.Values[1].ToString() != item.Values[0].ToString()))
{
//这是设备变量列表要用的
var msgInternal = new InjectedMqttApplicationMessage(
new MqttApplicationMessage()
{
Topic = $"internal/v1/gateway/telemetry/{deviceName}/{item.Name}",
Payload = Encoding.UTF8.GetBytes(JsonUtility.SerializeToJson(ret))
});
mqttServer.InjectApplicationMessage(msgInternal);
//这是在线组态要用的
var msgConfigure = new InjectedMqttApplicationMessage(
new MqttApplicationMessage()
{
Topic = $"v1/gateway/telemetry/{deviceName}/{item.Name}",
Payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(ret.CookedValue))
});
mqttServer.InjectApplicationMessage(msgConfigure);
} }
payLoad.TS = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds; item.Value = ret.Value;
item.CookedValue = ret.CookedValue;
if (deviceVariables.Where(x=>x.IsUpload&&x.ProtectType!=ProtectTypeEnum.WriteOnly).All(x => x.StatusType == VaribaleStatusTypeEnum.Good)) item.Timestamp = ret.Timestamp;
{ item.StatusType = ret.StatusType;
payLoad.DeviceStatus = DeviceStatusTypeEnum.Good;
sendModel[deviceName] = new List<PayLoad> { payLoad };
myMqttClient
.PublishTelemetryAsync(deviceName,
Device, sendModel).Wait();
}
else if (deviceVariables.Any(x => x.StatusType == VaribaleStatusTypeEnum.Bad))
_myMqttClient?.DeviceDisconnected(deviceName, Device);
} }
payLoad.TS = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds;
if (deviceVariables.Where(x => x.IsUpload && x.ProtectType != ProtectTypeEnum.WriteOnly).All(x => x.StatusType == VaribaleStatusTypeEnum.Good))
{
payLoad.DeviceStatus = DeviceStatusTypeEnum.Good;
sendModel[deviceName] = new List<PayLoad> { payLoad };
myMqttClient
.PublishTelemetryAsync(deviceName,
Device, sendModel).Wait();
}
else if (deviceVariables.Any(x => x.StatusType == VaribaleStatusTypeEnum.Bad))
_myMqttClient?.DeviceDisconnected(deviceName, Device);
} }
//只要有读取异常且连接正常就断开
if (Device.DeviceVariables.Where(x => x.IsUpload && x.ProtectType != ProtectTypeEnum.WriteOnly).Any(x => x.StatusType != VaribaleStatusTypeEnum.Good) && driver.IsConnected)
{
driver.Close();
driver.Dispose();
}
} }
else
//只要有读取异常且连接正常就断开
if (Device.DeviceVariables.Where(x => x.IsUpload && x.ProtectType != ProtectTypeEnum.WriteOnly).Any(x => x.StatusType != VaribaleStatusTypeEnum.Good) && driver.IsConnected)
{
driver.Close();
driver.Dispose();
}
}
else
{
foreach (var deviceVariables in Device.DeviceVariables!.GroupBy(x => x.Alias))
{
string deviceName = string.IsNullOrWhiteSpace(deviceVariables.Key)
? Device.DeviceName
: deviceVariables.Key;
_myMqttClient?.DeviceDisconnected(deviceName, Device);
}
if (driver.Connect())
{ {
foreach (var deviceVariables in Device.DeviceVariables!.GroupBy(x => x.Alias)) foreach (var deviceVariables in Device.DeviceVariables!.GroupBy(x => x.Alias))
{ {
@ -204,26 +214,14 @@ namespace Plugin
? Device.DeviceName ? Device.DeviceName
: deviceVariables.Key; : deviceVariables.Key;
_myMqttClient?.DeviceDisconnected(deviceName, Device); _myMqttClient?.DeviceConnected(deviceName, Device);
}
if (driver.Connect())
{
foreach (var deviceVariables in Device.DeviceVariables!.GroupBy(x => x.Alias))
{
string deviceName = string.IsNullOrWhiteSpace(deviceVariables.Key)
? Device.DeviceName
: deviceVariables.Key;
_myMqttClient?.DeviceConnected(deviceName, Device);
}
} }
} }
} }
catch (Exception ex) }
{ catch (Exception ex)
_logger.LogError(ex, $"线程循环异常,{Device.DeviceName}"); {
} _logger.LogError(ex, $"线程循环异常,{Device.DeviceName}");
} }
@ -255,61 +253,61 @@ namespace Plugin
//执行写入变量RPC //执行写入变量RPC
if (e.Method.ToLower() == "write") if (e.Method.ToLower() == "write")
{ {
lock (_lock) resetEvent.Reset();
bool rpcConnected = false;
//没连接就连接
if (!Driver.IsConnected)
if (Driver.Connect())
rpcConnected = true;
//连接成功就尝试一个一个的写入,注意:目前写入地址和读取地址是相同的对于PLC来说没问题其他的要自己改........
if (Driver.IsConnected)
{ {
bool rpcConnected = false; foreach (var para in e.Params)
//没连接就连接
if (!Driver.IsConnected)
if (Driver.Connect())
rpcConnected = true;
//连接成功就尝试一个一个的写入,注意:目前写入地址和读取地址是相同的对于PLC来说没问题其他的要自己改........
if (Driver.IsConnected)
{ {
foreach (var para in e.Params) //先查配置项,要用到配置的地址、数据类型、方法(方法最主要是用于区分写入数据的辅助判断比如modbus不同的功能码)
{ //先找别名中的变量名,找不到就用设备名
//先查配置项,要用到配置的地址、数据类型、方法(方法最主要是用于区分写入数据的辅助判断比如modbus不同的功能码) DeviceVariable? deviceVariable;
//先找别名中的变量名,找不到就用设备名 if (e.DeviceName == Device.DeviceName)
DeviceVariable? deviceVariable; deviceVariable = Device.DeviceVariables.FirstOrDefault(x =>
if (e.DeviceName == Device.DeviceName) x.Name == para.Key && string.IsNullOrWhiteSpace(x.Alias));
deviceVariable = Device.DeviceVariables.FirstOrDefault(x => else
x.Name == para.Key && string.IsNullOrWhiteSpace(x.Alias)); deviceVariable = Device.DeviceVariables.FirstOrDefault(x =>
else x.Name == para.Key && x.Alias == e.DeviceName);
deviceVariable = Device.DeviceVariables.FirstOrDefault(x =>
x.Name == para.Key && x.Alias == e.DeviceName);
if (deviceVariable != null&& deviceVariable.ProtectType!= ProtectTypeEnum.ReadOnly) if (deviceVariable != null && deviceVariable.ProtectType != ProtectTypeEnum.ReadOnly)
{
DriverAddressIoArgModel ioArgModel = new()
{ {
DriverAddressIoArgModel ioArgModel = new() Address = deviceVariable.DeviceAddress,
{ Value = para.Value,
Address = deviceVariable.DeviceAddress, ValueType = deviceVariable.DataType
Value = para.Value, };
ValueType = deviceVariable.DataType var writeResponse = Driver
}; .WriteAsync(e.RequestId, deviceVariable.Method, ioArgModel).Result;
var writeResponse = Driver rpcResponse.IsSuccess = writeResponse.IsSuccess;
.WriteAsync(e.RequestId, deviceVariable.Method, ioArgModel).Result; if (!writeResponse.IsSuccess)
rpcResponse.IsSuccess = writeResponse.IsSuccess;
if (!writeResponse.IsSuccess)
{
rpcResponse.Description += writeResponse.Description;
}
}
else
{ {
rpcResponse.IsSuccess = false; rpcResponse.Description += writeResponse.Description;
rpcResponse.Description += $"未能找到支持写入的变量:{para.Key},";
} }
} }
else
{
rpcResponse.IsSuccess = false;
rpcResponse.Description += $"未能找到支持写入的变量:{para.Key},";
}
}
if (rpcConnected) if (rpcConnected)
Driver.Close(); Driver.Close();
}
else //连接失败
{
rpcResponse.IsSuccess = false;
rpcResponse.Description = $"{e.DeviceName} 连接失败";
}
} }
else //连接失败
{
rpcResponse.IsSuccess = false;
rpcResponse.Description = $"{e.DeviceName} 连接失败";
}
resetEvent.Set();
} }
//其他RPC TODO //其他RPC TODO
else else