Show
Ignore:
Timestamp:
10/26/07 05:48:32 (1 year ago)
Author:
amcgregor
Message:

TurboMail 3.0 is now usable: the smtp and debug providers are working, immediate and demand managers are good-to-go, and S/MIME signatures work, too\!

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • branches/3.0/turbomail/managers/demand.py

    r40 r52  
     1# encoding: utf-8 
     2 
     3"""TurboMail extension API.""" 
     4 
     5import logging 
     6log = logging.getLogger("turbomail.manager") 
     7 
     8import turbomail 
     9from turbomail.api import Manager 
     10from turbomail.exceptions import ProviderExhaustedException 
     11 
     12import math, copy 
     13from Queue import Queue, Empty 
     14from threading import Event, Thread 
     15from turbomail.dispatch import Dispatch 
     16 
     17__all__ = ['load'] 
     18 
     19 
     20def load(): 
     21        return DemandManager() 
     22 
     23 
     24class DemandManager(Manager): 
     25        name = "Demand" 
     26        version = "1.0" 
     27        url = "http://www.python-turbomail.org/wiki/DemandManager" 
     28         
     29        def __init__(self): 
     30                log.info("Demand manager starting up.") 
     31                 
     32                super(DemandManager, self).__init__() 
     33                 
     34                self.pool = 0 
     35                self.queue = Queue() 
     36                self.finished = Event() 
     37                 
     38                self.threads = turbomail.config.get("mail.demand.threads", 4) # Maximum number of threads to create. 
     39                self.divisor = turbomail.config.get("mail.demand.divisor", 10) # Estimate the number of required threads by dividing the queue size by this. 
     40                self.timeout = turbomail.config.get("mail.demand.timeout", 60) 
     41                 
     42                log.info("Demand manager ready.") 
     43         
     44        def optimum(self): 
     45                return min(self.threads, math.ceil(self.queue.qsize() / float(self.divisor))) 
     46         
     47        optimum = property(optimum) 
     48         
     49        def stop(self): 
     50                log.info("Demand manager shutting down.") 
     51                self.finished.set() 
     52         
     53        def spawn(self): 
     54                thread = Thread(target=self.wrapper) 
     55                thread.start() 
     56                self.pool += 1 
     57                 
     58        def deliver(self, message): 
     59                log.info("Adding message %s to the queue for background delivery." % message.id) 
     60                self.queue.put(copy.deepcopy(message)) 
     61                message._processed = True 
     62                message._dirty = True 
     63                 
     64                if not self.queue.empty() and self.pool < self.optimum: 
     65                        tospawn = int(self.optimum - self.pool) 
     66                        log.debug("Spawning %d thread%s." % (tospawn, tospawn != 1 and "s" or "")) 
     67                        for i in range(tospawn): 
     68                                self.spawn() 
     69                 
     70        def wrapper(self): 
     71                log.debug("Mail queue worker starting up.") 
     72                 
     73                self.worker() 
     74                 
     75                self.pool -= 1 
     76                log.debug("Mail queue worker finished.") 
     77         
     78        def worker(self): 
     79                log.debug("Requesting new provider instance.") 
     80                provider = turbomail.provider.new() 
     81                if not provider: raise ManagerException, "Unable to allocate new provider." 
     82                 
     83                while True: 
     84                        try: 
     85                                message = self.queue.get(True, self.timeout) 
     86                                provider.deliver(message) 
     87                         
     88                        except Empty: 
     89                                log.debug("Worker death from starvation.") 
     90                                break 
     91                         
     92                        except ProviderExhaustedException: 
     93                                log.debug("Worker death from provider exhaustion - spawning child.") 
     94                                self.deliver(message) 
     95                                self.spawn() 
     96                                break 
     97                         
     98                        except: 
     99                                log.exception("Delivery of message %s failed." % message.id) 
     100                                break 
     101                         
     102                        else: 
     103                                log.info("Delivery of message %s successful or deferred." % message.id) 
     104