| | 1 | # encoding: utf-8 |
| | 2 | |
| | 3 | """TurboMail extension API.""" |
| | 4 | |
| | 5 | import logging |
| | 6 | log = logging.getLogger("turbomail.manager") |
| | 7 | |
| | 8 | import turbomail |
| | 9 | from turbomail.api import Manager |
| | 10 | from turbomail.exceptions import ProviderExhaustedException |
| | 11 | |
| | 12 | import math, copy |
| | 13 | from Queue import Queue, Empty |
| | 14 | from threading import Event, Thread |
| | 15 | from turbomail.dispatch import Dispatch |
| | 16 | |
| | 17 | __all__ = ['load'] |
| | 18 | |
| | 19 | |
| | 20 | def load(): |
| | 21 | return DemandManager() |
| | 22 | |
| | 23 | |
| | 24 | class DemandManager(Manager): |
| | 25 | name = "Demand" |
| | 26 | version = "1.0" |
| | 27 | url = "http://www.python-turbomail.org/wiki/DemandManager" |
| | 28 | |
| | 29 | def __init__(self): |
| | 30 | log.info("Demand manager starting up.") |
| | 31 | |
| | 32 | super(DemandManager, self).__init__() |
| | 33 | |
| | 34 | self.pool = 0 |
| | 35 | self.queue = Queue() |
| | 36 | self.finished = Event() |
| | 37 | |
| | 38 | self.threads = turbomail.config.get("mail.demand.threads", 4) # Maximum number of threads to create. |
| | 39 | self.divisor = turbomail.config.get("mail.demand.divisor", 10) # Estimate the number of required threads by dividing the queue size by this. |
| | 40 | self.timeout = turbomail.config.get("mail.demand.timeout", 60) |
| | 41 | |
| | 42 | log.info("Demand manager ready.") |
| | 43 | |
| | 44 | def optimum(self): |
| | 45 | return min(self.threads, math.ceil(self.queue.qsize() / float(self.divisor))) |
| | 46 | |
| | 47 | optimum = property(optimum) |
| | 48 | |
| | 49 | def stop(self): |
| | 50 | log.info("Demand manager shutting down.") |
| | 51 | self.finished.set() |
| | 52 | |
| | 53 | def spawn(self): |
| | 54 | thread = Thread(target=self.wrapper) |
| | 55 | thread.start() |
| | 56 | self.pool += 1 |
| | 57 | |
| | 58 | def deliver(self, message): |
| | 59 | log.info("Adding message %s to the queue for background delivery." % message.id) |
| | 60 | self.queue.put(copy.deepcopy(message)) |
| | 61 | message._processed = True |
| | 62 | message._dirty = True |
| | 63 | |
| | 64 | if not self.queue.empty() and self.pool < self.optimum: |
| | 65 | tospawn = int(self.optimum - self.pool) |
| | 66 | log.debug("Spawning %d thread%s." % (tospawn, tospawn != 1 and "s" or "")) |
| | 67 | for i in range(tospawn): |
| | 68 | self.spawn() |
| | 69 | |
| | 70 | def wrapper(self): |
| | 71 | log.debug("Mail queue worker starting up.") |
| | 72 | |
| | 73 | self.worker() |
| | 74 | |
| | 75 | self.pool -= 1 |
| | 76 | log.debug("Mail queue worker finished.") |
| | 77 | |
| | 78 | def worker(self): |
| | 79 | log.debug("Requesting new provider instance.") |
| | 80 | provider = turbomail.provider.new() |
| | 81 | if not provider: raise ManagerException, "Unable to allocate new provider." |
| | 82 | |
| | 83 | while True: |
| | 84 | try: |
| | 85 | message = self.queue.get(True, self.timeout) |
| | 86 | provider.deliver(message) |
| | 87 | |
| | 88 | except Empty: |
| | 89 | log.debug("Worker death from starvation.") |
| | 90 | break |
| | 91 | |
| | 92 | except ProviderExhaustedException: |
| | 93 | log.debug("Worker death from provider exhaustion - spawning child.") |
| | 94 | self.deliver(message) |
| | 95 | self.spawn() |
| | 96 | break |
| | 97 | |
| | 98 | except: |
| | 99 | log.exception("Delivery of message %s failed." % message.id) |
| | 100 | break |
| | 101 | |
| | 102 | else: |
| | 103 | log.info("Delivery of message %s successful or deferred." % message.id) |
| | 104 | |