Skip to content

Commit

Permalink
pythongh-111495: Add PyFile tests (python#129449)
Browse files Browse the repository at this point in the history
Add tests for the following functions in test_capi.test_file:

* PyFile_FromFd()
* PyFile_GetLine()
* PyFile_NewStdPrinter()
* PyFile_WriteObject()
* PyFile_WriteString()
* PyObject_AsFileDescriptor()

Add Modules/_testlimitedcapi/file.c file.

Remove test_embed.StdPrinterTests which became redundant.
  • Loading branch information
vstinner authored Jan 30, 2025
1 parent 4e47e05 commit 4ca9fc0
Show file tree
Hide file tree
Showing 11 changed files with 500 additions and 72 deletions.
249 changes: 235 additions & 14 deletions Lib/test/test_capi/test_file.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,242 @@
import io
import os
import unittest
import warnings
from test import support
from test.support import import_helper, os_helper
from test.support import import_helper, os_helper, warnings_helper

_testcapi = import_helper.import_module('_testcapi')

_testcapi = import_helper.import_module('_testcapi')
_testlimitedcapi = import_helper.import_module('_testlimitedcapi')
_io = import_helper.import_module('_io')
NULL = None
STDOUT_FD = 1

with open(__file__, 'rb') as fp:
FIRST_LINE = next(fp).decode()
FIRST_LINE_NORM = FIRST_LINE.rstrip() + '\n'


class CAPIFileTest(unittest.TestCase):
def test_pyfile_fromfd(self):
# Test PyFile_FromFd() which is a thin wrapper to _io.open()
pyfile_fromfd = _testlimitedcapi.pyfile_fromfd
filename = __file__
with open(filename, "rb") as fp:
fd = fp.fileno()

# FileIO
fp.seek(0)
obj = pyfile_fromfd(fd, filename, "rb", 0, NULL, NULL, NULL, 0)
try:
self.assertIsInstance(obj, _io.FileIO)
self.assertEqual(obj.readline(), FIRST_LINE.encode())
finally:
obj.close()

# BufferedReader
fp.seek(0)
obj = pyfile_fromfd(fd, filename, "rb", 1024, NULL, NULL, NULL, 0)
try:
self.assertIsInstance(obj, _io.BufferedReader)
self.assertEqual(obj.readline(), FIRST_LINE.encode())
finally:
obj.close()

# TextIOWrapper
fp.seek(0)
obj = pyfile_fromfd(fd, filename, "r", 1,
"utf-8", "replace", NULL, 0)
try:
self.assertIsInstance(obj, _io.TextIOWrapper)
self.assertEqual(obj.encoding, "utf-8")
self.assertEqual(obj.errors, "replace")
self.assertEqual(obj.readline(), FIRST_LINE_NORM)
finally:
obj.close()

def test_pyfile_getline(self):
# Test PyFile_GetLine(file, n): call file.readline()
# and strip "\n" suffix if n < 0.
pyfile_getline = _testlimitedcapi.pyfile_getline

# Test Unicode
with open(__file__, "r") as fp:
fp.seek(0)
self.assertEqual(pyfile_getline(fp, -1),
FIRST_LINE_NORM.rstrip('\n'))
fp.seek(0)
self.assertEqual(pyfile_getline(fp, 0),
FIRST_LINE_NORM)
fp.seek(0)
self.assertEqual(pyfile_getline(fp, 6),
FIRST_LINE_NORM[:6])

# Test bytes
with open(__file__, "rb") as fp:
fp.seek(0)
self.assertEqual(pyfile_getline(fp, -1),
FIRST_LINE.rstrip('\n').encode())
fp.seek(0)
self.assertEqual(pyfile_getline(fp, 0),
FIRST_LINE.encode())
fp.seek(0)
self.assertEqual(pyfile_getline(fp, 6),
FIRST_LINE.encode()[:6])

def test_pyfile_writestring(self):
# Test PyFile_WriteString(str, file): call file.write(str)
writestr = _testlimitedcapi.pyfile_writestring

with io.StringIO() as fp:
self.assertEqual(writestr("a\xe9\u20ac\U0010FFFF".encode(), fp), 0)
with self.assertRaises(UnicodeDecodeError):
writestr(b"\xff", fp)
with self.assertRaises(UnicodeDecodeError):
writestr("\udc80".encode("utf-8", "surrogatepass"), fp)

text = fp.getvalue()
self.assertEqual(text, "a\xe9\u20ac\U0010FFFF")

with self.assertRaises(SystemError):
writestr(b"abc", NULL)

def test_pyfile_writeobject(self):
# Test PyFile_WriteObject(obj, file, flags):
# - Call file.write(str(obj)) if flags equals Py_PRINT_RAW.
# - Call file.write(repr(obj)) otherwise.
writeobject = _testlimitedcapi.pyfile_writeobject
Py_PRINT_RAW = 1

with io.StringIO() as fp:
# Test flags=Py_PRINT_RAW
self.assertEqual(writeobject("raw", fp, Py_PRINT_RAW), 0)
writeobject(NULL, fp, Py_PRINT_RAW)

# Test flags=0
self.assertEqual(writeobject("repr", fp, 0), 0)
writeobject(NULL, fp, 0)

text = fp.getvalue()
self.assertEqual(text, "raw<NULL>'repr'<NULL>")

# invalid file type
for invalid_file in (123, "abc", object()):
with self.subTest(file=invalid_file):
with self.assertRaises(AttributeError):
writeobject("abc", invalid_file, Py_PRINT_RAW)

with self.assertRaises(TypeError):
writeobject("abc", NULL, 0)

def test_pyobject_asfiledescriptor(self):
# Test PyObject_AsFileDescriptor(obj):
# - Return obj if obj is an integer.
# - Return obj.fileno() otherwise.
# File descriptor must be >= 0.
asfd = _testlimitedcapi.pyobject_asfiledescriptor

self.assertEqual(asfd(123), 123)
self.assertEqual(asfd(0), 0)

with open(__file__, "rb") as fp:
self.assertEqual(asfd(fp), fp.fileno())

# bool emits RuntimeWarning
msg = r"bool is used as a file descriptor"
with warnings_helper.check_warnings((msg, RuntimeWarning)):
self.assertEqual(asfd(True), 1)

class FakeFile:
def __init__(self, fd):
self.fd = fd
def fileno(self):
return self.fd

# file descriptor must be positive
with self.assertRaises(ValueError):
asfd(-1)
with self.assertRaises(ValueError):
asfd(FakeFile(-1))

# fileno() result must be an integer
with self.assertRaises(TypeError):
asfd(FakeFile("text"))

# unsupported types
for obj in ("string", ["list"], object()):
with self.subTest(obj=obj):
with self.assertRaises(TypeError):
asfd(obj)

# CRASHES asfd(NULL)

def test_pyfile_newstdprinter(self):
# Test PyFile_NewStdPrinter()
pyfile_newstdprinter = _testcapi.pyfile_newstdprinter

file = pyfile_newstdprinter(STDOUT_FD)
self.assertEqual(file.closed, False)
self.assertIsNone(file.encoding)
self.assertEqual(file.mode, "w")

self.assertEqual(file.fileno(), STDOUT_FD)
self.assertEqual(file.isatty(), os.isatty(STDOUT_FD))

# flush() is a no-op
self.assertIsNone(file.flush())

# close() is a no-op
self.assertIsNone(file.close())
self.assertEqual(file.closed, False)

support.check_disallow_instantiation(self, type(file))

def test_pyfile_newstdprinter_write(self):
# Test the write() method of PyFile_NewStdPrinter()
pyfile_newstdprinter = _testcapi.pyfile_newstdprinter

filename = os_helper.TESTFN
self.addCleanup(os_helper.unlink, filename)

try:
old_stdout = os.dup(STDOUT_FD)
except OSError as exc:
# os.dup(STDOUT_FD) is not supported on WASI
self.skipTest(f"os.dup() failed with {exc!r}")

try:
with open(filename, "wb") as fp:
# PyFile_NewStdPrinter() only accepts fileno(stdout)
# or fileno(stderr) file descriptor.
fd = fp.fileno()
os.dup2(fd, STDOUT_FD)

file = pyfile_newstdprinter(STDOUT_FD)
self.assertEqual(file.write("text"), 4)
# The surrogate character is encoded with
# the "surrogateescape" error handler
self.assertEqual(file.write("[\udc80]"), 8)
finally:
os.dup2(old_stdout, STDOUT_FD)
os.close(old_stdout)

with open(filename, "r") as fp:
self.assertEqual(fp.read(), "text[\\udc80]")

def test_py_fopen(self):
# Test Py_fopen() and Py_fclose()
py_fopen = _testcapi.py_fopen

with open(__file__, "rb") as fp:
source = fp.read()

for filename in (__file__, os.fsencode(__file__)):
with self.subTest(filename=filename):
data = _testcapi.py_fopen(filename, "rb")
data = py_fopen(filename, "rb")
self.assertEqual(data, source[:256])

data = _testcapi.py_fopen(os_helper.FakePath(filename), "rb")
data = py_fopen(os_helper.FakePath(filename), "rb")
self.assertEqual(data, source[:256])

filenames = [
Expand All @@ -43,41 +259,46 @@ def test_py_fopen(self):
filename = None
continue
try:
data = _testcapi.py_fopen(filename, "rb")
data = py_fopen(filename, "rb")
self.assertEqual(data, source[:256])
finally:
os_helper.unlink(filename)

# embedded null character/byte in the filename
with self.assertRaises(ValueError):
_testcapi.py_fopen("a\x00b", "rb")
py_fopen("a\x00b", "rb")
with self.assertRaises(ValueError):
_testcapi.py_fopen(b"a\x00b", "rb")
py_fopen(b"a\x00b", "rb")

# non-ASCII mode failing with "Invalid argument"
with self.assertRaises(OSError):
_testcapi.py_fopen(__file__, b"\xc2\x80")
py_fopen(__file__, b"\xc2\x80")
with self.assertRaises(OSError):
# \x98 is invalid in cp1250, cp1251, cp1257
# \x9d is invalid in cp1252-cp1255, cp1258
_testcapi.py_fopen(__file__, b"\xc2\x98\xc2\x9d")
py_fopen(__file__, b"\xc2\x98\xc2\x9d")
# UnicodeDecodeError can come from the audit hook code
with self.assertRaises((UnicodeDecodeError, OSError)):
_testcapi.py_fopen(__file__, b"\x98\x9d")
py_fopen(__file__, b"\x98\x9d")

# invalid filename type
for invalid_type in (123, object()):
with self.subTest(filename=invalid_type):
with self.assertRaises(TypeError):
_testcapi.py_fopen(invalid_type, "rb")
py_fopen(invalid_type, "rb")

if support.MS_WINDOWS:
with self.assertRaises(OSError):
# On Windows, the file mode is limited to 10 characters
_testcapi.py_fopen(__file__, "rt+, ccs=UTF-8")
py_fopen(__file__, "rt+, ccs=UTF-8")

# CRASHES py_fopen(NULL, 'rb')
# CRASHES py_fopen(__file__, NULL)

# TODO: Test Py_UniversalNewlineFgets()

# CRASHES _testcapi.py_fopen(NULL, 'rb')
# CRASHES _testcapi.py_fopen(__file__, NULL)
# PyFile_SetOpenCodeHook() and PyFile_OpenCode() are tested by
# test_embed.test_open_code_hook()


if __name__ == "__main__":
Expand Down
51 changes: 0 additions & 51 deletions Lib/test/test_embed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1985,56 +1985,5 @@ def test_presite(self):
self.assertIn("unique-python-message", out)


class StdPrinterTests(EmbeddingTestsMixin, unittest.TestCase):
# Test PyStdPrinter_Type which is used by _PySys_SetPreliminaryStderr():
# "Set up a preliminary stderr printer until we have enough
# infrastructure for the io module in place."

STDOUT_FD = 1

def create_printer(self, fd):
ctypes = import_helper.import_module('ctypes')
PyFile_NewStdPrinter = ctypes.pythonapi.PyFile_NewStdPrinter
PyFile_NewStdPrinter.argtypes = (ctypes.c_int,)
PyFile_NewStdPrinter.restype = ctypes.py_object
return PyFile_NewStdPrinter(fd)

def test_write(self):
message = "unicode:\xe9-\u20ac-\udc80!\n"

stdout_fd = self.STDOUT_FD
stdout_fd_copy = os.dup(stdout_fd)
self.addCleanup(os.close, stdout_fd_copy)

rfd, wfd = os.pipe()
self.addCleanup(os.close, rfd)
self.addCleanup(os.close, wfd)
try:
# PyFile_NewStdPrinter() only accepts fileno(stdout)
# or fileno(stderr) file descriptor.
os.dup2(wfd, stdout_fd)

printer = self.create_printer(stdout_fd)
printer.write(message)
finally:
os.dup2(stdout_fd_copy, stdout_fd)

data = os.read(rfd, 100)
self.assertEqual(data, message.encode('utf8', 'backslashreplace'))

def test_methods(self):
fd = self.STDOUT_FD
printer = self.create_printer(fd)
self.assertEqual(printer.fileno(), fd)
self.assertEqual(printer.isatty(), os.isatty(fd))
printer.flush() # noop
printer.close() # noop

def test_disallow_instantiation(self):
fd = self.STDOUT_FD
printer = self.create_printer(fd)
support.check_disallow_instantiation(self, type(printer))


if __name__ == "__main__":
unittest.main()
2 changes: 1 addition & 1 deletion Modules/Setup.stdlib.in
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
@MODULE__TESTBUFFER_TRUE@_testbuffer _testbuffer.c
@MODULE__TESTINTERNALCAPI_TRUE@_testinternalcapi _testinternalcapi.c _testinternalcapi/test_lock.c _testinternalcapi/pytime.c _testinternalcapi/set.c _testinternalcapi/test_critical_sections.c
@MODULE__TESTCAPI_TRUE@_testcapi _testcapimodule.c _testcapi/vectorcall.c _testcapi/heaptype.c _testcapi/abstract.c _testcapi/unicode.c _testcapi/dict.c _testcapi/set.c _testcapi/list.c _testcapi/tuple.c _testcapi/getargs.c _testcapi/datetime.c _testcapi/docstring.c _testcapi/mem.c _testcapi/watchers.c _testcapi/long.c _testcapi/float.c _testcapi/complex.c _testcapi/numbers.c _testcapi/structmember.c _testcapi/exceptions.c _testcapi/code.c _testcapi/buffer.c _testcapi/pyatomic.c _testcapi/run.c _testcapi/file.c _testcapi/codec.c _testcapi/immortal.c _testcapi/gc.c _testcapi/hash.c _testcapi/time.c _testcapi/bytes.c _testcapi/object.c _testcapi/monitoring.c _testcapi/config.c _testcapi/import.c
@MODULE__TESTLIMITEDCAPI_TRUE@_testlimitedcapi _testlimitedcapi.c _testlimitedcapi/abstract.c _testlimitedcapi/bytearray.c _testlimitedcapi/bytes.c _testlimitedcapi/codec.c _testlimitedcapi/complex.c _testlimitedcapi/dict.c _testlimitedcapi/eval.c _testlimitedcapi/float.c _testlimitedcapi/heaptype_relative.c _testlimitedcapi/import.c _testlimitedcapi/list.c _testlimitedcapi/long.c _testlimitedcapi/object.c _testlimitedcapi/pyos.c _testlimitedcapi/set.c _testlimitedcapi/sys.c _testlimitedcapi/tuple.c _testlimitedcapi/unicode.c _testlimitedcapi/vectorcall_limited.c _testlimitedcapi/version.c
@MODULE__TESTLIMITEDCAPI_TRUE@_testlimitedcapi _testlimitedcapi.c _testlimitedcapi/abstract.c _testlimitedcapi/bytearray.c _testlimitedcapi/bytes.c _testlimitedcapi/codec.c _testlimitedcapi/complex.c _testlimitedcapi/dict.c _testlimitedcapi/eval.c _testlimitedcapi/float.c _testlimitedcapi/heaptype_relative.c _testlimitedcapi/import.c _testlimitedcapi/list.c _testlimitedcapi/long.c _testlimitedcapi/object.c _testlimitedcapi/pyos.c _testlimitedcapi/set.c _testlimitedcapi/sys.c _testlimitedcapi/tuple.c _testlimitedcapi/unicode.c _testlimitedcapi/vectorcall_limited.c _testlimitedcapi/version.c _testlimitedcapi/file.c
@MODULE__TESTCLINIC_TRUE@_testclinic _testclinic.c
@MODULE__TESTCLINIC_LIMITED_TRUE@_testclinic_limited _testclinic_limited.c

Expand Down
Loading

0 comments on commit 4ca9fc0

Please sign in to comment.