Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 295 additions & 2 deletions Lib/test/test_capi/test_codecs.py
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
import unittest
import codecs
import contextlib
import io
import re
import sys
import unittest
import unittest.mock as mock
import _testcapi
from test.support import import_helper

_testlimitedcapi = import_helper.import_module('_testlimitedcapi')

NULL = None
BAD_ARGUMENT = re.escape('bad argument type for built-in operation')


class CAPITest(unittest.TestCase):
class CAPIUnicodeTest(unittest.TestCase):
# TODO: Test the following functions:
#
# PyUnicode_BuildEncodingMap
Expand DownExpand Up@@ -516,5 +523,291 @@ def test_asrawunicodeescapestring(self):
# CRASHES asrawunicodeescapestring(NULL)


class CAPICodecs(unittest.TestCase):

def setUp(self):
# Encoding names are normalized internally by converting them
# to lowercase and their hyphens are replaced by underscores.
self.encoding_name = 'test.test_capi.test_codecs.codec_reversed'
# Make sure that our custom codec is not already registered (that
# way we know whether we correctly unregistered the custom codec
# after a test or not).
self.assertRaises(LookupError, codecs.lookup, self.encoding_name)
# create the search function without registering yet
self._create_custom_codec()

def _create_custom_codec(self):
def codec_encoder(m, errors='strict'):
return (type(m)().join(reversed(m)), len(m))

def codec_decoder(c, errors='strict'):
return (type(c)().join(reversed(c)), len(c))

class IncrementalEncoder(codecs.IncrementalEncoder):
def encode(self, input, final=False):
return codec_encoder(input)

class IncrementalDecoder(codecs.IncrementalDecoder):
def decode(self, input, final=False):
return codec_decoder(input)

class StreamReader(codecs.StreamReader):
def encode(self, input, errors='strict'):
return codec_encoder(input, errors=errors)

def decode(self, input, errors='strict'):
return codec_decoder(input, errors=errors)

class StreamWriter(codecs.StreamWriter):
def encode(self, input, errors='strict'):
return codec_encoder(input, errors=errors)

def decode(self, input, errors='strict'):
return codec_decoder(input, errors=errors)

info = codecs.CodecInfo(
encode=codec_encoder,
decode=codec_decoder,
streamreader=StreamReader,
streamwriter=StreamWriter,
incrementalencoder=IncrementalEncoder,
incrementaldecoder=IncrementalDecoder,
name=self.encoding_name
)

def search_function(encoding):
if encoding == self.encoding_name:
return info
return None

self.codec_info = info
self.search_function = search_function

@contextlib.contextmanager
def use_custom_encoder(self):
self.assertRaises(LookupError, codecs.lookup, self.encoding_name)
codecs.register(self.search_function)
yield
codecs.unregister(self.search_function)
self.assertRaises(LookupError, codecs.lookup, self.encoding_name)

def test_codec_register(self):
search_function, encoding = self.search_function, self.encoding_name
# register the search function using the C API
self.assertIsNone(_testcapi.codec_register(search_function))
# in case the test failed before cleaning up
self.addCleanup(codecs.unregister, self.search_function)
self.assertIs(codecs.lookup(encoding), search_function(encoding))
self.assertEqual(codecs.encode('123', encoding=encoding), '321')
# unregister the search function using the regular API
codecs.unregister(search_function)
self.assertRaises(LookupError, codecs.lookup, encoding)

def test_codec_unregister(self):
search_function, encoding = self.search_function, self.encoding_name
self.assertRaises(LookupError, codecs.lookup, encoding)
# register the search function using the regular API
codecs.register(search_function)
# in case the test failed before cleaning up
self.addCleanup(codecs.unregister, self.search_function)
self.assertIsNotNone(codecs.lookup(encoding))
# unregister the search function using the C API
self.assertIsNone(_testcapi.codec_unregister(search_function))
self.assertRaises(LookupError, codecs.lookup, encoding)

def test_codec_known_encoding(self):
self.assertRaises(LookupError, codecs.lookup, 'unknown-codec')
self.assertFalse(_testcapi.codec_known_encoding('unknown-codec'))
self.assertFalse(_testcapi.codec_known_encoding('unknown_codec'))
self.assertFalse(_testcapi.codec_known_encoding('UNKNOWN-codec'))

encoding_name = self.encoding_name
self.assertRaises(LookupError, codecs.lookup, encoding_name)

codecs.register(self.search_function)
self.addCleanup(codecs.unregister, self.search_function)

for name in [
encoding_name,
encoding_name.upper(),
encoding_name.replace('_', '-'),
]:
with self.subTest(name):
self.assertTrue(_testcapi.codec_known_encoding(name))

def test_codec_encode(self):
encode = _testcapi.codec_encode
self.assertEqual(encode('a', 'utf-8', NULL), b'a')
self.assertEqual(encode('a', 'utf-8', 'strict'), b'a')
self.assertEqual(encode('[é]', 'ascii', 'ignore'), b'[]')

self.assertRaises(TypeError, encode, NULL, 'ascii', 'strict')
with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
encode('a', NULL, 'strict')

def test_codec_decode(self):
decode = _testcapi.codec_decode

s = 'a\xa1\u4f60\U0001f600'
b = s.encode()

self.assertEqual(decode(b, 'utf-8', 'strict'), s)
self.assertEqual(decode(b, 'utf-8', NULL), s)
self.assertEqual(decode(b, 'latin1', 'strict'), b.decode('latin1'))
self.assertRaises(UnicodeDecodeError, decode, b, 'ascii', 'strict')
self.assertRaises(UnicodeDecodeError, decode, b, 'ascii', NULL)
self.assertEqual(decode(b, 'ascii', 'replace'), 'a' + '\ufffd'*9)

# _codecs.decode() only reports an unknown error handling name when
# the corresponding error handling function is used; this difers
# from PyUnicode_Decode() which checks that both the encoding and
# the error handling name are recognized before even attempting to
# call the decoder.
self.assertEqual(decode(b'', 'utf-8', 'unknown-error-handler'), '')
self.assertEqual(decode(b'a', 'utf-8', 'unknown-error-handler'), 'a')

self.assertRaises(TypeError, decode, NULL, 'ascii', 'strict')
with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
decode(b, NULL, 'strict')

def test_codec_encoder(self):
codec_encoder = _testcapi.codec_encoder

with self.use_custom_encoder():
encoder = codec_encoder(self.encoding_name)
self.assertIs(encoder, self.codec_info.encode)

with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
codec_encoder(NULL)

def test_codec_decoder(self):
codec_decoder = _testcapi.codec_decoder

with self.use_custom_encoder():
decoder = codec_decoder(self.encoding_name)
self.assertIs(decoder, self.codec_info.decode)

with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
codec_decoder(NULL)

def test_codec_incremental_encoder(self):
codec_incremental_encoder = _testcapi.codec_incremental_encoder

with self.use_custom_encoder():
encoding = self.encoding_name

for errors in ['strict', NULL]:
with self.subTest(errors):
encoder = codec_incremental_encoder(encoding, errors)
self.assertIsInstance(encoder, self.codec_info.incrementalencoder)

with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
codec_incremental_encoder(NULL, 'strict')

def test_codec_incremental_decoder(self):
codec_incremental_decoder = _testcapi.codec_incremental_decoder

with self.use_custom_encoder():
encoding = self.encoding_name

for errors in ['strict', NULL]:
with self.subTest(errors):
decoder = codec_incremental_decoder(encoding, errors)
self.assertIsInstance(decoder, self.codec_info.incrementaldecoder)

with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
codec_incremental_decoder(NULL, 'strict')

def test_codec_stream_reader(self):
codec_stream_reader = _testcapi.codec_stream_reader

with self.use_custom_encoder():
encoding, stream = self.encoding_name, io.StringIO()
for errors in ['strict', NULL]:
with self.subTest(errors):
writer = codec_stream_reader(encoding, stream, errors)
self.assertIsInstance(writer, self.codec_info.streamreader)

with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
codec_stream_reader(NULL, stream, 'strict')

def test_codec_stream_writer(self):
codec_stream_writer = _testcapi.codec_stream_writer

with self.use_custom_encoder():
encoding, stream = self.encoding_name, io.StringIO()
for errors in ['strict', NULL]:
with self.subTest(errors):
writer = codec_stream_writer(encoding, stream, errors)
self.assertIsInstance(writer, self.codec_info.streamwriter)

with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
codec_stream_writer(NULL, stream, 'strict')


class CAPICodecErrors(unittest.TestCase):

def test_codec_register_error(self):
# for cleaning up between tests
from _codecs import _unregister_error as _codecs_unregister_error

self.assertRaises(LookupError, _testcapi.codec_lookup_error, 'custom')

def custom_error_handler(exc):
raise exc

error_handler = mock.Mock(wraps=custom_error_handler)
_testcapi.codec_register_error('custom', error_handler)
self.addCleanup(_codecs_unregister_error, 'custom')

self.assertRaises(UnicodeEncodeError, codecs.encode,
'\xff', 'ascii', errors='custom')
error_handler.assert_called_once()
error_handler.reset_mock()

self.assertRaises(UnicodeDecodeError, codecs.decode,
b'\xff', 'ascii', errors='custom')
error_handler.assert_called_once()

# _codecs._unregister_error directly delegates to the internal C
# function so a Python-level function test is sufficient (it is
# tested in test_codeccallbacks).

def test_codec_lookup_error(self):
codec_lookup_error = _testcapi.codec_lookup_error
self.assertIs(codec_lookup_error(NULL), codecs.strict_errors)
self.assertIs(codec_lookup_error('strict'), codecs.strict_errors)
self.assertIs(codec_lookup_error('ignore'), codecs.ignore_errors)
self.assertIs(codec_lookup_error('replace'), codecs.replace_errors)
self.assertIs(codec_lookup_error('xmlcharrefreplace'), codecs.xmlcharrefreplace_errors)
self.assertIs(codec_lookup_error('namereplace'), codecs.namereplace_errors)
self.assertRaises(LookupError, codec_lookup_error, 'unknown')

def test_codec_error_handlers(self):
exceptions = [
# A UnicodeError with an empty message currently crashes:
# See: https://github.com/python/cpython/issues/123378
# UnicodeEncodeError('bad', '', 0, 1, 'reason'),
UnicodeEncodeError('bad', 'x', 0, 1, 'reason'),
UnicodeEncodeError('bad', 'xyz123', 0, 1, 'reason'),
UnicodeEncodeError('bad', 'xyz123', 1, 4, 'reason'),
]

strict_handler = _testcapi.codec_strict_errors
for exc in exceptions:
with self.subTest(handler=strict_handler, exc=exc):
self.assertRaises(UnicodeEncodeError, strict_handler, exc)

for handler in [
_testcapi.codec_ignore_errors,
_testcapi.codec_replace_errors,
_testcapi.codec_xmlcharrefreplace_errors,
_testlimitedcapi.codec_namereplace_errors,
]:
for exc in exceptions:
with self.subTest(handler=handler, exc=exc):
self.assertIsInstance(handler(exc), tuple)


if __name__ == "__main__":
unittest.main()
2 changes: 1 addition & 1 deletion Modules/Setup.stdlib.in
Original file line numberDiff line numberDiff line change
Expand Up@@ -163,7 +163,7 @@
@MODULE__TESTBUFFER_TRUE@_testbuffer _testbuffer.c
@MODULE__TESTINTERNALCAPI_TRUE@_testinternalcapi _testinternalcapi.c _testinternalcapi/test_lock.c _testinternalcapi/pytime.c _testinternalcapi/set.c _testinternalcapi/test_critical_sections.c
@MODULE__TESTCAPI_TRUE@_testcapi _testcapimodule.c _testcapi/vectorcall.c _testcapi/heaptype.c _testcapi/abstract.c _testcapi/unicode.c _testcapi/dict.c _testcapi/set.c _testcapi/list.c _testcapi/tuple.c _testcapi/getargs.c _testcapi/datetime.c _testcapi/docstring.c _testcapi/mem.c _testcapi/watchers.c _testcapi/long.c _testcapi/float.c _testcapi/complex.c _testcapi/numbers.c _testcapi/structmember.c _testcapi/exceptions.c _testcapi/code.c _testcapi/buffer.c _testcapi/pyatomic.c _testcapi/run.c _testcapi/file.c _testcapi/codec.c _testcapi/immortal.c _testcapi/gc.c _testcapi/hash.c _testcapi/time.c _testcapi/bytes.c _testcapi/object.c _testcapi/monitoring.c _testcapi/config.c
@MODULE__TESTLIMITEDCAPI_TRUE@_testlimitedcapi _testlimitedcapi.c _testlimitedcapi/abstract.c _testlimitedcapi/bytearray.c _testlimitedcapi/bytes.c _testlimitedcapi/complex.c _testlimitedcapi/dict.c _testlimitedcapi/eval.c _testlimitedcapi/float.c _testlimitedcapi/heaptype_relative.c _testlimitedcapi/list.c _testlimitedcapi/long.c _testlimitedcapi/object.c _testlimitedcapi/pyos.c _testlimitedcapi/set.c _testlimitedcapi/sys.c _testlimitedcapi/tuple.c _testlimitedcapi/unicode.c _testlimitedcapi/vectorcall_limited.c
@MODULE__TESTLIMITEDCAPI_TRUE@_testlimitedcapi _testlimitedcapi.c _testlimitedcapi/abstract.c _testlimitedcapi/bytearray.c _testlimitedcapi/bytes.c _testlimitedcapi/codec.c _testlimitedcapi/complex.c _testlimitedcapi/dict.c _testlimitedcapi/eval.c _testlimitedcapi/float.c _testlimitedcapi/heaptype_relative.c _testlimitedcapi/list.c _testlimitedcapi/long.c _testlimitedcapi/object.c _testlimitedcapi/pyos.c _testlimitedcapi/set.c _testlimitedcapi/sys.c _testlimitedcapi/tuple.c _testlimitedcapi/unicode.c _testlimitedcapi/vectorcall_limited.c
@MODULE__TESTCLINIC_TRUE@_testclinic _testclinic.c
@MODULE__TESTCLINIC_LIMITED_TRUE@_testclinic_limited _testclinic_limited.c

Expand Down
Loading