A Discrete-Event Network Simulator
API
ipython_view.py
Go to the documentation of this file.
1 """
2 Backend to the console plugin.
3 
4 @author: Eitan Isaacson
5 @organization: IBM Corporation
6 @copyright: Copyright (c) 2007 IBM Corporation
7 @license: BSD
8 
9 All rights reserved. This program and the accompanying materials are made
10 available under the terms of the BSD which accompanies this distribution, and
11 is available at U{http://www.opensource.org/licenses/bsd-license.php}
12 """
13 # this file is a modified version of source code from the Accerciser project
14 # http://live.gnome.org/accerciser
15 
16 from __future__ import print_function
17 import gtk, gobject
18 import re
19 import sys
20 import os
21 from gi.repository import Pango
22 from StringIO import StringIO
23 import IPython
24 
25 from pkg_resources import parse_version
26 
27 try:
28  import IPython
29 except ImportError:
30 
32  IPython = None
33 
34 
36 
75  def __init__(self,argv=None,user_ns=None,user_global_ns=None,
76  cin=None, cout=None,cerr=None, input_func=None):
77  """! Initializer
78 
79  @param self: this object
80  @param argv: Command line options for IPython
81  @param user_ns: User namespace.
82  @param user_global_ns: User global namespace.
83  @param cin: Console standard input.
84  @param cout: Console standard output.
85  @param cerr: Console standard error.
86  @param input_func: Replacement for builtin raw_input()
87  @return none
88  """
89  io = IPython.utils.io
90  if input_func:
91  if parse_version(IPython.release.version) >= parse_version("1.2.1"):
92  IPython.terminal.interactiveshell.raw_input_original = input_func
93  else:
94  IPython.frontend.terminal.interactiveshell.raw_input_original = input_func
95  if cin:
96  io.stdin = io.IOStream(cin)
97  if cout:
98  io.stdout = io.IOStream(cout)
99  if cerr:
100  io.stderr = io.IOStream(cerr)
101 
102  # This is to get rid of the blockage that occurs during
103  # IPython.Shell.InteractiveShell.user_setup()
104 
105  io.raw_input = lambda x: None
106 
107  os.environ['TERM'] = 'dumb'
108  excepthook = sys.excepthook
109 
110  from IPython.config.loader import Config
111  cfg = Config()
112  cfg.InteractiveShell.colors = "Linux"
113 
114  # InteractiveShell's __init__ overwrites io.stdout,io.stderr with
115  # sys.stdout, sys.stderr, this makes sure they are right
116  #
117  old_stdout, old_stderr = sys.stdout, sys.stderr
118  sys.stdout, sys.stderr = io.stdout.stream, io.stderr.stream
119 
120  # InteractiveShell inherits from SingletonConfigurable, so use instance()
121  #
122  if parse_version(IPython.release.version) >= parse_version("1.2.1"):
123  self.IP = IPython.terminal.embed.InteractiveShellEmbed.instance(\
124  config=cfg, user_ns=user_ns)
125  else:
126  self.IP = IPython.frontend.terminal.embed.InteractiveShellEmbed.instance(\
127  config=cfg, user_ns=user_ns)
128 
129  sys.stdout, sys.stderr = old_stdout, old_stderr
130 
131  self.IP.system = lambda cmd: self.shell(self.IP.var_expand(cmd),
132  header='IPython system call: ')
133 # local_ns=user_ns)
134  #global_ns=user_global_ns)
135  #verbose=self.IP.rc.system_verbose)
136 
137  self.IP.raw_input = input_func
138  sys.excepthook = excepthook
139  self.iter_more = 0
140  self.history_level = 0
141  self.complete_sep = re.compile('[\s\{\}\[\]\(\)]')
142  self.updateNamespace({'exit':lambda:None})
143  self.updateNamespace({'quit':lambda:None})
144  self.IP.readline_startup_hook(self.IP.pre_readline)
145  # Workaround for updating namespace with sys.modules
146  #
147  self.__update_namespace()
148 
150  """!
151  Update self.IP namespace for autocompletion with sys.modules
152  """
153  for k, v in list(sys.modules.items()):
154  if not '.' in k:
155  self.IP.user_ns.update({k:v})
156 
157  def execute(self):
158  """!
159  Executes the current line provided by the shell object.
160  """
161  self.history_level = 0
162  orig_stdout = sys.stdout
163  sys.stdout = IPython.utils.io.stdout
164 
165  orig_stdin = sys.stdin
166  sys.stdin = IPython.utils.io.stdin;
167  self.prompt = self.generatePrompt(self.iter_more)
168 
169  self.IP.hooks.pre_prompt_hook()
170  if self.iter_more:
171  try:
172  self.prompt = self.generatePrompt(True)
173  except:
174  self.IP.showtraceback()
175  if self.IP.autoindent:
176  self.IP.rl_do_indent = True
177 
178  try:
179  line = self.IP.raw_input(self.prompt)
180  except KeyboardInterrupt:
181  self.IP.write('\nKeyboardInterrupt\n')
182  self.IP.input_splitter.reset()
183  except:
184  self.IP.showtraceback()
185  else:
186  self.IP.input_splitter.push(line)
187  self.iter_more = self.IP.input_splitter.push_accepts_more()
188  self.prompt = self.generatePrompt(self.iter_more)
189  if (self.IP.SyntaxTB.last_syntax_error and
190  self.IP.autoedit_syntax):
191  self.IP.edit_syntax_error()
192  if not self.iter_more:
193  if parse_version(IPython.release.version) >= parse_version("2.0.0-dev"):
194  source_raw = self.IP.input_splitter.raw_reset()
195  else:
196  source_raw = self.IP.input_splitter.source_raw_reset()[1]
197  self.IP.run_cell(source_raw, store_history=True)
198  self.IP.rl_do_indent = False
199  else:
200  # TODO: Auto-indent
201  #
202  self.IP.rl_do_indent = True
203  pass
204 
205  sys.stdout = orig_stdout
206  sys.stdin = orig_stdin
207 
208  def generatePrompt(self, is_continuation):
209  """!
210  Generate prompt depending on is_continuation value
211 
212  @param is_continuation
213  @return: The prompt string representation
214 
215  """
216 
217  # Backwards compatibility with ipyton-0.11
218  #
219  ver = IPython.__version__
220  if '0.11' in ver:
221  prompt = self.IP.hooks.generate_prompt(is_continuation)
222  else:
223  if is_continuation:
224  prompt = self.IP.prompt_manager.render('in2')
225  else:
226  prompt = self.IP.prompt_manager.render('in')
227 
228  return prompt
229 
230 
231  def historyBack(self):
232  """!
233  Provides one history command back.
234 
235  @param self this object
236  @return: The command string.
237  """
238  self.history_level -= 1
239  if not self._getHistory():
240  self.history_level +=1
241  return self._getHistory()
242 
243  def historyForward(self):
244  """!
245  Provides one history command forward.
246 
247  @param self this object
248  @return: The command string.
249  """
250  if self.history_level < 0:
251  self.history_level += 1
252  return self._getHistory()
253 
254  def _getHistory(self):
255  """!
256  Gets the command string of the current history level.
257 
258  @param self this object
259  @return: Historic command string.
260  """
261  try:
262  rv = self.IP.user_ns['In'][self.history_level].strip('\n')
263  except IndexError:
264  rv = ''
265  return rv
266 
267  def updateNamespace(self, ns_dict):
268  """!
269  Add the current dictionary to the shell namespace.
270 
271  @param ns_dict: A dictionary of symbol-values.
272  @return none
273  """
274  self.IP.user_ns.update(ns_dict)
275 
276  def complete(self, line):
277  """!
278  Returns an auto completed line and/or possibilities for completion.
279 
280  @param line: Given line so far.
281  @return: Line completed as for as possible, and possible further completions.
282  """
283  split_line = self.complete_sep.split(line)
284  if split_line[-1]:
285  possibilities = self.IP.complete(split_line[-1])
286  else:
287  completed = line
288  possibilities = ['', []]
289  if possibilities:
290  def _commonPrefix(str1, str2):
291  """!
292  Reduction function. returns common prefix of two given strings.
293 
294  @param str1: First string.
295  @param str2: Second string
296  @return: Common prefix to both strings.
297  """
298  for i in range(len(str1)):
299  if not str2.startswith(str1[:i+1]):
300  return str1[:i]
301  return str1
302  if possibilities[1]:
303  common_prefix = reduce(_commonPrefix, possibilities[1]) or line[-1]
304  completed = line[:-len(split_line[-1])]+common_prefix
305  else:
306  completed = line
307  else:
308  completed = line
309  return completed, possibilities[1]
310 
311 
312  def shell(self, cmd,verbose=0,debug=0,header=''):
313  """!
314  Replacement method to allow shell commands without them blocking.
315 
316  @param cmd: Shell command to execute.
317  @param verbose: Verbosity
318  @param debug: Debug level
319  @param header: Header to be printed before output
320  @return none
321  """
322  stat = 0
323  if verbose or debug: print(header+cmd)
324  # flush stdout so we don't mangle python's buffering
325  if not debug:
326  input, output = os.popen4(cmd)
327  print(output.read())
328  output.close()
329  input.close()
330 
331 
332 class ConsoleView(Gtk.TextView):
333 
343  """
344  Specialized text view for console-like workflow.
345 
346  @cvar ANSI_COLORS: Mapping of terminal colors to X11 names.
347  @type ANSI_COLORS: dictionary
348 
349  @ivar text_buffer: Widget's text buffer.
350  @type text_buffer: Gtk.TextBuffer
351  @ivar color_pat: Regex of terminal color pattern
352  @type color_pat: _sre.SRE_Pattern
353  @ivar mark: Scroll mark for automatic scrolling on input.
354  @type mark: Gtk.TextMark
355  @ivar line_start: Start of command line mark.
356  @type line_start: Gtk.TextMark
357  """
358  ANSI_COLORS = {'0;30': 'Black', '0;31': 'Red',
359  '0;32': 'Green', '0;33': 'Brown',
360  '0;34': 'Blue', '0;35': 'Purple',
361  '0;36': 'Cyan', '0;37': 'LightGray',
362  '1;30': 'DarkGray', '1;31': 'DarkRed',
363  '1;32': 'SeaGreen', '1;33': 'Yellow',
364  '1;34': 'LightBlue', '1;35': 'MediumPurple',
365  '1;36': 'LightCyan', '1;37': 'White'}
366 
367  def __init__(self):
368  """
369  Initialize console view.
370  """
371  GObject.GObject.__init__(self)
372  self.modify_font(Pango.FontDescription('Mono'))
373  self.set_cursor_visible(True)
374  self.text_buffer = self.get_buffer()
375  self.mark = self.text_buffer.create_mark('scroll_mark',
376  self.text_buffer.get_end_iter(),
377  False)
378  for code in self.ANSI_COLORS:
379  self.text_buffer.create_tag(code,
380  foreground=self.ANSI_COLORS[code],
381  weight=700)
382  self.text_buffer.create_tag('0')
383  self.text_buffer.create_tag('notouch', editable=False)
384  self.color_pat = re.compile('\x01?\x1b\[(.*?)m\x02?')
385  self.line_start = \
386  self.text_buffer.create_mark('line_start',
387  self.text_buffer.get_end_iter(), True)
388  self.connect('key-press-event', self.onKeyPress)
389 
390  def write(self, text, editable=False):
391  """!
392  Write given text to buffer.
393 
394  @param text: Text to append.
395  @param editable: If true, added text is editable.
396  @return none
397  """
398  GObject.idle_add(self._write, text, editable)
399 
400  def _write(self, text, editable=False):
401  """!
402  Write given text to buffer.
403 
404  @param text: Text to append.
405  @param editable: If true, added text is editable.
406  @return none
407  """
408  segments = self.color_pat.split(text)
409  segment = segments.pop(0)
410  start_mark = self.text_buffer.create_mark(None,
411  self.text_buffer.get_end_iter(),
412  True)
413  self.text_buffer.insert(self.text_buffer.get_end_iter(), segment)
414 
415  if segments:
416  ansi_tags = self.color_pat.findall(text)
417  for tag in ansi_tags:
418  i = segments.index(tag)
419  self.text_buffer.insert_with_tags_by_name(self.text_buffer.get_end_iter(),
420  segments[i+1], str(tag))
421  segments.pop(i)
422  if not editable:
423  self.text_buffer.apply_tag_by_name('notouch',
424  self.text_buffer.get_iter_at_mark(start_mark),
425  self.text_buffer.get_end_iter())
426  self.text_buffer.delete_mark(start_mark)
427  self.scroll_mark_onscreen(self.mark)
428 
429  def showPrompt(self, prompt):
430  """!
431  Prints prompt at start of line.
432 
433  @param prompt: Prompt to print.
434  @return none
435  """
436  GObject.idle_add(self._showPrompt, prompt)
437 
438  def _showPrompt(self, prompt):
439  """!
440  Prints prompt at start of line.
441 
442  @param prompt: Prompt to print.
443  @return none
444  """
445  self._write(prompt)
446  self.text_buffer.move_mark(self.line_start,
447  self.text_buffer.get_end_iter())
448 
449  def changeLine(self, text):
450  """!
451  Replace currently entered command line with given text.
452 
453  @param text: Text to use as replacement.
454  @return none
455  """
456  GObject.idle_add(self._changeLine, text)
457 
458  def _changeLine(self, text):
459  """!
460  Replace currently entered command line with given text.
461 
462  @param text: Text to use as replacement.
463  @return none
464  """
465  iter = self.text_buffer.get_iter_at_mark(self.line_start)
466  iter.forward_to_line_end()
467  self.text_buffer.delete(self.text_buffer.get_iter_at_mark(self.line_start), iter)
468  self._write(text, True)
469 
470  def getCurrentLine(self):
471  """!
472  Get text in current command line.
473 
474  @return Text of current command line.
475  """
476  rv = self.text_buffer.get_slice(
477  self.text_buffer.get_iter_at_mark(self.line_start),
478  self.text_buffer.get_end_iter(), False)
479  return rv
480 
481  def showReturned(self, text):
482  """!
483  Show returned text from last command and print new prompt.
484 
485  @param text: Text to show.
486  @return none
487  """
488  GObject.idle_add(self._showReturned, text)
489 
490  def _showReturned(self, text):
491  """!
492  Show returned text from last command and print new prompt.
493 
494  @param text: Text to show.
495  @return none
496  """
497  iter = self.text_buffer.get_iter_at_mark(self.line_start)
498  iter.forward_to_line_end()
499  self.text_buffer.apply_tag_by_name(
500  'notouch',
501  self.text_buffer.get_iter_at_mark(self.line_start),
502  iter)
503  self._write('\n'+text)
504  if text:
505  self._write('\n')
506  self._showPrompt(self.prompt)
507  self.text_buffer.move_mark(self.line_start,self.text_buffer.get_end_iter())
508  self.text_buffer.place_cursor(self.text_buffer.get_end_iter())
509 
510  if self.IP.rl_do_indent:
511  indentation = self.IP.input_splitter.indent_spaces * ' '
512  self.text_buffer.insert_at_cursor(indentation)
513 
514  def onKeyPress(self, widget, event):
515  """!
516  Key press callback used for correcting behavior for console-like
517  interfaces. For example 'home' should go to prompt, not to beginning of
518  line.
519 
520  @param widget: Widget that key press accored in.
521  @param event: Event object
522  @return Return True if event should not trickle.
523  """
524  insert_mark = self.text_buffer.get_insert()
525  insert_iter = self.text_buffer.get_iter_at_mark(insert_mark)
526  selection_mark = self.text_buffer.get_selection_bound()
527  selection_iter = self.text_buffer.get_iter_at_mark(selection_mark)
528  start_iter = self.text_buffer.get_iter_at_mark(self.line_start)
529  if event.keyval == Gdk.KEY_Home:
530  if event.get_state() & Gdk.ModifierType.CONTROL_MASK or event.get_state() & Gdk.ModifierType.MOD1_MASK:
531  pass
532  elif event.get_state() & Gdk.ModifierType.SHIFT_MASK:
533  self.text_buffer.move_mark(insert_mark, start_iter)
534  return True
535  else:
536  self.text_buffer.place_cursor(start_iter)
537  return True
538  elif event.keyval == Gdk.KEY_Left:
539  insert_iter.backward_cursor_position()
540  if not insert_iter.editable(True):
541  return True
542  elif not event.string:
543  pass
544  elif start_iter.compare(insert_iter) <= 0 and \
545  start_iter.compare(selection_iter) <= 0:
546  pass
547  elif start_iter.compare(insert_iter) > 0 and \
548  start_iter.compare(selection_iter) > 0:
549  self.text_buffer.place_cursor(start_iter)
550  elif insert_iter.compare(selection_iter) < 0:
551  self.text_buffer.move_mark(insert_mark, start_iter)
552  elif insert_iter.compare(selection_iter) > 0:
553  self.text_buffer.move_mark(selection_mark, start_iter)
554 
555  return self.onKeyPressExtend(event)
556 
557  def onKeyPressExtend(self, event):
558  """!
559  For some reason we can't extend onKeyPress directly (bug #500900).
560  @param event key press
561  @return none
562  """
563  pass
564 
565 
566 class IPythonView(ConsoleView, IterableIPShell):
567 
581  """
582  Sub-class of both modified IPython shell and L{ConsoleView} this makes
583  a GTK+ IPython console.
584  """
585  def __init__(self):
586  """
587  Initialize. Redirect I/O to console.
588  """
589  ConsoleView.__init__(self)
590  self.cout = StringIO()
591  IterableIPShell.__init__(self, cout=self.cout,cerr=self.cout,
592  input_func=self.raw_input)
593  self.interrupt = False
594  self.execute()
595  self.prompt = self.generatePrompt(False)
596  self.cout.truncate(0)
597  self.showPrompt(self.prompt)
598 
599  def raw_input(self, prompt=''):
600  """!
601  Custom raw_input() replacement. Gets current line from console buffer.
602 
603  @param prompt: Prompt to print. Here for compatibility as replacement.
604  @return The current command line text.
605  """
606  if self.interrupt:
607  self.interrupt = False
608  raise KeyboardInterrupt
609  return self.getCurrentLine()
610 
611  def onKeyPressExtend(self, event):
612  """!
613  Key press callback with plenty of shell goodness, like history,
614  autocompletions, etc.
615 
616  @param event: Event object.
617  @return True if event should not trickle.
618  """
619 
620  if event.get_state() & Gdk.ModifierType.CONTROL_MASK and event.keyval == 99:
621  self.interrupt = True
622  self._processLine()
623  return True
624  elif event.keyval == Gdk.KEY_Return:
625  self._processLine()
626  return True
627  elif event.keyval == Gdk.KEY_Up:
628  self.changeLine(self.historyBack())
629  return True
630  elif event.keyval == Gdk.KEY_Down:
631  self.changeLine(self.historyForward())
632  return True
633  elif event.keyval == Gdk.KEY_Tab:
634  if not self.getCurrentLine().strip():
635  return False
636  completed, possibilities = self.complete(self.getCurrentLine())
637  if len(possibilities) > 1:
638  slice = self.getCurrentLine()
639  self.write('\n')
640  for symbol in possibilities:
641  self.write(symbol+'\n')
642  self.showPrompt(self.prompt)
643  self.changeLine(completed or slice)
644  return True
645 
646  def _processLine(self):
647  """!
648  Process current command line.
649  @return none
650  """
651  self.history_pos = 0
652  self.execute()
653  rv = self.cout.getvalue()
654  if rv: rv = rv.strip('\n')
655  self.showReturned(rv)
656  self.cout.truncate(0)
657  self.cout.seek(0)
658 
659 if __name__ == "__main__":
660  window = Gtk.Window()
661  window.set_default_size(640, 320)
662  window.connect('delete-event', lambda x, y: Gtk.main_quit())
663  window.add(IPythonView())
664  window.show_all()
665  Gtk.main()
666 
def onKeyPressExtend(self, event)
For some reason we can&#39;t extend onKeyPress directly (bug #500900).
def onKeyPressExtend(self, event)
Key press callback with plenty of shell goodness, like history, autocompletions, etc.
def __init__(self, argv=None, user_ns=None, user_global_ns=None, cin=None, cout=None, cerr=None, input_func=None)
Initializer.
Definition: ipython_view.py:76
def raw_input(self, prompt='')
Custom raw_input() replacement.
def getCurrentLine(self)
Get text in current command line.
def _processLine(self)
Process current command line.
def execute(self)
Executes the current line provided by the shell object.
def _getHistory(self)
Gets the command string of the current history level.
def changeLine(self, text)
Replace currently entered command line with given text.
#define list
def _showReturned(self, text)
Show returned text from last command and print new prompt.
def _write(self, text, editable=False)
Write given text to buffer.
#define delete
def onKeyPress(self, widget, event)
Key press callback used for correcting behavior for console-like interfaces.
dictionary ANSI_COLORS
color list
def shell(self, cmd, verbose=0, debug=0, header='')
Replacement method to allow shell commands without them blocking.
def __update_namespace(self)
Update self.IP namespace for autocompletion with sys.modules.
def historyBack(self)
Provides one history command back.
def write(self, text, editable=False)
Write given text to buffer.
def generatePrompt(self, is_continuation)
Generate prompt depending on is_continuation value.
def _changeLine(self, text)
Replace currently entered command line with given text.
def historyForward(self)
Provides one history command forward.
def updateNamespace(self, ns_dict)
Add the current dictionary to the shell namespace.
def _showPrompt(self, prompt)
Prints prompt at start of line.
def showPrompt(self, prompt)
Prints prompt at start of line.
def complete(self, line)
Returns an auto completed line and/or possibilities for completion.
def showReturned(self, text)
Show returned text from last command and print new prompt.