Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 111 additions & 11 deletions Lib/test/test_pty.py
Original file line numberDiff line numberDiff line change
Expand Up@@ -14,9 +14,22 @@
import io # readline
import unittest

import struct
import tty
import fcntl
import platform
import warnings

TEST_STRING_1 = b"I wish to buy a fish license.\n"
TEST_STRING_2 = b"For my pet fish, Eric.\n"

try:
_TIOCGWINSZ = tty.TIOCGWINSZ
_TIOCSWINSZ = tty.TIOCSWINSZ
_HAVE_WINSZ = True
except AttributeError:
_HAVE_WINSZ = False

if verbose:
def debug(msg):
print(msg)
Expand DownExpand Up@@ -60,6 +73,27 @@ def _readline(fd):
reader = io.FileIO(fd, mode='rb', closefd=False)
return reader.readline()

def expectedFailureIfStdinIsTTY(fun):
# avoid isatty() for now
try:
tty.tcgetattr(pty.STDIN_FILENO)
return unittest.expectedFailure(fun)
except tty.error:
pass
return fun

def expectedFailureOnBSD(fun):
PLATFORM = platform.system()
if PLATFORM.endswith("BSD") or PLATFORM == "Darwin":
return unittest.expectedFailure(fun)
return fun

def _get_term_winsz(fd):
s = struct.pack("HHHH", 0, 0, 0, 0)
return fcntl.ioctl(fd, _TIOCGWINSZ, s)

def _set_term_winsz(fd, winsz):
fcntl.ioctl(fd, _TIOCSWINSZ, winsz)


# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
Expand All@@ -78,6 +112,20 @@ def setUp(self):
self.addCleanup(signal.alarm, 0)
signal.alarm(10)

# Save original stdin window size
self.stdin_rows = None
self.stdin_cols = None
if _HAVE_WINSZ:
try:
stdin_dim = os.get_terminal_size(pty.STDIN_FILENO)
self.stdin_rows = stdin_dim.lines
self.stdin_cols = stdin_dim.columns
old_stdin_winsz = struct.pack("HHHH", self.stdin_rows,
self.stdin_cols, 0, 0)
self.addCleanup(_set_term_winsz, pty.STDIN_FILENO, old_stdin_winsz)
except OSError:
pass

def handle_sig(self, sig, frame):
self.fail("isatty hung")

Expand All@@ -86,26 +134,65 @@ def handle_sighup(signum, frame):
# bpo-38547: if the process is the session leader, os.close(master_fd)
# of "master_fd, slave_name = pty.master_open()" raises SIGHUP
# signal: just ignore the signal.
#
# NOTE: the above comment is from an older version of the test;
# master_open() is not being used anymore.
pass

def test_basic(self):
@expectedFailureIfStdinIsTTY
def test_openpty(self):
try:
debug("Calling master_open()")
master_fd, slave_name = pty.master_open()
debug("Got master_fd '%d', slave_name '%s'" %
(master_fd, slave_name))
debug("Calling slave_open(%r)" % (slave_name,))
slave_fd = pty.slave_open(slave_name)
debug("Got slave_fd '%d'" % slave_fd)
mode = tty.tcgetattr(pty.STDIN_FILENO)
except tty.error:
# not a tty or bad/closed fd
debug("tty.tcgetattr(pty.STDIN_FILENO) failed")
mode = None

new_stdin_winsz = None
if self.stdin_rows != None and self.stdin_cols != None:
try:
debug("Setting pty.STDIN_FILENO window size")
# Set number of columns and rows to be the
# floors of 1/5 of respective original values
target_stdin_winsz = struct.pack("HHHH", self.stdin_rows//5,
self.stdin_cols//5, 0, 0)
_set_term_winsz(pty.STDIN_FILENO, target_stdin_winsz)

# Were we able to set the window size
# of pty.STDIN_FILENO successfully?
new_stdin_winsz = _get_term_winsz(pty.STDIN_FILENO)
self.assertEqual(new_stdin_winsz, target_stdin_winsz,
"pty.STDIN_FILENO window size unchanged")
except OSError:
warnings.warn("Failed to set pty.STDIN_FILENO window size")
pass

try:
debug("Calling pty.openpty()")
try:
master_fd, slave_fd = pty.openpty(mode, new_stdin_winsz)
except TypeError:
master_fd, slave_fd = pty.openpty()
debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")
except OSError:
# " An optional feature could not be imported " ... ?
raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.")

self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
self.assertTrue(os.isatty(slave_fd), "slave_fd is not a tty")

if mode:
self.assertEqual(tty.tcgetattr(slave_fd), mode,
"openpty() failed to set slave termios")
if new_stdin_winsz:
self.assertEqual(_get_term_winsz(slave_fd), new_stdin_winsz,
"openpty() failed to set slave window size")

# Solaris requires reading the fd before anything is returned.
# My guess is that since we open and close the slave fd
# in master_open(), we need to read the EOF.
#
# NOTE: the above comment is from an older version of the test;
# master_open() is not being used anymore.

# Ensure the fd is non-blocking in case there's nothing to read.
blocking = os.get_blocking(master_fd)
Expand DownExpand Up@@ -222,8 +309,20 @@ def test_fork(self):

os.close(master_fd)

# pty.fork() passed.
@expectedFailureOnBSD
def test_master_read(self):
debug("Calling pty.openpty()")
master_fd, slave_fd = pty.openpty()
debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")

debug("Closing slave_fd")
os.close(slave_fd)

debug("Reading from master_fd")
with self.assertRaises(OSError):
os.read(master_fd, 1)

os.close(master_fd)

class SmallPtyTests(unittest.TestCase):
"""These tests don't spawn children or hang."""
Expand DownExpand Up@@ -262,8 +361,9 @@ def _socketpair(self):
self.files.extend(socketpair)
return socketpair

def _mock_select(self, rfds, wfds, xfds):
def _mock_select(self, rfds, wfds, xfds, timeout=0):
# This will raise IndexError when no more expected calls exist.
# This ignores the timeout
self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
return self.select_rfds_results.pop(0), [], []

Expand Down
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
Updated tests for the pty library. test_basic() has been changed to test_openpty(); this additionally checks if slave termios and slave winsize are being set properly by pty.openpty(). In order to add support for FreeBSD, NetBSD, OpenBSD, and Darwin, this also adds test_master_read(), which demonstrates that pty.spawn() should not depend on an OSError to exit from its copy loop.