Ticket #10: refactor.diff
| File refactor.diff, 82.9 kB (added by alberto@…, 2 years ago) |
|---|
-
setup.py
2 2 # encoding: utf-8 3 3 4 4 import sys 5 import os 5 6 from setuptools import setup, find_packages 6 from turbogears.finddata import find_package_data 7 from fnmatch import fnmatchcase 8 from distutils.util import convert_path 7 9 8 10 if sys.version_info < (2, 3): 9 11 raise SystemExit("Python 2.3 or later is required") 10 12 11 import os 13 def find_package_data( package='', where='.', only_in_packages=True): 14 """Finds static resources in package. Adapted from turbogears.finddata.""" 15 out = {} 16 exclude = ('*.py', '*.pyc', '*~', '.*', '*.bak', '*.swp*') 17 exclude_directories = ('.*', 'CVS', '_darcs', './build', 18 './dist', 'EGG-INFO', '*.egg-info') 19 stack = [(convert_path(where), '', package, only_in_packages)] 20 while stack: 21 where, prefix, package, only_in_packages = stack.pop(0) 22 for name in os.listdir(where): 23 fn = os.path.join(where, name) 24 if os.path.isdir(fn): 25 bad_name = False 26 for pattern in exclude_directories: 27 if (fnmatchcase(name, pattern) 28 or fn.lower() == pattern.lower()): 29 bad_name = True 30 print >> sys.stderr, ( 31 "Directory %s ignored by pattern %s" 32 % (fn, pattern)) 33 break 34 if bad_name: 35 continue 36 if os.path.isfile(os.path.join(fn, '__init__.py')): 37 if not package: 38 new_package = name 39 else: 40 new_package = package + '.' + name 41 stack.append((fn, '', new_package, False)) 42 else: 43 stack.append((fn, prefix + name + '/', package, only_in_packages)) 44 elif package or not only_in_packages: 45 # is a file 46 bad_name = False 47 for pattern in exclude: 48 if (fnmatchcase(name, pattern) 49 or fn.lower() == pattern.lower()): 50 bad_name = True 51 print >> sys.stderr, ( 52 "File %s ignored by pattern %s" 53 % (fn, pattern)) 54 break 55 if bad_name: 56 continue 57 out.setdefault(package, []).append(prefix+name) 58 return out 59 12 60 execfile(os.path.join("turbomail", "release.py")) 13 61 14 62 setup( 15 name="TurboMail", 16 version=version, 17 18 description=description, 19 long_description=long_description, 20 author=author, 21 author_email=email, 22 url=url, 23 download_url=download_url, 24 license=license, 25 26 install_requires = ["TurboGears >= 0.9a9dev-r2003"], 27 zip_safe=True, 28 packages=find_packages(), 29 package_data = find_package_data(where='turbomail', package='turbomail'), 30 keywords = ["turbogears.extension"], 31 classifiers = [ 32 'Development Status :: 5 - Production/Stable', 33 'Framework :: TurboGears', 34 'Intended Audience :: Developers', 35 'License :: OSI Approved :: MIT License', 36 'Operating System :: OS Independent', 37 'Programming Language :: Python', 38 'Topic :: Communications :: Email', 39 'Topic :: Software Development :: Libraries :: Python Modules', 40 ], 41 test_suite = 'nose.collector', 42 entry_points = { 43 # 'paste.paster_create_template': ["turbomail = turbomail.startup:MailTemplate"] 44 'turbogears.extensions': ["turbomail = turbomail"] 45 } 46 ) 47 63 name="TurboMail", 64 version=version, 65 66 description=description, 67 long_description=long_description, 68 author=author, 69 author_email=email, 70 url=url, 71 download_url=download_url, 72 license=license, 73 74 install_requires = [], 75 extras_require = { 76 'turbogears' : ["TurboGears >= 0.9a9dev-r2003"], 77 }, 78 zip_safe=True, 79 packages=find_packages(), 80 package_data = find_package_data(where='turbomail', package='turbomail'), 81 keywords = ["turbogears.extension"], 82 classifiers = [ 83 'Development Status :: 5 - Production/Stable', 84 'Framework :: TurboGears', 85 'Intended Audience :: Developers', 86 'License :: OSI Approved :: MIT License', 87 'Operating System :: OS Independent', 88 'Programming Language :: Python', 89 'Topic :: Communications :: Email', 90 'Topic :: Software Development :: Libraries :: Python Modules', 91 ], 92 test_suite = 'nose.collector', 93 entry_points = { 94 # 'paste.paster_create_template': ["turbomail = turbomail.startup:MailTemplate"] 95 'turbogears.extensions': ["turbomail = turbomail.ext.tg"] 96 } 97 ) -
turbomail/pool.py
15 15 16 16 17 17 class Pool(Thread): 18 """A threadpool which checks regularily for new jobs and spawns processes 19 as needed. 20 21 Do not use this class directly. Always subclass and override the worker 22 method. 23 """ 24 25 def __init__(self, interval=10, threads=4, jobs=10, timeout=60, polling=False, **kw): 26 """Initialize the threadpool. 27 28 @param interval: A delay, in seconds, between spawn runs. 29 @type interval: int 30 31 @param threads: The maximum number of concurrent threads. 32 @type threads: int 33 34 @param jobs: The maximum number of jobs a single thread is 35 allowed to handle before dying of old age. 36 @type jobs: int 37 38 @param timeout: The amount of time, in seconds, a thread is 39 allowed to sit idle before dying of starvation. 40 @type timeout: int 41 42 @param polling: Enable or disable the periodic polling 43 mechanism. Disabled, threads will be created, 44 as required, when work is enqueued. 45 @type polling: bool 46 """ 47 48 super(Pool, self).__init__() 49 50 self._pool = 0 51 self._queue = Queue() 52 self._finished = Event() 53 self._interval = interval 54 self._threads = threads 55 self._jobs = jobs 56 self._timeout = timeout 57 self._polling = False 58 self._kw = kw 59 60 log.debug("Thread pool created.") 61 62 def enqueue(self, work, block=True, timeout=None): 63 """Enqueue a Message instance. 64 65 @param work: The unit of work can be any callable that returns a 66 three-item tuple containing the sender and 67 recipient addresses and a properly formatted MIME 68 message, in that order. The preferred type is an 69 instance of the Message class or subclass. 70 @type work: callable 71 72 @param block: Block code execution until there is a free slot in 73 the queue. If I{block} is True and I{timeout} is 74 None, block indefinately. 75 @type block: bool 76 77 @param timeout: How long to block execution (in seconds). If 78 the timeout expires (or block is false and the 79 queue is full) raise the Full exception. 80 @type timeout: int 81 """ 18 """A threadpool which checks regularily for new jobs and spawns processes 19 as needed. 82 20 83 if callable(work): 84 self._queue.put(work(), block=block, timeout=timeout) 85 else: 86 self._queue.put(work, block=block, timeout=timeout) 87 88 optimum_threads = min(self._threads, math.ceil(self._queue.qsize() / float(self._jobs))) 21 Do not use this class directly. Always subclass and override the worker 22 method. 23 """ 89 24 90 if not self._polling and not self._queue.empty() and self._pool < optimum_threads: 91 log.debug("Work enqueued. Spawning %d threads." % (optimum_threads - self._pool)) 92 for i in range(int(optimum_threads - self._pool)): 93 self.spawn() 25 def __init__(self, interval=10, threads=4, jobs=10, timeout=60, polling=False, **kw): 26 """Initialize the threadpool. 94 27 95 else: 96 log.debug("Work enqueued.") 97 98 99 def shutdown(self): 100 """Quit the management thread and shutdown the queue.""" 101 102 log.debug("Shutdown requested.") 103 self._finished.set() 28 @param interval: A delay, in seconds, between spawn runs. 29 @type interval: int 104 30 105 def spawn(self): 106 thread = Thread(target=self.wrapper) 107 thread.start() 108 self._pool += 1 31 @param threads: The maximum number of concurrent threads. 32 @type threads: int 109 33 110 def run(self): 111 """The management thread. 112 113 Do not call directly. Instead, use the I{start} method. 114 """ 115 116 log.debug("Beginning thread pool main loop.") 117 118 while True: 119 if self._finished.isSet(): 120 log.debug("Shutdown request acknowledged.") 121 break 122 123 if not self._queue.empty(): 124 log.debug("Estimate %d work units in the queue." % self._queue.qsize()) 125 126 optimum_threads = min(self._threads, math.ceil(self._queue.qsize() / float(self._jobs))) 127 128 if not self._queue.empty() and self._pool < optimum_threads: 129 log.debug("Creating %d threads." % (optimum_threads - self._pool)) 130 for i in range(int(optimum_threads - self._pool)): 131 self.spawn() 132 133 self._finished.wait(self._interval) 134 135 log.debug("Thread pool main loop has ended.") 136 137 def wrapper(self): 138 """Thread wrapper to log and keep track of the active thread count.""" 139 140 log.debug("Thread pool worker starting up.") 141 142 self.worker() 143 144 self._pool -= 1 145 log.debug("Thread pool worker finished.") 34 @param jobs: The maximum number of jobs a single thread is 35 allowed to handle before dying of old age. 36 @type jobs: int 146 37 147 def worker(self): 148 """This method must be overridden in a subclass and is used to 149 perform the work of the threadpool. 150 151 Will raise a NotImplementedError exception if not subclassed.""" 152 153 raise NotImplementedError 38 @param timeout: The amount of time, in seconds, a thread is 39 allowed to sit idle before dying of starvation. 40 @type timeout: int 154 41 42 @param polling: Enable or disable the periodic polling 43 mechanism. Disabled, threads will be created, 44 as required, when work is enqueued. 45 @type polling: bool 46 """ 155 47 48 super(Pool, self).__init__() 49 50 self._pool = 0 51 self._queue = Queue() 52 self._finished = Event() 53 self._interval = interval 54 self._threads = threads 55 self._jobs = jobs 56 self._timeout = timeout 57 self._polling = False 58 self._kw = kw 59 60 log.debug("Thread pool created.") 61 62 def enqueue(self, work, block=True, timeout=None): 63 """Enqueue a Message instance. 64 65 @param work: The unit of work can be any callable that returns a 66 three-item tuple containing the sender and 67 recipient addresses and a properly formatted MIME 68 message, in that order. The preferred type is an 69 instance of the Message class or subclass. 70 @type work: callable 71 72 @param block: Block code execution until there is a free slot in 73 the queue. If I{block} is True and I{timeout} is 74 None, block indefinately. 75 @type block: bool 76 77 @param timeout: How long to block execution (in seconds). If 78 the timeout expires (or block is false and the 79 queue is full) raise the Full exception. 80 @type timeout: int 81 """ 82 83 if callable(work): 84 self._queue.put(work(), block=block, timeout=timeout) 85 else: 86 self._queue.put(work, block=block, timeout=timeout) 87 88 optimum_threads = min(self._threads, math.ceil(self._queue.qsize() / float(self._jobs))) 89 90 if not self._polling and not self._queue.empty() and self._pool < optimum_threads: 91 log.debug("Work enqueued. Spawning %d threads." % (optimum_threads - self._pool)) 92 for i in range(int(optimum_threads - self._pool)): 93 self.spawn() 94 95 else: 96 log.debug("Work enqueued.") 97 98 99 def shutdown(self): 100 """Quit the management thread and shutdown the queue.""" 101 102 log.debug("Shutdown requested.") 103 self._finished.set() 104 105 def spawn(self): 106 thread = Thread(target=self.wrapper) 107 thread.start() 108 self._pool += 1 109 110 def run(self): 111 """The management thread. 112 113 Do not call directly. Instead, use the I{start} method. 114 """ 115 116 log.debug("Beginning thread pool main loop.") 117 118 while True: 119 if self._finished.isSet(): 120 log.debug("Shutdown request acknowledged.") 121 break 122 123 if not self._queue.empty(): 124 log.debug("Estimate %d work units in the queue." % self._queue.qsize()) 125 126 optimum_threads = min(self._threads, math.ceil(self._queue.qsize() / float(self._jobs))) 127 128 if not self._queue.empty() and self._pool < optimum_threads: 129 log.debug("Creating %d threads." % (optimum_threads - self._pool)) 130 for i in range(int(optimum_threads - self._pool)): 131 self.spawn() 132 133 self._finished.wait(self._interval) 134 135 log.debug("Thread pool main loop has ended.") 136 137 def wrapper(self): 138 """Thread wrapper to log and keep track of the active thread count.""" 139 140 log.debug("Thread pool worker starting up.") 141 142 self.worker() 143 144 self._pool -= 1 145 log.debug("Thread pool worker finished.") 146 147 def worker(self): 148 """This method must be overridden in a subclass and is used to 149 perform the work of the threadpool. 150 151 Will raise a NotImplementedError exception if not subclassed.""" 152 153 raise NotImplementedError 154 155 156 156 class MailPool(Pool): 157 """Mail delivery threadpool. 158 159 This class delivers messages from a queue using the Dispatch class. 157 """Mail delivery threadpool. 160 158 161 Example usage:: 162 163 import turbomail 164 pool = turbomail.MailPool() 165 message = turbomail.Message( 166 "from@localhost", 167 "to@localhost", 168 "Subject" 169 ) 170 message.plain = "Hello world!" 171 pool.enqueue(message) 172 # wait for message to send 173 pool.shutdown() 159 This class delivers messages from a queue using the Dispatch class. 174 160 175 """ 176 177 def worker(self): 178 """Deliver up to I{jobs} messages per queue. 179 180 If there are no messages available in the queue, the worker 181 will wait up to I{timeout} seconds for data. If the timeout 182 expires, the thread will exit gracefully.""" 183 184 count = 0 185 dispatch = Dispatch(**self._kw) 186 187 log.debug("Worker starting work.") 188 189 while True: 190 if not count < self._jobs: 191 log.debug("Worker death from old age - spawning child.") 192 self.spawn() 193 break 194 195 try: 196 unit = self._queue.get(True, self._timeout) 197 dispatch(unit) 161 Example usage:: 198 162 199 except Empty: 200 log.debug("Worker death from starvation.") 201 break 202 203 count += 1 163 import turbomail 164 pool = turbomail.MailPool() 165 message = turbomail.Message( 166 "from@localhost", 167 "to@localhost", 168 "Subject" 169 ) 170 message.plain = "Hello world!" 171 pool.enqueue(message) 172 # wait for message to send 173 pool.shutdown() 174 175 """ 176 177 def worker(self): 178 """Deliver up to I{jobs} messages per queue. 179 180 If there are no messages available in the queue, the worker 181 will wait up to I{timeout} seconds for data. If the timeout 182 expires, the thread will exit gracefully.""" 183 184 count = 0 185 dispatch = Dispatch(**self._kw) 186 187 log.debug("Worker starting work.") 188 189 while True: 190 if not count < self._jobs: 191 log.debug("Worker death from old age - spawning child.") 192 self.spawn() 193 break 194 195 try: 196 unit = self._queue.get(True, self._timeout) 197 dispatch(unit) 198 199 except Empty: 200 log.debug("Worker death from starvation.") 201 break 202 203 count += 1 -
turbomail/ext/tg.py
1 # encoding: utf-8 2 3 """TurboGears extension startup and shutdown interface.""" 4 5 import logging 6 log = logging.getLogger("turbomail.startup") 7 8 import turbogears 9 10 import turbomail 11 from turbomail.exceptions import * 12 from turbomail.pool import MailPool 13 from turbomail.message import Message 14 15 from email import Charset 16 17 18 __all__ = ['start_extension', 'shutdown_extension'] 19 20 21 def start_extension(): 22 """TurboGears extension startup. 23 24 Exits immediately if TurboMail is not enabled and creates a MailPool 25 instance if all is well. 26 """ 27 28 if not turbogears.config.get("mail.on", False): 29 return 30 31 if turbogears.config.get("mail.server", None) is None: 32 raise MailConfigurationException("Outbound server must be specified.") 33 34 log.info("Outbound mail queue manager starting.") 35 36 if turbogears.config.get("mail.encoding", "us-ascii") == "utf-8-qp": 37 Charset.add_charset('utf-8', Charset.SHORTEST, Charset.QP, 'utf-8') 38 turbogears.config.update({"mail.encoding": "utf-8"}) 39 40 # Monkey-patch Message for backwards compatibility 41 Message.default_encoding = turbogears.config.get("mail.encoding", 42 Message.default_encoding) 43 turbomail._queue = MailPool( 44 interval=turbogears.config.get("mail.interval", 10), 45 threads=turbogears.config.get("mail.threads", 4), 46 jobs=turbogears.config.get("mail.jobs", 10), 47 timeout=turbogears.config.get("mail.timeout", 60), 48 server=turbogears.config.get("mail.server"), 49 username=turbogears.config.get("mail.username", None), 50 password=turbogears.config.get("mail.password", None), 51 debug=turbogears.config.get("mail.debug", False), 52 tls=turbogears.config.get("mail.tls", None), 53 polling=turbogears.config.get("mail.polling", None) 54 ) 55 56 if turbomail._queue._polling: 57 turbomail.queue.start() 58 59 60 def shutdown_extension(): 61 """TurboGears extension shutdown. 62 63 Exits immediately if TurboMail is not enabled and shuts down the 64 MailPool object safely. 65 """ 66 67 if not turbogears.config.get("mail.on", False): 68 return 69 70 log.info("Outbound mail queue manager shutting down.") 71 72 # turbomail.queue.shutdown() 73 turbomail.queue = None -
turbomail/__init__.py
4 4 Introduction 5 5 ============ 6 6 7 TurboMail is a TurboGears extension - meaning that it starts up and 8 shuts down alongside TurboGears applications you write in the same 9 way that visit tracking and identity do. TurboMail uses built-in 10 Python modules for SMTP communication and MIME e-mail creation, but 11 greatly simplifies these tasks by performing the grunt-work for you. 12 13 Being multi-threaded, TurboMail allows you to enqueue messages to be 14 sent and then immediately continue with processing, resulting in a 15 much more fluid user experience. Threads are handled intelligently 16 (increasing the number of threads as demand increases) and they are 17 automatically recycled. There is only ever one SMTP connection per 18 thread. 19 20 Benchmarking 21 ------------ 22 23 Throughput using the default options is sufficient for most use: 24 100 messages in 45 seconds; just over 2 messages a second. Using 25 a greater number of threads, 10 vs. 4, 100 messags take 30 26 seconds; just over 3 messages a second. YMMV. Note that if a 27 thread is idle, it will immediately deliver requests added to 28 the queue, thus increasing the idle time will increase sparse 29 performance. 30 31 TurboMail is heavily inspired by PHPMailer, a very, very handy class 32 for PHP 4 & 5 by Brent R. Matzelle. 33 7 TurboMail is a TurboGears extension - meaning that it starts up and 8 shuts down alongside TurboGears applications you write in the same 9 way that visit tracking and identity do. TurboMail uses built-in 10 Python modules for SMTP communication and MIME e-mail creation, but 11 greatly simplifies these tasks by performing the grunt-work for you. 34 12 13 Being multi-threaded, TurboMail allows you to enqueue messages to be 14 sent and then immediately continue with processing, resulting in a 15 much more fluid user experience. Threads are handled intelligently 16 (increasing the number of threads as demand increases) and they are 17 automatically recycled. There is only ever one SMTP connection per 18 thread. 19 20 Benchmarking 21 ------------ 22 23 Throughput using the default options is sufficient for most use: 24 100 messages in 45 seconds; just over 2 messages a second. Using 25 a greater number of threads, 10 vs. 4, 100 messags take 30 26 seconds; just over 3 messages a second. YMMV. Note that if a 27 thread is idle, it will immediately deliver requests added to 28 the queue, thus increasing the idle time will increase sparse 29 performance. 30 31 TurboMail is heavily inspired by PHPMailer, a very, very handy class 32 for PHP 4 & 5 by Brent R. Matzelle. 33 34 35 35 Installation 36 36 ============ 37 37 38 Simply easy_install the package::38 Simply easy_install the package:: 39 39 40 easy_install TurboMail40 easy_install TurboMail 41 41 42 TurboMail installs no external scripts.42 TurboMail installs no external scripts. 43 43 44 44 Upgrade 45 45 ======= 46 46 47 Upgrading also uses easy_install:: 48 49 easy_install -U TurboMail 47 Upgrading also uses easy_install:: 50 48 49 easy_install -U TurboMail 50 51 51 Configuration 52 52 ============= 53 53 54 TurboMail understands a large number of configuration options, all 55 piggy-backed from your application's configuration. Organized into 56 two groups, the advanced set can be safely ignored in most 57 applications. Each option is listed with its default value. 58 59 Simple Options 60 -------------- 61 62 - I{mail.on} (Default: B{False}) Enable TurboMail. B{Required.} 63 - I{mail.server} (Default: B{None}) SMTP server address. 64 B{Required.} 65 - I{mail.username} (Default: B{None}) 66 - I{mail.password} (Default: B{None}) 67 68 Both a username and password are required to enable 69 authentication. 70 71 Advanced Options 72
