J'ai un programme python qui possède plusieurs fonctions. Dans une de celles-ci, je dois faire une lecture du port série et réaliser quelques actions (pour remettre en forme les données reçues). Cependant, la fonction prend de temps en temps beaucoup plus de temps que prévu et donc pour y remédier, j'avais pensé mettre un genre de TimeOut sur la fonction mais cela n'existe (d'après ce que j'ai vu) pas tel quel et les méthodes que j'ai vues me retournent des erreurs. Voici le dernier essai:
#!/usr/bin/python
"""
##############################################################################
Importation des librairies
##############################################################################
"""
import array
import serial
import signal
import sys
import thread
import time
from multiprocessing import Process, Queue
#Fonction permettant d'envoyer les informations au DMX
from ola.ClientWrapper import ClientWrapper
from threading import Timer
"""
##############################################################################
Declaration des variables
##############################################################################
"""
#-----------------------------------------------------------------------------
# Initialisation des donnees pour la communication du micro-spectrophotometre
#-----------------------------------------------------------------------------
spectro_port = "/dev/ttyACM1"
spectro_baudrate = 57600
spectro_ser = serial.Serial(spectro_port, spectro_baudrate, timeout = 0.5)
#-----------------------------------------------------------------------------
# Initialisation des variables pour le micro-spectrophotometre
#-----------------------------------------------------------------------------
spectro_tab = ''
spectro_ancienTabWatt = []
spectro_ancienTabNm = []
spectro_ancienTabBrut = []
#-----------------------------------------------------------------------------
# Configuration de la communication avec l'interface USB/DMX
#-----------------------------------------------------------------------------
dmx_universe = 1
dmx_wrapper = ClientWrapper()
#-----------------------------------------------------------------------------
# Initialisation des variables pour le DMX
#-----------------------------------------------------------------------------
dmx_data = array.array('B', [255]*512)
dmx_suite = -1
nb_couleur = 5
"""
##############################################################################
Declaration des fonctions
##############################################################################
"""
class TimeoutException(Exception):
pass
def timeout():
thread.interrupt_main()
def deadline(timeout, *args):
def decorate(f):
def handler(signum, frame):
raise TimeoutException()
def new_f(*args):
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout)
res = f(*args)
signal.alarm(0)
new_f.__name__ = f.__name__
return new_f
return decorate
#-----------------------------------------------------------------------------
# Envoi des donnees a l'interface USB/DMX
#-----------------------------------------------------------------------------
def DmxSent(state):
dmx_wrapper.Stop()
def sendDataDMX(data):
dmx_client = dmx_wrapper.Client()
dmx_client.SendDmx(dmx_universe, data, DmxSent)
dmx_wrapper.Run()
return
#-----------------------------------------------------------------------------
# Reception donnees micro-spectrophotometre
#-----------------------------------------------------------------------------
def spectro_send(spectro_msg):
spectro_endValue = chr(0x0D) + chr(0x0A)
spectro_send = spectro_msg + spectro_endValue
spectro_ser.write(bytes(spectro_send.encode()))
return
@deadline(10)
def spectro_receive(mode):
if mode == "changementTemps":
spectro_ser.reset_input_buffer()
spectro_ser.reset_output_buffer()
time.sleep(1)
spectro_ser.flushInput()
spectro_ser.flushOutput()
time.sleep(1)
elif mode == "mesure":
#Reception des donnees jusqu'a avoir "---\r\n"
spectro_tab = ''
while True:
try:
spectro_lecture = str(spectro_ser.read(10))
spectro_tab += spectro_lecture
if "---\r\n" in spectro_lecture:
break
except timeout:
print("TimeOut")
#Suppression de certain caracteres dans le tableau
spectro_tab = spectro_tab.replace('-', '')
spectro_tab = spectro_tab.replace('b', '')
spectro_tab = spectro_tab.replace(chr(0x27), '')
#Selection et tri des informations necessaires dans les donnees
spectro_debut = spectro_tab.find("metre") + 6
spectro_fin = spectro_tab.find("\r\n\r\n") + 4
i = spectro_debut + 1
spectro_tab2 = ''
while i < spectro_fin:
spectro_tab2 += spectro_tab[i]
i += 1
spectro_tab2 = spectro_tab2.replace('\r', " ")
spectro_tab2 = spectro_tab2.replace('\n', '')
spectro_tab2 = spectro_tab2.split(" ")
spectro_tabValue = []
i = 0
while i < len(spectro_tab2):
if spectro_tab2[i] != '':
spectro_tabValue.append(spectro_tab2[i])
i += 1
print(spectro_tabValue)
erreur = False
#Mise en tableau des donnees suivant leur utilite
i = 0
spectro_tabNm = []
spectro_tabBrut = []
spectro_tabWatt = []
spectro_whichTab = 1
if "OVF" in spectro_tabValue:
print("OVF detected")
erreur = True
else:
while i < len(spectro_tabValue):
if spectro_whichTab == 4:
spectro_whichTab = 1
if spectro_whichTab == 1:
if "!OVF" == str(spectro_tabValue[i]):
erreur = True
break
else:
spectro_tabNm.append(round(float(spectro_tabValue[i]$
elif spectro_whichTab == 2:
valeurBrut = round(float(spectro_tabValue[i]),2)
spectro_tabBrut.append(valeurBrut)
elif spectro_whichTab == 3:
valeurWatt = round(float(spectro_tabValue[i]), 2)
spectro_tabWatt.append(valeurWatt)
spectro_whichTab += 1
i += 1
print("hello")
return(spectro_tabNm, spectro_tabWatt, spectro_tabBrut, erreur)
#-----------------------------------------------------------------------------
# Temporisation
#-----------------------------------------------------------------------------
def delay(tempo):
print("Attente de la chauffe des LED")
tempsDepart = time.time()
tempsRestant = tempo
while tempsRestant > 0:
#Ecriture du temps restant a attendre avant la prochaine mesure
sys.stdout.write("\rTemps d'attente restant: %s secondes " %tempsRestant)
sys.stdout.flush()
time.sleep(1)
tempsDifference = time.time()-tempsDepart
tempsRestant -= int(tempsDifference)
tempsDepart = time.time()
return
"""
##############################################################################
Programme principal
##############################################################################
"""
#-----------------------------------------------------------------------------
# Test et enregistrement du spectre lumineux de chaques LED a 100%
#-----------------------------------------------------------------------------
i = 0
while i < nb_couleur:
dmx_dataProvisoir = array.array('B', [0]*512)
dmx_dataProvisoir[i] = dmx_data[i]
sendDataDMX(dmx_dataProvisoir)
delay(10)
erreur = False
j = 0
while (erreur == False) and (j < 10):
spectro_msg = "t0"+str(j)
print(spectro_msg)
spectro_send(spectro_msg)
try:
spectro_receive("changementTemps")
except TimeoutException as e:
print("Error")
spectro_send("go")
print("\n Demarrage de la mesure du canal %s" %(i+1))
try:
spectro_tabNm, spectro_tabWatt, spectro_tabBrut, erreur = spectro_receive("m$
if erreur == False:
spectro_ancienTabWatt = spectro_tabWatt
spectro_ancienTabBrut = spectro_tabBrut
spectro_ancienTabNm = spectro_tabNm
except TimeoutException as e:
print("Too long")
j += 1
i += 1
#-----------------------------------------------------------------------------
# Extinction de toutes les LED
#-----------------------------------------------------------------------------
dmx_data = array.array('B', [0]*512)
sendDataDMX(dmx_data)
Et j'obtiens une erreur :
File "recherche-meilleur.py", line 216, in <module> spectro_tabNm, spectro_tabWatt, spectro_tabBrut, erreur = spectro_receive("mesure") TypeError: 'NoneType' object is not iterable
Ca vient du return de ma fonction mais je ne sais pas quoi faire.
Avez-vous une solution pour ce problème, ou alors une manière de faire un contrôle de temps d'exécution plus facile ?
Donc décorateur deadline remplace la fonction par une nouvelle, qui se charge d'appeler l'ancienne mais n'en renvoie pas la valeur de retour.
Il faudrait ajouter un return res à la fin de ta fonction new_f.
Par ailleurs, plutôt que de redéfinir __name__ sur ta fonction décorée, tu pourrais utiliser functools.wraps qui sert précisément à cela.
× Après avoir cliqué sur "Répondre" vous serez invité à vous connecter pour que votre message soit publié.
× Attention, ce sujet est très ancien. Le déterrer n'est pas forcément approprié. Nous te conseillons de créer un nouveau sujet pour poser ta question.
entwanne — @entwanne — Un zeste de Python — La POO en Python — Notions de Python avancées — Les secrets d'un code pythonique