Skip to content

Commit 48566c0

Browse files
authored
Merge pull request #150 from victormlg/atomic_copy
CFE-4549: Made copy_file atomic
2 parents 6b1e045 + 8ab0417 commit 48566c0

2 files changed

Lines changed: 95 additions & 7 deletions

File tree

cf_remote/utils.py

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import hashlib
22
import os
33
import shutil
4+
import glob
45
import sys
56
import re
67
import json
78
import getpass
9+
import tempfile
10+
import fcntl
811
from collections import OrderedDict
912
from cf_remote import log
1013
from datetime import datetime
@@ -226,15 +229,56 @@ def print_progress_dot(*args):
226229
sys.stdout.flush() # STDOUT is line-buffered
227230

228231

232+
# atomic copy, see lock-free whack-a-mole algorithm
233+
# https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-697.pdf#page=66
229234
def copy_file(input_path, output_path):
230-
filename = os.path.basename(input_path)
231-
output_dir = os.path.dirname(output_path)
235+
assert not input_path.endswith("/")
232236

233-
tmp_filename = ".{}.tmp".format(filename)
234-
tmp_output_path = os.path.join(output_dir, tmp_filename)
237+
output_filename = os.path.basename(output_path)
238+
if not output_filename:
239+
output_filename = os.path.basename(input_path)
235240

236-
shutil.copyfile(input_path, tmp_output_path)
237-
os.rename(tmp_output_path, output_path)
241+
output_dirname = os.path.dirname(output_path)
242+
tmp_fd, tmp_path = tempfile.mkstemp(
243+
".tmp", "{}-".format(output_filename), output_dirname
244+
)
245+
246+
# copy input content to tmp
247+
248+
with open(input_path, "r") as input_file:
249+
input_fd = input_file.fileno()
250+
251+
fcntl.flock(input_fd, fcntl.LOCK_SH)
252+
shutil.copy(input_path, tmp_path)
253+
fcntl.flock(input_fd, fcntl.LOCK_UN)
254+
255+
# rename tmp to tmp.mole
256+
257+
my_mole = "{}.mole".format(tmp_path)
258+
os.rename(tmp_path, my_mole)
259+
os.close(tmp_fd)
260+
261+
glob_pattern = "{}-*.tmp.mole".format(output_filename)
262+
moles = glob.glob(os.path.join(output_dirname, glob_pattern))
263+
for mole in moles:
264+
mole = os.path.join(output_dirname, mole)
265+
if mole == my_mole:
266+
continue
267+
268+
mole_to_whack, my_mole = sorted((mole, my_mole))
269+
try:
270+
os.remove(mole_to_whack)
271+
except OSError:
272+
pass
273+
try:
274+
with open(output_path, "a") as output_file:
275+
output_fd = output_file.fileno()
276+
277+
fcntl.flock(output_fd, fcntl.LOCK_EX)
278+
os.rename(my_mole, output_path)
279+
fcntl.flock(output_fd, fcntl.LOCK_UN)
280+
except OSError:
281+
pass
238282

239283

240284
def is_different_checksum(checksum, content):

tests/test_utils.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
from cf_remote.utils import has_unescaped_character, parse_envfile
1+
import os
2+
import shutil
3+
from multiprocessing import Pool
4+
from cf_remote.utils import has_unescaped_character, parse_envfile, copy_file
25

36

47
def test_parse_envfile():
@@ -45,3 +48,44 @@ def test_has_unescaped_character():
4548
assert not has_unescaped_character(r"\"test\"", '"')
4649
assert has_unescaped_character(r'hello"world', '"')
4750
assert has_unescaped_character(r'hello\""world', '"')
51+
52+
53+
def copy_file_with_args(args):
54+
src, dest = args
55+
copy_file(src, dest)
56+
57+
58+
def test_copy_file():
59+
60+
src_dir = "/tmp/cf-remote-test-src/"
61+
dest_dir = "/tmp/cf-remote-test-dest/"
62+
os.makedirs(src_dir, exist_ok=True)
63+
os.makedirs(dest_dir, exist_ok=True)
64+
65+
src_file = "myfile.txt"
66+
dest_file = "copy.txt"
67+
68+
src = os.path.join(src_dir, src_file)
69+
dest = os.path.join(dest_dir, dest_file)
70+
71+
with open(src, "w") as f:
72+
f.write("This is a test file for atomic copy.")
73+
74+
num_processes = 10
75+
76+
with Pool(num_processes) as copy_pool:
77+
copy_pool.map(copy_file_with_args, [(src, dest) for _ in range(num_processes)])
78+
79+
content = None
80+
try:
81+
with open(dest, "r") as f:
82+
content = f.read()
83+
except:
84+
assert False
85+
86+
assert content
87+
assert content == "This is a test file for atomic copy."
88+
assert os.listdir(dest_dir) == [dest_file]
89+
90+
shutil.rmtree(src_dir)
91+
shutil.rmtree(dest_dir)

0 commit comments

Comments
 (0)