Wie effizient viele Aufgaben ein "wenig später" in Python?

Ich habe einen Prozess, der eine Reihe von Aktionen "später" (nach 10-60 Sekunden in der Regel) durchführen muss. Das Problem ist, dass diese "späteren" Aktionen können viel (1000s), so dass ein Thread pro Aufgabe ist nicht lebensfähig. Ich weiß für die Existenz von Werkzeugen wie gevent und eventlet , aber eines der Problem ist, dass der Prozess neromq für die Kommunikation verwendet, so dass ich eine Integration benötigen (eventlet hat es bereits).

Was frage ich mich, was sind meine Optionen? Also, Anregungen sind willkommen, in den Zeilen der Bibliotheken (wenn Sie irgendwelche der erwähnten teilen, teilen Sie bitte Ihre Erfahrungen), Techniken ( Pythons "Coroutine" Unterstützung , verwenden Sie einen Thread, der für eine Weile schläft und überprüft eine Warteschlange), wie Von neromqs Umfrage oder Eventloop Gebrauch machen, um den Job zu machen, oder etwas anderes.

  • Schnellste Weg, um 3 Millionen Objekte aus einem S3-Eimer herunterzuladen
  • Was sind die Vorteile der Multithread-Programmierung in Python?
  • Unterhaltende eventlet.wsgi.server
  • Eventlet Timeout nicht verlassen
  • Fehler: Befehl 'gcc' fehlgeschlagen mit Exit Status 1 bei der Installation von eventlet
  • Ab ist fehlerhaft mit apr_socket_recv: Verbindung abgelehnt (61)
  • Wie schreibe ich die Funktion mit der Variablen von außen?
  • Java-Äquivalent der Funktionszuordnung in Python
  • Python: Zugriff auf zurückgegebene Werte aus einer Funktion, durch eine andere Funktion
  • Wie gibt Python mehrere Werte aus einer Funktion zurück?
  • Wie man Instanzmethodenaufrufe abfängt?
  • Aufruffunktion in einer anderen Klasse durch Kivy Button
  • 11 Solutions collect form web for “Wie effizient viele Aufgaben ein "wenig später" in Python?”

    Erwäge die Verwendung einer Prioritätswarteschlange mit einem oder mehreren Worker-Threads, um die Aufgaben zu bedienen. Der Haupt-Thread kann Arbeit hinzufügen, um die Warteschlange, mit einem Zeitstempel der am schnellsten sollte es gewartet werden. Worker Threads Pop-Arbeit aus der Warteschlange, schlafen, bis die Zeit der Priorität Wert erreicht ist, die Arbeit, und dann Pop ein anderes Element aus der Warteschlange.

    Wie wäre es mit einer ausgefeilteren Antwort. Mklauber macht einen guten punkt Wenn es eine Chance gibt, dass alle deine Arbeiter schlafen könnten, wenn du neue, dringendere Arbeit queue.PriorityQueue , dann ist eine queue.PriorityQueue ist nicht wirklich die Lösung, obwohl eine "Prioritätswarteschlange" immer noch die Technik ist, die von der heapq Modul Stattdessen verwenden wir ein anderes Synchronisations-Primitiv; Eine Bedingungsvariable, die in Python ist geschrieben threading.Condition .

    Der Ansatz ist ziemlich einfach, auf den Haufen zu sehen, und wenn die Arbeit aktuell ist, schalte ihn aus und mach das. Wenn es Arbeit gab, aber es ist in die Zukunft geplant, nur auf den Zustand warten bis dahin, oder wenn es überhaupt keine Arbeit gibt, schlafe für immer.

    Der Produzent macht es einen fairen Anteil an der Arbeit; Jedes Mal, wenn es neue Arbeit hinzufügt, benachrichtigt es die Bedingung, also wenn es schlafende Arbeiter gibt, werden sie aufwachen und die Warteschlange für neuere Arbeit überprüfen.

     import heapq, time, threading START_TIME = time.time() SERIALIZE_STDOUT = threading.Lock() def consumer(message): """the actual work function. nevermind the locks here, this just keeps the output nicely formatted. a real work function probably won't need it, or might need quite different synchronization""" SERIALIZE_STDOUT.acquire() print time.time() - START_TIME, message SERIALIZE_STDOUT.release() def produce(work_queue, condition, timeout, message): """called to put a single item onto the work queue.""" prio = time.time() + float(timeout) condition.acquire() heapq.heappush(work_queue, (prio, message)) condition.notify() condition.release() def worker(work_queue, condition): condition.acquire() stopped = False while not stopped: now = time.time() if work_queue: prio, data = work_queue[0] if data == 'stop': stopped = True continue if prio < now: heapq.heappop(work_queue) condition.release() # do some work! consumer(data) condition.acquire() else: condition.wait(prio - now) else: # the queue is empty, wait until notified condition.wait() condition.release() if __name__ == '__main__': # first set up the work queue and worker pool work_queue = [] cond = threading.Condition() pool = [threading.Thread(target=worker, args=(work_queue, cond)) for _ignored in range(4)] map(threading.Thread.start, pool) # now add some work produce(work_queue, cond, 10, 'Grumpy') produce(work_queue, cond, 10, 'Sneezy') produce(work_queue, cond, 5, 'Happy') produce(work_queue, cond, 10, 'Dopey') produce(work_queue, cond, 15, 'Bashful') time.sleep(5) produce(work_queue, cond, 5, 'Sleepy') produce(work_queue, cond, 10, 'Doc') # and just to make the example a bit more friendly, tell the threads to stop after all # the work is done produce(work_queue, cond, float('inf'), 'stop') map(threading.Thread.join, pool) 

    Diese Antwort hat eigentlich zwei Vorschläge – meine erste und andere habe ich nach dem ersten entdeckt.

    Planen

    Ich vermute, dass Sie nach dem sched suchen.

    EDIT : mein bloßer Vorschlag schien wenig hilfreich zu sein, nachdem ich es gelesen habe. Also habe ich beschlossen, die sched zu testen, um zu sehen, ob es funktionieren kann, wie ich vorgeschlagen habe. Hier kommt mein Test: Ich würde es mit einem einzigen Faden verwenden, mehr oder weniger auf diese Weise:

     class SchedulingThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.scheduler = sched.scheduler(time.time, time.sleep) self.queue = [] self.queue_lock = threading.Lock() self.scheduler.enter(1, 1, self._schedule_in_scheduler, ()) def run(self): self.scheduler.run() def schedule(self, function, delay): with self.queue_lock: self.queue.append((delay, 1, function, ())) def _schedule_in_scheduler(self): with self.queue_lock: for event in self.queue: self.scheduler.enter(*event) print "Registerd event", event self.queue = [] self.scheduler.enter(1, 1, self._schedule_in_scheduler, ()) 

    Zuerst würde ich eine Thread-Klasse erstellen, die einen eigenen Scheduler und eine Warteschlange haben würde. Mindestens ein Ereignis würde im Scheduler registriert werden: Eins zum Aufrufen einer Methode zum Planen von Ereignissen aus der Warteschlange.

     class SchedulingThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.scheduler = sched.scheduler(time.time, time.sleep) self.queue = [] self.queue_lock = threading.Lock() self.scheduler.enter(1, 1, self._schedule_in_scheduler, ()) 

    Die Methode für die Terminierung von Ereignissen aus der Warteschlange würde die Warteschlange sperren, jedes Ereignis planen, die Warteschlange leeren und sich selbst planen, um nach neuen Ereignissen einige Zeit in der Zukunft zu suchen. Beachten Sie, dass der Zeitraum für die Suche nach neuen Veranstaltungen kurz (eine Sekunde) ist, können Sie es ändern:

      def _schedule_in_scheduler(self): with self.queue_lock: for event in self.queue: self.scheduler.enter(*event) print "Registerd event", event self.queue = [] self.scheduler.enter(1, 1, self._schedule_in_scheduler, ()) 

    Die Klasse sollte auch eine Methode zur Terminierung von Benutzerereignissen haben. Natürlich sollte diese Methode die Warteschlange beim Aktualisieren sperren:

      def schedule(self, function, delay): with self.queue_lock: self.queue.append((delay, 1, function, ())) 

    Schließlich sollte die Klasse die Scheduler-Hauptmethode aufrufen:

      def run(self): self.scheduler.run() 

    Hier kommt ein Beispiel für die Verwendung von:

     def print_time(): print "scheduled:", time.time() if __name__ == "__main__": st = SchedulingThread() st.start() st.schedule(print_time, 10) while True: print "main thread:", time.time() time.sleep(5) st.join() 

    Seine Ausgabe in meiner Maschine ist:

     $ python schedthread.py main thread: 1311089765.77 Registerd event (10, 1, <function print_time at 0x2f4bb0>, ()) main thread: 1311089770.77 main thread: 1311089775.77 scheduled: 1311089776.77 main thread: 1311089780.77 main thread: 1311089785.77 

    Dieser Code ist nur ein kurzes Problem, es kann etwas Arbeit brauchen. Allerdings muss ich gestehen, dass ich ein bisschen fasziniert von der sched Modul bin, also habe ich es vorgeschlagen. Vielleicht möchten Sie auch nach anderen Vorschlägen suchen 🙂

    APScheduler

    Wenn ich in Google nach Lösungen suche, wie die, die ich Post habe, fand ich dieses erstaunliche APScheduler-Modul . Es ist so praktisch und nützlich, dass ich wette, es ist deine Lösung. Mein früheres Beispiel wäre einfacher mit diesem Modul:

     from apscheduler.scheduler import Scheduler import time sch = Scheduler() sch.start() @sch.interval_schedule(seconds=10) def print_time(): print "scheduled:", time.time() sch.unschedule_func(print_time) while True: print "main thread:", time.time() time.sleep(5) 

    (Leider habe ich nicht gefunden, wie man ein Event nur einmal ausführen kann, also sollte das Funktionsereignis sich selbst ausschreiben. Ich wette, es kann mit einem Dekorateur gelöst werden.)

    Wenn du eine Menge von Aufgaben hast, die später ausgehen müssen, und du willst sie auch weiterhin bestehen, auch wenn du das rufende Programm oder deine Arbeiter heruntergefahren hast, dann solltest du wirklich in Sellerie schauen, was es super macht, neue Aufgaben zu schaffen Sie hingerichtet auf jeder Maschine, die du möchtest, und warte auf die Ergebnisse.

    Von der Celery Seite, "Dies ist eine einfache Aufgabe, die zwei Zahlen:"

     from celery.task import task @task def add(x, y): return x + y 

    Sie können die Aufgabe im Hintergrund ausführen oder darauf warten, dass sie beendet ist:

     >>> result = add.delay(8, 8) >>> result.wait() # wait for and return the result 16 

    Sellerie

    Sie schrieben:

    Eines der Probleme ist, dass der Prozess neromq für die Kommunikation verwendet, so dass ich etwas Integration benötigen (eventlet hat es bereits)

    Scheint, wie Ihre Wahl wird stark von diesen Details beeinflusst werden, die sind ein bisschen unklar – wie ist zeromq für die Kommunikation verwendet werden, wie viel Ressourcen wird die Integration erfordern, und was sind Ihre Anforderungen und verfügbaren Ressourcen.


    Es gibt ein Projekt namens django-ztask, die zeromq und bietet eine task Dekorateur ähnlich wie Sellerie ist ein. Allerdings ist es (offensichtlich) Django-spezifisch und so kann nicht in Ihrem Fall geeignet sein. Ich habe es nicht benutzt, bevorzugen Sie Sellerie selbst.

    Mit Sellerie für ein paar Projekte (diese sind gehostet bei ep.io PaaS Hosting, die eine einfache Möglichkeit, es zu benutzen gehostet).

    Celery sieht aus wie eine sehr flexible Lösung, die Verzögerungsaufgaben, Rückrufe, Aufgabenablauf & Wiederholen, Begrenzung der Aufgabenausführungsrate usw. ermöglicht. Es kann mit Redis, Beanstalk, CouchDB, MongoDB oder einer SQL-Datenbank verwendet werden.

    Beispielcode (Definition der Task und asynchrone Ausführung nach einer Verzögerung):

     from celery.decorators import task @task def my_task(arg1, arg2): pass # Do something result = my_task.apply_async( args=[sth1, sth2], # Arguments that will be passed to `my_task()` function. countdown=3, # Time in seconds to wait before queueing the task. ) 

    Siehe auch einen Abschnitt in Sellerie-Dokumenten .

    Hast du das multiprocessing Modul gesehen? Es kommt standard mit Python. Es ähnelt dem threading Modul, läuft aber jede Aufgabe in einem Prozess. Sie können ein Pool() -Objekt verwenden, um einen Worker-Pool einzurichten, und verwenden .map() dann die Methode .map() , um eine Funktion mit den verschiedenen in der Warteschlange befindlichen Task-Argumenten aufzurufen.

    Pyzmq hat eine ioloop Implementierung mit einem ähnlichen API zu dem des Tornados ioloop. Es implementiert einen DelayedCallback der Ihnen helfen kann.

    Angenommen, Ihr Prozess hat eine Laufschleife, die Signale empfangen kann und die Länge der Zeit jeder Aktion ist innerhalb der Grenzen der sequentiellen Operation, verwenden Signale und posix Alarm ()

      signal.alarm(time) If time is non-zero, this function requests that a SIGALRM signal be sent to the process in time seconds. 

    Das hängt davon ab, was du mit " diesen" später "Handlungen kann viel " meint, und wenn dein Prozess bereits Signale verwendet. Aufgrund der Phrasierung der Frage ist es unklar, warum ein externes Python-Paket benötigt würde.

    Eine weitere Möglichkeit besteht darin, die Phyton-GLib-Bindungen zu verwenden , insbesondere ihre timeout Funktionen.

    Es ist eine gute Wahl, solange Sie nicht wollen, um mehrere Kerne nutzen und solange die Abhängigkeit von GLib ist kein Problem. Es behandelt alle Ereignisse im selben Thread, die Synchronisierungsprobleme verhindern. Darüber hinaus kann sein Event-Framework auch verwendet werden, um IO-basierte (dh Sockets) Events zu beobachten und zu behandeln.

    AKTUALISIEREN:

    Hier ist eine Live-Session mit GLib:

     >>> import time >>> import glib >>> >>> def workon(thing): ... print("%s: working on %s" % (time.time(), thing)) ... return True # use True for repetitive and False for one-time tasks ... >>> ml = glib.MainLoop() >>> >>> glib.timeout_add(1000, workon, "this") 2 >>> glib.timeout_add(2000, workon, "that") 3 >>> >>> ml.run() 1311343177.61: working on this 1311343178.61: working on that 1311343178.61: working on this 1311343179.61: working on this 1311343180.61: working on this 1311343180.61: working on that 1311343181.61: working on this 1311343182.61: working on this 1311343182.61: working on that 1311343183.61: working on this 

    Nun, meiner Meinung nach könnte man etwas namens "kooperatives Multitasking" verwenden. Es ist verdreht-basierte Sache und es ist wirklich cool. Schauen Sie sich einfach die PyCon-Präsentation von 2010 an: http://blip.tv/pycon-us-videos-2009-2010-2011/pycon-2010-Cooperative-Multitasking-with-twisted-getting-things-done-concurrently-11- 3352182

    Nun, Sie brauchen Transportwarteschlange, um das auch zu tun …

    Einfach. Sie können Ihre Klasse von Thread erben und erstellen Sie Instanz Ihrer Klasse mit Param wie Timeout so für jede Instanz Ihrer Klasse können Sie sagen, Zeitüberschreitung, die Ihren Thread auf diese Zeit warten wird

    Python ist die beste Programmiersprache der Welt.