Hilos en Python y nautilus-document-convert

Introducción

En la última versión de nautilus-document-converter he introducido algunos cambios que comenté en el artículo ”Convetir documentos de LibreOffice con progreso y multitarea”, pero quería entrar mas en profundidad en ellos, con la intención de orientarme a aquellos que quieran programar o que programen en Python.

Lo cierto es que, últimamente no había escrito nada sobre desarrollo de aplicaciones, ya sea en este lenguaje o en otro. Sin embargo, y tal y como comentaré mas adelante, quiero dedicarle algo mas de tiempo a estos temas, tanto en lo relacionado con Python, Vala como Java. Los dos primeros relacionados con Ubuntu y el segundo en su relación con Android. Por supuesto todo desde el punto de vista del software libre.

En este caso quiero entrar en lo referente al tratamiento de ejecución de varios hilos en paralelo para reducir de esta forma el tiempo de ejecución.

Multitarea

Con los equipos actuales con varios procesadores (incluidos los móviles, quien lo iba a pensar hace pocos años), realizar varias tareas de formas simultánea es una posibilidad a tener en cuenta. Esto, en el caso de procesamiento de archivos por lotes es todavía mas interesante si cabe. De esta manera, tal y como comenté en el artículo anterior, he reducido el tiempo de procesado de 60 archivos para cambiarlos de formato, de 60 a 40 segundos, lo cual es para tener en muy en cuenta.

Sistema de trabajo

Para realizar la operación se utilizan un conjunto de trabajadores que están siempre en funcionamiento mientras exista trabajo por realizar en la cola.

Worker

Se trata de una clase que deriva de Thread, y que procesa las tareas existentes (tasks), de forma que Cada vez que termina con una tarea, solicita una nueva, hasta que se procesan todas.

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()

El equipo

La siguiente clase es ThreadPool que es la encargada de organizar las tareas a realizar por los diferentes trabajadores, iniciar tantos trabajadores como necesitemos y esperar hasta que se hayan completado todas las tareas.

class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads): Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

El número de miembros del equipo

Para determinar el número de miembros del equipo, lo he realizado en función del número de procesadores que tiene el equipo, pero se debería de considerar el número de tareas a realizar, para evitar en cualquier momento lanzar mas trabajadores que tareas a realizar. Esto no debe suponer mucho problema, puesto que en caso de producirse esto el Worker terminará de inmediato al quedarse sin tareas a realizar.

try:
    import multiprocessing
    WORKERS = multiprocessing.cpu_count()
except:
    WORKERS = 1

El indicador de progreso

Lo siguiente a tener en cuenta es el cuadro de diálogo que muestra la barra de progreso. He incluido un sistema para parar el procesado de archivos. Esto solo funciona para las tareas pendientes de asignar, aquellas que se estén ejecutando en el momento de parar el procesado continuarán hasta su conclusión.

class Progreso(Gtk.Dialog):
    def __init__(self,title,max_value):
        #
        Gtk.Dialog.__init__(self,title)
        self.set_position(Gtk.WindowPosition.CENTER_ALWAYS)
        self.set_size_request(500, 40)
        self.set_resizable(False)
        self.connect('destroy', self.close)
        #
        vbox1 = Gtk.VBox(spacing = 5)
        vbox1.set_border_width(5)
        self.get_content_area().add(vbox1)
        #
        self.label = Gtk.Label(' '*50)
        vbox1.pack_start(self.label,False,False,0)
        #
        hbox1 = Gtk.HBox(spacing = 5)
        vbox1.pack_start(hbox1,True,True,0)
        #
        self.progressbar = Gtk.ProgressBar()
        self.progressbar.set_text('0%')
        hbox1.pack_start(self.progressbar,True,True,0)
        #
        image = Gtk.Image()
        image.set_from_stock(Gtk.STOCK_STOP, Gtk.IconSize.BUTTON)
        self.button = Gtk.Button()
        self.button.set_image(image)
        self.button.connect('clicked', self.on_button_clicked)
        hbox1.pack_start(self.button,False,False,0)
        #
        self.show_all()
        #
        self.stop = False
        self.value = 0.0
        self.max_value = max_value

    def on_button_clicked(self,widget):
        self.stop = True

    def is_stopped(self):
        return self.stop

    def set_filename(self,filename):
        self.label.set_text(filename)

    def close(self,widget=None):
        self.destroy()

    def increase(self):
        self.value+=1.0
        fraction=self.value/self.max_value
        self.progressbar.set_fraction(fraction)
        self.progressbar.set_text('%d%%' % (100*fraction))

Todo junto

En las siguientes líneas de código se puede ver todo el conjunto, en el que se puede apreciar el cuadro de diálogo que informa del progreso de conversión, y el modo en que se asignan tareas al equipo.

def convert_files(files,extension): 
    if len(files)>0:
        ft = time.time()
        MULTIPROCESSING = (WORKERS>1 and len(files)>1)
        print('--------------------------------------------------------')
        print('--------------------------------------------------------')
        print('--------------------------------------------------------')
        print(MULTIPROCESSING)
        print(WORKERS)
        print(len(files))
        print('--------------------------------------------------------')
        print('--------------------------------------------------------')
        print('--------------------------------------------------------')

        progress_dialog = Progreso(_('Converting files')+'...',len(files))
        progress_dialog.show()
        progress_dialog.map()
        while Gtk.events_pending():
            Gtk.main_iteration()
        the_converter = subprocess.Popen(['/usr/bin/unoconv','--listener'])
        time.sleep(1)
        if MULTIPROCESSING:
            pool = ThreadPool(WORKERS)      
        for index,afile in enumerate(files):
            basefile,extile = os.path.splitext(afile)
            outputfile = os.path.splitext(afile)[0]+'.'+extension
            print('Init... %s'%index)
            progress_dialog.set_filename(os.path.basename(afile)[:50])  
            progress_dialog.increase()
            progress_dialog.map()
            while Gtk.events_pending():
                Gtk.main_iteration()
            print('Start %s'%index)
            if MULTIPROCESSING:
                pool.add_task(convert_file, extension,outputfile,afile)
            else:
                convert_file(extension,outputfile,afile)
            print('End %s'%index)
            #print(convert_file(extension,outputfile,afile,progress_dialog))
            if progress_dialog.is_stopped():
                break
        if MULTIPROCESSING:
            pool.wait_completion()
        the_converter.terminate()
        progress_dialog.hide()
        progress_dialog.destroy()
        print('--------------------------------------------------------')
        print('--------------------------------------------------------')
        print('--------------------------------------------------------')
        print(time.time()-ft)
        print('--------------------------------------------------------')
        print('--------------------------------------------------------')
        print('--------------------------------------------------------')

Conclusiones

Como ves se trata de un proceso realmente sencillo, y que se puede aplicar a multitud de operaciones, sobre todo a las relacionadas con el procesamiento de archivos en lote. Actualmente estoy trabajando con este mismo sistema, en otro complemento para Nautilus, pero esta vez relacionado con el procesado de imágenes (girar, invertir, pasar a blanco y negro, añadir borde, etc), donde esto tiene mas interés si cabe, dado el tiempo que puede llevar procesar una única foto.

<

Deja una respuesta

Tu dirección de correo electrónico no será publicada.