diff --git a/repos/gems/src/lib/vfs/pipe/plugin.cc b/repos/gems/src/lib/vfs/pipe/plugin.cc index 5b08dcf73a..97533696f9 100644 --- a/repos/gems/src/lib/vfs/pipe/plugin.cc +++ b/repos/gems/src/lib/vfs/pipe/plugin.cc @@ -83,6 +83,7 @@ struct Vfs_pipe::Pipe_handle : Vfs::Vfs_handle, private Pipe_handle_registry_ele struct Vfs_pipe::Pipe { + Genode::Env &env; Genode::Allocator &alloc; Pipe_space::Element space_elem; Pipe_buffer buffer { }; @@ -92,13 +93,18 @@ struct Vfs_pipe::Pipe unsigned num_writers = 0; bool waiting_for_writers = true; - Genode::Signal_context_capability ¬ify_sigh; + Genode::Io_signal_handler _read_notify_handler + { env.ep(), *this, &Pipe::notify_read }; + + Genode::Io_signal_handler _write_notify_handler + { env.ep(), *this, &Pipe::notify_write }; bool new_handle_active { true }; - Pipe(Genode::Allocator &alloc, Pipe_space &space, - Genode::Signal_context_capability ¬ify_sigh) - : alloc(alloc), space_elem(*this, space), notify_sigh(notify_sigh) { } + Pipe(Genode::Env &env, Genode::Allocator &alloc, Pipe_space &space) + : + env(env), alloc(alloc), space_elem(*this, space) + { } ~Pipe() = default; @@ -108,6 +114,28 @@ struct Vfs_pipe::Pipe return Name(space_elem.id().value); } + void notify_read() + { + read_ready_waiters.dequeue_all([] (Handle_element &elem) { + elem.object().read_ready_response(); }); + } + + void notify_write() + { + io_progress_waiters.dequeue_all([] (Handle_element &elem) { + elem.object().io_progress_response(); }); + } + + void submit_read_signal() + { + Genode::Signal_transmitter(_read_notify_handler).submit(); + } + + void submit_write_signal() + { + Genode::Signal_transmitter(_write_notify_handler).submit(); + } + /** * Check if pipe is referenced, if not, destroy */ @@ -150,11 +178,11 @@ struct Vfs_pipe::Pipe if (0 == num_writers) { /* flush buffer */ - if (!buffer.empty()) { + if (!buffer.empty()) Genode::warning("flushing non-empty buffer. capacity=", buffer.avail_capacity()); - } + buffer.reset(); - io_progress_waiters.dequeue_all([] (Handle_element /*&elem*/) { }); + io_progress_waiters.dequeue_all([] (Handle_element &/*elem*/) { }); } *handle = new (alloc) Pipe_handle(fs, alloc, Directory_service::OPEN_MODE_WRONLY, registry, *this); @@ -176,24 +204,6 @@ struct Vfs_pipe::Pipe return Open_result::OPEN_ERR_UNACCESSIBLE; } - /** - * Use a signal as a hack to defer notifications - * until the "io_progress_handler". - */ - void submit_signal() { - Genode::Signal_transmitter(notify_sigh).submit(); } - - /** - * Notify handles waiting for activity - */ - void notify() - { - io_progress_waiters.dequeue_all([] (Handle_element &elem) { - elem.object().io_progress_response(); }); - read_ready_waiters.dequeue_all([] (Handle_element &elem) { - elem.object().read_ready_response(); }); - } - Write_result write(Pipe_handle &handle, const char *buf, file_size count, file_size &out_count) @@ -201,6 +211,12 @@ struct Vfs_pipe::Pipe file_size out = 0; bool notify = buffer.empty(); + if (buffer.avail_capacity() == 0) { + submit_read_signal(); + out_count = 0; + return Write_result::WRITE_OK; + } + while (out < count && 0 < buffer.avail_capacity()) { buffer.add(*(buf++)); ++out; @@ -211,7 +227,7 @@ struct Vfs_pipe::Pipe io_progress_waiters.enqueue(handle.io_progress_elem); if (notify) - submit_signal(); + submit_read_signal(); return Write_result::WRITE_OK; } @@ -240,7 +256,7 @@ struct Vfs_pipe::Pipe } if (notify) - submit_signal(); + submit_write_signal(); return Read_result::READ_OK; } @@ -284,12 +300,13 @@ struct Vfs_pipe::New_pipe_handle : Vfs::Vfs_handle Pipe &pipe; New_pipe_handle(Vfs::File_system &fs, + Genode::Env &env, Genode::Allocator &alloc, unsigned flags, - Pipe_space &pipe_space, - Genode::Signal_context_capability ¬ify_sigh) - : Vfs::Vfs_handle(fs, fs, alloc, flags), - pipe(*(new (alloc) Pipe(alloc, pipe_space, notify_sigh))) + Pipe_space &pipe_space) + : + Vfs::Vfs_handle(fs, fs, alloc, flags), + pipe(*(new (alloc) Pipe(env, alloc, pipe_space))) { } ~New_pipe_handle() @@ -316,21 +333,10 @@ class Vfs_pipe::File_system : public Vfs::File_system { protected: + Vfs::Env &_env; + Pipe_space _pipe_space { }; - /* - * XXX: a hack to defer cross-thread notifications at - * the libc until the io_progress handler - */ - Genode::Io_signal_handler _notify_handler; - Genode::Signal_context_capability _notify_cap { _notify_handler }; - - void _notify_any() - { - _pipe_space.for_each([] (Pipe &pipe) { - pipe.notify(); }); - } - /* * verifies if a path meets access control requirements */ @@ -349,7 +355,7 @@ class Vfs_pipe::File_system : public Vfs::File_system File_system(Vfs::Env &env) : - _notify_handler(env.env().ep(), *this, &File_system::_notify_any) + _env(env) { } const char* type() override { return "pipe"; } @@ -450,7 +456,10 @@ class Vfs_pipe::File_system : public Vfs::File_system /* trigger reattempt of read to deliver EOF */ if (pipe->num_writers == 0) - pipe->submit_signal(); + pipe->submit_read_signal(); + } else { + /* a close() may arrive before read() - make sure we deliver EOF */ + pipe->waiting_for_writers = false; } } else if (New_pipe_handle *handle = dynamic_cast(vfs_handle)) @@ -651,7 +660,7 @@ class Vfs_pipe::Pipe_file_system : public Vfs_pipe::File_system if ((OPEN_MODE_ACCMODE & mode) == OPEN_MODE_WRONLY) return OPEN_ERR_NO_PERM; *handle = new (alloc) - New_pipe_handle(*this, alloc, mode, _pipe_space, _notify_cap); + New_pipe_handle(*this, _env.env(), alloc, mode, _pipe_space); return OPEN_OK; } @@ -725,7 +734,6 @@ class Vfs_pipe::Fifo_file_system : public Vfs_pipe::File_system { } }; - Vfs::Env &_env; Genode::Registry _items { }; protected: @@ -792,13 +800,13 @@ class Vfs_pipe::Fifo_file_system : public Vfs_pipe::File_system Fifo_file_system(Vfs::Env &env, Genode::Xml_node const &config) : - File_system(env), _env(env) + File_system(env) { config.for_each_sub_node("fifo", [&env, this] (Xml_node const &fifo) { Path const path { fifo.attribute_value("name", String()) }; Pipe &pipe = *new (env.alloc()) - Pipe(env.alloc(), _pipe_space, _notify_cap); + Pipe(env.env(), env.alloc(), _pipe_space); new (env.alloc()) Fifo_item(_items, path, pipe.space_elem.id()); });