const Schema = require('validate'); const isIp = require('is-ip'); const NodeS7 = require('nodes7'); const slowConnect = 5000; function hop(obj, key) { return Object.prototype.hasOwnProperty.call(obj, key); } function pos_integer(n) { return parseInt(n) == n && n >= 0; } function is_ip(s) { return isIp.version(s) !== undefined; } const plc_schema = new Schema({ ip: { type: String, required: true, use: { is_ip } }, port: { type: Number, required: true, use: { pos_integer } }, rack: { type: Number, required: true, use: { pos_integer } }, slot: { type: Number, required: true, use: { pos_integer } } }); const write_msg = new Schema({ plc: plc_schema, payload: { variable: { type: String, required: true }, value: { required: true } } }); const read_payload = new Schema({ variable: { type: String, required: true }, }); const read_msg = new Schema({ plc: plc_schema, payload: read_payload }); let s7nodes = {}; let s7promises = {}; let s7_read_write_queue = {}; // lista di {msg, resolve, reject} let s7write_lock = {}; // si può scrivere function s7node_from_plc(plc) { if (hop(s7nodes, plc.ip)) { // c'è già un oggetto connessione let s7obj = s7nodes[plc.ip]; if (s7obj.isoConnectionState == 4) { // è connesso return new Promise((resolve, reject) => { resolve(s7obj) }); } else { // non è connesso: rifare delete s7nodes[plc.ip]; return s7node_from_plc(plc); } } else if (hop(s7promises, plc.ip)) { // sta già provando a connettersi ma bisogna aspettare la promise let prom = s7promises[plc.ip]; return prom.then(() => { return s7nodes[plc.ip]; }, () => { return s7node_from_plc(plc) }); } else { let conn_opt = { host: plc.ip, port: plc.port, rack: plc.rack, slot: plc.slot, timeout: plc.timeout || 10000, }; let nuovo = new NodeS7({ silent: !plc.debug, debug: plc.debug || false, }); let prom = new Promise( (resolve, reject) => { setTimeout(() => { nuovo.initiateConnection( conn_opt, (arg1) => { delete s7promises[plc.ip]; if (arg1 !== undefined) { reject(arg1) } else { s7nodes[plc.ip] = nuovo; resolve(nuovo) } }) }, slowConnect); }); s7promises[plc.ip] = prom; return prom; } }; function s7_write(plc, obj, variable, value) { if (!hop(s7_read_write_queue, plc.ip)) { s7_read_write_queue[plc.ip] = []; } let queue_prom = {}; let out = new Promise((resolve, reject) => { queue_prom.resolve = resolve; queue_prom.reject = reject; }); s7_read_write_queue[plc.ip].push({ action: "write", variable, value, reject: queue_prom.reject, resolve: queue_prom.resolve }); s7_read_write_loop(plc.ip, obj); return out; } function s7_read(plc, obj, variable) { if (!hop(s7_read_write_queue, plc.ip)) { s7_read_write_queue[plc.ip] = []; } let queue_prom = {}; let out = new Promise((resolve, reject) => { queue_prom.resolve = resolve; queue_prom.reject = reject; }); s7_read_write_queue[plc.ip].push({ action: "read", variable, reject: queue_prom.reject, resolve: queue_prom.resolve }); s7_read_write_loop(plc.ip, obj); return out; } function s7_read_write_loop(ip, obj) { let q = s7_read_write_queue[ip]; if (!hop(s7write_lock, ip)) { s7write_lock[ip] = false; } if (!s7write_lock[ip] && q.length > 0) { s7write_lock[ip] = true; let item = q.shift(); if (item.action == "write") { let timeout_id = null; let callback = (e) => { if (e) { // questa scrittura è fallita item.reject(e); } else { // questa scrittura è riuscita item.resolve(); } try { clearTimeout(timeout_id); } catch (e) { } s7write_lock[ip] = false; s7_read_write_loop(ip, obj); }; timeout_id = setTimeout(() => callback('timeout error'), 60000); obj.writeItems(item.variable, item.value, callback) } else if (item.action == "read") { obj.removeItems(undefined); // questo elimina tutte obj.addItems(item.variable); let timeout_id = null; let callback = (anyBadQualities, dataObject) => { if (anyBadQualities) { console.error("Can't read variables (" + ip + "): bad qualities", anyBadQualities) item.reject(anyBadQualities); } else { // console.log("letti", dataObject); item.resolve(dataObject[item.variable]); } try { clearTimeout(timeout_id); } catch (e) { } s7write_lock[ip] = false; s7_read_write_loop(ip, obj); }; timeout_id = setTimeout(() => callback('timeout error', null), 60000); obj.readAllItems(callback) } } else { // è già lockato o vuoto } } module.exports = function (RED) { function S7BriqWrite(config) { RED.nodes.createNode(this, config); var node = this; node.on('input', function (msg) { /*try { write_msg.assert(msg); } catch (e) { let errors = write_msg.validate(msg); node.error("Msg non valido: " + errors.map(x => x.path + ": " + x.message).join("\n")); return; }*/ s7node_from_plc(msg.plc).then(s7conn => { return s7_write(msg.plc, s7conn, msg.payload.variable, msg.payload.value); }).then(() => { msg.result = "ok"; node.send(msg); return true; }).catch(err => { msg.result = "err"; msg.payload.err = "(" + msg.plc.ip + ") Impossibile scrivere il valore: " + err; node.send(msg); }); }); } RED.nodes.registerType("s7briqwrite", S7BriqWrite); function S7BriqRead(config) { RED.nodes.createNode(this, config); var node = this; node.on('input', function (msg) { // console.log("input", msg); /*try { read_msg.assert(msg); } catch (e) { let errors = read_msg.validate(msg); node.error("Msg " + JSON.stringify(msg) + " non valido: " + errors.map(x => x.path + ": " + x.message).join("\n")); return; }*/ s7node_from_plc(msg.plc).then(s7conn => { return s7_read(msg.plc, s7conn, msg.payload.variable); }).then((out) => { msg.result = "ok"; msg.payload.value = out; node.send(msg); return true; }).catch(err => { msg.result = "err"; msg.payload.err = "(" + msg.plc.ip + ") Impossibile leggere il valore: " + err; node.send(msg); }); }); } RED.nodes.registerType("s7briqread", S7BriqRead); }