import sys
import pdb
from bdb import BdbQuit
from pdb import Pdb
import os
import signal
from functools import partial
import traceback
#from vsi.tools import static
if os.name == 'nt':
from vsi.windows.named_pipes import Pipe
import threading
import ctypes
ATTACH_SIGNAL = signal.SIGINT
else:
ATTACH_SIGNAL=signal.SIGUSR1
[docs]def find_frame(frame, depth=0):
''' Parameters
---------
frame : str
The Frame
depth : int
The Depth
'''
if not frame:
frame = sys._getframe()
for d in range(depth):
if frame.f_back is None:
break
frame = frame.f_back
return frame
[docs]def set_attach(db_cmd=None, signal=ATTACH_SIGNAL):
''' Set up this process to be "debugger attachable"
Parameters
----------
db_cmd : str
The Debugger Command
signal : int
The Attach Signal
Just like gdb can attach to a running process, if you execute this on a
process, now you can "attach" to the running python using the attach
command
This works pretty well, and allows you to resume the code UNLESS you are
running in windows and happen to interrupt a sleep command.
'''
#Todo: Add tcp OPTION?
signal.signal(signal, partial(handle_db, db_cmd=db_cmd))
if os.name == 'nt':
thread = threading.Thread(target=pipe_server)
thread.daemon = True
thread.start()
#print(os.getpid())
[docs]def attach(pid, signal=ATTACH_SIGNAL):
''' Trigger a python pid that's been already run set_attach
Parameters
----------
pid : str
The Process ID
signal : int
The Attach Signal
This is the second part of attaching to a python process. Once
set_attach is run, on another prompt running attach will trigger
the interrupt thing attaching or triggering whatever db_cmd was
'''
if os.name == 'nt':
pipe = Pipe('vdb_%d' % pid)
pipe.write('vsi')
pipe.close()
else:
os.kill(pid, signal)
[docs]def pipe_server():
''' Part of attach/set_attach for Windows '''
while 1:
pipe = Pipe('vdb_%d' % os.getpid(), server=True)
knock = pipe.read(3)
if knock == 'vsi':
os.kill(0, signal.CTRL_C_EVENT)
#ctypes.windll.kernel32.GenerateConsoleCtrlEvent(0, os.getpid())
pipe.disconnect()
pipe.close()
[docs]def handle_db(sig, frame, db_cmd=None, signal=ATTACH_SIGNAL):
''' signal handler part of attach/set_attach
Parameters
----------
sig :
frame :
db_cmd : str
The Debugger Command
signal : int
The Attach Signal
'''
if sig == signal:
#if not hasattr(sys, 'ps1'): #If not interactive
if db_cmd:
db_cmd()
else: #default behavior
set_trace(frame)
[docs]class RunningTrace():
''' Mixin to give any bdb a running settrace
Example::
from pdb import Pdb as OldPdb
from vsi.tools.vdb import RunningTrace
class Pdb(OldPdb, RunningTrace):
pass
p = Pdb()
p.set_running_trace()
p.onecmd('b my_function')
'''
[docs] def set_running_trace(self, frame=None):
''' Start debugging from frame, but don't enter interactive mode
If frame is not specified, debugging starts from caller's frame.
'''
if frame is None:
frame = sys._getframe().f_back
self.botframe = None
self.setup(frame, None)
while frame:
frame.f_trace = self.trace_dispatch
self.botframe = frame
frame = frame.f_back
self.set_continue()
sys.settrace(self.trace_dispatch)
[docs] @classmethod
def get_db(cls, debugger_cls=Pdb):
''' Helper function to mixin and instantiate in simple cases
Parameters
----------
debugger_cls : :term:`class`
Bdb based Debugger class to be mixed in
Returns
-------
object
An instance of the debugger class with ``set_runnable_trace`` added
'''
new_cls = type(debugger_cls.__name__+"Runnable", (debugger_cls, cls), {})
return new_cls()
[docs]class PostMortemHook(object):
original_excepthook = None
[docs] @staticmethod
def dbclear_if_error():
if PostMortemHook.original_excepthook != None:
sys.excepthook = PostMortemHook.original_excepthook
PostMortemHook.original_excepthook = None
[docs] @classmethod
def dbstop_if_error(cls, interactive=False, *args, **kwargs):
''' Parameters
----------
interactive : bool
True if interactive. False if not.
*args
Variable length argument list.
**kwargs
Arbitrary keyword arguments.
'''
if PostMortemHook.original_excepthook == None:
PostMortemHook.original_excepthook = sys.excepthook
cls.set_post_mortem(interactive, *args, **kwargs)
[docs] @staticmethod
def set_post_mortem(interactive=False):
''' Overrite this function for each debugger
Parameters
----------
interactive : bool
True if interactive. False if not.
Raises
------
Exception
Makes users aware that this is purely a virtual function
'''
raise Exception('Purely virtual function')
[docs]class DbStopIfErrorGeneric(object):
''' With statement for local dbstop situations '''
ignore_exception = False
def __init__(self, threading_support=False, *args, **kwargs):
self.args = args
self.kwargs = kwargs
''' Parameters
----------
threading_support : bool
Optional. Support the threading module and patch a bug preventing
catching exceptions in other threads. See add_threading_excepthook
for more info. Only neccesary if you want to catch exceptions not
on the main thread. This is only patched after __enter__ unpatched
at __exit__
*args
Variable length argument list.
**kwargs
Arbitrary keyword arguments.
All other args from db_stop_if_error()
'''
self.threading_support=threading_support
def __enter__(self):
if self.threading_support:
import threading
self.threading_init = threading.Thread.__init__
add_threading_excepthook()
self.original_excepthook = sys.excepthook
self.get_post_mortem_class().set_post_mortem(*self.args, **self.kwargs)
[docs] def __exit__(self, exc_type, exc_value, tb):
''' Parameters
----------
exc_type : str
The Execption Type
exc_value : float
The Exception Value
tb : str
The Traceback
'''
if self.threading_support:
import threading
threading.Thread.__init__ = self.threading_init
sys.excepthook = self.original_excepthook
if tb is not None:
print('Exception detected!!!')
pm = self.get_post_mortem()
pm(tb, *self.args, **self.kwargs)
if self.ignore_exception:
return True
[docs] def get_post_mortem(self):
''' Returns
-------
:term:`function`
Should return a function that takes a traceback as the first
argument and any additional args/kwargs sent to __init__ after that
Raises
------
Exception
For a virtual function
'''
raise Exception('Purely virtual function')
[docs] @classmethod
def set_continue_exception(cls):
''' Continue running code after exception
Parameters
---------
cls : bool
True to continue to run code after the exception. Default: False.
After the with statement scope fails, if this is called, python will
continue running as if there was no error. Can be useful, can also be
dangerous. So don't abuse it!
'''
cls.ignore_exception = True
[docs]def dbstop_exception_hook(type, value, tb,
post_mortem,
interactive=False):
''' Parameters
----------
type : :term:`class`
sys.excepthook variable
value : object
sys.excepthook variable
tb : object
sys.excepthook variable
post_mortem : :term:`function`
The Post Mortem Handler Function
Interactive : bool
True if interactive. False if not.
'''
if not interactive and (hasattr(sys, 'ps1') or not sys.stderr.isatty()):
# we are in interactive mode or we don't have a tty-like
# device, so we call the default hook
sys.__excepthook__(type, value, tb)
else:
#we are NOT in interactive mode, print the exception
traceback.print_exception(type, value, tb)
# ...then start the debugger in post-mortem mode.
post_mortem(tb)
[docs]def break_pool_worker():
''' Setup the ThreadPool to break when an exception occurs (so that it can
be debugged)
The ThreadPool class (and the Pool class too, but not useful here)
always catches any exception and raises it in the main thread. This
is nice for normal behavior, but for debugging, it makes it impossible
to do post mortem debugging. In order to automatically attach a post
mortem debugger, the exception has to be thrown. An exception being
thrown WILL BREAK the pool call, and not allow your main function to
continue, however you can now attach a debugger post_mortem. Useful
with dbstop_if_error
Threading has a "bug" where exceptions are also automatically caught.
http://bugs.python.org/issue1230540
In order to make THIS work, call add_threading_excepthook too
Example::
>>> from multiprocessing.pool import ThreadPool
>>> import vsi.tools.vdb as vdb
>>> def a(b):
... print(b)
... if b==3:
... does_not_exist()
>>> vdb.dbstop_if_error()
>>> vdb.break_pool_worker()
>>> vdb.add_threading_excepthook()
>>> tp = ThreadPool(3)
>>> tp.map(a, range(10))
'''
import multiprocessing.pool
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
inqueue._writer.close()
outqueue._reader.close()
if initializer is not None:
initializer(*initargs)
completed = 0
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = get()
except (EOFError, IOError):
multiprocessing.pool.debug('worker got EOFError or IOError -- exiting')
break
if task is None:
multiprocessing.pool.debug('worker got sentinel -- exiting')
break
job, i, func, args, kwds = task
# try:
result = (True, func(*args, **kwds))
# except Exception as e:
# result = (False, e)
try:
put((job, i, result))
except Exception as e:
wrapped = multiprocessing.pool.MaybeEncodingError(e, result[1])
multiprocessing.pool.debug("Possible encoding error while sending result: %s" % (wrapped))
put((job, i, (False, wrapped)))
completed += 1
multiprocessing.pool.debug('worker exiting after %d tasks' % completed)
multiprocessing.pool.worker = worker
[docs]def add_threading_excepthook():
"""
Workaround for sys.excepthook thread bug
From
http://spyced.blogspot.com/2007/06/workaround-for-sysexcepthook-bug.html
(https://sourceforge.net/tracker/?func=detail&atid=105470&aid=1230540&group_id=5470).
Call once from __main__ before creating any threads.
If using psyco, call psyco.cannotcompile(threading.Thread.run)
since this replaces a new-style class method.
Raises
------
KeyboardInterrupt
SystemExit
"""
import threading, sys
init_old = threading.Thread.__init__
def init(self, *args, **kwargs):
''' Parameters
----------
*args
Variable length argument list.
**kwargs
Arbitrary keyword arguments.
'''
import sys
init_old(self, *args, **kwargs)
run_old = self.run
def run_with_except_hook(*args, **kw):
''' Parameters
----------
*args
Variable length argument list.
**kw
Arbitrary keyword arguments.
'''
try:
run_old(*args, **kw)
except (KeyboardInterrupt, SystemExit):
raise
except:
sys.excepthook(*sys.exc_info())
self.run = run_with_except_hook
threading.Thread.__init__ = init
[docs]def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--pid', '-p', type=int, default=None)
db = parser.add_mutually_exclusive_group(required=False)
db.add_argument('--rpdb2', '-r', default=False, action='store_true',
help='Attach using rpdb2')
db.add_argument('--rpdb', default=False, action='store_true',
help='Attach using rpdb (Client not implemented, use putty)')
db.add_argument('--winpdb', '--gui', '-g', default=False, action='store_true',
help='Attach using winpdb')
parser.add_argument('--ip', default='127.0.0.1',
help='Set ip address for rpdb/rpdb2/winpdb to attach on')
parser.add_argument('--port', default=4444, type=int,
help='Set port for rpdb to attach on')
parser.add_argument('--password', '--pw', default='vsi')
parser.add_argument('args', nargs='*',
help='Command to run with vdb attached. Not implemented yet')
args = parser.parse_args()
if args.pid:
#attach to a pid
if args.rpdb2 or args.winpdb:
from .vdb_rpdb2 import attach as rpdb2_attach
rpdb2_attach(args.pid, password=args.password, ip=args.ip, gui=args.winpdb)
elif args.rpdb:
from .vdb_rpdb import attach as rpdb_attach
rpdb_attach(args.pid, ip=args.ip, port=args.port)
else:
attach(args.pid)
else:
pass #Do whatever pdb does to run the command
#Copy pdb.main or ipdb.main
if __name__=='__main__':
main()