vfs,libc: deferred wakeup of remote peers

This patch facilitates the batching of I/O operations in the VFS library
by replacing the implicit wakeup of remote peer (via the traditional
packet-stream interface like 'submit_packet') by explicit wakeup
signalling.

The wakeup signalling is triggered not before the VFS user settles down.
E.g., for libc-based applications, this is the case if the libc goes
idle, waiting for external I/O.
In the case of a busy writer to a non-blocking file descriptor or socket
(e.g., lighttpd), the remote peers are woken up once a write operation
yields an out-count of 0.

The deferring of wakeup signals is accommodated by the new 'Remote_io'
mechanism (vfs/remote_io.h) that is designated to be used by all VFS
plugins that interact with asynchronous Genode services for I/O.

Issue #4697
This commit is contained in:
Norman Feske 2022-12-02 09:53:16 +01:00 committed by Christian Helmuth
parent 9421a449ab
commit 7d8d4f4532
16 changed files with 383 additions and 124 deletions

View File

@ -13,6 +13,9 @@
#ifndef _INCLUDE__RUMP_FS__FS_H_
#define _INCLUDE__RUMP_FS__FS_H_
#include <util/noncopyable.h>
#include <util/interface.h>
/**
* File to upon the back-end will open a block session
*/
@ -29,7 +32,13 @@
#define GENODE_MOUNT_DIR "/mnt"
void rump_io_backend_init();
struct Rump_fs_user_wakeup : Genode::Interface, Genode::Noncopyable
{
virtual void wakeup_rump_fs_user() = 0;
};
void rump_io_backend_init(Rump_fs_user_wakeup &);
/**
* Sync I/O back-end with underlying Genode subsystems

View File

@ -108,6 +108,8 @@ class Backend
Genode::Allocator_avl _alloc { &Rump::env().heap() };
Genode::Entrypoint &_ep { Rump::env().env().ep() };
Rump_fs_user_wakeup &_user_wakeup;
/*
* The tx_buf_size is chosen such that one I/O request fits into the
* I/O buffer at once. The size of NetBSD's I/O requests is bounded by
@ -121,6 +123,15 @@ class Backend
Io_signal_blockade _io_signal_blockade { _ep,
Genode::Thread::myself() };
Genode::Io_signal_handler<Backend> _io_signal_handler {
_ep, *this, &Backend::_handle_io_signal };
void _handle_io_signal()
{
_io_signal_blockade.dispatch(1);
_user_wakeup.wakeup_rump_fs_user();
}
int _blocked_for_synchronous_io = 0;
void _update_jobs()
@ -190,9 +201,11 @@ class Backend
public:
Backend()
Backend(Rump_fs_user_wakeup &user_wakeup)
:
_user_wakeup(user_wakeup)
{
_session.sigh(_io_signal_blockade);
_session.sigh(_io_signal_handler);
}
uint64_t block_count() const { return _info.block_count; }
@ -230,9 +243,15 @@ class Backend
};
static Rump_fs_user_wakeup *_user_wakeup_ptr = nullptr;
static Backend &backend()
{
static Backend _b;
if (!_user_wakeup_ptr)
Genode::error("rump: missing call of rump_io_backend_init");
static Backend _b { *_user_wakeup_ptr };
return _b;
}
@ -315,8 +334,10 @@ extern "C" void rumpns_modctor_msdos(void);
extern "C" void rumpns_modctor_wapbl(void);
void rump_io_backend_init()
void rump_io_backend_init(Rump_fs_user_wakeup &user_wakeup)
{
_user_wakeup_ptr = &user_wakeup;
/* call init/constructor functions of rump_fs.lib.so (order is important!) */
rumpcompctor_RUMP_COMPONENT_KERN_SYSCALL();
rumpns_modctor_wapbl();

View File

@ -845,14 +845,28 @@ class Vfs::Rump_file_system : public File_system
class Rump_factory : public Vfs::File_system_factory
{
private:
struct Rump_fs_user : Rump_fs_user_wakeup
{
Vfs::Env::User &_vfs_user;
void wakeup_rump_fs_user() override { _vfs_user.wakeup_vfs_user(); }
Rump_fs_user(Vfs::Env::User &vfs_user) : _vfs_user(vfs_user) { }
} _rump_fs_user;
public:
Rump_factory(Genode::Env &env, Genode::Allocator &alloc,
Genode::Xml_node config)
Vfs::Env::User &vfs_user, Genode::Xml_node config)
:
_rump_fs_user(vfs_user)
{
Rump::construct_env(env);
rump_io_backend_init();
rump_io_backend_init(_rump_fs_user);
/* limit RAM consumption */
if (!config.has_attribute("ram")) {
@ -897,7 +911,7 @@ extern "C" Vfs::File_system_factory *vfs_file_system_factory(void)
{
Vfs::File_system *create(Vfs::Env &env, Genode::Xml_node node) override
{
static Rump_factory factory(env.env(), env.alloc(), node);
static Rump_factory factory(env.env(), env.alloc(), env.user(), node);
return factory.create(env, node);
}
};

View File

@ -33,13 +33,13 @@ class Vfs_import::Flush_guard
{
private:
Genode::Entrypoint &_ep;
Vfs_handle &_handle;
Vfs::Env::Io &_io;
Vfs_handle &_handle;
public:
Flush_guard(Vfs::Env &env, Vfs_handle &handle)
: _ep(env.env().ep()), _handle(handle) { }
Flush_guard(Vfs::Env::Io &io, Vfs_handle &handle)
: _io(io), _handle(handle) { }
~Flush_guard()
{
@ -48,7 +48,7 @@ class Vfs_import::Flush_guard
&& (_handle.fs().complete_sync(&_handle)
== Vfs::File_io_service::SYNC_OK))
break;
_ep.wait_and_dispatch_one_io_signal();
_io.commit_and_wait();
}
}
};
@ -89,7 +89,7 @@ class Vfs_import::File_system : public Vfs::File_system
Vfs_handle::Guard guard(dst_handle);
{
Flush_guard flush(env, *dst_handle);
Flush_guard flush(env.io(), *dst_handle);
file_size count = target.length();
for (;;) {
@ -142,7 +142,7 @@ class Vfs_import::File_system : public Vfs::File_system
char buf[4096];
Vfs_handle::Guard guard { dst_handle };
Flush_guard flush { env, *dst_handle };
Flush_guard flush { env.io(), *dst_handle };
Readonly_file::At at { };
while (true) {

View File

@ -483,6 +483,8 @@ struct Libc::Kernel final : Vfs::Io_response_handler,
while (main_blocked_in_monitor() || main_suspended_for_io()) {
wakeup_remote_peers();
/*
* Block for one I/O signal and process all pending ones
* before executing the monitor functions. This avoids
@ -693,6 +695,16 @@ struct Libc::Kernel final : Vfs::Io_response_handler,
return *_kernel_ptr;
}
/******************************
** Vfs::Remote_io mechanism **
******************************/
void wakeup_remote_peers()
{
_libc_env.vfs_env().deferred_wakeups().trigger();
}
};
#endif /* _LIBC__INTERNAL__KERNEL_H_ */

View File

@ -404,6 +404,8 @@ void Libc::Kernel::handle_io_progress()
if (_execute_monitors_pending == Monitor::Pool::State::JOBS_PENDING)
_execute_monitors_pending = _monitors.execute_monitors();
}
wakeup_remote_peers();
}

View File

@ -591,8 +591,10 @@ int Libc::Vfs_plugin::close_from_kernel(File_descriptor *fd)
/* XXX mtime not updated here */
Sync sync { *handle, Update_mtime::NO, _current_real_time };
while (!sync.complete())
while (!sync.complete()) {
Libc::Kernel::kernel().wakeup_remote_peers();
Libc::Kernel::kernel().libc_env().ep().wait_and_dispatch_one_io_signal();
}
}
handle->close();
@ -837,8 +839,10 @@ ssize_t Libc::Vfs_plugin::write(File_descriptor *fd, const void *buf,
Vfs::file_size out_count = 0;
Result out_result = Result::WRITE_OK;
bool const nonblocking = (fd->flags & O_NONBLOCK);
if (nonblocking) {
if (fd->flags & O_NONBLOCK) {
monitor().monitor([&] {
try {
out_result = handle->fs().write(handle, (char const *)buf, count, out_count);
@ -948,6 +952,11 @@ ssize_t Libc::Vfs_plugin::write(File_descriptor *fd, const void *buf,
handle->advance_seek(out_count);
fd->modified = true;
/* notify remote peers once our VFS' local I/O buffers are saturated */
bool const nonblocking_write_stalled = nonblocking && count && !out_count;
if (nonblocking_write_stalled)
Libc::Kernel::kernel().wakeup_remote_peers();
return out_count;
}

View File

@ -117,7 +117,7 @@ struct Genode::Directory : Noncopyable, Interface
Vfs::File_system &_fs;
Entrypoint &_ep;
Vfs::Env::Io &_io;
Allocator &_alloc;
@ -158,8 +158,9 @@ struct Genode::Directory : Noncopyable, Interface
* \throw Open_failed
*/
Directory(Vfs::Env &vfs_env)
: _path(""), _fs(vfs_env.root_dir()),
_ep(vfs_env.env().ep()), _alloc(vfs_env.alloc())
:
_path(""), _fs(vfs_env.root_dir()),
_io(vfs_env.io()), _alloc(vfs_env.alloc())
{
if (_fs.opendir("/", false, &_handle, _alloc) !=
Vfs::Directory_service::OPENDIR_OK)
@ -172,8 +173,9 @@ struct Genode::Directory : Noncopyable, Interface
* \throw Nonexistent_directory
*/
Directory(Directory const &other, Path const &rel_path)
: _path(join(other._path, rel_path)), _fs(other._fs), _ep(other._ep),
_alloc(other._alloc)
:
_path(join(other._path, rel_path)), _fs(other._fs), _io(other._io),
_alloc(other._alloc)
{
if (_fs.opendir(_path.string(), false, &_handle, _alloc) !=
Vfs::Directory_service::OPENDIR_OK)
@ -192,7 +194,7 @@ struct Genode::Directory : Noncopyable, Interface
_handle->seek(i * sizeof(entry._dirent));
while (!_handle->fs().queue_read(_handle, sizeof(entry._dirent)))
_ep.wait_and_dispatch_one_io_signal();
_io.progress();
Vfs::File_io_service::Read_result read_result;
Vfs::file_size out_count = 0;
@ -207,7 +209,7 @@ struct Genode::Directory : Noncopyable, Interface
if (read_result != Vfs::File_io_service::READ_QUEUED)
break;
_ep.wait_and_dispatch_one_io_signal();
_io.progress();
}
if ((read_result != Vfs::File_io_service::READ_OK) ||
@ -296,7 +298,7 @@ struct Genode::Directory : Noncopyable, Interface
Vfs::file_size count = sizeof(buf)-1;
Vfs::file_size out_count = 0;
while (!link_handle->fs().queue_read(link_handle, count)) {
_ep.wait_and_dispatch_one_io_signal();
_io.progress();
}
File_io_service::Read_result result;
@ -308,7 +310,7 @@ struct Genode::Directory : Noncopyable, Interface
if (result != File_io_service::READ_QUEUED)
break;
_ep.wait_and_dispatch_one_io_signal();
_io.progress();
};
if (result != File_io_service::READ_OK)
@ -395,7 +397,7 @@ class Genode::Readonly_file : public File
Vfs::Vfs_handle mutable *_handle = nullptr;
Genode::Entrypoint &_ep;
Vfs::Env::Io &_io;
void _open(Vfs::File_system &fs, Allocator &alloc, Path const path)
{
@ -438,7 +440,8 @@ class Genode::Readonly_file : public File
* \throw File::Open_failed
*/
Readonly_file(Directory const &dir, Path const &rel_path)
: _ep(_mutable(dir)._ep)
:
_io(_mutable(dir)._io)
{
_open(_mutable(dir)._fs, _mutable(dir)._alloc,
Directory::join(dir._path, rel_path));
@ -460,7 +463,7 @@ class Genode::Readonly_file : public File
_handle->seek(at.value);
while (!_handle->fs().queue_read(_handle, bytes))
_ep.wait_and_dispatch_one_io_signal();
_io.progress();
Vfs::File_io_service::Read_result result;
@ -471,7 +474,7 @@ class Genode::Readonly_file : public File
if (result != Vfs::File_io_service::READ_QUEUED)
break;
_ep.wait_and_dispatch_one_io_signal();
_io.progress();
};
/*
@ -714,10 +717,10 @@ class Genode::Writeable_file : Noncopyable
return *handle_ptr;
}
static void _sync(Vfs::Vfs_handle &handle, Entrypoint &ep)
static void _sync(Vfs::Vfs_handle &handle, Vfs::Env::Io &io)
{
while (handle.fs().queue_sync(&handle) == false)
ep.wait_and_dispatch_one_io_signal();
io.progress();
for (bool sync_done = false; !sync_done; ) {
@ -737,11 +740,11 @@ class Genode::Writeable_file : Noncopyable
}
if (!sync_done)
ep.wait_and_dispatch_one_io_signal();
io.progress();
}
}
static Append_result _append(Vfs::Vfs_handle &handle, Entrypoint &ep,
static Append_result _append(Vfs::Vfs_handle &handle, Vfs::Env::Io &io,
char const *src, size_t size)
{
bool write_error = false;
@ -783,7 +786,7 @@ class Genode::Writeable_file : Noncopyable
stalled = true; }
if (stalled)
ep.wait_and_dispatch_one_io_signal();
io.progress();
}
return write_error ? Append_result::WRITE_ERROR
: Append_result::OK;
@ -798,7 +801,7 @@ class Genode::Append_file : public Writeable_file
{
private:
Entrypoint &_ep;
Vfs::Env::Io &_io;
Vfs::Vfs_handle &_handle;
public:
@ -810,7 +813,7 @@ class Genode::Append_file : public Writeable_file
*/
Append_file(Directory &dir, Directory::Path const &path)
:
_ep(dir._ep),
_io(dir._io),
_handle(_init_handle(dir, path))
{
Vfs::Directory_service::Stat stat { };
@ -820,12 +823,12 @@ class Genode::Append_file : public Writeable_file
~Append_file()
{
_sync(_handle, _ep);
_sync(_handle, _io);
_handle.ds().close(&_handle);
}
Append_result append(char const *src, size_t size) {
return _append(_handle, _ep, src, size); }
return _append(_handle, _io, src, size); }
};
@ -836,8 +839,8 @@ class Genode::New_file : public Writeable_file
{
private:
Entrypoint &_ep;
Vfs::Vfs_handle &_handle;
Vfs::Env::Io &_io;
Vfs::Vfs_handle &_handle;
public:
@ -851,18 +854,20 @@ class Genode::New_file : public Writeable_file
*/
New_file(Directory &dir, Directory::Path const &path)
:
_ep(dir._ep),
_io(dir._io),
_handle(_init_handle(dir, path))
{ _handle.fs().ftruncate(&_handle, 0); }
{
_handle.fs().ftruncate(&_handle, 0);
}
~New_file()
{
_sync(_handle, _ep);
_sync(_handle, _io);
_handle.ds().close(&_handle);
}
Append_result append(char const *src, size_t size) {
return _append(_handle, _ep, src, size); }
return _append(_handle, _io, src, size); }
};

View File

@ -1,6 +1,7 @@
/*
* \brief Cross-plugin VFS environment
* \author Emery Hemingway
* \author Norman Feske
* \date 2018-04-02
*/
@ -15,6 +16,7 @@
#define _INCLUDE__VFS__ENV_H_
#include <vfs/file_system.h>
#include <vfs/remote_io.h>
#include <base/allocator.h>
#include <base/env.h>
@ -25,15 +27,29 @@ struct Vfs::Env : Interface
virtual Genode::Env &env() = 0;
/**
* Allocator for creating stuctures shared
* across open VFS handles.
* Allocator for creating stuctures shared across open VFS handles
*/
virtual Genode::Allocator &alloc() = 0;
/**
* VFS root file-system
* VFS root file system
*/
virtual File_system &root_dir() = 0;
/**
* Registry of deferred wakeups for plugins interacting with remote peers
*/
virtual Remote_io::Deferred_wakeups &deferred_wakeups() = 0;
/**
* Interface tailored for triggering and waiting for I/O
*/
struct Io : Interface, Genode::Noncopyable
{
virtual void progress() = 0;
};
virtual Io &io() = 0;
};
#endif /* _INCLUDE__VFS__ENV_H_ */

View File

@ -0,0 +1,104 @@
/*
* \brief Mechanism for waking up remote I/O peers
* \author Norman Feske
* \date 2022-12-01
*/
/*
* Copyright (C) 2022 Genode Labs GmbH
*
* This file is part of the Genode OS framework, which is distributed
* under the terms of the GNU Affero General Public License version 3.
*/
#ifndef _INCLUDE__VFS__REMOTE_IO_H_
#define _INCLUDE__VFS__REMOTE_IO_H_
#include <base/registry.h>
#include <vfs/types.h>
namespace Vfs { class Remote_io; }
struct Vfs::Remote_io : Interface
{
virtual void wakeup_remote_peer() = 0;
class Deferred_wakeups;
class Peer;
};
class Vfs::Remote_io::Peer : Genode::Noncopyable
{
private:
struct Deferred_wakeup;
using Wakeup_registry = Genode::Registry<Deferred_wakeup>;
class Deferred_wakeup : Wakeup_registry::Element, Interface
{
private:
Peer &_peer;
public:
Deferred_wakeup(Wakeup_registry &registry, Peer &peer)
:
Wakeup_registry::Element(registry, *this), _peer(peer)
{ }
void trigger() { _peer._wakeup(); }
};
friend class Deferred_wakeups;
Deferred_wakeups &_deferred_wakeups;
Remote_io &_remote_io;
Genode::Constructible<Deferred_wakeup> _deferred_wakeup { };
void _wakeup()
{
_remote_io.wakeup_remote_peer();
_deferred_wakeup.destruct();
}
public:
Peer(Deferred_wakeups &deferred_wakeups, Remote_io &remote_io)
:
_deferred_wakeups(deferred_wakeups), _remote_io(remote_io)
{ }
inline void schedule_wakeup();
};
class Vfs::Remote_io::Deferred_wakeups : Genode::Noncopyable
{
private:
Peer::Wakeup_registry _registry { };
friend class Peer;
public:
void trigger()
{
_registry.for_each([&] (Peer::Deferred_wakeup &deferred_wakeup) {
deferred_wakeup.trigger(); });
}
};
void Vfs::Remote_io::Peer::schedule_wakeup()
{
_deferred_wakeup.construct(_deferred_wakeups._registry, *this);
}
#endif /* _INCLUDE__VFS__REMOTE_IO_H_ */

View File

@ -1,6 +1,7 @@
/*
* \brief Cross-plugin VFS environment
* \author Emery Hemingway
* \author Norman Feske
* \date 2018-04-04
*/
@ -20,16 +21,21 @@
namespace Vfs { struct Simple_env; }
class Vfs::Simple_env : public Vfs::Env
class Vfs::Simple_env : public Vfs::Env, private Vfs::Env::Io
{
private:
Genode::Env &_env;
Genode::Allocator &_alloc;
Vfs::Global_file_system_factory _fs_factory { _alloc };
Global_file_system_factory _fs_factory { _alloc };
Vfs::Dir_file_system _root_dir;
Dir_file_system _root_dir;
using Deferred_wakeups = Remote_io::Deferred_wakeups;
Deferred_wakeups _deferred_wakeups { };
public:
@ -45,9 +51,20 @@ class Vfs::Simple_env : public Vfs::Env
_root_dir.apply_config(config);
}
Genode::Env &env() override { return _env; }
Genode::Allocator &alloc() override { return _alloc; }
Vfs::File_system &root_dir() override { return _root_dir; }
Genode::Env &env() override { return _env; }
Genode::Allocator &alloc() override { return _alloc; }
Vfs::File_system &root_dir() override { return _root_dir; }
Deferred_wakeups &deferred_wakeups() override { return _deferred_wakeups; }
Vfs::Env::Io &io() override { return *this; }
/**
* Vfs::Env::Io interface
*/
void progress() override
{
_deferred_wakeups.trigger();
_env.ep().wait_and_dispatch_one_io_signal();
}
};
#endif /* _INCLUDE__VFS__SIMPLE_ENV_H_ */

View File

@ -24,7 +24,7 @@
namespace Vfs { class Fs_file_system; }
class Vfs::Fs_file_system : public File_system
class Vfs::Fs_file_system : public File_system, private Remote_io
{
private:
@ -69,6 +69,33 @@ class Vfs::Fs_file_system : public File_system
struct Fs_vfs_handle;
typedef Genode::Fifo<Fs_vfs_handle> Fs_vfs_handle_queue;
Remote_io::Peer _peer { _env.deferred_wakeups(), *this };
/**
* Remote_io interface
*/
void wakeup_remote_peer() override { _fs.tx()->wakeup(); }
/*
* Pass packet to server side
*
* The caller is expected to check 'ready_to_submit' before calling
* this function.
*/
void _submit_packet(::File_system::Packet_descriptor const &packet)
{
/*
* The warning should never occur if the precondition above is
* satisfied.
*/
if (!_fs.tx()->ready_to_submit())
Genode::warning("submit queue of file-system session unexpectedly full");
else
_fs.tx()->try_submit_packet(packet);
_peer.schedule_wakeup();
}
/**
* Convert 'File_system::Node_type' to 'Dirent_type'
*/
@ -129,14 +156,14 @@ class Vfs::Fs_file_system : public File_system
using Handle_state::queued_sync_state;
using Handle_state::read_ready_state;
::File_system::Connection &_fs;
Fs_file_system &_vfs_fs;
bool _queue_read(file_size count, file_size const seek_offset)
{
if (queued_read_state != Handle_state::Queued_state::IDLE)
return false;
::File_system::Session::Tx::Source &source = *_fs.tx();
::File_system::Session::Tx::Source &source = *_vfs_fs._fs.tx();
/* if not ready to submit suggest retry */
if (!source.ready_to_submit())
@ -161,7 +188,7 @@ class Vfs::Fs_file_system : public File_system
queued_read_state = Handle_state::Queued_state::QUEUED;
/* pass packet to server side */
source.submit_packet(packet);
_vfs_fs._submit_packet(packet);
return true;
}
@ -172,7 +199,7 @@ class Vfs::Fs_file_system : public File_system
if (queued_read_state != Handle_state::Queued_state::ACK)
return READ_QUEUED;
::File_system::Session::Tx::Source &source = *_fs.tx();
::File_system::Session::Tx::Source &source = *_vfs_fs._fs.tx();
/* obtain result packet descriptor with updated status info */
::File_system::Packet_descriptor const
@ -199,11 +226,11 @@ class Vfs::Fs_file_system : public File_system
Fs_vfs_handle(File_system &fs, Allocator &alloc,
int status_flags, Handle_space &space,
::File_system::Node_handle node_handle,
::File_system::Connection &fs_connection)
Fs_file_system &vfs_fs)
:
Vfs_handle(fs, fs, alloc, status_flags),
Handle_space::Element(*this, space, node_handle),
_fs(fs_connection)
_vfs_fs(vfs_fs)
{ }
::File_system::File_handle file_handle() const
@ -228,7 +255,7 @@ class Vfs::Fs_file_system : public File_system
if (queued_sync_state != Handle_state::Queued_state::IDLE)
return true;
::File_system::Session::Tx::Source &source = *_fs.tx();
::File_system::Session::Tx::Source &source = *_vfs_fs._fs.tx();
/* if not ready to submit suggest retry */
if (!source.ready_to_submit()) return false;
@ -247,7 +274,7 @@ class Vfs::Fs_file_system : public File_system
queued_sync_state = Handle_state::Queued_state::QUEUED;
/* pass packet to server side */
source.submit_packet(packet);
_vfs_fs._submit_packet(packet);
return true;
}
@ -261,7 +288,7 @@ class Vfs::Fs_file_system : public File_system
::File_system::Packet_descriptor const
packet = queued_sync_packet;
::File_system::Session::Tx::Source &source = *_fs.tx();
::File_system::Session::Tx::Source &source = *_vfs_fs._fs.tx();
Sync_result result = packet.succeeded()
? SYNC_OK : SYNC_ERR_INVALID;
@ -276,7 +303,7 @@ class Vfs::Fs_file_system : public File_system
bool update_modification_timestamp(Vfs::Timestamp time)
{
::File_system::Session::Tx::Source &source = *_fs.tx();
::File_system::Session::Tx::Source &source = *_vfs_fs._fs.tx();
using ::File_system::Packet_descriptor;
if (!source.ready_to_submit()) {
@ -289,8 +316,7 @@ class Vfs::Fs_file_system : public File_system
Packet_descriptor::WRITE_TIMESTAMP,
::File_system::Timestamp { .value = time.value });
/* pass packet to server side */
source.submit_packet(p);
_vfs_fs._submit_packet(p);
} catch (::File_system::Session::Tx::Source::Packet_alloc_failed) {
return false;
}
@ -398,22 +424,17 @@ class Vfs::Fs_file_system : public File_system
*/
struct Fs_handle_guard : Fs_vfs_handle
{
::File_system::Session &_fs_session;
Fs_handle_guard(File_system &fs,
::File_system::Session &fs_session,
::File_system::Node_handle fs_handle,
Handle_space &space,
::File_system::Connection &fs_connection)
Fs_file_system &vfs_fs)
:
Fs_vfs_handle(fs, *(Allocator*)nullptr, 0, space, fs_handle,
fs_connection),
_fs_session(fs_session)
Fs_vfs_handle(fs, *(Allocator*)nullptr, 0, space, fs_handle, vfs_fs)
{ }
~Fs_handle_guard()
{
_fs_session.close(file_handle());
_vfs_fs._fs.close(file_handle());
}
};
@ -444,10 +465,13 @@ class Vfs::Fs_file_system : public File_system
::File_system::Session::Tx::Source &source = *_fs.tx();
using ::File_system::Packet_descriptor;
if (!source.ready_to_submit())
throw ::File_system::Session::Tx::Source::Saturated_submit_queue();
file_size const max_packet_size = source.bulk_buffer_size() / 2;
file_size const clipped_count = min(max_packet_size, count);
/* XXX check if alloc_packet() and submit_packet() will succeed! */
/* XXX check if alloc_packet() will succeed! */
Packet_descriptor const packet_in(source.alloc_packet((size_t)clipped_count),
handle.file_handle(),
@ -458,8 +482,7 @@ class Vfs::Fs_file_system : public File_system
/* wait until packet was acknowledged */
handle.queued_read_state = Handle_state::Queued_state::QUEUED;
/* pass packet to server side */
source.submit_packet(packet_in);
_submit_packet(packet_in);
while (handle.queued_read_state != Handle_state::Queued_state::ACK)
_env.env().ep().wait_and_dispatch_one_io_signal();
@ -522,8 +545,7 @@ class Vfs::Fs_file_system : public File_system
memcpy(source.packet_content(packet_in), buf, (size_t)count);
/* pass packet to server side */
source.submit_packet(packet_in);
_submit_packet(packet_in);
} catch (::File_system::Session::Tx::Source::Packet_alloc_failed) {
if (!handle.enqueued())
_congested_handles.enqueue(handle);
@ -542,7 +564,8 @@ class Vfs::Fs_file_system : public File_system
while (source.ack_avail()) {
Packet_descriptor const packet = source.get_acked_packet();
Packet_descriptor const packet = source.try_get_acked_packet();
_peer.schedule_wakeup();
Handle_space::Id const id(packet.handle());
@ -661,7 +684,7 @@ class Vfs::Fs_file_system : public File_system
try {
::File_system::Node_handle node = _fs.node(path);
Fs_handle_guard node_guard(*this, _fs, node, _handle_space, _fs);
Fs_handle_guard node_guard(*this, node, _handle_space, *this);
status = _fs.status(node);
}
catch (Genode::Out_of_ram) {
@ -696,7 +719,7 @@ class Vfs::Fs_file_system : public File_system
try {
::File_system::Dir_handle dir = _fs.dir(dir_path.base(), false);
Fs_handle_guard dir_guard(*this, _fs, dir, _handle_space, _fs);
Fs_handle_guard dir_guard(*this, dir, _handle_space, *this);
_fs.unlink(dir, file_name.base() + 1);
}
@ -731,13 +754,12 @@ class Vfs::Fs_file_system : public File_system
::File_system::Dir_handle from_dir =
_fs.dir(from_dir_path.base(), false);
Fs_handle_guard from_dir_guard(*this, _fs, from_dir,
_handle_space, _fs);
Fs_handle_guard from_dir_guard(*this, from_dir, _handle_space, *this);
::File_system::Dir_handle to_dir = _fs.dir(to_dir_path.base(),
false);
Fs_handle_guard to_dir_guard(
*this, _fs, to_dir, _handle_space, _fs);
*this, to_dir, _handle_space, *this);
_fs.move(from_dir, from_file_name.base() + 1,
to_dir, to_file_name.base() + 1);
@ -755,8 +777,7 @@ class Vfs::Fs_file_system : public File_system
try {
::File_system::Dir_handle dir = _fs.dir(path, false);
Fs_handle_guard node_guard(*this, _fs, dir,
_handle_space, _fs);
Fs_handle_guard node_guard(*this, dir, _handle_space, *this);
return _fs.num_entries(dir);
}
@ -768,7 +789,7 @@ class Vfs::Fs_file_system : public File_system
{
try {
::File_system::Node_handle node = _fs.node(path);
Fs_handle_guard node_guard(*this, _fs, node, _handle_space, _fs);
Fs_handle_guard node_guard(*this, node, _handle_space, *this);
::File_system::Status status = _fs.status(node);
@ -814,14 +835,14 @@ class Vfs::Fs_file_system : public File_system
try {
::File_system::Dir_handle dir = _fs.dir(dir_path.base(), false);
Fs_handle_guard dir_guard(*this, _fs, dir, _handle_space, _fs);
Fs_handle_guard dir_guard(*this, dir, _handle_space, *this);
::File_system::File_handle file = _fs.file(dir,
file_name.base() + 1,
mode, create);
*out_handle = new (alloc)
Fs_vfs_file_handle(*this, alloc, vfs_mode, _handle_space, file, _fs);
Fs_vfs_file_handle(*this, alloc, vfs_mode, _handle_space, file, *this);
}
catch (::File_system::Lookup_failed) { return OPEN_ERR_UNACCESSIBLE; }
catch (::File_system::Permission_denied) { return OPEN_ERR_NO_PERM; }
@ -849,7 +870,7 @@ class Vfs::Fs_file_system : public File_system
*out_handle = new (alloc)
Fs_vfs_dir_handle(*this, alloc, ::File_system::READ_ONLY,
_handle_space, dir, _fs);
_handle_space, dir, *this);
}
catch (::File_system::Lookup_failed) { return OPENDIR_ERR_LOOKUP_FAILED; }
catch (::File_system::Name_too_long) { return OPENDIR_ERR_NAME_TOO_LONG; }
@ -880,8 +901,7 @@ class Vfs::Fs_file_system : public File_system
::File_system::Dir_handle dir_handle = _fs.dir(abs_path.base(),
false);
Fs_handle_guard from_dir_guard(*this, _fs, dir_handle,
_handle_space, _fs);
Fs_handle_guard from_dir_guard(*this, dir_handle, _handle_space, *this);
::File_system::Symlink_handle symlink_handle =
_fs.symlink(dir_handle, symlink_name.base() + 1, create);
@ -889,7 +909,7 @@ class Vfs::Fs_file_system : public File_system
*out_handle = new (alloc)
Fs_vfs_symlink_handle(*this, alloc,
::File_system::READ_ONLY,
_handle_space, symlink_handle, _fs);
_handle_space, symlink_handle, *this);
return OPENLINK_OK;
}
@ -1030,7 +1050,7 @@ class Vfs::Fs_file_system : public File_system
handle->read_ready_state = Handle_state::Read_ready_state::PENDING;
source.submit_packet(packet);
_submit_packet(packet);
/*
* When the packet is acknowledged the application is notified via

View File

@ -70,9 +70,9 @@ class Fs_report::Session_component : public Genode::Rpc_object<Report::Session>
{
private:
Genode::Entrypoint &_ep;
Genode::Allocator &_alloc;
Vfs::File_system &_vfs;
Genode::Allocator &_alloc;
Vfs::Env::Io &_io;
Vfs::File_system &_vfs;
Attached_ram_dataspace _ds;
Path _path { };
@ -108,10 +108,10 @@ class Fs_report::Session_component : public Genode::Rpc_object<Report::Session>
/* sync file operations before close */
while (!handle->fs().queue_sync(handle))
_ep.wait_and_dispatch_one_io_signal();
_io.progress();
while (handle->fs().complete_sync(handle) == Vfs::File_io_service::SYNC_QUEUED)
_ep.wait_and_dispatch_one_io_signal();
_io.progress();
handle->close();
}
@ -126,11 +126,12 @@ class Fs_report::Session_component : public Genode::Rpc_object<Report::Session>
Session_component(Genode::Env &env,
Genode::Allocator &alloc,
Vfs::Env::Io &io,
Vfs::File_system &vfs,
Genode::Session_label const &label,
size_t buffer_size)
:
_ep(env.ep()), _alloc(alloc), _vfs(vfs),
_alloc(alloc), _io(io), _vfs(vfs),
_ds(env.ram(), env.rm(), buffer_size),
_path(path_from_label<Path>(label.string()))
{
@ -237,7 +238,8 @@ class Fs_report::Root : public Genode::Root_component<Session_component>
}
return new (md_alloc())
Session_component(_env, _heap, _vfs_env.root_dir(), label, buffer_size);
Session_component(_env, _heap, _vfs_env.io(), _vfs_env.root_dir(),
label, buffer_size);
}
public:

View File

@ -90,6 +90,10 @@ class Vfs_server::Session_component : private Session_resources,
Vfs::File_system &_vfs;
using Deferred_wakeups = Vfs::Remote_io::Deferred_wakeups;
Deferred_wakeups &_deferred_wakeups;
Genode::Entrypoint &_ep;
Io_progress_handler &_io_progress_handler;
@ -193,6 +197,7 @@ class Vfs_server::Session_component : private Session_resources,
{
drop_packet_from_submit_queue();
packet.succeeded(false);
Genode::log("consume_and_ack_invalid_packet");
_stream.try_ack_packet(packet);
overall_progress = true;
@ -273,7 +278,12 @@ class Vfs_server::Session_component : private Session_resources,
}
if (node.acknowledgement_pending()) {
_stream.try_ack_packet(node.dequeue_acknowledgement());
auto packet = node.dequeue_acknowledgement();
if (!packet.succeeded())
Genode::warning("_try_acknowledge_jobs failed packet");
_stream.try_ack_packet(packet);
progress = true;
}
@ -378,6 +388,8 @@ class Vfs_server::Session_component : private Session_resources,
*/
if (progress == Process_packets_result::PROGRESS)
_io_progress_handler.handle_io_progress();
_deferred_wakeups.trigger();
}
/**
@ -442,6 +454,7 @@ class Vfs_server::Session_component : private Session_resources,
Genode::Cap_quota cap_quota,
size_t tx_buf_size,
Vfs::File_system &vfs,
Deferred_wakeups &deferred_wakeups,
Session_queue &active_sessions,
Io_progress_handler &io_progress_handler,
char const *root_path,
@ -450,6 +463,7 @@ class Vfs_server::Session_component : private Session_resources,
Session_resources(env.pd(), env.rm(), ram_quota, cap_quota, tx_buf_size),
Session_rpc_object(_packet_ds.cap(), env.rm(), env.ep().rpc_ep()),
_vfs(vfs),
_deferred_wakeups(deferred_wakeups),
_ep(env.ep()),
_io_progress_handler(io_progress_handler),
_active_sessions(active_sessions),
@ -852,6 +866,8 @@ class Vfs_server::Root : public Genode::Root_component<Session_component>,
*/
if (yield)
Genode::Signal_transmitter(_reactivate_handler).submit();
_vfs_env.deferred_wakeups().trigger();
}
protected:
@ -936,6 +952,7 @@ class Vfs_server::Root : public Genode::Root_component<Session_component>,
Genode::Ram_quota{ram_quota},
Genode::Cap_quota{cap_quota},
tx_buf_size, _vfs_env.root_dir(),
_vfs_env.deferred_wakeups(),
_active_sessions, *this,
session_root.base(), writeable);

View File

@ -226,17 +226,22 @@ struct Block_session_component : Rpc_object<Block::Session>,
using Block::Request_stream::try_acknowledge;
using Block::Request_stream::wakeup_client_if_needed;
using Vfs_peers = Vfs::Remote_io::Deferred_wakeups;
Vfs_block::File &_file;
Vfs_peers &_vfs_peers;
Block_session_component(Region_map &rm,
Entrypoint &ep,
Dataspace_capability ds,
Signal_context_capability sigh,
Vfs_block::File &file)
Vfs_block::File &file,
Vfs_peers &vfs_peers)
:
Request_stream { rm, ds, ep, sigh, file.block_info() },
_ep { ep },
_file { file }
_file { file },
_vfs_peers { vfs_peers }
{
_ep.manage(*this);
}
@ -303,6 +308,8 @@ struct Block_session_component : Rpc_object<Block::Session>,
}
}
_vfs_peers.trigger();
wakeup_client_if_needed();
}
};
@ -377,7 +384,8 @@ struct Main : Rpc_object<Typed_root<Block::Session>>
_request_handler, file_info);
_block_session.construct(_env.rm(), _env.ep(),
_block_ds->cap(),
_request_handler, *_block_file);
_request_handler, *_block_file,
_vfs_env.deferred_wakeups());
return _block_session->cap();
} catch (...) {

View File

@ -280,7 +280,7 @@ struct Populate_test : public Stress_test
struct Write_test : public Stress_test
{
Genode::Entrypoint &_ep;
Vfs::Env::Io &_io;
void write(int depth)
{
@ -304,7 +304,7 @@ struct Write_test : public Stress_test
handle->fs().queue_sync(handle);
while (handle->fs().complete_sync(handle) ==
Vfs::File_io_service::SYNC_QUEUED)
_ep.wait_and_dispatch_one_io_signal();
_io.progress();
count += n;
}
@ -328,8 +328,9 @@ struct Write_test : public Stress_test
}
Write_test(Vfs::File_system &vfs, Genode::Allocator &alloc,
char const *parent, Genode::Entrypoint &ep)
: Stress_test(vfs, alloc, parent), _ep(ep)
char const *parent, Vfs::Env::Io &io)
:
Stress_test(vfs, alloc, parent), _io(io)
{
size_t path_len = strlen(path.base());
try {
@ -354,7 +355,7 @@ struct Write_test : public Stress_test
struct Read_test : public Stress_test
{
Genode::Entrypoint &_ep;
Vfs::Env::Io &_io;
void read(int depth)
{
@ -381,7 +382,7 @@ struct Read_test : public Stress_test
while ((read_result =
handle->fs().complete_read(handle, tmp, sizeof(tmp), n)) ==
Vfs::File_io_service::READ_QUEUED)
_ep.wait_and_dispatch_one_io_signal();
_io.progress();
assert_read(read_result);
@ -410,8 +411,9 @@ struct Read_test : public Stress_test
}
Read_test(Vfs::File_system &vfs, Genode::Allocator &alloc, char const *parent,
Genode::Entrypoint &ep)
: Stress_test(vfs, alloc, parent), _ep(ep)
Vfs::Env::Io &io)
:
Stress_test(vfs, alloc, parent), _io(io)
{
size_t path_len = strlen(path.base());
try {
@ -436,7 +438,7 @@ struct Read_test : public Stress_test
struct Unlink_test : public Stress_test
{
Genode::Entrypoint &_ep;
Vfs::Env::Io &_io;
void empty_dir(char const *path)
{
@ -455,7 +457,7 @@ struct Unlink_test : public Stress_test
while (dir_handle->fs().complete_read(dir_handle, (char*)&dirent,
sizeof(dirent), out_count) ==
Vfs::File_io_service::READ_QUEUED)
_ep.wait_and_dispatch_one_io_signal();
_io.progress();
subpath.append(dirent.name.buf);
switch (dirent.type) {
@ -483,8 +485,9 @@ struct Unlink_test : public Stress_test
}
Unlink_test(Vfs::File_system &vfs, Genode::Allocator &alloc,
char const *parent, Genode::Entrypoint &ep)
: Stress_test(vfs, alloc, parent), _ep(ep)
char const *parent, Vfs::Env::Io &io)
:
Stress_test(vfs, alloc, parent), _io(io)
{
typedef Vfs::Directory_service::Unlink_result Result;
try {
@ -537,11 +540,11 @@ void Component::construct(Genode::Env &env)
auto vfs_root_sync = [&] ()
{
while (!vfs_root_handle->fs().queue_sync(vfs_root_handle))
env.ep().wait_and_dispatch_one_io_signal();
vfs_env.io().progress();
while (vfs_root_handle->fs().complete_sync(vfs_root_handle) ==
Vfs::File_io_service::SYNC_QUEUED)
env.ep().wait_and_dispatch_one_io_signal();
vfs_env.io().progress();
};
char path[Vfs::MAX_PATH_LEN];
@ -624,7 +627,7 @@ void Component::construct(Genode::Env &env)
for (int i = 0; i < ROOT_TREE_COUNT; ++i) {
snprintf(path, 3, "/%d", i);
Write_test test(vfs_root, heap, path, env.ep());
Write_test test(vfs_root, heap, path, vfs_env.io());
count += test.wait();
}
@ -660,7 +663,7 @@ void Component::construct(Genode::Env &env)
for (int i = 0; i < ROOT_TREE_COUNT; ++i) {
snprintf(path, 3, "/%d", i);
Read_test test(vfs_root, heap, path, env.ep());
Read_test test(vfs_root, heap, path, vfs_env.io());
count += test.wait();
}
@ -696,7 +699,7 @@ void Component::construct(Genode::Env &env)
for (int i = 0; i < ROOT_TREE_COUNT; ++i) {
snprintf(path, 3, "/%d", i);
Unlink_test test(vfs_root, heap, path, env.ep());
Unlink_test test(vfs_root, heap, path, vfs_env.io());
count += test.wait();
}