A Discrete-Event Network Simulator
API
Loading...
Searching...
No Matches
ipython_view.py
Go to the documentation of this file.
1"""
2Backend to the console plugin.
3
4@author: Eitan Isaacson
5@organization: IBM Corporation
6@copyright: Copyright (c) 2007 IBM Corporation
7@license: BSD
8
9All rights reserved. This program and the accompanying materials are made
10available under the terms of the BSD which accompanies this distribution, and
11is available at U{http://www.opensource.org/licenses/bsd-license.php}
12"""
13
14# this file is a modified version of source code from the Accerciser project
15# https://wiki.gnome.org/Apps/Accerciser
16
17import gi
18
19gi.require_version("Gtk", "3.0")
20import os
21import re
22import sys
23from functools import reduce
24from io import StringIO
25
26from gi.repository import Gdk, GLib, Gtk, Pango
27from pkg_resources import parse_version
28
29
30try:
31 import IPython
32except ImportError:
33
35 IPython = None
36
37
38
40
64
66 self,
67 argv=[],
68 user_ns=None,
69 user_global_ns=None,
70 cin=None,
71 cout=None,
72 cerr=None,
73 input_func=None,
74 ):
75 """!
76 Constructor for the IterableIPShell class
77 @param self: this object
78 @param argv: Command line options for IPython
79 @param user_ns: User namespace.
80 @param user_global_ns: User global namespace.
81 @param cin: Console standard input.
82 @param cout: Console standard output.
83 @param cerr: Console standard error.
84 @param input_func: Replacement for builtin raw_input()
85 """
86 io = IPython.utils.io
87 if input_func:
88 IPython.terminal.interactiveshell.raw_input_original = input_func
89 if IPython.version_info < (8,):
90 if cin:
91 io.stdin = io.IOStream(cin)
92 if cout:
93 io.stdout = io.IOStream(cout)
94 if cerr:
95 io.stderr = io.IOStream(cerr)
96 else:
97 if cin:
98 sys.stdin = cin
99 if cout:
100 sys.stdout = cout
101 if cerr:
102 sys.stderr = cerr
103
104 # This is to get rid of the blockage that occurs during
105 # IPython.Shell.InteractiveShell.user_setup()
106 io.raw_input = lambda x: None
107
108 os.environ["TERM"] = "dumb"
109 excepthook = sys.excepthook
110
111 from traitlets.config.loader import Config
112
113 cfg = Config()
114 cfg.InteractiveShell.colors = "Linux"
115 cfg.Completer.use_jedi = False
116
117 if IPython.version_info < (8,):
118 # InteractiveShell's __init__ overwrites io.stdout,io.stderr with
119 # sys.stdout, sys.stderr, this makes sure they are right
120 old_stdout, old_stderr = sys.stdout, sys.stderr
121 sys.stdout, sys.stderr = io.stdout.stream, io.stderr.stream
122
123 # InteractiveShell inherits from SingletonConfigurable, so use instance()
124 #
125 self.IP = IPython.terminal.embed.InteractiveShellEmbed.instance(config=cfg, user_ns=user_ns)
126
127 if IPython.version_info < (8,):
128 sys.stdout, sys.stderr = old_stdout, old_stderr
129
130 self.IP.system = lambda cmd: self.shell(
131 self.IP.var_expand(cmd), header="IPython system call: "
132 )
133 # local_ns=user_ns)
134 # global_ns=user_global_ns)
135 # verbose=self.IP.rc.system_verbose)
136
137 self.IP.raw_input = input_func
138 sys.excepthook = excepthook
139 self.iter_more = 0
141 self.complete_sep = re.compile("[\s\{\}\[\]\‍(\‍)]")
142 self.updateNamespace({"exit": lambda: None})
143 self.updateNamespace({"quit": lambda: None})
144 # Workaround for updating namespace with sys.modules
145 #
146 self.__update_namespace()
147
148 # Avoid using input splitter when not really needed.
149 # Perhaps it could work even before 5.8.0
150 # But it definitely does not work any more with >= 7.0.0
151 self.no_input_splitter = parse_version(IPython.release.version) >= parse_version("5.8.0")
152 self.lines = []
154
156 """!
157 Update self.IP namespace for autocompletion with sys.modules
158 @return none
159 """
160 for k, v in list(sys.modules.items()):
161 if not "." in k:
162 self.IP.user_ns.update({k: v})
163
164 def execute(self):
165 """!
166 Executes the current line provided by the shell object.
167 @return none
168 """
169 self.history_level = 0
170
171 if IPython.version_info < (8,):
172 # this is needed because some functions in IPython use 'print' to print
173 # output (like 'who')
174
175 orig_stdout = sys.stdout
176 sys.stdout = IPython.utils.io.stdout
177
178 orig_stdin = sys.stdin
179 sys.stdin = IPython.utils.io.stdin
180
182
183 self.IP.hooks.pre_prompt_hook()
184 if self.iter_more:
185 try:
186 self.prompt = self.generatePrompt(True)
187 except:
188 self.IP.showtraceback()
189 if self.IP.autoindent:
190 self.IP.rl_do_indent = True
191
192 try:
193 line = self.IP.raw_input(self.prompt)
194 except KeyboardInterrupt:
195 self.IP.write("\nKeyboardInterrupt\n")
196 if self.no_input_splitter:
197 self.lines = []
198 else:
199 self.IP.input_splitter.reset()
200 except:
201 self.IP.showtraceback()
202 else:
203 if self.no_input_splitter:
204 self.lines.append(line)
205 (status, self.indent_spaces) = self.IP.check_complete("\n".join(self.lines))
206 self.iter_more = status == "incomplete"
207 else:
208 self.IP.input_splitter.push(line)
209 self.iter_more = self.IP.input_splitter.push_accepts_more()
210 if not self.iter_more:
211 if self.no_input_splitter:
212 source_raw = "\n".join(self.lines)
213 self.lines = []
214 else:
215 source_raw = self.IP.input_splitter.raw_reset()
216 self.IP.run_cell(source_raw, store_history=True)
217 self.IP.rl_do_indent = False
218 else:
219 # TODO: Auto-indent
220 #
221 self.IP.rl_do_indent = True
222 pass
223 self.prompt = self.generatePrompt(self.iter_more)
224
225 if IPython.version_info < (8,):
226 sys.stdout = orig_stdout
227 sys.stdin = orig_stdin
228
229 def generatePrompt(self, is_continuation):
230 """!
231 Generate prompt depending on is_continuation value
232
233 @param is_continuation
234 @return: The prompt string representation
235
236 """
237
238 if is_continuation:
239 prompt = "... "
240 else:
241 prompt = ">>> "
242 return prompt
243
244 def historyBack(self):
245 """!
246 Provides one history command back.
247
248 @param self this object
249 @return: The command string.
250 """
251 self.history_level -= 1
252 if not self._getHistory():
253 self.history_level += 1
254 return self._getHistory()
255
256 def historyForward(self):
257 """!
258 Provides one history command forward.
259
260 @param self this object
261 @return: The command string.
262 """
263 if self.history_level < 0:
264 self.history_level += 1
265 return self._getHistory()
266
267 def _getHistory(self):
268 """!
269 Gets the command string of the current history level.
270
271 @param self this object
272 @return: Historic command string.
273 """
274 try:
275 rv = self.IP.user_ns["In"][self.history_level].strip("\n")
276 except IndexError:
277 rv = ""
278 return rv
279
280 def updateNamespace(self, ns_dict):
281 """!
282 Add the current dictionary to the shell namespace.
283
284 @param ns_dict: A dictionary of symbol-values.
285 @return none
286 """
287 self.IP.user_ns.update(ns_dict)
288
289 def complete(self, line):
290 """!
291 Returns an auto completed line and/or possibilities for completion.
292
293 @param line: Given line so far.
294 @return: Line completed as for as possible, and possible further completions.
295 """
296 split_line = self.complete_sep.split(line)
297 if split_line[-1]:
298 possibilities = self.IP.complete(split_line[-1])
299 else:
300 completed = line
301 possibilities = ["", []]
302 if possibilities:
303
304 def _commonPrefix(str1, str2):
305 """!
306 Reduction function. returns common prefix of two given strings.
307
308 @param str1: First string.
309 @param str2: Second string
310 @return: Common prefix to both strings.
311 """
312 for i in range(len(str1)):
313 if not str2.startswith(str1[: i + 1]):
314 return str1[:i]
315 return str1
316
317 if possibilities[1]:
318 common_prefix = reduce(_commonPrefix, possibilities[1]) or split_line[-1]
319 completed = line[: -len(split_line[-1])] + common_prefix
320 else:
321 completed = line
322 else:
323 completed = line
324 return completed, possibilities[1]
325
326 def shell(self, cmd, verbose=0, debug=0, header=""):
327 """!
328 Replacement method to allow shell commands without them blocking.
329
330 @param cmd: Shell command to execute.
331 @param verbose: Verbosity
332 @param debug: Debug level
333 @param header: Header to be printed before output
334 @return none
335 """
336 stat = 0
337 if verbose or debug:
338 print(header + cmd)
339 # flush stdout so we don't mangle python's buffering
340 if not debug:
341 input, output = os.popen4(cmd)
342 print(output.read())
343 output.close()
344 input.close()
345
346
347
348class ConsoleView(Gtk.TextView):
349
371
372 """
373 Specialized text view for console-like workflow.
374
375 @cvar ANSI_COLORS: Mapping of terminal control sequence values to
376 tuples containing foreground and background color names.
377 @type ANSI_COLORS: dictionary
378
379 @ivar text_buffer: Widget's text buffer. @type text_buffer: Gtk.TextBuffer
380 @ivar color_pat: Regex of terminal color pattern
381 @type color_pat: _sre.SRE_Pattern
382 @ivar mark: Scroll mark for automatic scrolling on input.
383 @type mark: Gtk.TextMark
384 @ivar line_start: Start of command line mark.
385 @type line_start: Gtk.TextMark
386 """
387 ANSI_COLORS = {
388 "0;30": ("Black", None),
389 "0;31": ("Red", None),
390 "0;32": ("Green", None),
391 "0;33": ("Brown", None),
392 "0;34": ("Blue", None),
393 "0;35": ("Purple", None),
394 "0;36": ("Cyan", None),
395 "0;37": ("LightGray", None),
396 "1;30": ("DarkGray", None),
397 "1;31": ("DarkRed", None),
398 "1;32": ("SeaGreen", None),
399 "1;33": ("Yellow", None),
400 "1;34": ("LightBlue", None),
401 "1;35": ("MediumPurple", None),
402 "1;36": ("LightCyan", None),
403 "1;37": ("White", None),
404 "38;5;124;43": ("DarkRed", "Yellow"),
405 "38;5;241": ("Gray", None),
406 "38;5;241;43": ("Gray", "Yellow"),
407 "39": ("Black", None),
408 "39;49": ("Red", "White"),
409 "43": (None, "Yellow"),
410 "49": (None, "White"),
411 }
412
413 def __init__(self):
414 """
415 Initialize console view.
416 """
417 Gtk.TextView.__init__(self)
418 self.modify_font(Pango.FontDescription("Mono"))
419 self.set_cursor_visible(True)
420 self.text_buffer = self.get_buffer()
421 self.mark = self.text_buffer.create_mark(
422 "scroll_mark", self.text_buffer.get_end_iter(), False
423 )
424 for code in self.ANSI_COLORS:
425 self.text_buffer.create_tag(
426 code,
427 foreground=self.ANSI_COLORS[code][0],
428 background=self.ANSI_COLORS[code][1],
429 weight=700,
430 )
431 self.text_buffer.create_tag("0")
432 self.text_buffer.create_tag("notouch", editable=False)
433 self.color_pat = re.compile("\x01?\x1b\[(.*?)m\x02?")
434 self.line_start = self.text_buffer.create_mark(
435 "line_start", self.text_buffer.get_end_iter(), True
436 )
437 self.connect("key-press-event", self.onKeyPress)
438
439 def write(self, text, editable=False):
440 """!
441 Write given text to buffer.
442
443 @param text: Text to append.
444 @param editable: If true, added text is editable.
445 @return none
446 """
447 GLib.idle_add(self._write, text, editable)
448
449 def _write(self, text, editable=False):
450 """!
451 Write given text to buffer.
452
453 @param text: Text to append.
454 @param editable: If true, added text is editable.
455 @return none
456 """
457 segments = self.color_pat.split(text)
458 segment = segments.pop(0)
459 start_mark = self.text_buffer.create_mark(None, self.text_buffer.get_end_iter(), True)
460 self.text_buffer.insert(self.text_buffer.get_end_iter(), segment)
461
462 if segments:
463 ansi_tags = self.color_pat.findall(text)
464 for tag in ansi_tags:
465 i = segments.index(tag)
466 self.text_buffer.insert_with_tags_by_name(
467 self.text_buffer.get_end_iter(), segments[i + 1], str(tag)
468 )
469 segments.pop(i)
470 if not editable:
471 self.text_buffer.apply_tag_by_name(
472 "notouch",
473 self.text_buffer.get_iter_at_mark(start_mark),
474 self.text_buffer.get_end_iter(),
475 )
476 self.text_buffer.delete_mark(start_mark)
477 self.scroll_mark_onscreen(self.mark)
478
479 def showPrompt(self, prompt):
480 """!
481 Prints prompt at start of line.
482
483 @param prompt: Prompt to print.
484 @return none
485 """
486 GLib.idle_add(self._showPrompt, prompt)
487
488 def _showPrompt(self, prompt):
489 """!
490 Prints prompt at start of line.
491
492 @param prompt: Prompt to print.
493 @return none
494 """
495 self._write(prompt)
496 self.text_buffer.move_mark(self.line_start, self.text_buffer.get_end_iter())
497
498 def changeLine(self, text):
499 """!
500 Replace currently entered command line with given text.
501
502 @param text: Text to use as replacement.
503 @return none
504 """
505 GLib.idle_add(self._changeLine, text)
506
507 def _changeLine(self, text):
508 """!
509 Replace currently entered command line with given text.
510
511 @param text: Text to use as replacement.
512 @return none
513 """
514 iter = self.text_buffer.get_iter_at_mark(self.line_start)
515 iter.forward_to_line_end()
516 self.text_buffer.delete(self.text_buffer.get_iter_at_mark(self.line_start), iter)
517 self._write(text, True)
518
519 def getCurrentLine(self):
520 """!
521 Get text in current command line.
522
523 @return Text of current command line.
524 """
525 rv = self.text_buffer.get_slice(
526 self.text_buffer.get_iter_at_mark(self.line_start),
527 self.text_buffer.get_end_iter(),
528 False,
529 )
530 return rv
531
532 def showReturned(self, text):
533 """!
534 Show returned text from last command and print new prompt.
535
536 @param text: Text to show.
537 @return none
538 """
539 GLib.idle_add(self._showReturned, text)
540
541 def _showReturned(self, text):
542 """!
543 Show returned text from last command and print new prompt.
544
545 @param text: Text to show.
546 @return none
547 """
548 iter = self.text_buffer.get_iter_at_mark(self.line_start)
549 iter.forward_to_line_end()
550 self.text_buffer.apply_tag_by_name(
551 "notouch", self.text_buffer.get_iter_at_mark(self.line_start), iter
552 )
553 self._write("\n" + text)
554 if text:
555 self._write("\n")
556 self._showPrompt(self.prompt)
557 self.text_buffer.move_mark(self.line_start, self.text_buffer.get_end_iter())
558 self.text_buffer.place_cursor(self.text_buffer.get_end_iter())
559
560 if self.IP.rl_do_indent:
561 if self.no_input_splitter:
562 indentation = self.indent_spaces
563 else:
564 indentation = self.IP.input_splitter.indent_spaces * " "
565 self.text_buffer.insert_at_cursor(indentation)
566
567 def onKeyPress(self, widget, event):
568 """!
569 Key press callback used for correcting behavior for console-like
570 interfaces. For example 'home' should go to prompt, not to beginning of
571 line.
572
573 @param widget: Widget that key press accored in.
574 @param event: Event object
575 @return Return True if event should not trickle.
576 """
577 insert_mark = self.text_buffer.get_insert()
578 insert_iter = self.text_buffer.get_iter_at_mark(insert_mark)
579 selection_mark = self.text_buffer.get_selection_bound()
580 selection_iter = self.text_buffer.get_iter_at_mark(selection_mark)
581 start_iter = self.text_buffer.get_iter_at_mark(self.line_start)
582 if event.keyval == Gdk.KEY_Home:
583 if (
584 event.state & Gdk.ModifierType.CONTROL_MASK
585 or event.state & Gdk.ModifierType.MOD1_MASK
586 ):
587 pass
588 elif event.state & Gdk.ModifierType.SHIFT_MASK:
589 self.text_buffer.move_mark(insert_mark, start_iter)
590 return True
591 else:
592 self.text_buffer.place_cursor(start_iter)
593 return True
594 elif event.keyval == Gdk.KEY_Left:
595 insert_iter.backward_cursor_position()
596 if not insert_iter.editable(True):
597 return True
598 elif event.state & Gdk.ModifierType.CONTROL_MASK and event.keyval in [ord("L"), ord("l")]:
599 # clear previous output on Ctrl+L, but remember current input line + cursor position
600 cursor_offset = self.text_buffer.get_property("cursor-position")
601 cursor_pos_in_line = cursor_offset - start_iter.get_offset() + len(self.prompt)
602 current_input = self.text_buffer.get_text(
603 start_iter, self.text_buffer.get_end_iter(), False
604 )
605 self.text_buffer.set_text(self.prompt + current_input)
606 self.text_buffer.move_mark(
607 self.line_start, self.text_buffer.get_iter_at_offset(len(self.prompt))
608 )
609 self.text_buffer.place_cursor(self.text_buffer.get_iter_at_offset(cursor_pos_in_line))
610 return True
611 elif event.state & Gdk.ModifierType.CONTROL_MASK and event.keyval in [Gdk.KEY_k, Gdk.KEY_K]:
612 # clear text after input cursor on Ctrl+K
613 if insert_iter.editable(True):
614 self.text_buffer.delete(insert_iter, self.text_buffer.get_end_iter())
615 return True
616 elif event.state & Gdk.ModifierType.CONTROL_MASK and event.keyval == Gdk.KEY_C:
617 # copy selection on Ctrl+C (upper-case 'C' only)
618 self.text_buffer.copy_clipboard(Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD))
619 return True
620 elif not event.string:
621 pass
622 elif start_iter.compare(insert_iter) <= 0 and start_iter.compare(selection_iter) <= 0:
623 pass
624 elif start_iter.compare(insert_iter) > 0 and start_iter.compare(selection_iter) > 0:
625 self.text_buffer.place_cursor(start_iter)
626 elif insert_iter.compare(selection_iter) < 0:
627 self.text_buffer.move_mark(insert_mark, start_iter)
628 elif insert_iter.compare(selection_iter) > 0:
629 self.text_buffer.move_mark(selection_mark, start_iter)
630
631 return self.onKeyPressExtend(event)
632
633 def onKeyPressExtend(self, event):
634 """!
635 For some reason we can't extend onKeyPress directly (bug #500900).
636 @param event key press
637 @return none
638 """
639 pass
640
641
642
658 """
659 Sub-class of both modified IPython shell and L{ConsoleView} this makes
660 a GTK+ IPython console.
661 """
662
663 def __init__(self):
664 """
665 Initialize. Redirect I/O to console.
666 """
667 ConsoleView.__init__(self)
668 self.cout = StringIO()
669 IterableIPShell.__init__(self, cout=self.cout, cerr=self.cout, input_func=self.raw_input)
670 self.interrupt = False
671 self.execute()
672 self.promptprompt = self.generatePrompt(False)
673 self.cout.truncate(0)
674 self.showPrompt(self.promptprompt)
675
676 def raw_input(self, prompt=""):
677 """!
678 Custom raw_input() replacement. Gets current line from console buffer.
679
680 @param prompt: Prompt to print. Here for compatibility as replacement.
681 @return The current command line text.
682 """
683 if self.interrupt:
684 self.interrupt = False
685 raise KeyboardInterrupt
686 return self.getCurrentLine()
687
688 def onKeyPressExtend(self, event):
689 """!
690 Key press callback with plenty of shell goodness, like history,
691 autocompletions, etc.
692
693 @param event: Event object.
694 @return True if event should not trickle.
695 """
696
697 if event.get_state() & Gdk.ModifierType.CONTROL_MASK and event.keyval == 99:
698 self.interrupt = True
699 self._processLine()
700 return True
701 elif event.keyval == Gdk.KEY_Return:
702 self._processLine()
703 return True
704 elif event.keyval == Gdk.KEY_Up:
705 self.changeLine(self.historyBack())
706 return True
707 elif event.keyval == Gdk.KEY_Down:
708 self.changeLine(self.historyForward())
709 return True
710 elif event.keyval == Gdk.KEY_Tab:
711 if not self.getCurrentLine().strip():
712 return False
713 completed, possibilities = self.complete(self.getCurrentLine())
714 if len(possibilities) > 1:
715 slice = self.getCurrentLine()
716 self.write("\n")
717 for symbol in possibilities:
718 self.write(symbol + "\n")
719 self.showPrompt(self.promptprompt)
720 self.changeLine(completed or slice)
721 return True
722
723 def _processLine(self):
724 """!
725 Process current command line.
726 @return none
727 """
728 self.history_pos = 0
729 self.execute()
730 rv = self.cout.getvalue()
731 if rv:
732 rv = rv.strip("\n")
733 self.showReturned(rv)
734 self.cout.truncate(0)
735 self.cout.seek(0)
736
737
738if __name__ == "__main__":
739 window = Gtk.Window()
740 window.set_default_size(640, 320)
741 window.connect("delete-event", lambda x, y: Gtk.main_quit())
742 window.add(IPythonView())
743 window.show_all()
744 Gtk.main()
745
def write(self, text, editable=False)
Write given text to buffer.
def changeLine(self, text)
Replace currently entered command line with given text.
def _showReturned(self, text)
Show returned text from last command and print new prompt.
def getCurrentLine(self)
Get text in current command line.
def _showPrompt(self, prompt)
Prints prompt at start of line.
def onKeyPressExtend(self, event)
For some reason we can't extend onKeyPress directly (bug #500900).
def onKeyPress(self, widget, event)
Key press callback used for correcting behavior for console-like interfaces.
def showPrompt(self, prompt)
Prints prompt at start of line.
def _write(self, text, editable=False)
Write given text to buffer.
def _changeLine(self, text)
Replace currently entered command line with given text.
def showReturned(self, text)
Show returned text from last command and print new prompt.
def onKeyPressExtend(self, event)
Key press callback with plenty of shell goodness, like history, autocompletions, etc.
def _processLine(self)
Process current command line.
def raw_input(self, prompt="")
Custom raw_input() replacement.
def updateNamespace(self, ns_dict)
Add the current dictionary to the shell namespace.
def _getHistory(self)
Gets the command string of the current history level.
def __update_namespace(self)
Update self.IP namespace for autocompletion with sys.modules.
def __init__(self, argv=[], user_ns=None, user_global_ns=None, cin=None, cout=None, cerr=None, input_func=None)
Constructor for the IterableIPShell class.
Definition: ipython_view.py:74
def historyBack(self)
Provides one history command back.
def generatePrompt(self, is_continuation)
Generate prompt depending on is_continuation value.
def historyForward(self)
Provides one history command forward.
def shell(self, cmd, verbose=0, debug=0, header="")
Replacement method to allow shell commands without them blocking.
def execute(self)
Executes the current line provided by the shell object.
def complete(self, line)
Returns an auto completed line and/or possibilities for completion.
#define delete
#define list