vfs/pipe: refactor read/write synchronization

All pipe-ends were notified at the same time which leads to dead-locks.
This commit mitigates this by having a signal handler for each pipe and
each pipe-end respectively.

Issue #3583
This commit is contained in:
Sid Hussmann 2021-02-03 17:37:32 +01:00 committed by Norman Feske
parent 3ff0efd627
commit 8d13121e84

View File

@ -83,6 +83,7 @@ struct Vfs_pipe::Pipe_handle : Vfs::Vfs_handle, private Pipe_handle_registry_ele
struct Vfs_pipe::Pipe
{
Genode::Env &env;
Genode::Allocator &alloc;
Pipe_space::Element space_elem;
Pipe_buffer buffer { };
@ -92,13 +93,18 @@ struct Vfs_pipe::Pipe
unsigned num_writers = 0;
bool waiting_for_writers = true;
Genode::Signal_context_capability &notify_sigh;
Genode::Io_signal_handler<Pipe> _read_notify_handler
{ env.ep(), *this, &Pipe::notify_read };
Genode::Io_signal_handler<Pipe> _write_notify_handler
{ env.ep(), *this, &Pipe::notify_write };
bool new_handle_active { true };
Pipe(Genode::Allocator &alloc, Pipe_space &space,
Genode::Signal_context_capability &notify_sigh)
: alloc(alloc), space_elem(*this, space), notify_sigh(notify_sigh) { }
Pipe(Genode::Env &env, Genode::Allocator &alloc, Pipe_space &space)
:
env(env), alloc(alloc), space_elem(*this, space)
{ }
~Pipe() = default;
@ -108,6 +114,28 @@ struct Vfs_pipe::Pipe
return Name(space_elem.id().value);
}
void notify_read()
{
read_ready_waiters.dequeue_all([] (Handle_element &elem) {
elem.object().read_ready_response(); });
}
void notify_write()
{
io_progress_waiters.dequeue_all([] (Handle_element &elem) {
elem.object().io_progress_response(); });
}
void submit_read_signal()
{
Genode::Signal_transmitter(_read_notify_handler).submit();
}
void submit_write_signal()
{
Genode::Signal_transmitter(_write_notify_handler).submit();
}
/**
* Check if pipe is referenced, if not, destroy
*/
@ -150,11 +178,11 @@ struct Vfs_pipe::Pipe
if (0 == num_writers) {
/* flush buffer */
if (!buffer.empty()) {
if (!buffer.empty())
Genode::warning("flushing non-empty buffer. capacity=", buffer.avail_capacity());
}
buffer.reset();
io_progress_waiters.dequeue_all([] (Handle_element /*&elem*/) { });
io_progress_waiters.dequeue_all([] (Handle_element &/*elem*/) { });
}
*handle = new (alloc)
Pipe_handle(fs, alloc, Directory_service::OPEN_MODE_WRONLY, registry, *this);
@ -176,24 +204,6 @@ struct Vfs_pipe::Pipe
return Open_result::OPEN_ERR_UNACCESSIBLE;
}
/**
* Use a signal as a hack to defer notifications
* until the "io_progress_handler".
*/
void submit_signal() {
Genode::Signal_transmitter(notify_sigh).submit(); }
/**
* Notify handles waiting for activity
*/
void notify()
{
io_progress_waiters.dequeue_all([] (Handle_element &elem) {
elem.object().io_progress_response(); });
read_ready_waiters.dequeue_all([] (Handle_element &elem) {
elem.object().read_ready_response(); });
}
Write_result write(Pipe_handle &handle,
const char *buf, file_size count,
file_size &out_count)
@ -201,6 +211,12 @@ struct Vfs_pipe::Pipe
file_size out = 0;
bool notify = buffer.empty();
if (buffer.avail_capacity() == 0) {
submit_read_signal();
out_count = 0;
return Write_result::WRITE_OK;
}
while (out < count && 0 < buffer.avail_capacity()) {
buffer.add(*(buf++));
++out;
@ -211,7 +227,7 @@ struct Vfs_pipe::Pipe
io_progress_waiters.enqueue(handle.io_progress_elem);
if (notify)
submit_signal();
submit_read_signal();
return Write_result::WRITE_OK;
}
@ -240,7 +256,7 @@ struct Vfs_pipe::Pipe
}
if (notify)
submit_signal();
submit_write_signal();
return Read_result::READ_OK;
}
@ -284,12 +300,13 @@ struct Vfs_pipe::New_pipe_handle : Vfs::Vfs_handle
Pipe &pipe;
New_pipe_handle(Vfs::File_system &fs,
Genode::Env &env,
Genode::Allocator &alloc,
unsigned flags,
Pipe_space &pipe_space,
Genode::Signal_context_capability &notify_sigh)
: Vfs::Vfs_handle(fs, fs, alloc, flags),
pipe(*(new (alloc) Pipe(alloc, pipe_space, notify_sigh)))
Pipe_space &pipe_space)
:
Vfs::Vfs_handle(fs, fs, alloc, flags),
pipe(*(new (alloc) Pipe(env, alloc, pipe_space)))
{ }
~New_pipe_handle()
@ -316,21 +333,10 @@ class Vfs_pipe::File_system : public Vfs::File_system
{
protected:
Vfs::Env &_env;
Pipe_space _pipe_space { };
/*
* XXX: a hack to defer cross-thread notifications at
* the libc until the io_progress handler
*/
Genode::Io_signal_handler<File_system> _notify_handler;
Genode::Signal_context_capability _notify_cap { _notify_handler };
void _notify_any()
{
_pipe_space.for_each<Pipe&>([] (Pipe &pipe) {
pipe.notify(); });
}
/*
* verifies if a path meets access control requirements
*/
@ -349,7 +355,7 @@ class Vfs_pipe::File_system : public Vfs::File_system
File_system(Vfs::Env &env)
:
_notify_handler(env.env().ep(), *this, &File_system::_notify_any)
_env(env)
{ }
const char* type() override { return "pipe"; }
@ -450,7 +456,10 @@ class Vfs_pipe::File_system : public Vfs::File_system
/* trigger reattempt of read to deliver EOF */
if (pipe->num_writers == 0)
pipe->submit_signal();
pipe->submit_read_signal();
} else {
/* a close() may arrive before read() - make sure we deliver EOF */
pipe->waiting_for_writers = false;
}
} else
if (New_pipe_handle *handle = dynamic_cast<New_pipe_handle*>(vfs_handle))
@ -651,7 +660,7 @@ class Vfs_pipe::Pipe_file_system : public Vfs_pipe::File_system
if ((OPEN_MODE_ACCMODE & mode) == OPEN_MODE_WRONLY)
return OPEN_ERR_NO_PERM;
*handle = new (alloc)
New_pipe_handle(*this, alloc, mode, _pipe_space, _notify_cap);
New_pipe_handle(*this, _env.env(), alloc, mode, _pipe_space);
return OPEN_OK;
}
@ -725,7 +734,6 @@ class Vfs_pipe::Fifo_file_system : public Vfs_pipe::File_system
{ }
};
Vfs::Env &_env;
Genode::Registry<Fifo_item> _items { };
protected:
@ -792,13 +800,13 @@ class Vfs_pipe::Fifo_file_system : public Vfs_pipe::File_system
Fifo_file_system(Vfs::Env &env, Genode::Xml_node const &config)
:
File_system(env), _env(env)
File_system(env)
{
config.for_each_sub_node("fifo", [&env, this] (Xml_node const &fifo) {
Path const path { fifo.attribute_value("name", String<MAX_PATH_LEN>()) };
Pipe &pipe = *new (env.alloc())
Pipe(env.alloc(), _pipe_space, _notify_cap);
Pipe(env.env(), env.alloc(), _pipe_space);
new (env.alloc())
Fifo_item(_items, path, pipe.space_elem.id());
});