Source code for watchmen.watchmen

# watchmen is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# watchmen is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with watchmen.  If not, see <http://www.gnu.org/licenses/>.

'''
Created on Aug 8, 2016

author: jakeret
'''
from __future__ import print_function, division, absolute_import, unicode_literals

import threading
import psutil
import time
import sys
import os
import Queue
import functools
import inspect
import ctypes
import gc

__all__ = ["watch"]

SLEEP_TIME = 0.2

class WatchmenException(Exception):
    pass


class Event(object):
    SUCCESS = "sucess"
    ERROR = "error"
    LIMIT = "limit"
    
    def __init__(self, event_type, value):
        self.type = event_type
        self.value = value
        

class Watcher(threading.Thread):
    
    def __init__(self, queue, pid, sleep_time=None):
        self.queue = queue
        
        if sleep_time is None:
            sleep_time = SLEEP_TIME
        self.sleep_time = sleep_time
        self.process = psutil.Process(pid)
            
        self._cancelled = False
        super(Watcher, self).__init__()

    def _active(self):
        return not self._cancelled and self.process.is_running()

    def run(self):
        try:
            while self._active():
                state = self.update_state()
                if state is not None:
                    self.queue.put(state)
                    self.cancel()
                    
                time.sleep(self.sleep_time)
        except psutil.NoSuchProcess:
            self.cancel()
        except psutil.AccessDenied:
            self.cancel()
            
    def cancel(self):
        self._cancelled = True

class MemoryWatcher(Watcher):
    
    def __init__(self, max_mem, queue, pid, sleep_time=None):
        super(MemoryWatcher, self).__init__(queue, pid, sleep_time)
        self.max_mem = max_mem
    
    def update_state(self):
        processes = self.process.children(recursive=True)
        processes.append(self.process)
        
        rss_mem=0
        for process in processes:
            rss_mem += process.memory_info()[0]
        
        rss_mem = rss_mem / 1024 / 1024
        if rss_mem > self.max_mem:
            return Event(Event.LIMIT, "Memory limit exceeded. RSS: {} MB".format(rss_mem))

class TimeWatcher(Watcher):
    
    def __init__(self, max_time, queue, pid, sleep_time=None):
        super(TimeWatcher, self).__init__(queue, pid, sleep_time)
        self.max_time = max_time
        self.start_time = None
    
    def update_state(self):
        if self.start_time is None:
            self.start_time = time.time()
            
        delta = time.time() - self.start_time
        if delta > self.max_time:
            return Event(Event.LIMIT, "Time limit exceeded")

class CallableWrapper(threading.Thread):
    
    def __init__(self, func, queue, *args, **kwargs):
        self.callable = func
        self.queue = queue
        self.args = args
        self.kwargs = kwargs
        self.exception = None
        super(CallableWrapper, self).__init__()
        
    def run(self):
        try:
            result = self.callable(*self.args, **self.kwargs)
            self.queue.put(Event(Event.SUCCESS, result))
        except Exception:
            exc_info = sys.exc_info()
            self.queue.put(Event(Event.ERROR, exc_info))
             
    def cancel(self):
        pass

    def raiseExc(self, exctype):
        """Raises the given exception type in the context of this thread.

        If the thread is busy in a system call (time.sleep(),
        socket.accept(), ...), the exception is simply ignored.

        """
        _async_raise(self.ident, exctype )

def _async_raise(tid, exctype):
    '''Raises an exception in the threads with id tid'''
    if not inspect.isclass(exctype):
        raise TypeError("Only types can be raised (not instances)")
    tid = ctypes.c_long(tid)
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid,
                                                     ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    elif res != 1:
        # "if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")


[docs]def watch(original_function=None, max_mem=None, max_time=None, sample_rate=None): """ Decorator to watch a function or method. :param max_mem: (optional) maximal memory limit in MB :param max_time: (optional) maximal execution time in seconds :param sample_rate: (optional) Rate at which the process is being queried """ def _decorate(function): @functools.wraps(function) def wrapped_function(*args, **kwargs): pid = os.getpid() queue = Queue.Queue() threads = [CallableWrapper(function, queue, *args, **kwargs)] if max_mem is not None: threads.append(MemoryWatcher(max_mem, queue, pid, sample_rate)) if max_time is not None: threads.append(TimeWatcher(max_time, queue, pid, sample_rate)) for t in threads[::-1]: t.start() event = queue.get() for t in threads: t.cancel() if event.type == Event.LIMIT: #nasty: trying to kill the wrapper thread threads[0].raiseExc(SystemError) while threads[0].isAlive(): threads[0].raiseExc(SystemError) time.sleep(0.1) gc.collect() raise WatchmenException(event.value) if event.type == Event.ERROR: raise event.value return event.value return wrapped_function if original_function is not None: return _decorate(original_function) return _decorate