it-swarm-ru.tech

В Python, как узнать, когда процесс завершен?

Из Python GUI (PyGTK) я запускаю процесс (используя многопроцессорность). Процесс занимает много времени (~ 20 минут), чтобы закончить. Когда процесс закончится, я бы хотел его очистить (извлечь результаты и присоединиться к процессу). Как я узнаю, когда процесс закончится?

Мой коллега предложил занятый цикл в родительском процессе, который проверяет, завершен ли дочерний процесс. Конечно, есть лучший способ.

В Unix, когда процесс разветвляется, обработчик сигнала вызывается из родительского процесса, когда дочерний процесс завершил . Но я не вижу ничего подобного в Python. Я что-то пропустил?

Как получается, что конец дочернего процесса можно наблюдать из родительского процесса? (Конечно, я не хочу вызывать Process.join (), так как это приведет к зависанию интерфейса GUI.)

Этот вопрос не ограничивается многопоточностью: у меня точно такая же проблема с многопоточностью. 

23
Matthew Walker

Этот ответ действительно прост! (Это просто заняло у меня дней , чтобы решить это.)

В сочетании с idle_add () PyGTK вы можете создать AutoJoiningThread. Общий код является тривиальным:

class AutoJoiningThread(threading.Thread):
    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)

Если вы хотите сделать больше, чем просто объединение (например, сбор результатов), вы можете расширить класс выше, чтобы отправлять сигналы по завершении, как это сделано в следующем примере:

import threading
import time
import sys
import gobject
gobject.threads_init()

class Child:
    def __init__(self):
        self.result = None

    def play(self, count):
        print "Child starting to play."
        for i in range(count):
            print "Child playing."
            time.sleep(1)
        print "Child finished playing."
        self.result = 42

    def get_result(self, obj):
        print "The result was "+str(self.result)

class AutoJoiningThread(threading.Thread, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
        }

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        threading.Thread.join(self)
        print "Called Thread.join()"

if __== '__main__':
    print "Creating child"
    child = Child()
    print "Creating thread"
    thread = AutoJoiningThread(target=child.play,
                               args=(3,))
    thread.connect('finished', child.get_result)
    print "Starting thread"
    thread.start()
    print "Running mainloop (Ctrl+C to exit)"
    mainloop = gobject.MainLoop()

    try:
        mainloop.run()
    except KeyboardInterrupt:
        print "Received KeyboardInterrupt.  Quiting."
        sys.exit()

    print "God knows how we got here.  Quiting."
    sys.exit()

Вывод приведенного выше примера будет зависеть от порядка выполнения потоков, но он будет похож на:

Создание ребенка 
 Создание потока 
 Начальная тема 
 Ребенок, начинающий играть .
 Ребенок играет .
 Запуск mainloop (Ctrl + C для выхода) 
 Ребенок играет .
 Ребенок играет .
 Ребенок закончил играть .
 Вызывается Thread.join () 
 Результат составил 42 
 ^ CReceived KeyboardInterrupt. Помнится.

Невозможно создать AutoJoiningProcess таким же образом (потому что мы не можем вызвать idle_add () для двух разных процессов), однако мы можем использовать AutoJoiningThread, чтобы получить то, что мы хотим:

class AutoJoiningProcess(multiprocessing.Process):
    def start(self):
        thread = AutoJoiningThread(target=self.start_process)
        thread.start() # automatically joins

    def start_process(self):
        multiprocessing.Process.start(self)
        self.join()

Чтобы продемонстрировать AutoJoiningProcess, вот еще один пример:

import threading
import multiprocessing
import time
import sys
import gobject
gobject.threads_init()

class Child:
    def __init__(self):
        self.result = multiprocessing.Manager().list()

    def play(self, count):
        print "Child starting to play."
        for i in range(count):
            print "Child playing."
            time.sleep(1)
    print "Child finished playing."
        self.result.append(42)

    def get_result(self, obj):
        print "The result was "+str(self.result)

class AutoJoiningThread(threading.Thread, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
    }

    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def run(self):
        threading.Thread.run(self)
        gobject.idle_add(self.join)
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        threading.Thread.join(self)
        print "Called Thread.join()"

class AutoJoiningProcess(multiprocessing.Process, gobject.GObject):
    __gsignals__ = {
        'finished': (gobject.SIGNAL_RUN_LAST,
                     gobject.TYPE_NONE,
                     ())
        }

    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        gobject.GObject.__init__(self)

    def start(self):
        thread = AutoJoiningThread(target=self.start_process)
        thread.start()

    def start_process(self):
        multiprocessing.Process.start(self)
        self.join()
        gobject.idle_add(self.emit, 'finished')

    def join(self):
        multiprocessing.Process.join(self)
        print "Called Process.join()"

if __== '__main__':
    print "Creating child"
    child = Child()
    print "Creating thread"
    process = AutoJoiningProcess(target=child.play,
                               args=(3,))
    process.connect('finished',child.get_result)
    print "Starting thread"
    process.start()
    print "Running mainloop (Ctrl+C to exit)"
    mainloop = gobject.MainLoop()

    try:
        mainloop.run()
    except KeyboardInterrupt:
        print "Received KeyboardInterrupt.  Quiting."
        sys.exit()

    print "God knows how we got here.  Quiting."
    sys.exit()

Результирующий вывод будет очень похож на приведенный выше пример, за исключением того, что на этот раз у нас есть как присоединение процесса, так и присоединение его сопутствующего потока:

Создание дочернего 
 Создание потока 
 Начальный поток 
 Запуск mainloop (Ctrl + C для выхода) 
 Ребенок начинает играть .
 Ребенок играет .
 Ребенок играет .
 Ребенок играет .
 Ребенок закончил играть .
 Вызывается Process.join () 
 Результат был [42] 
 Вызывается Thread.join () 
 ^ CReceived KeyboardInterrupt. Помнится.

К несчастью:

  1. Это решение зависит от gobject из-за использования idle_add (). gobject используется PyGTK.
  2. Это не настоящие отношения родитель/ребенок. Если один из этих потоков запущен другим потоком, он, тем не менее, будет присоединен к потоку, выполняющему основной цикл, а не к родительскому потоку. Эта проблема сохраняется и для AutoJoiningProcess, за исключением того, что, я думаю, будет сгенерировано исключение.

Таким образом, чтобы использовать этот подход, было бы лучше всего создавать потоки/процесс только из mainloop/GUI.

2
Matthew Walker

Я думаю, что в рамках создания многоплатформенного Python простые вещи, такие как SIGCHLD, должны быть сделаны сами. Согласитесь, это немного больше работы, когда все, что вы хотите сделать, это знать, когда ребенок закончил, но на самом деле это НЕ ТАК больно. Рассмотрим следующее, в котором для выполнения работы используется дочерний процесс, два экземпляра multiprocessing.Event и поток для проверки выполнения дочернего процесса:

import threading
from multiprocessing import Process, Event
from time import sleep

def childsPlay(event):
    print "Child started"
    for i in range(3):
        print "Child is playing..."
        sleep(1)
    print "Child done"
    event.set()

def checkChild(event, killEvent):
    event.wait()
    print "Child checked, and is done playing"
    if raw_input("Do again? y/n:") == "y":
        event.clear()
        t = threading.Thread(target=checkChild, args=(event, killEvent))
        t.start()
        p = Process(target=childsPlay, args=(event,))
        p.start()
    else:
        cleanChild()
        killEvent.set()

def cleanChild():
    print "Cleaning up the child..."

if __== '__main__':
    event = Event()
    killEvent = Event()

    # process to do work
    p = Process(target=childsPlay, args=(event,))
    p.start()

    # thread to check on child process
    t = threading.Thread(target=checkChild, args=(event, killEvent))
    t.start()

    try:
        while not killEvent.is_set():
            print "GUI running..."
            sleep(1)
    except KeyboardInterrupt:
        print "Quitting..."
        exit(0)
    finally:
        print "Main done"

Правка

Присоединение ко всем созданным процессам и потокам является хорошей практикой, поскольку оно помогает указать, когда создаются процессы/потоки (не завершающиеся) в зомби. Я изменил приведенный выше код, сделав класс ChildChecker, который наследует от threading.Thread. Его единственная цель - запустить задание в отдельном процессе, дождаться его завершения и затем уведомить графический интерфейс, когда все будет завершено. Присоединение к ChildChecker также присоединится к процессу, который он «проверяет». Теперь, если процесс не включается через 5 секунд, поток принудительно завершит процесс. Ввод "y" создает запускает дочерний процесс, выполняющий "endlessChildsPlay", который должен продемонстрировать принудительное завершение. 

import threading
from multiprocessing import Process, Event
from time import sleep

def childsPlay(event):
    print "Child started"
    for i in range(3):
        print "Child is playing..."
        sleep(1)
    print "Child done"
    event.set()

def endlessChildsPlay(event):
    print "Endless child started"
    while True:
        print "Endless child is playing..."
        sleep(1)
        event.set()
    print "Endless child done"

class ChildChecker(threading.Thread):
    def __init__(self, killEvent):
        super(ChildChecker, self).__init__()
        self.killEvent = killEvent
        self.event = Event()
        self.process = Process(target=childsPlay, args=(self.event,))

    def run(self):
        self.process.start()

        while not self.killEvent.is_set():
            self.event.wait()
            print "Child checked, and is done playing"
            if raw_input("Do again? y/n:") == "y":
                self.event.clear()
                self.process = Process(target=endlessChildsPlay, args=(self.event,))
                self.process.start()
            else:
                self.cleanChild()
                self.killEvent.set()

    def join(self):
        print "Joining child process"
        # Timeout on 5 seconds
        self.process.join(5)

        if self.process.is_alive():
            print "Child did not join!  Killing.."
            self.process.terminate()
        print "Joining ChildChecker thread"
        super(ChildChecker, self).join()


    def cleanChild(self):
        print "Cleaning up the child..."

if __== '__main__':
    killEvent = Event()
    # thread to check on child process
    t = ChildChecker(killEvent)
    t.start()

    try:
        while not killEvent.is_set():
            print "GUI running..."
            sleep(1)
    except KeyboardInterrupt:
        print "Quitting..."
        exit(0)
    finally:
        t.join()
        print "Main done"
11
manifest

В своих попытках найти ответ на свой вопрос я наткнулся на функцию PyGTK idle_add () . Это дает мне следующую возможность:

  1. Создайте новый дочерний процесс, который связывается через очередь.
  2. Создайте поток слушателя, который слушает Очередь, когда дочерний процесс отправляет слушателю сообщение о том, что он завершен, слушатель вызывает idle_add (), который устанавливает обратный вызов.
  3. В течение следующего времени в основном цикле родительский процесс будет вызывать обратный вызов.
  4. Обратный вызов может извлечь результаты, присоединиться к дочернему процессу и присоединиться к потоку слушателя.

Это кажется слишком сложным способом воссоздания Unix-функции call-callback-when-child-process-done.

Это должно быть распространенной проблемой с GUI в Python. Наверняка есть стандартный шаблон для решения этой проблемы?

2
Matthew Walker

Вы можете использовать queue для связи с дочерними процессами. Вы можете прикрепить промежуточные результаты к нему, или сообщения, указывающие, что достигнуты вехи (для индикаторов выполнения), или просто сообщение, указывающее, что процесс готов к присоединению. Опросить его с пусто легко и быстро.

Если вы действительно хотите узнать, сделано ли это, вы можете посмотреть код выхода вашего процесса или опрос is_alive () .

2
nmichaels

взгляните на модуль подпроцесса:

http://docs.python.org/library/subprocess.html

import subprocess
let pipe = subprocess.Popen("ls -l", stdout=subprocess.PIPE)
allText = pipe.stdout.read()
pipe.wait()
retVal = pipe.returncode
0
Andy Skirrow