diff --git a/CIME/tests/test_unit_copy_over_file.py b/CIME/tests/test_unit_copy_over_file.py new file mode 100644 index 00000000000..89284606de2 --- /dev/null +++ b/CIME/tests/test_unit_copy_over_file.py @@ -0,0 +1,174 @@ +import os +import stat +import shutil +import tempfile +import unittest +from unittest.mock import patch + +from CIME.utils import copy_over_file + + +class TestCopyOverFile(unittest.TestCase): + """Unit tests for copy_over_file.""" + + SRC_CONTENT = "source content" + OLD_CONTENT = "old content" + + def setUp(self): + self._workdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self._workdir, ignore_errors=True) + + def _make_file(self, name, content, mode=None): + path = os.path.join(self._workdir, name) + with open(path, "w", encoding="utf8") as f: + f.write(content) + if mode is not None: + os.chmod(path, mode) + return path + + def _read(self, path): + with open(path, "r", encoding="utf8") as f: + return f.read() + + def test_copies_content(self): + """Content is copied correctly (default preserve_meta).""" + src = self._make_file("src.txt", self.SRC_CONTENT) + tgt = self._make_file("tgt.txt", self.OLD_CONTENT) + + copy_over_file(src, tgt) + + self.assertEqual(self._read(tgt), self.SRC_CONTENT) + + def test_owned_target_preserve_meta_true_copies_permissions(self): + """With preserve_meta=True and owned target, permissions are copied from src.""" + src = self._make_file("src.txt", self.SRC_CONTENT, mode=0o644) + tgt = self._make_file("tgt.txt", self.OLD_CONTENT, mode=0o600) + + copy_over_file(src, tgt, preserve_meta=True) + + self.assertEqual(self._read(tgt), self.SRC_CONTENT) + tgt_mode = stat.S_IMODE(os.stat(tgt).st_mode) + src_mode = stat.S_IMODE(os.stat(src).st_mode) + self.assertEqual(tgt_mode, src_mode) + + def test_readonly_target_owned_overwritten(self): + """A read-only owned target is made writable and then overwritten.""" + src = self._make_file("src.txt", self.SRC_CONTENT) + tgt = self._make_file("tgt.txt", self.OLD_CONTENT, mode=0o444) + + copy_over_file(src, tgt, preserve_meta=True) + + self.assertEqual(self._read(tgt), self.SRC_CONTENT) + + def test_readonly_target_not_owned_raises(self): + """A read-only target that we don't own raises OSError.""" + src = self._make_file("src.txt", self.SRC_CONTENT) + tgt = self._make_file("tgt.txt", self.OLD_CONTENT, mode=0o444) + + # Simulate being a different user (not the file owner) who cannot write to + # the read-only target. We patch both os.getuid (so owner_uid != getuid()) + # and os.access (so the file appears non-writable regardless of whether the + # test runner is root, which would otherwise make os.access return True for + # any file and prevent the OSError path from being reached). + real_uid = os.getuid() + with patch("CIME.utils.os.getuid", return_value=real_uid + 1): + with patch("CIME.utils.os.access", return_value=False): + with self.assertRaises(OSError): + copy_over_file(src, tgt, preserve_meta=True) + + def test_not_owner_content_only(self): + """A non-owned writable target gets content-only copy (no metadata transfer).""" + src = self._make_file("src.txt", self.SRC_CONTENT, mode=0o644) + tgt = self._make_file("tgt.txt", self.OLD_CONTENT, mode=0o666) + + real_stat = os.stat(tgt) + # Build a real os.stat_result with a different uid so copy_over_file thinks + # we are not the owner. Using os.stat_result preserves st_ino/st_dev so that + # shutil.copyfile's internal _samefile check keeps working. + fake_uid = os.getuid() + 1 + fake_stat_result = os.stat_result( + ( + real_stat.st_mode, + real_stat.st_ino, + real_stat.st_dev, + real_stat.st_nlink, + fake_uid, + real_stat.st_gid, + real_stat.st_size, + real_stat.st_atime, + real_stat.st_mtime, + real_stat.st_ctime, + ) + ) + + real_os_stat = os.stat + + def fake_stat_fn(path, *args, **kwargs): + if path == tgt: + return fake_stat_result + return real_os_stat(path, *args, **kwargs) + + with patch("CIME.utils.os.stat", side_effect=fake_stat_fn): + # make target appear writable so we don't hit the read-only branch + with patch("CIME.utils.os.access", return_value=True): + copy_over_file(src, tgt, preserve_meta=True) + + self.assertEqual(self._read(tgt), self.SRC_CONTENT) + # Permissions should NOT have been copied from src (0o644) to tgt (0o666) + tgt_mode = stat.S_IMODE(os.stat(tgt).st_mode) + src_mode = stat.S_IMODE(os.stat(src).st_mode) + self.assertNotEqual(tgt_mode, src_mode) + self.assertEqual(tgt_mode, stat.S_IMODE(real_stat.st_mode)) + + def test_owned_target_preserve_meta_false_does_not_copy_permissions(self): + """With preserve_meta=False and owned target, permissions are NOT copied from src. + Instead, use caller's umask. + """ + # Force a known umask so the post-copy file mode is deterministic and different from the + # original permissions of both source and target. + prev_umask = os.umask(0o022) + try: + # src has restrictive 0o600 permissions; tgt has broader 0o644 + src = self._make_file("src.txt", self.SRC_CONTENT, mode=0o600) + tgt = self._make_file("tgt.txt", self.OLD_CONTENT, mode=0o644) + tgt_mode_orig = stat.S_IMODE(os.stat(tgt).st_mode) + copy_over_file(src, tgt, preserve_meta=False) + # Content must be copied + self.assertEqual(self._read(tgt), self.SRC_CONTENT) + # Permissions must NOT have been taken from src (0o600) + tgt_mode = stat.S_IMODE(os.stat(tgt).st_mode) + src_mode = stat.S_IMODE(os.stat(src).st_mode) + self.assertNotEqual( + tgt_mode, + src_mode, + f"preserve_meta=False should not copy permissions; " + f"src={oct(src_mode)}, tgt={oct(tgt_mode)}", + ) + self.assertEqual( + tgt_mode, + tgt_mode_orig, + ) + finally: + os.umask(prev_umask) + + def test_readonly_owned_target_preserve_meta_false_does_not_copy_permissions(self): + """Read-only owned target with preserve_meta=False: made writable, content copied, + permissions NOT transferred from src. + """ + src = self._make_file("src.txt", self.SRC_CONTENT, mode=0o600) + tgt = self._make_file("tgt.txt", self.OLD_CONTENT, mode=0o444) + + copy_over_file(src, tgt, preserve_meta=False) + + self.assertEqual(self._read(tgt), self.SRC_CONTENT) + + tgt_mode = stat.S_IMODE(os.stat(tgt).st_mode) + src_mode = stat.S_IMODE(os.stat(src).st_mode) + self.assertNotEqual( + tgt_mode, + src_mode, + f"preserve_meta=False should not copy permissions; " + f"src={oct(src_mode)}, tgt={oct(tgt_mode)}", + ) diff --git a/CIME/tests/test_unit_safe_copy.py b/CIME/tests/test_unit_safe_copy.py new file mode 100644 index 00000000000..00d56586222 --- /dev/null +++ b/CIME/tests/test_unit_safe_copy.py @@ -0,0 +1,461 @@ +import os +import stat +import shutil +import tempfile + +import unittest +from CIME.utils import safe_copy, SharedArea + + +class TestSafeCopy(unittest.TestCase): + # Test content constants + TEST_CONTENT = "test content" + NEW_CONTENT = "new content" + OLD_CONTENT = "old content" + CONTENT_1 = "content1" + CONTENT_2 = "content2" + ROOT_CONTENT = "root content" + NESTED_CONTENT = "nested content" + SCRIPT_CONTENT = "#!/bin/bash\necho test" + + # Permission constants + PERM_RW_R_R = 0o644 # rw-r--r-- (owner: read/write, group/others: read) + PERM_RW_ONLY = 0o600 # rw------- (owner: read/write only) + PERM_EXECUTABLE = 0o755 # rwxr-xr-x (owner: rwx, group/others: rx) + PERM_READONLY = 0o444 # r--r--r-- (all: read only) + + # Filename constants + SRC_FILE = "source.txt" + TGT_FILE = "target.txt" + SRC_DIR = "source_dir" + TGT_DIR = "target_dir" + SCRIPT_FILE = "script.sh" + SCRIPT_COPY = "script_copy.sh" + FILE_1 = "file1.txt" + FILE_2 = "file2.txt" + SINGLE_FILE = "file.txt" + SUBDIR = "subdir" + NESTED_FILE = "nested.txt" + + def setUp(self): + self._workdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self._workdir, ignore_errors=True) + + def _create_file(self, path, content): + """Helper method to create a file with given content""" + with open(path, "w", encoding="utf8") as f: + f.write(content) + + def _read_file(self, path): + """Helper method to read file content""" + with open(path, "r", encoding="utf8") as f: + return f.read() + + def test_safe_copy_basic_file(self): + """Test basic file copy to a new location""" + src_file = os.path.join(self._workdir, self.SRC_FILE) + tgt_file = os.path.join(self._workdir, self.TGT_FILE) + + self._create_file(src_file, self.TEST_CONTENT) + + safe_copy(src_file, tgt_file) + + self.assertTrue(os.path.isfile(tgt_file)) + self.assertEqual(self._read_file(tgt_file), self.TEST_CONTENT) + + def test_safe_copy_to_directory(self): + """Test copying a file to a directory (should use basename)""" + src_file = os.path.join(self._workdir, self.SRC_FILE) + tgt_dir = os.path.join(self._workdir, self.TGT_DIR) + os.makedirs(tgt_dir) + + self._create_file(src_file, self.TEST_CONTENT) + + safe_copy(src_file, tgt_dir) + + expected_file = os.path.join(tgt_dir, self.SRC_FILE) + self.assertTrue(os.path.isfile(expected_file)) + self.assertEqual(self._read_file(expected_file), self.TEST_CONTENT) + + def test_safe_copy_overwrite_existing(self): + """Test overwriting an existing file""" + src_file = os.path.join(self._workdir, self.SRC_FILE) + tgt_file = os.path.join(self._workdir, self.TGT_FILE) + + self._create_file(src_file, self.NEW_CONTENT) + self._create_file(tgt_file, self.OLD_CONTENT) + + safe_copy(src_file, tgt_file) + + self.assertEqual(self._read_file(tgt_file), self.NEW_CONTENT) + + def test_safe_copy_readonly_file(self): + """Test overwriting a read-only file (when we own it)""" + src_file = os.path.join(self._workdir, self.SRC_FILE) + tgt_file = os.path.join(self._workdir, self.TGT_FILE) + + self._create_file(src_file, self.NEW_CONTENT) + self._create_file(tgt_file, self.OLD_CONTENT) + + # Make target read-only + os.chmod(tgt_file, self.PERM_READONLY) + + safe_copy(src_file, tgt_file) + + self.assertEqual(self._read_file(tgt_file), self.NEW_CONTENT) + + def test_safe_copy_preserve_meta_true(self): + """Test that metadata is preserved when preserve_meta=True""" + src_file = os.path.join(self._workdir, self.SRC_FILE) + tgt_file = os.path.join(self._workdir, self.TGT_FILE) + + self._create_file(src_file, self.TEST_CONTENT) + + # Set specific permissions on source + os.chmod(src_file, self.PERM_RW_R_R) + src_stat = os.stat(src_file) + + safe_copy(src_file, tgt_file, preserve_meta=True) + + tgt_stat = os.stat(tgt_file) + # Check that permissions are preserved (masking out file type bits) + self.assertEqual( + oct(stat.S_IMODE(src_stat.st_mode)), oct(stat.S_IMODE(tgt_stat.st_mode)) + ) + + def test_safe_copy_preserve_meta_false(self): + """Test that metadata is not preserved when preserve_meta=False.""" + src_file = os.path.join(self._workdir, self.SRC_FILE) + tgt_file = os.path.join(self._workdir, self.TGT_FILE) + + self._create_file(src_file, self.TEST_CONTENT) + + # Set specific permissions on source + os.chmod(src_file, self.PERM_RW_ONLY) + src_stat = os.stat(src_file) + + safe_copy(src_file, tgt_file, preserve_meta=False) + + # File should exist with content copied correctly + self.assertTrue(os.path.isfile(tgt_file)) + self.assertEqual(self._read_file(tgt_file), self.TEST_CONTENT) + + # Verify that permissions are NOT preserved (should be different from source): This is the + # intended behavior with preserve_meta=False according to the safe_copy docstring. + tgt_stat = os.stat(tgt_file) + self.assertNotEqual( + oct(stat.S_IMODE(src_stat.st_mode)), oct(stat.S_IMODE(tgt_stat.st_mode)) + ) + + def test_safe_copy_executable_file(self): + """Test that executable bit is preserved for executable files""" + src_file = os.path.join(self._workdir, self.SCRIPT_FILE) + tgt_file = os.path.join(self._workdir, self.SCRIPT_COPY) + + self._create_file(src_file, self.SCRIPT_CONTENT) + + # Make source executable + os.chmod(src_file, self.PERM_EXECUTABLE) + + safe_copy(src_file, tgt_file) + + # Check that target is also executable + self.assertTrue(os.access(tgt_file, os.X_OK)) + + def test_safe_copy_directory(self): + """Test copying a directory""" + src_dir = os.path.join(self._workdir, self.SRC_DIR) + tgt_dir = os.path.join(self._workdir, self.TGT_DIR) + os.makedirs(src_dir) + + # Create some files in source directory + self._create_file(os.path.join(src_dir, self.FILE_1), self.CONTENT_1) + self._create_file(os.path.join(src_dir, self.FILE_2), self.CONTENT_2) + + safe_copy(src_dir, tgt_dir) + + # Check that directory and files were copied + self.assertTrue(os.path.isdir(tgt_dir)) + self.assertTrue(os.path.isfile(os.path.join(tgt_dir, self.FILE_1))) + self.assertTrue(os.path.isfile(os.path.join(tgt_dir, self.FILE_2))) + + self.assertEqual( + self._read_file(os.path.join(tgt_dir, self.FILE_1)), self.CONTENT_1 + ) + + def test_safe_copy_directory_preserve_meta_false(self): + """Test copying a directory with preserve_meta=False.""" + src_dir = os.path.join(self._workdir, self.SRC_DIR) + tgt_dir = os.path.join(self._workdir, self.TGT_DIR) + os.makedirs(os.path.join(src_dir, self.SUBDIR)) + + test_file = os.path.join(src_dir, self.SUBDIR, self.SINGLE_FILE) + self._create_file(test_file, self.TEST_CONTENT) + + # Set specific permissions on the file + os.chmod(test_file, self.PERM_RW_ONLY) + src_stat = os.stat(test_file) + + safe_copy(src_dir, tgt_dir, preserve_meta=False) + + self.assertTrue(os.path.isdir(tgt_dir)) + tgt_file = os.path.join(tgt_dir, self.SUBDIR, self.SINGLE_FILE) + self.assertTrue(os.path.isfile(tgt_file)) + + # Verify that permissions are NOT preserved (should be different from source) + tgt_stat = os.stat(tgt_file) + self.assertNotEqual( + oct(stat.S_IMODE(src_stat.st_mode)), oct(stat.S_IMODE(tgt_stat.st_mode)) + ) + + def test_safe_copy_nested_directory(self): + """Test copying a directory with nested subdirectories""" + src_dir = os.path.join(self._workdir, self.SRC_DIR) + tgt_dir = os.path.join(self._workdir, self.TGT_DIR) + os.makedirs(os.path.join(src_dir, self.SUBDIR)) + + self._create_file( + os.path.join(src_dir, self.SUBDIR, self.SINGLE_FILE), self.ROOT_CONTENT + ) + self._create_file( + os.path.join(src_dir, self.SUBDIR, self.NESTED_FILE), self.NESTED_CONTENT + ) + + safe_copy(src_dir, tgt_dir) + + self.assertTrue(os.path.isdir(tgt_dir)) + self.assertTrue( + os.path.isfile(os.path.join(tgt_dir, self.SUBDIR, self.SINGLE_FILE)) + ) + self.assertTrue(os.path.isdir(os.path.join(tgt_dir, self.SUBDIR))) + self.assertTrue( + os.path.isfile(os.path.join(tgt_dir, self.SUBDIR, self.NESTED_FILE)) + ) + + self.assertEqual( + self._read_file(os.path.join(tgt_dir, self.SUBDIR, self.NESTED_FILE)), + self.NESTED_CONTENT, + ) + + def test_safe_copy_with_shared_area_preserve_meta_false(self): + """ + Test that SharedArea umask is respected when preserve_meta=False. + + When copying within a SharedArea context with preserve_meta=False, the target file + should have permissions based on the SharedArea umask (0o002 by default, making files + group-writable and other-readable), not the source file's permissions. + """ + src_file = os.path.join(self._workdir, self.SRC_FILE) + tgt_file = os.path.join(self._workdir, self.TGT_FILE) + + # Create source file with restrictive permissions (no group/other access) + self._create_file(src_file, self.TEST_CONTENT) + os.chmod(src_file, self.PERM_RW_ONLY) # 0o600 + + # Copy within SharedArea with preserve_meta=False + with SharedArea(): + safe_copy(src_file, tgt_file, preserve_meta=False) + + # Target should be group-writable and other-readable (umask 0o002) + tgt_stat = os.stat(tgt_file) + tgt_mode = stat.S_IMODE(tgt_stat.st_mode) + + # With umask 0o002, new files should have group read-write and other read permissions + self.assertTrue( + tgt_mode & stat.S_IWGRP, + f"Expected group write permission, got {oct(tgt_mode)}", + ) + self.assertTrue( + tgt_mode & stat.S_IROTH, + f"Expected other read permission, got {oct(tgt_mode)}", + ) + self.assertFalse( + tgt_mode & stat.S_IWOTH, + "Unexpectedly got other-write permission", + ) + + def test_safe_copy_with_shared_area_preserve_meta_true(self): + """ + Test that SharedArea umask is ignored when preserve_meta=True. + + When copying within a SharedArea context with preserve_meta=True, the target file + should preserve the source file's permissions, ignoring the SharedArea umask. + """ + src_file = os.path.join(self._workdir, self.SRC_FILE) + tgt_file = os.path.join(self._workdir, self.TGT_FILE) + + # Create source file with restrictive permissions (no group/other access) + self._create_file(src_file, self.TEST_CONTENT) + os.chmod(src_file, self.PERM_RW_ONLY) # 0o600 + src_stat = os.stat(src_file) + + # Copy within SharedArea with preserve_meta=True + with SharedArea(): + safe_copy(src_file, tgt_file, preserve_meta=True) + + # Target should have same permissions as source (ignoring SharedArea umask) + tgt_stat = os.stat(tgt_file) + self.assertEqual( + oct(stat.S_IMODE(src_stat.st_mode)), oct(stat.S_IMODE(tgt_stat.st_mode)) + ) + + def test_safe_copy_directory_with_shared_area_preserve_meta_false(self): + """ + Test that SharedArea umask is respected for directory copies when preserve_meta=False. + + When copying a directory within a SharedArea context with preserve_meta=False, the files + in the target directory should have permissions based on the SharedArea umask, not the + source files' permissions. + + This test guards against a bug where directory copies incorrectly preserved the source + files' permissions instead of applying the SharedArea umask. + """ + src_dir = os.path.join(self._workdir, self.SRC_DIR) + tgt_dir = os.path.join(self._workdir, self.TGT_DIR) + os.makedirs(src_dir) + + # Create source file with restrictive permissions + src_file = os.path.join(src_dir, self.SINGLE_FILE) + self._create_file(src_file, self.TEST_CONTENT) + os.chmod(src_file, self.PERM_RW_ONLY) # 0o600 + + # Copy within SharedArea with preserve_meta=False + with SharedArea(): + safe_copy(src_dir, tgt_dir, preserve_meta=False) + + # Target file should be group-writable and other-readable (umask 0o002) + tgt_file = os.path.join(tgt_dir, self.SINGLE_FILE) + tgt_stat = os.stat(tgt_file) + tgt_mode = stat.S_IMODE(tgt_stat.st_mode) + + # With umask 0o002, new files should have group write and other read permissions + self.assertTrue( + tgt_mode & stat.S_IWGRP, + f"Expected group write permission, got {oct(tgt_mode)}", + ) + self.assertTrue( + tgt_mode & stat.S_IROTH, + f"Expected other read permission, got {oct(tgt_mode)}", + ) + self.assertFalse( + tgt_mode & stat.S_IWOTH, + "Unexpectedly got other-write permission", + ) + + def test_safe_copy_overwrite_preserve_meta_false(self): + """Overwriting an existing file with preserve_meta=False should not copy permissions. + + Instead, the target should be recreated so the current umask-derived permissions apply, + rather than preserving the existing target's permissions. + """ + # Force a known umask so the post-copy file mode is deterministic and different from the + # original permissions of both source and target. + prev_umask = os.umask(0o022) + try: + src_file = os.path.join(self._workdir, self.SRC_FILE) + tgt_file = os.path.join(self._workdir, self.TGT_FILE) + + # Source has restrictive permissions; target already exists with broader permissions + self._create_file(src_file, self.NEW_CONTENT) + os.chmod(src_file, self.PERM_RW_ONLY) # 0o600 + self._create_file(tgt_file, self.OLD_CONTENT) + os.chmod(tgt_file, self.PERM_RW_R_R) # 0o644 + tgt_mode_orig = stat.S_IMODE(os.stat(tgt_file).st_mode) + + safe_copy(src_file, tgt_file, preserve_meta=False) + + # Content must be updated + self.assertEqual(self._read_file(tgt_file), self.NEW_CONTENT) + + # Permissions must NOT have been taken from src (0o600) + tgt_mode = stat.S_IMODE(os.stat(tgt_file).st_mode) + src_mode = stat.S_IMODE(os.stat(src_file).st_mode) + self.assertNotEqual( + tgt_mode, + src_mode, + f"preserve_meta=False should not copy permissions to an existing target; " + f"src={oct(src_mode)}, tgt={oct(tgt_mode)}", + ) + + # Instead, the existing target's perms should have been preserved. + self.assertEqual(tgt_mode, tgt_mode_orig) + finally: + os.umask(prev_umask) + + def test_safe_copy_overwrite_readonly_preserve_meta_false_with_shared_area(self): + """Overwriting an existing read-only owned file with preserve_meta=False inside + SharedArea should apply the SharedArea umask, not the source permissions. + """ + src_file = os.path.join(self._workdir, self.SRC_FILE) + tgt_file = os.path.join(self._workdir, self.TGT_FILE) + + self._create_file(src_file, self.NEW_CONTENT) + os.chmod(src_file, self.PERM_RW_ONLY) # 0o600 — restrictive + self._create_file(tgt_file, self.OLD_CONTENT) + os.chmod(tgt_file, self.PERM_READONLY) # 0o444 — read-only + + # Ensure src file not group-writable + src_stat = os.stat(src_file) + src_mode = stat.S_IMODE(src_stat.st_mode) + self.assertFalse( + src_mode & stat.S_IWGRP, + f"Expected group write permission with SharedArea umask, got {oct(src_mode)}", + ) + + with SharedArea(): + safe_copy(src_file, tgt_file, preserve_meta=False) + + self.assertEqual(self._read_file(tgt_file), self.NEW_CONTENT) + + tgt_stat = os.stat(tgt_file) + tgt_mode = stat.S_IMODE(tgt_stat.st_mode) + + # With SharedArea umask (0o002), the file should be group-writable + self.assertTrue( + tgt_mode & stat.S_IWGRP, + f"Expected group write permission with SharedArea umask, got {oct(tgt_mode)}", + ) + + def test_safe_copy_outside_shared_area(self): + """ + Test that safe_copy works normally outside of SharedArea context. + + This is a baseline test to ensure that the SharedArea tests are actually + testing the SharedArea behavior and not just normal file copy behavior. + When copying outside SharedArea with preserve_meta=False, the file should + NOT preserve permissions and should NOT be group-writable (unless the user's + default umask happens to allow it). + """ + src_file = os.path.join(self._workdir, self.SRC_FILE) + tgt_file = os.path.join(self._workdir, self.TGT_FILE) + + # Create source file with restrictive permissions (no group/other access) + self._create_file(src_file, self.TEST_CONTENT) + os.chmod(src_file, self.PERM_RW_ONLY) # 0o600 + src_stat = os.stat(src_file) + + # Copy outside SharedArea with preserve_meta=False + safe_copy(src_file, tgt_file, preserve_meta=False) + + # Verify the file was created with correct content + self.assertTrue(os.path.isfile(tgt_file)) + self.assertEqual(self._read_file(tgt_file), self.TEST_CONTENT) + + # Permissions should NOT be preserved (bug is now fixed) + tgt_stat = os.stat(tgt_file) + tgt_mode = stat.S_IMODE(tgt_stat.st_mode) + + # Verify permissions are different from source + self.assertNotEqual( + stat.S_IMODE(src_stat.st_mode), + tgt_mode, + "Permissions should not be preserved with preserve_meta=False", + ) + + # Outside SharedArea, file should NOT be group-writable (typical umask is 0o022) + self.assertFalse( + tgt_mode & stat.S_IWGRP, + f"File should not be group-writable outside SharedArea, got {oct(tgt_mode)}", + ) diff --git a/CIME/utils.py b/CIME/utils.py index e4a49c4457e..bbf0673d50b 100644 --- a/CIME/utils.py +++ b/CIME/utils.py @@ -1368,9 +1368,16 @@ def copy_globs(globs_to_copy, output_directory, lid=None): ) -def copy_over_file(src_path, tgt_path): +def copy_over_file(src_path, tgt_path, preserve_meta=True): """ - Copy a file over a file that already exists + Copy a file over a file that already exists. + + preserve_meta controls whether file metadata (permissions, timestamps) are + copied from src_path. When True and the caller owns the target, shutil.copy2 + is used (contents + metadata). When False and the caller owns the target, the + contents are written to a fresh temp file (so the caller's umask takes effect) + which is then renamed atomically over the target. In either case, a read-only + owned target is made writable before the copy. """ st = os.stat(tgt_path) owner_uid = st.st_uid @@ -1388,17 +1395,36 @@ def copy_over_file(src_path, tgt_path): ) ) - if owner_uid == os.getuid(): - # I am the owner, copy file contents, permissions, and metadata + if owner_uid == os.getuid() and preserve_meta: + # I am the owner and metadata should be preserved: copy contents, permissions, + # and timestamps. try: shutil.copy2(src_path, tgt_path) # ignore same file error except shutil.SameFileError: pass + elif owner_uid == os.getuid(): + # I am the owner but preserve_meta=False: copy src to a fresh temp file in the + # same directory so the OS applies the caller's umask (e.g. from SharedArea) + # naturally when creating the new file, then atomically replace the target. + tmp_path = tgt_path + f".safe_copy_tmp.{os.getpid()}" + try: + shutil.copyfile(src_path, tmp_path) + os.rename(tmp_path, tgt_path) + except Exception: + try: + os.unlink(tmp_path) + except OSError: + pass + raise + else: - # I am not the owner, just copy file contents - shutil.copyfile(src_path, tgt_path) + # I am not the owner: copy file contents only (cannot change permissions). + try: + shutil.copyfile(src_path, tgt_path) + except shutil.SameFileError: + pass def safe_copy(src_path, tgt_path, preserve_meta=True): @@ -1428,7 +1454,7 @@ def safe_copy(src_path, tgt_path, preserve_meta=True): try: if os.path.isfile(src_path): if os.path.isfile(tgt_path): - copy_over_file(src_path, tgt_path) + copy_over_file(src_path, tgt_path, preserve_meta=preserve_meta) elif preserve_meta: # We are making a new file, copy file contents, permissions, and metadata. @@ -1436,7 +1462,7 @@ def safe_copy(src_path, tgt_path, preserve_meta=True): shutil.copy2(src_path, tgt_path) else: - shutil.copy(src_path, tgt_path) + shutil.copyfile(src_path, tgt_path) else: # Some of the archived "files" are directories, like ADIOS BP output "files" if preserve_meta: