Hi,
Despite the following code works great on Windows and Linux (well, there is an OS layer stripped from the code), it hangs on macOS (pseudocode first):
- create non-blocking
socketpair(AF_UNIX, SOCK_STREAM, ...);
+ a couple offcntl(fd, ..., flags | O_NONBLOCK)
- spawn 128 pairs of threads (might be as little as 32, but will need several iterations to reproduce). Of course, there is the errno check to ensure there are no errors but
EWOULDBLOCK /
EAGAIN
- readers read a byte 10000 times:
for (...) { while (read(fd[1]...) < 1) select(...); r++;}
- writers write a byte 10000 times:
for (...) { while (write(fd[0]...) < 1) select(...); w++;}
- Join writers;
- Join readers;
On Linux/Windows with the iterations number really cranked up, I'm getting a socket buffer overflow, so ::write
returns EWOULDBLOCK,
then I'm waiting on a socket until it's ready, continue, and after joining both sets of threads I see that bytes-read is equal to bytes-written, everything fine.
However, on macOS I quickly end up in a strange lock when writers are waiting on ::select(...., &write_fds, ...)
and readers on the corresponding ::select(..., &read_fds, ...);
I have really no idea how that could happen except that the read/write is not thread-safe. However, it looks like POSIX docs and manpages state that it is (at least, reentrant).
Could anyone point me in the right direction?
Detailed code below:
std::atomic<int> bytes_written(0);
std::atomic<int> bytes_read(0);
static constexpr int k_packets = 10000;
static constexpr int k_threads = 32;
std::vector<std::thread> writers;
std::vector<std::thread> readers;
writers.reserve(k_threads);
readers.reserve(k_threads);
for (int i = 0; i < k_threads; ++i)
{
writers.emplace_back([fd_write = fd[1], &bytes_written]()
{
char data = 'x';
for (int i = 0; i < k_packets; ++i)
{
while (::write(fd_write, &data, 1) < 1)
{
fd_set writefds;
FD_ZERO(&writefds);
FD_SET(fd_write, &writefds);
assert(errno == EAGAIN || errno == EWOULDBLOCK);
int retval = ::select(fd_write + 1, nullptr, &writefds, nullptr, nullptr);
if (retval < 1)
assert(errno == EAGAIN || errno == EWOULDBLOCK);
}
++bytes_written;
}
});
readers.emplace_back([fd_read = fd[0], &bytes_read]()
{
char data;
for (int i = 0; i < k_packets; ++i)
{
while (::read(fd_read, &data, 1) < 1)
{
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(fd_read, &readfds);
assert(errno == EAGAIN || errno == EWOULDBLOCK);
int retval = ::select(fd_read + 1, &readfds, nullptr, nullptr, nullptr);
if (retval < 1)
assert(errno == EAGAIN || errno == EWOULDBLOCK);
}
++bytes_read;
}
});
}
for (auto& t : writers)
t.join();
for (auto& t : readers)
t.join();
assert(bytes_written == bytes_read);