Introducción
En la última versión de nautilus-document-converter he introducido algunos cambios que comenté en el artículo ”Convetir documentos de LibreOffice con progreso y multitarea”, pero quería entrar mas en profundidad en ellos, con la intención de orientarme a aquellos que quieran programar o que programen en Python.
Lo cierto es que, últimamente no había escrito nada sobre desarrollo de aplicaciones, ya sea en este lenguaje o en otro. Sin embargo, y tal y como comentaré mas adelante, quiero dedicarle algo mas de tiempo a estos temas, tanto en lo relacionado con Python, Vala como Java. Los dos primeros relacionados con Ubuntu y el segundo en su relación con Android. Por supuesto todo desde el punto de vista del software libre.
En este caso quiero entrar en lo referente al tratamiento de ejecución de varios hilos en paralelo para reducir de esta forma el tiempo de ejecución.
Multitarea
Con los equipos actuales con varios procesadores (incluidos los móviles, quien lo iba a pensar hace pocos años), realizar varias tareas de formas simultánea es una posibilidad a tener en cuenta. Esto, en el caso de procesamiento de archivos por lotes es todavía mas interesante si cabe. De esta manera, tal y como comenté en el artículo anterior, he reducido el tiempo de procesado de 60 archivos para cambiarlos de formato, de 60 a 40 segundos, lo cual es para tener en muy en cuenta.
Sistema de trabajo
Para realizar la operación se utilizan un conjunto de trabajadores que están siempre en funcionamiento mientras exista trabajo por realizar en la cola.
Worker
Se trata de una clase que deriva de Thread, y que procesa las tareas existentes (tasks), de forma que Cada vez que termina con una tarea, solicita una nueva, hasta que se procesan todas.
class Worker(Thread): """Thread executing tasks from a given tasks queue""" def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: func, args, kargs = self.tasks.get() try: func(*args, **kargs) except Exception, e: print e finally: self.tasks.task_done()
El equipo
La siguiente clase es ThreadPool que es la encargada de organizar las tareas a realizar por los diferentes trabajadores, iniciar tantos trabajadores como necesitemos y esperar hasta que se hayan completado todas las tareas.
class ThreadPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads): self.tasks = Queue(num_threads) for _ in range(num_threads): Worker(self.tasks) def add_task(self, func, *args, **kargs): """Add a task to the queue""" self.tasks.put((func, args, kargs)) def wait_completion(self): """Wait for completion of all the tasks in the queue""" self.tasks.join()
El número de miembros del equipo
Para determinar el número de miembros del equipo, lo he realizado en función del número de procesadores que tiene el equipo, pero se debería de considerar el número de tareas a realizar, para evitar en cualquier momento lanzar mas trabajadores que tareas a realizar. Esto no debe suponer mucho problema, puesto que en caso de producirse esto el Worker terminará de inmediato al quedarse sin tareas a realizar.
try: import multiprocessing WORKERS = multiprocessing.cpu_count() except: WORKERS = 1
El indicador de progreso
Lo siguiente a tener en cuenta es el cuadro de diálogo que muestra la barra de progreso. He incluido un sistema para parar el procesado de archivos. Esto solo funciona para las tareas pendientes de asignar, aquellas que se estén ejecutando en el momento de parar el procesado continuarán hasta su conclusión.
class Progreso(Gtk.Dialog): def __init__(self,title,max_value): # Gtk.Dialog.__init__(self,title) self.set_position(Gtk.WindowPosition.CENTER_ALWAYS) self.set_size_request(500, 40) self.set_resizable(False) self.connect('destroy', self.close) # vbox1 = Gtk.VBox(spacing = 5) vbox1.set_border_width(5) self.get_content_area().add(vbox1) # self.label = Gtk.Label(' '*50) vbox1.pack_start(self.label,False,False,0) # hbox1 = Gtk.HBox(spacing = 5) vbox1.pack_start(hbox1,True,True,0) # self.progressbar = Gtk.ProgressBar() self.progressbar.set_text('0%') hbox1.pack_start(self.progressbar,True,True,0) # image = Gtk.Image() image.set_from_stock(Gtk.STOCK_STOP, Gtk.IconSize.BUTTON) self.button = Gtk.Button() self.button.set_image(image) self.button.connect('clicked', self.on_button_clicked) hbox1.pack_start(self.button,False,False,0) # self.show_all() # self.stop = False self.value = 0.0 self.max_value = max_value def on_button_clicked(self,widget): self.stop = True def is_stopped(self): return self.stop def set_filename(self,filename): self.label.set_text(filename) def close(self,widget=None): self.destroy() def increase(self): self.value+=1.0 fraction=self.value/self.max_value self.progressbar.set_fraction(fraction) self.progressbar.set_text('%d%%' % (100*fraction))
Todo junto
En las siguientes líneas de código se puede ver todo el conjunto, en el que se puede apreciar el cuadro de diálogo que informa del progreso de conversión, y el modo en que se asignan tareas al equipo.
def convert_files(files,extension): if len(files)>0: ft = time.time() MULTIPROCESSING = (WORKERS>1 and len(files)>1) print('--------------------------------------------------------') print('--------------------------------------------------------') print('--------------------------------------------------------') print(MULTIPROCESSING) print(WORKERS) print(len(files)) print('--------------------------------------------------------') print('--------------------------------------------------------') print('--------------------------------------------------------') progress_dialog = Progreso(_('Converting files')+'...',len(files)) progress_dialog.show() progress_dialog.map() while Gtk.events_pending(): Gtk.main_iteration() the_converter = subprocess.Popen(['/usr/bin/unoconv','--listener']) time.sleep(1) if MULTIPROCESSING: pool = ThreadPool(WORKERS) for index,afile in enumerate(files): basefile,extile = os.path.splitext(afile) outputfile = os.path.splitext(afile)[0]+'.'+extension print('Init... %s'%index) progress_dialog.set_filename(os.path.basename(afile)[:50]) progress_dialog.increase() progress_dialog.map() while Gtk.events_pending(): Gtk.main_iteration() print('Start %s'%index) if MULTIPROCESSING: pool.add_task(convert_file, extension,outputfile,afile) else: convert_file(extension,outputfile,afile) print('End %s'%index) #print(convert_file(extension,outputfile,afile,progress_dialog)) if progress_dialog.is_stopped(): break if MULTIPROCESSING: pool.wait_completion() the_converter.terminate() progress_dialog.hide() progress_dialog.destroy() print('--------------------------------------------------------') print('--------------------------------------------------------') print('--------------------------------------------------------') print(time.time()-ft) print('--------------------------------------------------------') print('--------------------------------------------------------') print('--------------------------------------------------------')
Conclusiones
Como ves se trata de un proceso realmente sencillo, y que se puede aplicar a multitud de operaciones, sobre todo a las relacionadas con el procesamiento de archivos en lote. Actualmente estoy trabajando con este mismo sistema, en otro complemento para Nautilus, pero esta vez relacionado con el procesado de imágenes (girar, invertir, pasar a blanco y negro, añadir borde, etc), donde esto tiene mas interés si cabe, dado el tiempo que puede llevar procesar una única foto.