Pseudo Terminals in Python

Summary

In this post I’ll show you a Python script which becomes a pseudo terminal so that it can act as a man in the middle between your terminal emulator and a running process.  In my example, the Python script will:

  • spawn a child process (by default this will be a shell);
  • on start-up, detect the width and height of the window it’s in, and pass those details on to the child process;
  • catch signals from from its controlling terminal saying that the window has changed size, and pass the new details on to the child process;
  • detect when the child process tries to use the alternative screen buffer (this is what processes like Vim do), and inject some key presses as if the user had typed them; and
  • detect when the child process tries to return to the normal screen buffer (for example, by exiting Vim), and inject a different set of key presses.

What are pseudo terminals?

On Unix-based systems, running processes read from and write to streams for input and output. When a process wants to write to the terminal, it writes to the stdout stream. When a process wants to read input from the user, it reads from the stdin stream. A pseudo terminal basically intercepts the stdin and stdout streams of a process so that the process thinks that it is talking to the user via a terminal. Common examples of pseudo terminal programs are xterm and screen.

In Python, the pty module provides a few helpful functions related to pseudo terminals. I also found the development version of the docs helpful because it had an example.

Can’t I just use pipes?

No, you can’t. Of course, pseudo terminals do use pipes. But for terminals (and pseudo terminals), the kernel stores special information such as the window size of the terminal. If you just use pipes, a clever child process will be able to detect that its stdin and stdout streams are pipes and do not belong to a terminal. Some programs will change how they output results based on this fact.

Ok, show me the code

The code below is licensed under the MIT license. You may not use, modify, or distribute it unless you agree to the license. If you agree to the license and would like to download this example, click here (zipped Python source).

# Copyright (c) 2011 Joshua D. Bartlett
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

import array
import fcntl
import os
import pty
import select
import signal
import sys
import termios
import tty

# The following escape codes are xterm codes.
# See http://rtfm.etla.org/xterm/ctlseq.html for more.
START_ALTERNATE_MODE = set('\x1b[?{0}h'.format(i) for i in
…       ('1049', '47', '1047'))
END_ALTERNATE_MODE = set('\x1b[?{0}l'.format(i) for i in
…       ('1049', '47', '1047'))
ALTERNATE_MODE_FLAGS = tuple(START_ALTERNATE_MODE) +
…       tuple(END_ALTERNATE_MODE)

def findlast(s, substrs):
    '''
    Finds whichever of the given substrings occurs last in the given string
    …       and returns that substring, or returns None if no such strings
    …       occur.
    '''
    i = -1
    result = None
    for substr in substrs:
        pos = s.rfind(substr)
        if pos > i:
            i = pos
            result = substr
    return result

class Interceptor(object):
    '''
    This class does the actual work of the pseudo terminal. The spawn()
    …       function is the main entrypoint.
    '''

    def __init__(self):
        self.master_fd = None

    def spawn(self, argv=None):
        '''
        Create a spawned process.
        Based on the code for pty.spawn().
        '''
        assert self.master_fd is None
        if not argv:
            argv = [os.environ['SHELL']]

        pid, master_fd = pty.fork()
        self.master_fd = master_fd
        if pid == pty.CHILD:
            os.execlp(argv[0], *argv)

        old_handler = signal.signal(signal.SIGWINCH, self._signal_winch)
        try:
            mode = tty.tcgetattr(pty.STDIN_FILENO)
            tty.setraw(pty.STDIN_FILENO)
            restore = 1
        except tty.error:    # This is the same as termios.error
            restore = 0
        self._init_fd()
        try:
            self._copy()
        except (IOError, OSError):
            if restore:
                tty.tcsetattr(pty.STDIN_FILENO, tty.TCSAFLUSH, mode)

        os.close(master_fd)
        self.master_fd = None
        signal.signal(signal.SIGWINCH, old_handler)

    def _init_fd(self):
        '''
        Called once when the pty is first set up.
        '''
        self._set_pty_size()

    def _signal_winch(self, signum, frame):
        '''
        Signal handler for SIGWINCH - window size has changed.
        '''
        self._set_pty_size()

    def _set_pty_size(self):
        '''
        Sets the window size of the child pty based on the window size of
        …       our own controlling terminal.
        '''
        assert self.master_fd is not None

        # Get the terminal size of the real terminal, set it on the
        …       pseudoterminal.
        buf = array.array('h', [0, 0, 0, 0])
        fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCGWINSZ, buf, True)
        fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, buf)

    def _copy(self):
        '''
        Main select loop. Passes all data to self.master_read() or
        …       self.stdin_read().
        '''
        assert self.master_fd is not None
        master_fd = self.master_fd
        while 1:
            try:
                rfds, wfds, xfds = select.select([master_fd,
                …       pty.STDIN_FILENO], [], [])
            except select.error, e:
                if e[0] == 4:   # Interrupted system call.
                    continue

            if master_fd in rfds:
                data = os.read(self.master_fd, 1024)
                self.master_read(data)
            if pty.STDIN_FILENO in rfds:
                data = os.read(pty.STDIN_FILENO, 1024)
                self.stdin_read(data)

    def write_stdout(self, data):
        '''
        Writes to stdout as if the child process had written the data.
        '''
        os.write(pty.STDOUT_FILENO, data)

    def write_master(self, data):
        '''
        Writes to the child process from its controlling terminal.
        '''
        master_fd = self.master_fd
        assert master_fd is not None
        while data != '':
            n = os.write(master_fd, data)
            data = data[n:]

    def master_read(self, data):
        '''
        Called when there is data to be sent from the child process back to
        …       the user.
        '''
        flag = findlast(data, ALTERNATE_MODE_FLAGS)
        if flag is not None:
            if flag in START_ALTERNATE_MODE:
                # This code is executed when the child process switches the
                …       terminal into alternate mode. The line below
                …       assumes that the user has opened vim, and writes a
                …       message.
                self.write_master('IEntering special mode.\x1b')
            elif flag in END_ALTERNATE_MODE:
                # This code is executed when the child process switches the
                …       terminal back out of alternate mode. The line below
                …       assumes that the user has returned to the command
                …       prompt.
                self.write_master('echo "Leaving special mode."\r')
        self.write_stdout(data)

    def stdin_read(self, data):
        '''
        Called when there is data to be sent from the user/controlling
        …       terminal down to the child process.
        '''
        self.write_master(data)

if __name__ == '__main__':
    i = Interceptor()
    i.write_stdout('\nThe dream has begun.\n')
    i.spawn(sys.argv[1:])
    i.write_stdout('\nThe dream is (probably) over.\n')

Useful links

Here are some resources which I found useful when writing this example:

This entry was posted in long and tagged , , . Bookmark the permalink.