From 4f3d6dc3e00284457e1894e63d1b7054c0a34c7b Mon Sep 17 00:00:00 2001 From: Danny Lin Date: Mon, 23 Jun 2025 19:37:11 +0800 Subject: [PATCH 1/2] Add `copy()` to `ZipFile` --- Doc/library/zipfile.rst | 29 ++ Lib/test/test_zipfile/test_core.py | 295 +++++++++++++++++- Lib/zipfile/__init__.py | 69 +++- ...5-05-24-11-17-34.gh-issue-51067.yHOgfy.rst | 3 +- 4 files changed, 384 insertions(+), 12 deletions(-) diff --git a/Doc/library/zipfile.rst b/Doc/library/zipfile.rst index 98d2a5e5cdf00e2..410377994874530 100644 --- a/Doc/library/zipfile.rst +++ b/Doc/library/zipfile.rst @@ -550,6 +550,35 @@ ZipFile objects .. versionadded:: 3.11 +.. method:: ZipFile.copy(zinfo_or_arcname, new_arcname[, chunk_size]) + + Copies a member *zinfo_or_arcname* to *new_arcname* in the archive. + *zinfo_or_arcname* may be the full path of the member or a :class:`ZipInfo` + instance. + + *chunk_size* may be specified to control the buffer size when copying + entry data (default is 1 MiB). + + The archive must be opened with mode ``'w'``, ``'x'`` or ``'a'``, and the + underlying stream must be seekable. + + Returns the original version of the copied :class:`ZipInfo` instance. + + Calling :meth:`copy` on a closed ZipFile will raise a :exc:`ValueError`. + + .. note:: + Renaming a member in a ZIP file requires rewriting its data, as the + filename is stored within its local file entry. + + To rename a member and reclaim the space occupied by the old entry, + combine :meth:`copy`, :meth:`remove`, and :meth:`repack` like:: + + with ZipFile('spam.zip', 'a') as myzip: + myzip.repack([myzip.remove(myzip.copy('old.txt', 'new.txt'))]) + + .. versionadded:: next + + .. method:: ZipFile.remove(zinfo_or_arcname) Removes a member entry from the archive's central directory. diff --git a/Lib/test/test_zipfile/test_core.py b/Lib/test/test_zipfile/test_core.py index c0f3efaf0921da8..f4a2e5dba458e4f 100644 --- a/Lib/test/test_zipfile/test_core.py +++ b/Lib/test/test_zipfile/test_core.py @@ -1482,6 +1482,289 @@ def _prepare_zip_from_test_files(cls, zfname, test_files, force_zip64=False): fh.write(data) return list(zh.infolist()) +class AbstractCopyTests(RepackHelperMixin): + @classmethod + def setUpClass(cls): + cls.test_files = cls._prepare_test_files() + + def tearDown(self): + unlink(TESTFN) + + def test_copy_by_name(self): + for i in range(3): + with self.subTest(i=i, filename=self.test_files[i][0]): + zinfos = self._prepare_zip_from_test_files(TESTFN, self.test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + zi_new = { + **comparable_zinfo(zinfos[i]), + 'filename': 'file.txt', + 'orig_filename': 'file.txt', + 'header_offset': zh.start_dir, + } + zh.copy(self.test_files[i][0], 'file.txt') + + # check infolist + self.assertEqual( + [comparable_zinfo(zi) for zi in zh.infolist()], + [*(comparable_zinfo(zi) for zi in zinfos), zi_new], + ) + + # check NameToInfo cache + self.assertEqual(comparable_zinfo(zh.getinfo('file.txt')), zi_new) + + # check content + self.assertEqual( + zh.read(zi_new['filename']), + zh.read(zinfos[i].filename), + ) + + # make sure the zip file is still valid + with zipfile.ZipFile(TESTFN) as zh: + self.assertIsNone(zh.testzip()) + + def test_copy_by_zinfo(self): + for i in range(3): + with self.subTest(i=i, filename=self.test_files[i][0]): + zinfos = self._prepare_zip_from_test_files(TESTFN, self.test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + zi_new = { + **comparable_zinfo(zinfos[i]), + 'filename': 'file.txt', + 'orig_filename': 'file.txt', + 'header_offset': zh.start_dir, + } + zh.copy(zh.infolist()[i], 'file.txt') + + # check infolist + self.assertEqual( + [comparable_zinfo(zi) for zi in zh.infolist()], + [*(comparable_zinfo(zi) for zi in zinfos), zi_new], + ) + + # check NameToInfo cache + self.assertEqual(comparable_zinfo(zh.getinfo('file.txt')), zi_new) + + # check content + self.assertEqual( + zh.read(zi_new['filename']), + zh.read(zinfos[i].filename), + ) + + # make sure the zip file is still valid + with zipfile.ZipFile(TESTFN) as zh: + self.assertIsNone(zh.testzip()) + + def test_copy_zip64(self): + for i in range(3): + with self.subTest(i=i, filename=self.test_files[i][0]): + zinfos = self._prepare_zip_from_test_files(TESTFN, self.test_files, force_zip64=True) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + zi_new = { + **comparable_zinfo(zinfos[i]), + 'filename': 'file.txt', + 'orig_filename': 'file.txt', + 'header_offset': zh.start_dir, + } + zh.copy(self.test_files[i][0], 'file.txt') + + # check infolist + self.assertEqual( + [comparable_zinfo(zi) for zi in zh.infolist()], + [*(comparable_zinfo(zi) for zi in zinfos), zi_new], + ) + + # check NameToInfo cache + self.assertEqual(comparable_zinfo(zh.getinfo('file.txt')), zi_new) + + # check content + self.assertEqual( + zh.read(zi_new['filename']), + zh.read(zinfos[i].filename), + ) + + # make sure the zip file is still valid + with zipfile.ZipFile(TESTFN) as zh: + self.assertIsNone(zh.testzip()) + + def test_copy_data_descriptor(self): + for i in range(3): + with self.subTest(i=i, filename=self.test_files[i][0]): + with open(TESTFN, 'wb') as fh: + zinfos = self._prepare_zip_from_test_files(Unseekable(fh), self.test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + zi_new = { + **comparable_zinfo(zinfos[i]), + 'filename': 'file.txt', + 'orig_filename': 'file.txt', + 'header_offset': zh.start_dir, + } + zh.copy(self.test_files[i][0], 'file.txt') + + # check infolist + self.assertEqual( + [comparable_zinfo(zi) for zi in zh.infolist()], + [*(comparable_zinfo(zi) for zi in zinfos), zi_new], + ) + + # check NameToInfo cache + self.assertEqual(comparable_zinfo(zh.getinfo('file.txt')), zi_new) + + # check content + self.assertEqual( + zh.read(zi_new['filename']), + zh.read(zinfos[i].filename), + ) + + # make sure the zip file is still valid + with zipfile.ZipFile(TESTFN) as zh: + self.assertIsNone(zh.testzip()) + + def test_copy_target_exist(self): + for i in (1,): + with self.subTest(i=i, filename=self.test_files[i][0]): + zinfos = self._prepare_zip_from_test_files(TESTFN, self.test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + zi_new = { + **comparable_zinfo(zinfos[i]), + 'filename': 'file2.txt', + 'orig_filename': 'file2.txt', + 'header_offset': zh.start_dir, + } + zh.copy(self.test_files[i][0], 'file2.txt') + + # check infolist + self.assertEqual( + [comparable_zinfo(zi) for zi in zh.infolist()], + [*(comparable_zinfo(zi) for zi in zinfos), zi_new], + ) + + # check NameToInfo cache + self.assertEqual(comparable_zinfo(zh.getinfo('file2.txt')), zi_new) + + # check content + self.assertEqual( + zh.read(zi_new['filename']), + zh.read(zinfos[i].filename), + ) + + # make sure the zip file is still valid + with zipfile.ZipFile(TESTFN) as zh: + self.assertIsNone(zh.testzip()) + + @mock.patch.object(zipfile, '_ZipRepacker') + def test_copy_closed(self, m_repack): + self._prepare_zip_from_test_files(TESTFN, self.test_files) + with zipfile.ZipFile(TESTFN, 'a') as zh: + zh.close() + with self.assertRaises(ValueError): + zh.copy(self.test_files[0][0], 'file.txt') + m_repack.assert_not_called() + + @mock.patch.object(zipfile, '_ZipRepacker') + def test_copy_writing(self, m_repack): + self._prepare_zip_from_test_files(TESTFN, self.test_files) + with zipfile.ZipFile(TESTFN, 'a') as zh: + with zh.open('newfile.txt', 'w'): + with self.assertRaises(ValueError): + zh.copy(self.test_files[0][0], 'file.txt') + m_repack.assert_not_called() + + @mock.patch.object(zipfile, '_ZipRepacker') + def test_copy_unseekble(self, m_repack): + with open(TESTFN, 'wb') as fh: + with zipfile.ZipFile(Unseekable(fh), 'w') as zh: + for file, data in self.test_files: + zh.writestr(file, data) + + with self.assertRaises(io.UnsupportedOperation): + zh.copy(zh.infolist()[0], 'file.txt') + m_repack.assert_not_called() + + def test_copy_mode_w(self): + with zipfile.ZipFile(TESTFN, 'w') as zh: + for file, data in self.test_files: + zh.writestr(file, data) + zinfos = list(zh.infolist()) + + zi_new = { + **comparable_zinfo(zinfos[0]), + 'filename': 'file.txt', + 'orig_filename': 'file.txt', + 'header_offset': zh.start_dir, + } + zh.copy(zh.infolist()[0], 'file.txt') + + # check infolist + self.assertEqual( + [comparable_zinfo(zi) for zi in zh.infolist()], + [*(comparable_zinfo(zi) for zi in zinfos), zi_new], + ) + + # check NameToInfo cache + self.assertEqual(comparable_zinfo(zh.getinfo('file.txt')), zi_new) + + # check content + self.assertEqual( + zh.read(zi_new['filename']), + zh.read(zinfos[0].filename), + ) + + # make sure the zip file is still valid + with zipfile.ZipFile(TESTFN) as zh: + self.assertIsNone(zh.testzip()) + + def test_copy_mode_x(self): + with zipfile.ZipFile(TESTFN, 'x') as zh: + for file, data in self.test_files: + zh.writestr(file, data) + zinfos = list(zh.infolist()) + + zi_new = { + **comparable_zinfo(zinfos[0]), + 'filename': 'file.txt', + 'orig_filename': 'file.txt', + 'header_offset': zh.start_dir, + } + zh.copy(zh.infolist()[0], 'file.txt') + + # check infolist + self.assertEqual( + [comparable_zinfo(zi) for zi in zh.infolist()], + [*(comparable_zinfo(zi) for zi in zinfos), zi_new], + ) + + # check NameToInfo cache + self.assertEqual(comparable_zinfo(zh.getinfo('file.txt')), zi_new) + + # check content + self.assertEqual( + zh.read(zi_new['filename']), + zh.read(zinfos[0].filename), + ) + + # make sure the zip file is still valid + with zipfile.ZipFile(TESTFN) as zh: + self.assertIsNone(zh.testzip()) + +class StoredCopyTests(AbstractCopyTests, unittest.TestCase): + compression = zipfile.ZIP_STORED + +@requires_zlib() +class DeflateCopyTests(AbstractCopyTests, unittest.TestCase): + compression = zipfile.ZIP_DEFLATED + +@requires_bz2() +class Bzip2CopyTests(AbstractCopyTests, unittest.TestCase): + compression = zipfile.ZIP_BZIP2 + +@requires_lzma() +class LzmaCopyTests(AbstractCopyTests, unittest.TestCase): + compression = zipfile.ZIP_LZMA + +@requires_zstd() +class ZstdCopyTests(AbstractCopyTests, unittest.TestCase): + compression = zipfile.ZIP_ZSTANDARD + class AbstractRemoveTests(RepackHelperMixin): @classmethod def setUpClass(cls): @@ -3432,7 +3715,7 @@ def test_calc_local_file_entry_size(self): self.assertEqual( repacker._calc_local_file_entry_size(fz, zi), - 43, + (30, 8, 0, 5, 0), ) # data descriptor @@ -3444,7 +3727,7 @@ def test_calc_local_file_entry_size(self): self.assertEqual( repacker._calc_local_file_entry_size(fz, zi), - 59, + (30, 8, 0, 5, 16), ) # data descriptor (unsigned) @@ -3457,7 +3740,7 @@ def test_calc_local_file_entry_size(self): self.assertEqual( repacker._calc_local_file_entry_size(fz, zi), - 55, + (30, 8, 0, 5, 12), ) def test_calc_local_file_entry_size_zip64(self): @@ -3472,7 +3755,7 @@ def test_calc_local_file_entry_size_zip64(self): self.assertEqual( repacker._calc_local_file_entry_size(fz, zi), - 63, + (30, 8, 20, 5, 0), ) # data descriptor + zip64 @@ -3484,7 +3767,7 @@ def test_calc_local_file_entry_size_zip64(self): self.assertEqual( repacker._calc_local_file_entry_size(fz, zi), - 87, + (30, 8, 20, 5, 24), ) # data descriptor (unsigned) + zip64 @@ -3497,7 +3780,7 @@ def test_calc_local_file_entry_size_zip64(self): self.assertEqual( repacker._calc_local_file_entry_size(fz, zi), - 83, + (30, 8, 20, 5, 20), ) def test_copy_bytes(self): diff --git a/Lib/zipfile/__init__.py b/Lib/zipfile/__init__.py index 084a47518a935cc..a87986cc47eb324 100644 --- a/Lib/zipfile/__init__.py +++ b/Lib/zipfile/__init__.py @@ -4,6 +4,7 @@ XXX references to utf-8 need further investigation. """ import binascii +import copy import io import os import shutil @@ -1409,6 +1410,31 @@ def _debug(self, level, *msg): if self.debug >= level: print(*msg) + def copy(self, zfile, zinfo, filename): + # make a copy of zinfo + zinfo2 = copy.copy(zinfo) + + # apply sanitized new filename as in `ZipInfo.__init__` + zinfo2.orig_filename = filename + zinfo2.filename = _sanitize_filename(filename) + + zinfo2.header_offset = zfile.start_dir + zinfo2._end_offset = None + + # write to a new local file header + fp = zfile.fp + sizes = self._calc_local_file_entry_size(fp, zinfo) + fp.seek(zinfo2.header_offset) + fp.write(zinfo2.FileHeader()) + self._copy_bytes(fp, zinfo.header_offset + sum(sizes[:3]), fp.tell(), sum(sizes[3:])) + zfile.start_dir = fp.tell() + + # add to filelist + zfile.filelist.append(zinfo2) + zfile.NameToInfo[zinfo2.filename] = zinfo2 + + zfile._didModify = True + def repack(self, zfile, removed=None): """ Repack the ZIP file, stripping unreferenced local file entries. @@ -1508,7 +1534,7 @@ def repack(self, zfile, removed=None): entry_size = offset - zinfo.header_offset # may raise on an invalid local file header - used_entry_size = self._calc_local_file_entry_size(fp, zinfo) + used_entry_size = sum(self._calc_local_file_entry_size(fp, zinfo)) self._debug(3, 'entry:', i, zinfo.orig_filename, zinfo.header_offset, entry_size, used_entry_size) @@ -1842,10 +1868,11 @@ def _calc_local_file_entry_size(self, fp, zinfo): dd_size = 0 return ( - sizeFileHeader + - fheader[_FH_FILENAME_LENGTH] + fheader[_FH_EXTRA_FIELD_LENGTH] + - zinfo.compress_size + - dd_size + sizeFileHeader, + fheader[_FH_FILENAME_LENGTH], + fheader[_FH_EXTRA_FIELD_LENGTH], + zinfo.compress_size, + dd_size, ) def _copy_bytes(self, fp, old_offset, new_offset, size): @@ -2350,6 +2377,38 @@ def extractall(self, path=None, members=None, pwd=None): for zipinfo in members: self._extract_member(zipinfo, path, pwd) + def copy(self, zinfo_or_arcname, filename, *, chunk_size=_REPACK_CHUNK_SIZE): + """Copy a member in the archive.""" + if self.mode not in ('w', 'x', 'a'): + raise ValueError("copy() requires mode 'w', 'x', or 'a'") + if not self.fp: + raise ValueError( + "Attempt to write to ZIP archive that was already closed") + if self._writing: + raise ValueError( + "Can't write to ZIP archive while an open writing handle exists." + ) + if not self._seekable: + raise io.UnsupportedOperation("copy() requires a seekable stream.") + + with self._lock: + # get the zinfo + # raise KeyError if arcname does not exist + if isinstance(zinfo_or_arcname, ZipInfo): + zinfo = zinfo_or_arcname + if zinfo not in self.filelist: + raise KeyError('There is no item %r in the archive' % zinfo) + else: + zinfo = self.getinfo(zinfo_or_arcname) + + self._writing = True + try: + _ZipRepacker(chunk_size=chunk_size).copy(self, zinfo, filename) + finally: + self._writing = False + + return zinfo + def remove(self, zinfo_or_arcname): """Remove a member from the archive.""" if self.mode not in ('w', 'x', 'a'): diff --git a/Misc/NEWS.d/next/Library/2025-05-24-11-17-34.gh-issue-51067.yHOgfy.rst b/Misc/NEWS.d/next/Library/2025-05-24-11-17-34.gh-issue-51067.yHOgfy.rst index 204213c74de5dfd..ea646a6072872e9 100644 --- a/Misc/NEWS.d/next/Library/2025-05-24-11-17-34.gh-issue-51067.yHOgfy.rst +++ b/Misc/NEWS.d/next/Library/2025-05-24-11-17-34.gh-issue-51067.yHOgfy.rst @@ -1 +1,2 @@ -Add :meth:`~zipfile.ZipFile.remove` and :meth:`~zipfile.ZipFile.repack` to :class:`~zipfile.ZipFile`. +Add :meth:`~zipfile.ZipFile.remove`, :meth:`~zipfile.ZipFile.repack`, and +:meth:`~zipfile.ZipFile.copy` to :class:`~zipfile.ZipFile`. From 544f13cdd997022010e108f6e1bd310220eaeaa5 Mon Sep 17 00:00:00 2001 From: Danny Lin Date: Sun, 21 Jun 2026 15:33:41 +0800 Subject: [PATCH 2/2] Add whatsnew entry --- Doc/whatsnew/3.16.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Doc/whatsnew/3.16.rst b/Doc/whatsnew/3.16.rst index ec8e367d938ddb9..38a8872656b94f0 100644 --- a/Doc/whatsnew/3.16.rst +++ b/Doc/whatsnew/3.16.rst @@ -181,6 +181,8 @@ zipfile by the local file entries of removed members. (Contributed by Danny Lin in :gh:`51067`.) +* Add :meth:`ZipFile.copy() ` to copy a member. + .. Add improved modules above alphabetically, not here at the end. Optimizations