A Discrete-Event Network Simulator
API
Loading...
Searching...
No Matches
ipython_view.py
Go to the documentation of this file.
1"""
2Backend to the console plugin.
3
4@author: Eitan Isaacson
5@organization: IBM Corporation
6@copyright: Copyright (c) 2007 IBM Corporation
7@license: BSD
8
9All rights reserved. This program and the accompanying materials are made
10available under the terms of the BSD which accompanies this distribution, and
11is available at U{http://www.opensource.org/licenses/bsd-license.php}
12"""
13# this file is a modified version of source code from the Accerciser project
14# https://wiki.gnome.org/Apps/Accerciser
15
16import gi
17gi.require_version('Gtk', '3.0')
18from gi.repository import Gtk
19from gi.repository import Gdk
20from gi.repository import GLib
21from gi.repository import Pango
22
23from pkg_resources import parse_version
24
25import re
26import sys
27import os
28
29from io import StringIO
30from functools import reduce
31
32
33try:
34 import IPython
35except ImportError:
36
38 IPython = None
39
40
42
67 def __init__(self, argv=[], user_ns=None, user_global_ns=None,
68 cin=None, cout=None, cerr=None, input_func=None):
69 """! Initializer
70
71 @param self: this object
72 @param argv: Command line options for IPython
73 @param user_ns: User namespace.
74 @param user_global_ns: User global namespace.
75 @param cin: Console standard input.
76 @param cout: Console standard output.
77 @param cerr: Console standard error.
78 @param input_func: Replacement for builtin raw_input()
79 """
80 io = IPython.utils.io
81 if input_func:
82 IPython.terminal.interactiveshell.raw_input_original = input_func
83 if IPython.version_info < (8,):
84 if cin:
85 io.stdin = io.IOStream(cin)
86 if cout:
87 io.stdout = io.IOStream(cout)
88 if cerr:
89 io.stderr = io.IOStream(cerr)
90 else:
91 if cin:
92 sys.stdin = cin
93 if cout:
94 sys.stdout = cout
95 if cerr:
96 sys.stderr = cerr
97
98 # This is to get rid of the blockage that occurs during
99 # IPython.Shell.InteractiveShell.user_setup()
100 io.raw_input = lambda x: None
101
102 os.environ['TERM'] = 'dumb'
103 excepthook = sys.excepthook
104
105 from traitlets.config.loader import Config
106 cfg = Config()
107 cfg.InteractiveShell.colors = "Linux"
108 cfg.Completer.use_jedi = False
109
110 if IPython.version_info < (8,):
111 # InteractiveShell's __init__ overwrites io.stdout,io.stderr with
112 # sys.stdout, sys.stderr, this makes sure they are right
113 old_stdout, old_stderr = sys.stdout, sys.stderr
114 sys.stdout, sys.stderr = io.stdout.stream, io.stderr.stream
115
116 # InteractiveShell inherits from SingletonConfigurable, so use instance()
117 #
118 self.IP = IPython.terminal.embed.InteractiveShellEmbed.instance(\
119 config=cfg, user_ns=user_ns)
120
121 if IPython.version_info < (8,):
122 sys.stdout, sys.stderr = old_stdout, old_stderr
123
124 self.IP.system = lambda cmd: self.shell(self.IP.var_expand(cmd),
125 header='IPython system call: ')
126# local_ns=user_ns)
127 #global_ns=user_global_ns)
128 #verbose=self.IP.rc.system_verbose)
129
130 self.IP.raw_input = input_func
131 sys.excepthook = excepthook
132 self.iter_more = 0
134 self.complete_sep = re.compile('[\s\{\}\[\]\‍(\‍)]')
135 self.updateNamespace({'exit':lambda:None})
136 self.updateNamespace({'quit':lambda:None})
137 # Workaround for updating namespace with sys.modules
138 #
139 self.__update_namespace()
140
141 # Avoid using input splitter when not really needed.
142 # Perhaps it could work even before 5.8.0
143 # But it definitely does not work any more with >= 7.0.0
144 self.no_input_splitter = parse_version(IPython.release.version) >= parse_version('5.8.0')
145 self.lines = []
147
149 """!
150 Update self.IP namespace for autocompletion with sys.modules
151 @return none
152 """
153 for k, v in list(sys.modules.items()):
154 if not '.' in k:
155 self.IP.user_ns.update({k:v})
156
157 def execute(self):
158 """!
159 Executes the current line provided by the shell object.
160 @return none
161 """
162 self.history_level = 0
163
164 if IPython.version_info < (8,):
165 # this is needed because some functions in IPython use 'print' to print
166 # output (like 'who')
167
168 orig_stdout = sys.stdout
169 sys.stdout = IPython.utils.io.stdout
170
171 orig_stdin = sys.stdin
172 sys.stdin = IPython.utils.io.stdin
173
175
176 self.IP.hooks.pre_prompt_hook()
177 if self.iter_more:
178 try:
179 self.prompt = self.generatePrompt(True)
180 except:
181 self.IP.showtraceback()
182 if self.IP.autoindent:
183 self.IP.rl_do_indent = True
184
185 try:
186 line = self.IP.raw_input(self.prompt)
187 except KeyboardInterrupt:
188 self.IP.write('\nKeyboardInterrupt\n')
189 if self.no_input_splitter:
190 self.lines = []
191 else:
192 self.IP.input_splitter.reset()
193 except:
194 self.IP.showtraceback()
195 else:
196 if self.no_input_splitter:
197 self.lines.append(line)
198 (status, self.indent_spaces) = self.IP.check_complete('\n'.join(self.lines))
199 self.iter_more = status == 'incomplete'
200 else:
201 self.IP.input_splitter.push(line)
202 self.iter_more = self.IP.input_splitter.push_accepts_more()
203 if not self.iter_more:
204 if self.no_input_splitter:
205 source_raw = '\n'.join(self.lines)
206 self.lines = []
207 else:
208 source_raw = self.IP.input_splitter.raw_reset()
209 self.IP.run_cell(source_raw, store_history=True)
210 self.IP.rl_do_indent = False
211 else:
212 # TODO: Auto-indent
213 #
214 self.IP.rl_do_indent = True
215 pass
216 self.prompt = self.generatePrompt(self.iter_more)
217
218 if IPython.version_info < (8,):
219 sys.stdout = orig_stdout
220 sys.stdin = orig_stdin
221
222 def generatePrompt(self, is_continuation):
223 """!
224 Generate prompt depending on is_continuation value
225
226 @param is_continuation
227 @return: The prompt string representation
228
229 """
230
231 if is_continuation:
232 prompt = '... '
233 else:
234 prompt = '>>> '
235 return prompt
236
237
238 def historyBack(self):
239 """!
240 Provides one history command back.
241
242 @param self this object
243 @return: The command string.
244 """
245 self.history_level -= 1
246 if not self._getHistory():
247 self.history_level +=1
248 return self._getHistory()
249
250 def historyForward(self):
251 """!
252 Provides one history command forward.
253
254 @param self this object
255 @return: The command string.
256 """
257 if self.history_level < 0:
258 self.history_level += 1
259 return self._getHistory()
260
261 def _getHistory(self):
262 """!
263 Gets the command string of the current history level.
264
265 @param self this object
266 @return: Historic command string.
267 """
268 try:
269 rv = self.IP.user_ns['In'][self.history_level].strip('\n')
270 except IndexError:
271 rv = ''
272 return rv
273
274 def updateNamespace(self, ns_dict):
275 """!
276 Add the current dictionary to the shell namespace.
277
278 @param ns_dict: A dictionary of symbol-values.
279 @return none
280 """
281 self.IP.user_ns.update(ns_dict)
282
283 def complete(self, line):
284 """!
285 Returns an auto completed line and/or possibilities for completion.
286
287 @param line: Given line so far.
288 @return: Line completed as for as possible, and possible further completions.
289 """
290 split_line = self.complete_sep.split(line)
291 if split_line[-1]:
292 possibilities = self.IP.complete(split_line[-1])
293 else:
294 completed = line
295 possibilities = ['', []]
296 if possibilities:
297 def _commonPrefix(str1, str2):
298 """!
299 Reduction function. returns common prefix of two given strings.
300
301 @param str1: First string.
302 @param str2: Second string
303 @return: Common prefix to both strings.
304 """
305 for i in range(len(str1)):
306 if not str2.startswith(str1[:i+1]):
307 return str1[:i]
308 return str1
309 if possibilities[1]:
310 common_prefix = reduce(_commonPrefix, possibilities[1]) or split_line[-1]
311 completed = line[:-len(split_line[-1])]+common_prefix
312 else:
313 completed = line
314 else:
315 completed = line
316 return completed, possibilities[1]
317
318
319 def shell(self, cmd,verbose=0,debug=0,header=''):
320 """!
321 Replacement method to allow shell commands without them blocking.
322
323 @param cmd: Shell command to execute.
324 @param verbose: Verbosity
325 @param debug: Debug level
326 @param header: Header to be printed before output
327 @return none
328 """
329 stat = 0
330 if verbose or debug: print(header+cmd)
331 # flush stdout so we don't mangle python's buffering
332 if not debug:
333 input, output = os.popen4(cmd)
334 print(output.read())
335 output.close()
336 input.close()
337
338
339class ConsoleView(Gtk.TextView):
340
350 """
351 Specialized text view for console-like workflow.
352
353 @cvar ANSI_COLORS: Mapping of terminal control sequence values to
354 tuples containing foreground and background color names.
355 @type ANSI_COLORS: dictionary
356
357 @ivar text_buffer: Widget's text buffer. @type text_buffer: Gtk.TextBuffer
358 @ivar color_pat: Regex of terminal color pattern
359 @type color_pat: _sre.SRE_Pattern
360 @ivar mark: Scroll mark for automatic scrolling on input.
361 @type mark: Gtk.TextMark
362 @ivar line_start: Start of command line mark.
363 @type line_start: Gtk.TextMark
364 """
365 ANSI_COLORS = {'0;30': ('Black', None),
366 '0;31': ('Red', None),
367 '0;32': ('Green', None),
368 '0;33': ('Brown', None),
369 '0;34': ('Blue', None),
370 '0;35': ('Purple', None),
371 '0;36': ('Cyan', None),
372 '0;37': ('LightGray', None),
373 '1;30': ('DarkGray', None),
374 '1;31': ('DarkRed', None),
375 '1;32': ('SeaGreen', None),
376 '1;33': ('Yellow', None),
377 '1;34': ('LightBlue', None),
378 '1;35': ('MediumPurple', None),
379 '1;36': ('LightCyan', None),
380 '1;37': ('White', None),
381 '38;5;124;43': ('DarkRed', 'Yellow'),
382 '38;5;241': ('Gray', None),
383 '38;5;241;43': ('Gray', 'Yellow'),
384 '39': ('Black', None),
385 '39;49': ('Red', 'White'),
386 '43': (None, 'Yellow'),
387 '49': (None, 'White')}
388
389 def __init__(self):
390 """
391 Initialize console view.
392 """
393 Gtk.TextView.__init__(self)
394 self.modify_font(Pango.FontDescription('Mono'))
395 self.set_cursor_visible(True)
396 self.text_buffer = self.get_buffer()
397 self.mark = self.text_buffer.create_mark('scroll_mark',
398 self.text_buffer.get_end_iter(),
399 False)
400 for code in self.ANSI_COLORS:
401 self.text_buffer.create_tag(code,
402 foreground=self.ANSI_COLORS[code][0],
403 background=self.ANSI_COLORS[code][1],
404 weight=700)
405 self.text_buffer.create_tag('0')
406 self.text_buffer.create_tag('notouch', editable=False)
407 self.color_pat = re.compile('\x01?\x1b\[(.*?)m\x02?')
408 self.line_start = \
409 self.text_buffer.create_mark('line_start',
410 self.text_buffer.get_end_iter(), True)
411 self.connect('key-press-event', self.onKeyPress)
412
413 def write(self, text, editable=False):
414 """!
415 Write given text to buffer.
416
417 @param text: Text to append.
418 @param editable: If true, added text is editable.
419 @return none
420 """
421 GLib.idle_add(self._write, text, editable)
422
423 def _write(self, text, editable=False):
424 """!
425 Write given text to buffer.
426
427 @param text: Text to append.
428 @param editable: If true, added text is editable.
429 @return none
430 """
431 segments = self.color_pat.split(text)
432 segment = segments.pop(0)
433 start_mark = self.text_buffer.create_mark(None,
434 self.text_buffer.get_end_iter(),
435 True)
436 self.text_buffer.insert(self.text_buffer.get_end_iter(), segment)
437
438 if segments:
439 ansi_tags = self.color_pat.findall(text)
440 for tag in ansi_tags:
441 i = segments.index(tag)
442 self.text_buffer.insert_with_tags_by_name(self.text_buffer.get_end_iter(),
443 segments[i+1], str(tag))
444 segments.pop(i)
445 if not editable:
446 self.text_buffer.apply_tag_by_name('notouch',
447 self.text_buffer.get_iter_at_mark(start_mark),
448 self.text_buffer.get_end_iter())
449 self.text_buffer.delete_mark(start_mark)
450 self.scroll_mark_onscreen(self.mark)
451
452 def showPrompt(self, prompt):
453 """!
454 Prints prompt at start of line.
455
456 @param prompt: Prompt to print.
457 @return none
458 """
459 GLib.idle_add(self._showPrompt, prompt)
460
461 def _showPrompt(self, prompt):
462 """!
463 Prints prompt at start of line.
464
465 @param prompt: Prompt to print.
466 @return none
467 """
468 self._write(prompt)
469 self.text_buffer.move_mark(self.line_start,
470 self.text_buffer.get_end_iter())
471
472 def changeLine(self, text):
473 """!
474 Replace currently entered command line with given text.
475
476 @param text: Text to use as replacement.
477 @return none
478 """
479 GLib.idle_add(self._changeLine, text)
480
481 def _changeLine(self, text):
482 """!
483 Replace currently entered command line with given text.
484
485 @param text: Text to use as replacement.
486 @return none
487 """
488 iter = self.text_buffer.get_iter_at_mark(self.line_start)
489 iter.forward_to_line_end()
490 self.text_buffer.delete(self.text_buffer.get_iter_at_mark(self.line_start), iter)
491 self._write(text, True)
492
493 def getCurrentLine(self):
494 """!
495 Get text in current command line.
496
497 @return Text of current command line.
498 """
499 rv = self.text_buffer.get_slice(
500 self.text_buffer.get_iter_at_mark(self.line_start),
501 self.text_buffer.get_end_iter(), False)
502 return rv
503
504 def showReturned(self, text):
505 """!
506 Show returned text from last command and print new prompt.
507
508 @param text: Text to show.
509 @return none
510 """
511 GLib.idle_add(self._showReturned, text)
512
513 def _showReturned(self, text):
514 """!
515 Show returned text from last command and print new prompt.
516
517 @param text: Text to show.
518 @return none
519 """
520 iter = self.text_buffer.get_iter_at_mark(self.line_start)
521 iter.forward_to_line_end()
522 self.text_buffer.apply_tag_by_name(
523 'notouch',
524 self.text_buffer.get_iter_at_mark(self.line_start),
525 iter)
526 self._write('\n'+text)
527 if text:
528 self._write('\n')
529 self._showPrompt(self.prompt)
530 self.text_buffer.move_mark(self.line_start, self.text_buffer.get_end_iter())
531 self.text_buffer.place_cursor(self.text_buffer.get_end_iter())
532
533 if self.IP.rl_do_indent:
534 if self.no_input_splitter:
535 indentation = self.indent_spaces
536 else:
537 indentation = self.IP.input_splitter.indent_spaces * ' '
538 self.text_buffer.insert_at_cursor(indentation)
539
540 def onKeyPress(self, widget, event):
541 """!
542 Key press callback used for correcting behavior for console-like
543 interfaces. For example 'home' should go to prompt, not to beginning of
544 line.
545
546 @param widget: Widget that key press accored in.
547 @param event: Event object
548 @return Return True if event should not trickle.
549 """
550 insert_mark = self.text_buffer.get_insert()
551 insert_iter = self.text_buffer.get_iter_at_mark(insert_mark)
552 selection_mark = self.text_buffer.get_selection_bound()
553 selection_iter = self.text_buffer.get_iter_at_mark(selection_mark)
554 start_iter = self.text_buffer.get_iter_at_mark(self.line_start)
555 if event.keyval == Gdk.KEY_Home:
556 if event.state & Gdk.ModifierType.CONTROL_MASK or event.state & Gdk.ModifierType.MOD1_MASK:
557 pass
558 elif event.state & Gdk.ModifierType.SHIFT_MASK:
559 self.text_buffer.move_mark(insert_mark, start_iter)
560 return True
561 else:
562 self.text_buffer.place_cursor(start_iter)
563 return True
564 elif event.keyval == Gdk.KEY_Left:
565 insert_iter.backward_cursor_position()
566 if not insert_iter.editable(True):
567 return True
568 elif event.state & Gdk.ModifierType.CONTROL_MASK and event.keyval in [ord('L'), ord('l')]:
569 # clear previous output on Ctrl+L, but remember current input line + cursor position
570 cursor_offset = self.text_buffer.get_property('cursor-position')
571 cursor_pos_in_line = cursor_offset - start_iter.get_offset() + len(self.prompt)
572 current_input = self.text_buffer.get_text(start_iter, self.text_buffer.get_end_iter(), False)
573 self.text_buffer.set_text(self.prompt + current_input)
574 self.text_buffer.move_mark(self.line_start, self.text_buffer.get_iter_at_offset(len(self.prompt)))
575 self.text_buffer.place_cursor(self.text_buffer.get_iter_at_offset(cursor_pos_in_line))
576 return True
577 elif event.state & Gdk.ModifierType.CONTROL_MASK and event.keyval in [Gdk.KEY_k, Gdk.KEY_K]:
578 # clear text after input cursor on Ctrl+K
579 if insert_iter.editable(True):
580 self.text_buffer.delete(insert_iter, self.text_buffer.get_end_iter())
581 return True
582 elif event.state & Gdk.ModifierType.CONTROL_MASK and event.keyval == Gdk.KEY_C:
583 # copy selection on Ctrl+C (upper-case 'C' only)
584 self.text_buffer.copy_clipboard(Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD))
585 return True
586 elif not event.string:
587 pass
588 elif start_iter.compare(insert_iter) <= 0 and \
589 start_iter.compare(selection_iter) <= 0:
590 pass
591 elif start_iter.compare(insert_iter) > 0 and \
592 start_iter.compare(selection_iter) > 0:
593 self.text_buffer.place_cursor(start_iter)
594 elif insert_iter.compare(selection_iter) < 0:
595 self.text_buffer.move_mark(insert_mark, start_iter)
596 elif insert_iter.compare(selection_iter) > 0:
597 self.text_buffer.move_mark(selection_mark, start_iter)
598
599 return self.onKeyPressExtend(event)
600
601 def onKeyPressExtend(self, event):
602 """!
603 For some reason we can't extend onKeyPress directly (bug #500900).
604 @param event key press
605 @return none
606 """
607 pass
608
609
625 """
626 Sub-class of both modified IPython shell and L{ConsoleView} this makes
627 a GTK+ IPython console.
628 """
629 def __init__(self):
630 """
631 Initialize. Redirect I/O to console.
632 """
633 ConsoleView.__init__(self)
634 self.cout = StringIO()
635 IterableIPShell.__init__(self, cout=self.cout, cerr=self.cout,
636 input_func=self.raw_input)
637 self.interrupt = False
638 self.execute()
639 self.promptprompt = self.generatePrompt(False)
640 self.cout.truncate(0)
641 self.showPrompt(self.promptprompt)
642
643 def raw_input(self, prompt=''):
644 """!
645 Custom raw_input() replacement. Gets current line from console buffer.
646
647 @param prompt: Prompt to print. Here for compatibility as replacement.
648 @return The current command line text.
649 """
650 if self.interrupt:
651 self.interrupt = False
652 raise KeyboardInterrupt
653 return self.getCurrentLine()
654
655 def onKeyPressExtend(self, event):
656 """!
657 Key press callback with plenty of shell goodness, like history,
658 autocompletions, etc.
659
660 @param event: Event object.
661 @return True if event should not trickle.
662 """
663
664 if event.get_state() & Gdk.ModifierType.CONTROL_MASK and event.keyval == 99:
665 self.interrupt = True
666 self._processLine()
667 return True
668 elif event.keyval == Gdk.KEY_Return:
669 self._processLine()
670 return True
671 elif event.keyval == Gdk.KEY_Up:
672 self.changeLine(self.historyBack())
673 return True
674 elif event.keyval == Gdk.KEY_Down:
675 self.changeLine(self.historyForward())
676 return True
677 elif event.keyval == Gdk.KEY_Tab:
678 if not self.getCurrentLine().strip():
679 return False
680 completed, possibilities = self.complete(self.getCurrentLine())
681 if len(possibilities) > 1:
682 slice = self.getCurrentLine()
683 self.write('\n')
684 for symbol in possibilities:
685 self.write(symbol+'\n')
686 self.showPrompt(self.promptprompt)
687 self.changeLine(completed or slice)
688 return True
689
690 def _processLine(self):
691 """!
692 Process current command line.
693 @return none
694 """
695 self.history_pos = 0
696 self.execute()
697 rv = self.cout.getvalue()
698 if rv: rv = rv.strip('\n')
699 self.showReturned(rv)
700 self.cout.truncate(0)
701 self.cout.seek(0)
702
703if __name__ == "__main__":
704 window = Gtk.Window()
705 window.set_default_size(640, 320)
706 window.connect('delete-event', lambda x, y: Gtk.main_quit())
707 window.add(IPythonView())
708 window.show_all()
709 Gtk.main()
710
711
def write(self, text, editable=False)
Write given text to buffer.
def changeLine(self, text)
Replace currently entered command line with given text.
def _showPrompt(self, prompt)
Prints prompt at start of line.
def _showReturned(self, text)
Show returned text from last command and print new prompt.
def getCurrentLine(self)
Get text in current command line.
def onKeyPressExtend(self, event)
For some reason we can't extend onKeyPress directly (bug #500900).
def _write(self, text, editable=False)
Write given text to buffer.
def _changeLine(self, text)
Replace currently entered command line with given text.
def showPrompt(self, prompt)
Prints prompt at start of line.
def showReturned(self, text)
Show returned text from last command and print new prompt.
def onKeyPress(self, widget, event)
Key press callback used for correcting behavior for console-like interfaces.
def onKeyPressExtend(self, event)
Key press callback with plenty of shell goodness, like history, autocompletions, etc.
def raw_input(self, prompt='')
Custom raw_input() replacement.
def _processLine(self)
Process current command line.
def updateNamespace(self, ns_dict)
Add the current dictionary to the shell namespace.
def _getHistory(self)
Gets the command string of the current history level.
def __update_namespace(self)
Update self.IP namespace for autocompletion with sys.modules.
def __init__(self, argv=[], user_ns=None, user_global_ns=None, cin=None, cout=None, cerr=None, input_func=None)
Initializer.
Definition: ipython_view.py:68
def historyBack(self)
Provides one history command back.
def generatePrompt(self, is_continuation)
Generate prompt depending on is_continuation value.
def historyForward(self)
Provides one history command forward.
def shell(self, cmd, verbose=0, debug=0, header='')
Replacement method to allow shell commands without them blocking.
def execute(self)
Executes the current line provided by the shell object.
def complete(self, line)
Returns an auto completed line and/or possibilities for completion.
#define delete
#define list