Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 50 additions & 44 deletions io.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
#include "ifuncs.h"
#include "inums.h"

/** If no timeout is specified then use a 60 second select timeout */
#include <poll.h>

/** If no timeout is specified then use a 60 second I/O timeout */
#define SELECT_TIMEOUT 60

extern int bwlimit;
Expand Down Expand Up @@ -243,31 +245,26 @@ static size_t safe_read(int fd, char *buf, size_t len)
assert(fd != iobuf.in_fd);

while (1) {
struct timeval tv;
fd_set r_fds, e_fds;
struct pollfd pfd;
int cnt;

FD_ZERO(&r_fds);
FD_SET(fd, &r_fds);
FD_ZERO(&e_fds);
FD_SET(fd, &e_fds);
tv.tv_sec = select_timeout;
tv.tv_usec = 0;
/* We use poll() rather than select() so that a high-numbered fd
* (>= FD_SETSIZE) cannot overflow an fd_set bitmap. */
pfd.fd = fd;
pfd.events = POLLIN;
pfd.revents = 0;

cnt = select(fd+1, &r_fds, NULL, &e_fds, &tv);
cnt = poll(&pfd, 1, select_timeout * 1000);
if (cnt <= 0) {
if (cnt < 0 && errno == EBADF) {
rsyserr(FERROR, errno, "safe_read select failed");
rsyserr(FERROR, errno, "safe_read poll failed");
exit_cleanup(RERR_FILEIO);
}
check_timeout(1, MSK_ALLOW_FLUSH);
continue;
}

/*if (FD_ISSET(fd, &e_fds))
rprintf(FINFO, "select exception on fd %d\n", fd); */

if (FD_ISSET(fd, &r_fds)) {
if (pfd.revents) {
ssize_t n = read(fd, buf + got, len - got);
if (DEBUG_GTE(IO, 2)) {
rprintf(FINFO, "[%s] safe_read(%d)=%" SIZE_T_FMT_MOD "d\n",
Expand Down Expand Up @@ -332,27 +329,26 @@ static void safe_write(int fd, const char *buf, size_t len)
}

while (len) {
struct timeval tv;
fd_set w_fds;
struct pollfd pfd;
int cnt;

FD_ZERO(&w_fds);
FD_SET(fd, &w_fds);
tv.tv_sec = select_timeout;
tv.tv_usec = 0;
/* poll() avoids the FD_SETSIZE limit that select() imposes. */
pfd.fd = fd;
pfd.events = POLLOUT;
pfd.revents = 0;

cnt = select(fd + 1, NULL, &w_fds, NULL, &tv);
cnt = poll(&pfd, 1, select_timeout * 1000);
if (cnt <= 0) {
if (cnt < 0 && errno == EBADF) {
rsyserr(FERROR, errno, "safe_write select failed on %s", what_fd_is(fd));
rsyserr(FERROR, errno, "safe_write poll failed on %s", what_fd_is(fd));
exit_cleanup(RERR_FILEIO);
}
if (io_timeout)
maybe_send_keepalive(time(NULL), MSK_ALLOW_FLUSH);
continue;
}

if (FD_ISSET(fd, &w_fds)) {
if (pfd.revents) {
n = write(fd, buf, len);
if (n < 0) {
if (errno == EINTR)
Expand Down Expand Up @@ -561,9 +557,8 @@ static void handle_kill_signal(BOOL flush_ok)
* unused raw data in the buf would prevent the reading of socket data. */
static char *perform_io(size_t needed, int flags)
{
fd_set r_fds, e_fds, w_fds;
struct timeval tv;
int cnt, max_fd;
struct pollfd pfds[3];
int cnt, max_fd, npfds, poll_timeout, in_pollpos, out_pollpos, ff_pollpos;
size_t empty_buf_len = 0;
xbuf *out;
char *data;
Expand Down Expand Up @@ -656,26 +651,30 @@ static char *perform_io(size_t needed, int flags)
}

max_fd = -1;
npfds = 0;
in_pollpos = out_pollpos = ff_pollpos = -1;

FD_ZERO(&r_fds);
FD_ZERO(&e_fds);
if (iobuf.in_fd >= 0 && iobuf.in.size - iobuf.in.len) {
if (!read_batch || batch_fd >= 0) {
FD_SET(iobuf.in_fd, &r_fds);
FD_SET(iobuf.in_fd, &e_fds);
pfds[npfds].fd = iobuf.in_fd;
pfds[npfds].events = POLLIN;
pfds[npfds].revents = 0;
in_pollpos = npfds++;
}
if (iobuf.in_fd > max_fd)
max_fd = iobuf.in_fd;
}

/* Only do more filesfrom processing if there is enough room in the out buffer. */
if (ff_forward_fd >= 0 && iobuf.out.size - iobuf.out.len > FILESFROM_BUFLEN*2) {
FD_SET(ff_forward_fd, &r_fds);
pfds[npfds].fd = ff_forward_fd;
pfds[npfds].events = POLLIN;
pfds[npfds].revents = 0;
ff_pollpos = npfds++;
if (ff_forward_fd > max_fd)
max_fd = ff_forward_fd;
}

FD_ZERO(&w_fds);
if (iobuf.out_fd >= 0) {
if (iobuf.raw_flushing_ends_before
|| (!iobuf.msg.len && iobuf.out.len > iobuf.out_empty_len && !(flags & PIO_NEED_MSGROOM))) {
Expand Down Expand Up @@ -715,7 +714,10 @@ static char *perform_io(size_t needed, int flags)
} else
out = NULL;
if (out) {
FD_SET(iobuf.out_fd, &w_fds);
pfds[npfds].fd = iobuf.out_fd;
pfds[npfds].events = POLLOUT;
pfds[npfds].revents = 0;
out_pollpos = npfds++;
if (iobuf.out_fd > max_fd)
max_fd = iobuf.out_fd;
}
Expand Down Expand Up @@ -752,16 +754,15 @@ static char *perform_io(size_t needed, int flags)

if (extra_flist_sending_enabled) {
if (file_total - file_old_total < MAX_FILECNT_LOOKAHEAD && IN_MULTIPLEXED_AND_READY)
tv.tv_sec = 0;
poll_timeout = 0;
else {
extra_flist_sending_enabled = False;
tv.tv_sec = select_timeout;
poll_timeout = select_timeout * 1000;
}
} else
tv.tv_sec = select_timeout;
tv.tv_usec = 0;
poll_timeout = select_timeout * 1000;

cnt = select(max_fd + 1, &r_fds, &w_fds, &e_fds, &tv);
cnt = poll(pfds, npfds, poll_timeout);

if (cnt <= 0) {
if (cnt < 0 && errno == EBADF) {
Expand All @@ -774,11 +775,16 @@ static char *perform_io(size_t needed, int flags)
extra_flist_sending_enabled = !flist_eof;
} else
check_timeout((flags & PIO_NEED_INPUT) != 0, 0);
FD_ZERO(&r_fds); /* Just in case... */
FD_ZERO(&w_fds);
/* Just in case... */
if (in_pollpos >= 0)
pfds[in_pollpos].revents = 0;
if (ff_pollpos >= 0)
pfds[ff_pollpos].revents = 0;
if (out_pollpos >= 0)
pfds[out_pollpos].revents = 0;
}

if (iobuf.in_fd >= 0 && FD_ISSET(iobuf.in_fd, &r_fds)) {
if (iobuf.in_fd >= 0 && in_pollpos >= 0 && pfds[in_pollpos].revents) {
size_t len, pos = iobuf.in.pos + iobuf.in.len;
ssize_t n;
if (pos >= iobuf.in.size) {
Expand Down Expand Up @@ -827,7 +833,7 @@ static char *perform_io(size_t needed, int flags)
exit_cleanup(RERR_TIMEOUT);
}

if (out && FD_ISSET(iobuf.out_fd, &w_fds)) {
if (out && out_pollpos >= 0 && pfds[out_pollpos].revents) {
size_t len = iobuf.raw_flushing_ends_before ? iobuf.raw_flushing_ends_before - out->pos : out->len;
ssize_t n;

Expand Down Expand Up @@ -888,7 +894,7 @@ static char *perform_io(size_t needed, int flags)
wait_for_receiver(); /* generator only */
}

if (ff_forward_fd >= 0 && FD_ISSET(ff_forward_fd, &r_fds)) {
if (ff_forward_fd >= 0 && ff_pollpos >= 0 && pfds[ff_pollpos].revents) {
/* This can potentially flush all output and enable
* multiplexed output, so keep this last in the loop
* and be sure to not cache anything that would break
Expand Down
84 changes: 84 additions & 0 deletions testsuite/highfd-hang_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/usr/bin/env python3
"""Regression test for issue #231: rsync must not hang when its I/O file
descriptors are numbered at/above FD_SETSIZE.

rsync's I/O loops in io.c used to call select() with fd_set bitmaps, which can
only represent descriptors below FD_SETSIZE (1024 with glibc). If rsync is
started with many descriptors already open -- e.g. inherited from a parent that
leaked fds -- its own socket/pipe fds get allocated above 1024. FD_SET() and
FD_ISSET() then index past the end of the fixed-size fd_set, which is undefined
behavior: select() reports the fd ready, but FD_ISSET() reads the out-of-bounds
bit as 0, so the read/write never happens and rsync spins at 100% CPU forever
("rsync hangs, 100% of one CPU, no progress").

We reproduce that deterministically by opening enough inheritable dummy fds to
push rsync's descriptors past FD_SETSIZE, then running an ordinary transfer
with close_fds=False so the child inherits them. With the poll()-based io.c
the transfer finishes instantly; with the old select()-based code it never
finishes and is caught by the timeout. The cross-over is binary (instant vs.
infinite), so the timeout is a robust signal rather than a timing race.
"""

import os
import resource
import subprocess

from rsyncfns import (
FROMDIR, TODIR, rmtree, rsync_argv, test_fail, test_skipped,
)

# select()'s fd_set holds FD_SETSIZE bits; glibc and most libcs use 1024.
FD_SETSIZE = 1024
NDUMMY = FD_SETSIZE + 80 # force rsync's fds comfortably past the limit
TIMEOUT = 30 # poll build: ~instant; select build: hangs forever

# We must be able to open enough descriptors to cross FD_SETSIZE.
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
want = NDUMMY + 64
if soft < want:
if hard != resource.RLIM_INFINITY and hard < want:
test_skipped(f"RLIMIT_NOFILE hard cap {hard} < {want}; cannot place "
"fds above FD_SETSIZE to exercise issue #231")
resource.setrlimit(resource.RLIMIT_NOFILE, (want, hard))

# A small tree to transfer.
rmtree(FROMDIR)
rmtree(TODIR)
FROMDIR.mkdir(parents=True, exist_ok=True)
payload = {f"f{i}": os.urandom(1000) for i in range(20)}
for name, data in payload.items():
(FROMDIR / name).write_bytes(data)

# Occupy the low fd numbers with inheritable dummy descriptors so that the
# rsync child's socket/pipe fds are forced above FD_SETSIZE. Python opens fds
# O_CLOEXEC by default, so mark each inheritable and use close_fds=False.
dummies = []
try:
while True:
fd = os.open(os.devnull, os.O_RDONLY)
os.set_inheritable(fd, True)
dummies.append(fd)
if fd >= NDUMMY:
break

argv = rsync_argv('-a', f'{FROMDIR}/', f'{TODIR}/')
try:
proc = subprocess.run(argv, timeout=TIMEOUT, close_fds=False)
except subprocess.TimeoutExpired:
test_fail("rsync hung with high-numbered fds -- select()/fd_set "
"overflow (issue #231 regression)")
finally:
for fd in dummies:
os.close(fd)

if proc.returncode != 0:
test_fail(f"rsync exited {proc.returncode}: {' '.join(argv)}")

# The cap only matters if it stayed correct: verify every file transferred.
for name, data in payload.items():
got = (TODIR / name).read_bytes()
if got != data:
test_fail(f"{name} differs after a high-fd transfer")

print(f"issue #231: transfer with fds >= {NDUMMY} (above FD_SETSIZE) "
"completed correctly")