优化rpc;设备在线、离线状态上传

This commit is contained in:
dd 2022-04-15 16:49:58 +08:00
parent ca6f298bf2
commit c916cf1a04
14 changed files with 334 additions and 251 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -11,5 +11,5 @@
<wt:combobox field="Searcher.IsSuccess" empty-text="@Localizer["Sys.All"]" />
</wt:row>
</wt:searchpanel>
<wt:quote>注意不同平台的rpc的topic和payload均不同后期教程会更新到<a href="http://iotgateway.net/" target="_blank">http://iotgateway.net/</a></wt:quote>
<wt:quote>注意不同平台的rpc的topic和payload均不同后期教程会更新到<a href="http://42.193.160.84/" target="_blank">http://iotgateway.net/</a></wt:quote>
<wt:grid vm="@Model" url="/Rpc/RpcLog/Search" hidden-checkbox="true" hidden-grid-index="true" />

View File

@ -69,7 +69,7 @@
<span class="login-error">@Model.MSD.GetFirstError()</span>
<button type="submit" class="login-button" style="cursor:pointer">@Model.Localizer["Login.Login"]</button>
<div class="login-button" style="cursor:pointer">
<a href="http://iotgateway.net/" target="_blank">跳转教程文档,持续更新,收藏不迷路</a>
<a href="http://42.193.160.84/" target="_blank">跳转教程文档,持续更新,收藏不迷路</a>
</div>
@*<wt:linkbutton window-title="@Model.Localizer["Login.Register"]" class="login-button" style="cursor:pointer" target="ButtonTargetEnum.Layer" window-width="500" window-height="500" url="/Login/Reg" text="@Model.Localizer["Login.Register"]" />*@

Binary file not shown.

View File

@ -21,7 +21,6 @@ namespace DriverModbusMaster
private SerialPort port = null;
private Modbus.Device.ModbusMaster master = null;
private SerialPortAdapter adapter = null;
private object locker=new object();
#region
[ConfigParameter("设备Id")]
@ -198,8 +197,6 @@ namespace DriverModbusMaster
[Method("功能码:03", description: "ReadHoldingRegisters读保持寄存器")]
public DriverReturnValueModel ReadHoldingRegisters(DriverAddressIoArgModel ioarg)
{
lock (locker)
{
DriverReturnValueModel ret = new();
try
@ -221,13 +218,9 @@ namespace DriverModbusMaster
return ret;
}
}
[Method("功能码:04", description: "ReadHoldingRegisters读输入寄存器")]
public DriverReturnValueModel ReadInputRegisters(DriverAddressIoArgModel ioarg)
{
lock (locker)
{
DriverReturnValueModel ret = new();
try
@ -249,13 +242,9 @@ namespace DriverModbusMaster
return ret;
}
}
[Method("功能码:01", description: "ReadCoil读线圈")]
public DriverReturnValueModel ReadCoil(DriverAddressIoArgModel ioarg)
{
lock (locker)
{
DriverReturnValueModel ret = new();
try
@ -289,12 +278,8 @@ namespace DriverModbusMaster
return ret;
}
}
[Method("功能码:02", description: "ReadInput读输入")]
public DriverReturnValueModel ReadInput(DriverAddressIoArgModel ioarg)
{
lock (locker)
{
DriverReturnValueModel ret = new();
try
@ -328,8 +313,6 @@ namespace DriverModbusMaster
return ret;
}
}
[Method("Read方法样例", description: "Read方法样例描述")]
public DriverReturnValueModel Read(DriverAddressIoArgModel ioarg)
@ -462,8 +445,6 @@ namespace DriverModbusMaster
}
public async Task<RpcResponse> WriteAsync(string RequestId, string Method, DriverAddressIoArgModel Ioarg)
{
lock (locker)
{
RpcResponse rpcResponse = new() { IsSuccess = false };
try
@ -498,8 +479,6 @@ namespace DriverModbusMaster
}
return rpcResponse;
}
}
}
public enum PLC_TYPE
{

View File

@ -37,7 +37,6 @@ namespace Plugin
_DrvierManager = drvierManager;
_MyMqttClient = myMqttClient;
_MqttServer = mqttServer;
try
{
using (var DC = new DataContext(connnectSetting, DBType))

View File

@ -1,19 +1,11 @@
using Microsoft.EntityFrameworkCore;
using PluginInterface;
using System;
using System.Collections.Generic;
using System.Linq;
using PluginInterface;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using IoTGateway.DataAccess;
using IoTGateway.Model;
using WalkingTec.Mvvm.Core;
using DynamicExpresso;
using MQTTnet.Server;
using Newtonsoft.Json;
using Microsoft.Extensions.Logging;
using PluginInterface.IoTSharp;
namespace Plugin
{
@ -29,7 +21,8 @@ namespace Plugin
private DateTime TsStartDt = new DateTime(1970, 1, 1);
private CancellationTokenSource tokenSource = new CancellationTokenSource();
private Interpreter Interpreter = null;
private object _lock = new object();
private bool lastConnected = false;
public DeviceThread(Device device, IDriver driver, string ProjectId, MyMqttClient myMqttClient, Interpreter interpreter, IMqttServer mqttServer, ILogger logger)
{
_myMqttClient = myMqttClient;
@ -59,6 +52,7 @@ namespace Plugin
task = Task.Run(() =>
{
Thread.Sleep(5000);
while (true)
{
if (tokenSource.IsCancellationRequested)
@ -67,6 +61,8 @@ namespace Plugin
return;
}
lock (_lock)
{
try
{
Dictionary<string, List<PayLoad>> sendModel = new() { { _device.DeviceName, new() } };
@ -122,7 +118,7 @@ namespace Plugin
DeviceValues[item.ID] = ret;
}
payLoad.TS = (long)(DateTime.Now - TsStartDt).TotalMilliseconds;
payLoad.TS = (long)(DateTime.UtcNow - TsStartDt).TotalMilliseconds;
if (DeviceValues.Any(x => x.Value.Value == null))
{
@ -140,19 +136,28 @@ namespace Plugin
}
else
{
driver.Connect();
if (driver.Connect())
_myMqttClient.DeviceConnected(_device.DeviceName);
else if (lastConnected)
{
lastConnected = false;
_myMqttClient.DeviceDisconnected(_device.DeviceName);
}
}
}
catch (Exception ex)
{
_logger.LogError($"线程循环异常,{_device.DeviceName}", ex);
}
}
Thread.Sleep((int)_driver.MinPeriod);
}
});
}
else
_myMqttClient.DeviceDisconnected(_device.DeviceName);
}
private void MyMqttClient_OnExcRpc(object? sender, RpcRequest e)
@ -173,9 +178,13 @@ namespace Plugin
//执行写入变量RPC
if (e.Method.ToLower() == "write")
{
lock (_lock)
{
bool RpcConnected = false;
//没连接就连接
if (!_driver.IsConnected)
_driver.Connect();
if (_driver.Connect())
RpcConnected = true;
//连接成功就尝试一个一个的写入,注意:目前写入地址和读取地址是相同的对于PLC来说没问题其他的要自己改........
if (_driver.IsConnected)
@ -207,6 +216,8 @@ namespace Plugin
break;
}
}
if (RpcConnected)
_driver.Close();
}
else//连接失败
@ -215,6 +226,8 @@ namespace Plugin
rpcResponse.Description = $"{e.DeviceName} 连接失败";
}
}
}
//其他RPC TODO
else
{
@ -227,7 +240,7 @@ namespace Plugin
//纪录入库
rpcLog.IsSuccess = rpcResponse.IsSuccess;
rpcLog.Description = rpcResponse.Description;
rpcLog.EndTime=DateTime.Now;
rpcLog.EndTime = DateTime.Now;
using (var DC = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DBType))
@ -241,6 +254,7 @@ namespace Plugin
public void StopThread()
{
_logger.LogInformation($"线程停止:{_device.DeviceName}");
_myMqttClient.DeviceDisconnected(_device.DeviceName);
if (task != null)
{
_myMqttClient.OnExcRpc -= MyMqttClient_OnExcRpc;

View File

@ -101,21 +101,39 @@ namespace Plugin
}
}
private readonly string tbRpcTopic = "v1/gateway/rpc";
private void OnConnected()
{
//v1/gateway/attributes
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
//v1/gateway/rpc
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
//{"device": "Device A", "data": {"id": $request_id, "method": "toggle_gpio", "params": {"pin":1}}}
Client.SubscribeAsync(tbRpcTopic, MqttQualityOfServiceLevel.ExactlyOnce);//tb server side rpc
Client.SubscribeAsync($"devices/+/rpc/request/+/+", MqttQualityOfServiceLevel.ExactlyOnce);
Client.SubscribeAsync($"devices/Modbus/attributes/update/", MqttQualityOfServiceLevel.ExactlyOnce);
Client.SubscribeAsync($"devices/+/attributes/response/+", MqttQualityOfServiceLevel.ExactlyOnce);
Client.SubscribeAsync(tbRpcTopic, MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"id": $request_id, "device": "Device A", "value": "value1"}
Client.SubscribeAsync("v1/gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
Client.SubscribeAsync("v1/gateway/attributes", MqttQualityOfServiceLevel.ExactlyOnce);
break;
case IoTPlatformType.IoTSharp:
Client.SubscribeAsync("devices/+/rpc/response/+/+", MqttQualityOfServiceLevel.ExactlyOnce);
Client.SubscribeAsync("devices/+/attributes/update", MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
Client.SubscribeAsync("devices/+/attributes/response/+", MqttQualityOfServiceLevel.ExactlyOnce);
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
default:
break;
}
_logger.LogInformation($"MQTT CONNECTED WITH SERVER ");
}
private readonly string tbRpcTopic = @"v1/gateway/rpc";
private Task Client_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
@ -299,18 +317,47 @@ namespace Plugin
}
public Task RequestAttributes(string _device, bool anySide, params string[] args)
public Task RequestAttributes(string _devicename, bool anySide, params string[] args)
{
try
{
//Topic: v1/gateway/attributes/request
// Message: { "id": $request_id, "device": "Device A", "client": true, "key": "attribute1"}
//Topic: v1/gateway/attributes/response
//Message: {"id": $request_id, "device": "Device A", "value": "value1"}
string id = Guid.NewGuid().ToString();
string topic = $"devices/{_device}/attributes/request/{id}";
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
//{"id": $request_id, "device": "Device A", "client": true, "key": "attribute1"}
Dictionary<string, object> tbRequestData = new Dictionary<string, object>
{
{ "id",id},
{ "device",_devicename},
{ "client",true},
{ "key",args[0]}
};
return Client.PublishAsync("v1/gateway/attributes/request", JsonConvert.SerializeObject(tbRequestData), MqttQualityOfServiceLevel.ExactlyOnce);
case IoTPlatformType.IoTSharp:
string topic = $"devices/{_devicename}/attributes/request/{id}";
Dictionary<string, string> keys = new Dictionary<string, string>();
keys.Add(anySide ? "anySide" : "server", string.Join(",", args));
Client.SubscribeAsync($"devices/{_device}/attributes/response/{id}", MqttQualityOfServiceLevel.ExactlyOnce);
return Client.PublishAsync(topic, Newtonsoft.Json.JsonConvert.SerializeObject(keys), MqttQualityOfServiceLevel.ExactlyOnce);
Client.SubscribeAsync($"devices/{_devicename}/attributes/response/{id}", MqttQualityOfServiceLevel.ExactlyOnce);
return Client.PublishAsync(topic, JsonConvert.SerializeObject(keys), MqttQualityOfServiceLevel.ExactlyOnce);
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
default:
break;
}
}
catch (Exception ex)
{
_logger.LogError($"RequestAttributes:{_devicename}", ex);
}
return Task.CompletedTask;
}
public void PublishTelemetry(Device device, Dictionary<string, List<PayLoad>> SendModel)
@ -353,16 +400,60 @@ namespace Plugin
}
public void DeviceConnected()
public async Task DeviceConnected(string DeviceName)
{
//Topic: v1/gateway/connect
//Message: { "device":"Device A"}
try
{
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
case IoTPlatformType.IoTSharp:
await Client.PublishAsync("v1/gateway/connect", JsonConvert.SerializeObject(new Dictionary<string, string> { { "device", DeviceName } }));
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
default:
break;
}
}
catch (Exception ex)
{
_logger.LogError($"DeviceConnected:{DeviceName}", ex);
}
}
public void DeviceDisconnected()
public async Task DeviceDisconnected(string DeviceName)
{
//Topic: v1/gateway/connect
//Message: { "device":"Device A"}
try
{
switch (_systemConfig.IoTPlatformType)
{
case IoTPlatformType.ThingsBoard:
case IoTPlatformType.IoTSharp:
await Client.PublishAsync("v1/gateway/disconnect", JsonConvert.SerializeObject(new Dictionary<string, string> { { "device", DeviceName } }));
break;
case IoTPlatformType.AliCloudIoT:
break;
case IoTPlatformType.TencentIoTHub:
break;
case IoTPlatformType.BaiduIoTCore:
break;
case IoTPlatformType.OneNET:
break;
default:
break;
}
}
catch (Exception ex)
{
_logger.LogError($"DeviceDisconnected:{DeviceName}", ex);
}
}

Binary file not shown.