URL: http://github.com/python/cpython/pull/125739.patch
lue should be a tuple, ``(user, - password)``, which can be used for basic authentication. - - The implementation prompts for this information on the terminal; an application - should override this method to use an appropriate interaction model in the local - environment. - :mod:`urllib.request` Restrictions ---------------------------------- @@ -1578,8 +1462,7 @@ some point in the future. you try to fetch a file whose read permissions make it inaccessible; the FTP code will try to read it, fail with a 550 error, and then perform a directory listing for the unreadable file. If fine-grained control is needed, consider - using the :mod:`ftplib` module, subclassing :class:`FancyURLopener`, or changing - *_urlopener* to meet your needs. + using the :mod:`ftplib` module. diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index ad841538ccc547..dd2018de6aa290 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -611,6 +611,10 @@ urllib * Remove deprecated :class:`!Quoter` class from :mod:`urllib.parse`. It had previously raised a :exc:`DeprecationWarning` since Python 3.11. (Contributed by Nikita Sobolev in :gh:`118827`.) +* Remove deprecated :class:`!URLopener` and :class:`!FancyURLopener` classes + from :mod:`urllib.request`. They had previously raised a + :exc:`DeprecationWarning` since Python 3.3. + (Contributed by Barney Gale in gh:`84850`.) Others ------ diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py index 6bb0fb303fa202..65e05dc30c3dcc 100644 --- a/Lib/test/test_urllib.py +++ b/Lib/test/test_urllib.py @@ -7,11 +7,9 @@ import email.message import io import unittest -from unittest.mock import patch from test import support from test.support import os_helper from test.support import socket_helper -from test.support import warnings_helper import os try: import ssl @@ -21,7 +19,6 @@ import tempfile from nturl2path import url2pathname, pathname2url -from base64 import b64encode import collections @@ -36,32 +33,6 @@ def hexescape(char): hex_repr = "0%s" % hex_repr return "%" + hex_repr -# Shortcut for testing FancyURLopener -_urlopener = None - - -def urlopen(url, data=None, proxies=None): - """urlopen(url [, data]) -> open file-like object""" - global _urlopener - if proxies is not None: - opener = urllib.request.FancyURLopener(proxies=proxies) - elif not _urlopener: - opener = FancyURLopener() - _urlopener = opener - else: - opener = _urlopener - if data is None: - return opener.open(url) - else: - return opener.open(url, data) - - -def FancyURLopener(): - with warnings_helper.check_warnings( - ('FancyURLopener style of invoking requests is deprecated.', - DeprecationWarning)): - return urllib.request.FancyURLopener() - def fakehttp(fakedata, mock_close=False): class FakeSocket(io.BytesIO): @@ -159,7 +130,7 @@ def setUp(self): f.close() self.pathname = os_helper.TESTFN self.quoted_pathname = urllib.parse.quote(self.pathname) - self.returned_obj = urlopen("file:%s" % self.quoted_pathname) + self.returned_obj = urllib.request.urlopen("file:%s" % self.quoted_pathname) def tearDown(self): """Shut down the open object""" @@ -206,7 +177,7 @@ def test_headers(self): self.assertIsInstance(self.returned_obj.headers, email.message.Message) def test_url(self): - self.assertEqual(self.returned_obj.url, self.quoted_pathname) + self.assertEqual(self.returned_obj.url, "file://" + self.quoted_pathname) def test_status(self): self.assertIsNone(self.returned_obj.status) @@ -215,7 +186,7 @@ def test_info(self): self.assertIsInstance(self.returned_obj.info(), email.message.Message) def test_geturl(self): - self.assertEqual(self.returned_obj.geturl(), self.quoted_pathname) + self.assertEqual(self.returned_obj.geturl(), "file://" + self.quoted_pathname) def test_getcode(self): self.assertIsNone(self.returned_obj.getcode()) @@ -346,7 +317,7 @@ class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin): def check_read(self, ver): self.fakehttp(b"HTTP/" + ver + b" 200 OK\r\n\r\nHello!") try: - fp = urlopen("http://python.org/") + fp = urllib.request.urlopen("http://python.org/") self.assertEqual(fp.readline(), b"Hello!") self.assertEqual(fp.readline(), b"") self.assertEqual(fp.geturl(), 'http://python.org/') @@ -367,8 +338,8 @@ def test_url_fragment(self): def test_willclose(self): self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!") try: - resp = urlopen("http://www.python.org") - self.assertTrue(resp.fp.will_close) + resp = urllib.request.urlopen("http://www.python.org") + self.assertTrue(resp.will_close) finally: self.unfakehttp() @@ -393,9 +364,6 @@ def test_url_path_with_control_char_rejected(self): with self.assertRaisesRegex( InvalidURL, f"contain control.*{escaped_char_repr}"): urllib.request.urlopen(f"https:{schemeless_url}") - # This code path quotes the URL so there is no injection. - resp = urlopen(f"http:{schemeless_url}") - self.assertNotIn(char, resp.geturl()) finally: self.unfakehttp() @@ -417,11 +385,6 @@ def test_url_path_with_newline_header_injection_rejected(self): urllib.request.urlopen(f"http:{schemeless_url}") with self.assertRaisesRegex(InvalidURL, r"contain control.*\\n"): urllib.request.urlopen(f"https:{schemeless_url}") - # This code path quotes the URL so there is no injection. - resp = urlopen(f"http:{schemeless_url}") - self.assertNotIn(' ', resp.geturl()) - self.assertNotIn('\r', resp.geturl()) - self.assertNotIn('\n', resp.geturl()) finally: self.unfakehttp() @@ -436,9 +399,9 @@ def test_url_host_with_control_char_rejected(self): InvalidURL = http.client.InvalidURL with self.assertRaisesRegex( InvalidURL, f"contain control.*{escaped_char_repr}"): - urlopen(f"http:{schemeless_url}") + urllib.request.urlopen(f"http:{schemeless_url}") with self.assertRaisesRegex(InvalidURL, f"contain control.*{escaped_char_repr}"): - urlopen(f"https:{schemeless_url}") + urllib.request.urlopen(f"https:{schemeless_url}") finally: self.unfakehttp() @@ -451,9 +414,9 @@ def test_url_host_with_newline_header_injection_rejected(self): InvalidURL = http.client.InvalidURL with self.assertRaisesRegex( InvalidURL, r"contain control.*\\r"): - urlopen(f"http:{schemeless_url}") + urllib.request.urlopen(f"http:{schemeless_url}") with self.assertRaisesRegex(InvalidURL, r"contain control.*\\n"): - urlopen(f"https:{schemeless_url}") + urllib.request.urlopen(f"https:{schemeless_url}") finally: self.unfakehttp() @@ -477,7 +440,7 @@ def test_read_bogus(self): Content-Type: text/html; charset=iso-8859-1 ''', mock_close=True) try: - self.assertRaises(OSError, urlopen, "http://python.org/") + self.assertRaises(OSError, urllib.request.urlopen, "http://python.org/") finally: self.unfakehttp() @@ -493,20 +456,20 @@ def test_invalid_redirect(self): try: msg = "Redirection to url 'file:" with self.assertRaisesRegex(urllib.error.HTTPError, msg): - urlopen("http://python.org/") + urllib.request.urlopen("http://python.org/") finally: self.unfakehttp() def test_redirect_limit_independent(self): # Ticket #12923: make sure independent requests each use their # own retry limit. - for i in range(FancyURLopener().maxtries): + for i in range(urllib.request.HTTPRedirectHandler.max_redirections): self.fakehttp(b'''HTTP/1.1 302 Found Location: file://guidocomputer.athome.com:/python/license Connection: close ''', mock_close=True) try: - self.assertRaises(urllib.error.HTTPError, urlopen, + self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, "http://something") finally: self.unfakehttp() @@ -516,14 +479,14 @@ def test_empty_socket(self): # data. (#1680230) self.fakehttp(b'') try: - self.assertRaises(OSError, urlopen, "http://something") + self.assertRaises(OSError, urllib.request.urlopen, "http://something") finally: self.unfakehttp() def test_missing_localfile(self): # Test for #10836 with self.assertRaises(urllib.error.URLError) as e: - urlopen('file://localhost/a/file/which/doesnot/exists.py') + urllib.request.urlopen('file://localhost/a/file/which/doesnot/exists.py') self.assertTrue(e.exception.filename) self.assertTrue(e.exception.reason) @@ -532,71 +495,28 @@ def test_file_notexists(self): tmp_fileurl = 'file://localhost/' + tmp_file.replace(os.path.sep, '/') try: self.assertTrue(os.path.exists(tmp_file)) - with urlopen(tmp_fileurl) as fobj: + with urllib.request.urlopen(tmp_fileurl) as fobj: self.assertTrue(fobj) finally: os.close(fd) os.unlink(tmp_file) self.assertFalse(os.path.exists(tmp_file)) with self.assertRaises(urllib.error.URLError): - urlopen(tmp_fileurl) + urllib.request.urlopen(tmp_fileurl) def test_ftp_nohost(self): test_ftp_url = 'ftp://github.com/path' with self.assertRaises(urllib.error.URLError) as e: - urlopen(test_ftp_url) + urllib.request.urlopen(test_ftp_url) self.assertFalse(e.exception.filename) self.assertTrue(e.exception.reason) def test_ftp_nonexisting(self): with self.assertRaises(urllib.error.URLError) as e: - urlopen('ftp://localhost/a/file/which/doesnot/exists.py') + urllib.request.urlopen('ftp://localhost/a/file/which/doesnot/exists.py') self.assertFalse(e.exception.filename) self.assertTrue(e.exception.reason) - @patch.object(urllib.request, 'MAXFTPCACHE', 0) - def test_ftp_cache_pruning(self): - self.fakeftp() - try: - urllib.request.ftpcache['test'] = urllib.request.ftpwrapper('user', 'pass', 'localhost', 21, []) - urlopen('ftp://localhost') - finally: - self.unfakeftp() - - def test_userpass_inurl(self): - self.fakehttp(b"HTTP/1.0 200 OK\r\n\r\nHello!") - try: - fp = urlopen("http://user:pass@python.org/") - self.assertEqual(fp.readline(), b"Hello!") - self.assertEqual(fp.readline(), b"") - self.assertEqual(fp.geturl(), 'http://user:pass@python.org/') - self.assertEqual(fp.getcode(), 200) - finally: - self.unfakehttp() - - def test_userpass_inurl_w_spaces(self): - self.fakehttp(b"HTTP/1.0 200 OK\r\n\r\nHello!") - try: - userpass = "a b:c d" - url = "http://{}@python.org/".format(userpass) - fakehttp_wrapper = http.client.HTTPConnection - authorization = ("Authorization: Basic %s\r\n" % - b64encode(userpass.encode("ASCII")).decode("ASCII")) - fp = urlopen(url) - # The authorization header must be in place - self.assertIn(authorization, fakehttp_wrapper.buf.decode("UTF-8")) - self.assertEqual(fp.readline(), b"Hello!") - self.assertEqual(fp.readline(), b"") - # the spaces are quoted in URL so no match - self.assertNotEqual(fp.geturl(), url) - self.assertEqual(fp.getcode(), 200) - finally: - self.unfakehttp() - - def test_URLopener_deprecation(self): - with warnings_helper.check_warnings(('',DeprecationWarning)): - urllib.request.URLopener() - class urlopen_DataTests(unittest.TestCase): """Test urlopen() opening a data URL.""" @@ -1562,56 +1482,6 @@ def test_thishost(self): self.assertIsInstance(urllib.request.thishost(), tuple) -class URLopener_Tests(FakeHTTPMixin, unittest.TestCase): - """Testcase to test the open method of URLopener class.""" - - def test_quoted_open(self): - class DummyURLopener(urllib.request.URLopener): - def open_spam(self, url): - return url - with warnings_helper.check_warnings( - ('DummyURLopener style of invoking requests is deprecated.', - DeprecationWarning)): - self.assertEqual(DummyURLopener().open( - 'spam://example/ /'),'//example/%20/') - - # test the safe characters are not quoted by urlopen - self.assertEqual(DummyURLopener().open( - "spam://c:|windows%/:=&?~#+!$,;'@()*[]|/path/"), - "//c:|windows%/:=&?~#+!$,;'@()*[]|/path/") - - @warnings_helper.ignore_warnings(category=DeprecationWarning) - def test_urlopener_retrieve_file(self): - with os_helper.temp_dir() as tmpdir: - fd, tmpfile = tempfile.mkstemp(dir=tmpdir) - os.close(fd) - fileurl = "file:" + urllib.request.pathname2url(tmpfile) - filename, _ = urllib.request.URLopener().retrieve(fileurl) - # Some buildbots have TEMP folder that uses a lowercase drive letter. - self.assertEqual(os.path.normcase(filename), os.path.normcase(tmpfile)) - - @warnings_helper.ignore_warnings(category=DeprecationWarning) - def test_urlopener_retrieve_remote(self): - url = "http://www.python.org/file.txt" - self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!") - self.addCleanup(self.unfakehttp) - filename, _ = urllib.request.URLopener().retrieve(url) - self.assertEqual(os.path.splitext(filename)[1], ".txt") - - @warnings_helper.ignore_warnings(category=DeprecationWarning) - def test_local_file_open(self): - # bpo-35907, CVE-2019-9948: urllib must reject local_file:// scheme - class DummyURLopener(urllib.request.URLopener): - def open_local_file(self, url): - return url - for url in ('local_file://example', 'local-file://example'): - self.assertRaises(OSError, urllib.request.urlopen, url) - self.assertRaises(OSError, urllib.request.URLopener().open, url) - self.assertRaises(OSError, urllib.request.URLopener().retrieve, url) - self.assertRaises(OSError, DummyURLopener().open, url) - self.assertRaises(OSError, DummyURLopener().retrieve, url) - - class RequestTests(unittest.TestCase): """Unit tests for urllib.request.Request.""" diff --git a/Lib/test/test_urllibnet.py b/Lib/test/test_urllibnet.py index 49a3b5afdebb2f..f824dddf711761 100644 --- a/Lib/test/test_urllibnet.py +++ b/Lib/test/test_urllibnet.py @@ -5,6 +5,7 @@ import contextlib import socket +import urllib.error import urllib.parse import urllib.request import os @@ -101,13 +102,10 @@ def test_getcode(self): # test getcode() with the fancy opener to get 404 error codes URL = self.url + "XXXinvalidXXX" with socket_helper.transient_internet(URL): - with self.assertWarns(DeprecationWarning): - open_url = urllib.request.FancyURLopener().open(URL) - try: - code = open_url.getcode() - finally: - open_url.close() - self.assertEqual(code, 404) + with self.assertRaises(urllib.error.URLError) as e: + with urllib.request.urlopen(URL): + pass + self.assertEqual(e.exception.code, 404) @support.requires_resource('walltime') def test_bad_address(self): diff --git a/Lib/urllib/request.py b/Lib/urllib/request.py index bc35d8a80e5d03..46eb7f76307121 100644 --- a/Lib/urllib/request.py +++ b/Lib/urllib/request.py @@ -102,7 +102,7 @@ from urllib.parse import ( urlparse, urlsplit, urljoin, unwrap, quote, unquote, _splittype, _splithost, _splitport, _splituser, _splitpasswd, - _splitattr, _splitquery, _splitvalue, _splittag, _to_bytes, + _splitattr, _splitvalue, _splittag, unquote_to_bytes, urlunparse) from urllib.response import addinfourl, addclosehook @@ -128,7 +128,7 @@ 'urlopen', 'install_opener', 'build_opener', 'pathname2url', 'url2pathname', 'getproxies', # Legacy interface - 'urlretrieve', 'urlcleanup', 'URLopener', 'FancyURLopener', + 'urlretrieve', 'urlcleanup', ] # used in User-Agent header sent @@ -165,8 +165,7 @@ def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, the reason phrase returned by the server --- instead of the response headers as it is specified in the documentation for HTTPResponse. - For FTP, file, and data URLs and requests explicitly handled by legacy - URLopener and FancyURLopener classes, this function returns a + For FTP, file, and data URLs, this function returns a urllib.response.addinfourl object. Note that None may be returned if no handler handles the request (though @@ -1495,7 +1494,7 @@ def open_local_file(self, req): origurl = 'file://' + filename return addinfourl(open(localfile, 'rb'), headers, origurl) except OSError as exp: - raise URLError(exp) + raise URLError(exp, exp.filename) raise URLError('file not on local host') def _safe_gethostbyname(host): @@ -1647,8 +1646,6 @@ def data_open(self, req): # Code move from the old urllib module -MAXFTPCACHE = 10 # Trim the ftp cache beyond this size - # Helper for non-unix systems if os.name == 'nt': from nturl2path import url2pathname, pathname2url @@ -1664,678 +1661,6 @@ def pathname2url(pathname): return quote(pathname) -ftpcache = {} - - -class URLopener: - """Class to open URLs. - This is a class rather than just a subroutine because we may need - more than one set of global protocol-specific options. - Note -- this is a base class for those who don't want the - automatic handling of errors type 302 (relocated) and 401 - (authorization needed).""" - - __tempfiles = None - - version = "Python-urllib/%s" % __version__ - - # Constructor - def __init__(self, proxies=None, **x509): - msg = "%(class)s style of invoking requests is deprecated. " \ - "Use newer urlopen functions/methods" % {'class': self.__class__.__name__} - warnings.warn(msg, DeprecationWarning, stacklevel=3) - if proxies is None: - proxies = getproxies() - assert hasattr(proxies, 'keys'), "proxies must be a mapping" - self.proxies = proxies - self.key_file = x509.get('key_file') - self.cert_file = x509.get('cert_file') - self.addheaders = [('User-Agent', self.version), ('Accept', '*/*')] - self.__tempfiles = [] - self.__unlink = os.unlink # See cleanup() - self.tempcache = None - # Undocumented feature: if you assign {} to tempcache, - # it is used to cache files retrieved with - # self.retrieve(). This is not enabled by default - # since it does not work for changing documents (and I - # haven't got the logic to check expiration headers - # yet). - self.ftpcache = ftpcache - # Undocumented feature: you can use a different - # ftp cache by assigning to the .ftpcache member; - # in case you want logically independent URL openers - # XXX This is not threadsafe. Bah. - - def __del__(self): - self.close() - - def close(self): - self.cleanup() - - def cleanup(self): - # This code sometimes runs when the rest of this module - # has already been deleted, so it can't use any globals - # or import anything. - if self.__tempfiles: - for file in self.__tempfiles: - try: - self.__unlink(file) - except OSError: - pass - del self.__tempfiles[:] - if self.tempcache: - self.tempcache.clear() - - def addheader(self, *args): - """Add a header to be used by the HTTP interface only - e.g. u.addheader('Accept', 'sound/basic')""" - self.addheaders.append(args) - - # External interface - def open(self, fullurl, data=None): - """Use URLopener().open(file) instead of open(file, 'r').""" - fullurl = unwrap(_to_bytes(fullurl)) - fullurl = quote(fullurl, safe="%/:=&?~#+!$,;'@()*[]|") - if self.tempcache and fullurl in self.tempcache: - filename, headers = self.tempcache[fullurl] - fp = open(filename, 'rb') - return addinfourl(fp, headers, fullurl) - urltype, url = _splittype(fullurl) - if not urltype: - urltype = 'file' - if urltype in self.proxies: - proxy = self.proxies[urltype] - urltype, proxyhost = _splittype(proxy) - host, selector = _splithost(proxyhost) - url = (host, fullurl) # Signal special case to open_*() - else: - proxy = None - name = 'open_' + urltype - self.type = urltype - name = name.replace('-', '_') - if not hasattr(self, name) or name == 'open_local_file': - if proxy: - return self.open_unknown_proxy(proxy, fullurl, data) - else: - return self.open_unknown(fullurl, data) - try: - if data is None: - return getattr(self, name)(url) - else: - return getattr(self, name)(url, data) - except (HTTPError, URLError): - raise - except OSError as msg: - raise OSError('socket error', msg) from msg - - def open_unknown(self, fullurl, data=None): - """Overridable interface to open unknown URL type.""" - type, url = _splittype(fullurl) - raise OSError('url error', 'unknown url type', type) - - def open_unknown_proxy(self, proxy, fullurl, data=None): - """Overridable interface to open unknown URL type.""" - type, url = _splittype(fullurl) - raise OSError('url error', 'invalid proxy for %s' % type, proxy) - - # External interface - def retrieve(self, url, filename=None, reporthook=None, data=None): - """retrieve(url) returns (filename, headers) for a local object - or (tempfilename, headers) for a remote object.""" - url = unwrap(_to_bytes(url)) - if self.tempcache and url in self.tempcache: - return self.tempcache[url] - type, url1 = _splittype(url) - if filename is None and (not type or type == 'file'): - try: - fp = self.open_local_file(url1) - hdrs = fp.info() - fp.close() - return url2pathname(_splithost(url1)[1]), hdrs - except OSError: - pass - fp = self.open(url, data) - try: - headers = fp.info() - if filename: - tfp = open(filename, 'wb') - else: - garbage, path = _splittype(url) - garbage, path = _splithost(path or "") - path, garbage = _splitquery(path or "") - path, garbage = _splitattr(path or "") - suffix = os.path.splitext(path)[1] - (fd, filename) = tempfile.mkstemp(suffix) - self.__tempfiles.append(filename) - tfp = os.fdopen(fd, 'wb') - try: - result = filename, headers - if self.tempcache is not None: - self.tempcache[url] = result - bs = 1024*8 - size = -1 - read = 0 - blocknum = 0 - if "content-length" in headers: - size = int(headers["Content-Length"]) - if reporthook: - reporthook(blocknum, bs, size) - while block := fp.read(bs): - read += len(block) - tfp.write(block) - blocknum += 1 - if reporthook: - reporthook(blocknum, bs, size) - finally: - tfp.close() - finally: - fp.close() - - # raise exception if actual size does not match content-length header - if size >= 0 and read < size: - raise ContentTooShortError( - "retrieval incomplete: got only %i out of %i bytes" - % (read, size), result) - - return result - - # Each method named open_Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.
Alternative Proxies: