MarkoprintWiFi/MkpWIFI/protocol.py

77 lines
2.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Sep 27 14:35:31 2019
@author: Guido Longoni - guidolongoni@gmail.com
"""
import commands as cmd
import types
import copy
import functools
import re
def _cmdString(cmdName, **cmdArgs):
cmdDict = cmd.MKPcommands[cmdName]
if 'args' in cmdDict and cmdDict['args'] is not None:
mandatory_args = set(cmdDict['args'])
else:
mandatory_args = set()
if 'optArgs' in cmdDict and cmdDict['optArgs'] is not None:
optional_args = set(cmdDict['optArgs'])
else:
optional_args = set()
my_args = set(cmdArgs.keys())
if len(my_args) < len(mandatory_args) or my_args.intersection(mandatory_args) != my_args:
raise ValueError('Argomenti obbligatori errati o mancanti.')
if len(my_args-mandatory_args-optional_args) > 0:
raise Warning('Forniti argomenti sconosciuti!')
argDict = { ** { a: cmdArgs[a] for a in mandatory_args},
** { a: cmdArgs[a] if a in my_args else '' for a in optional_args} }
command = cmdDict['command']
arg_groups = [i.group(1) for i in re.finditer(r'[^;]((\{[^}]*\};)+\{[^}]*\})[^;]',command)]
replacements = [(g,';'.join([a.format(**argDict) for a in g.split(';') if len(a.format(**argDict)) > 0])) for g in arg_groups]
command = functools.reduce(lambda x,y: x.replace(y[0],y[1]),replacements,command).format(**argDict)
return str(cmd.DEFprefix['begin'] if cmdDict['defPrefix'] else cmdDict['begin']) \
+ command \
+ str(cmd.DEFprefix['end']
if cmdDict['defPrefix'] else cmdDict['end'])
def _checkError(cmdName, response):
errKey = 'error'
cmdDict = cmd.MKPcommands[cmdName]
if errKey in cmdDict and cmdDict[errKey] is None:
return None
error_codes = re.search(cmdDict[errKey], response, re.I)
if error_codes:
return error_codes.groups()
return None
def checkResponse(cmdName, response):
sucKey = 'success'
cmdDict = cmd.MKPcommands[cmdName]
if sucKey in cmdDict and cmdDict[sucKey] is not None:
if cmdDict[sucKey] == True:
error = _checkError(cmdName, response)
if error is None:
return None
else:
return error
if re.search(cmdDict[sucKey], response, re.I):
return True
error = _checkError(cmdName, response)
if error is None:
raise NotImplementedError(
'Ricevuta risposta sconosciuta: {}'.format(response))
else:
return error
else:
error = _checkError(cmdName, response)
return error
_cloneFcn = lambda f1,f2: types.FunctionType(copy.copy(f1.__code__), copy.copy(f1.__globals__), name=f2, argdefs=copy.copy(f1.__defaults__), closure=copy.copy(f1.__closure__))
for k in cmd.MKPcommands.keys():
globals()[k] = functools.partial(_cloneFcn(_cmdString,k),k)
del k