/* * \brief Connection to block service * \author Norman Feske * \date 2019-04-10 * * The 'Block::Connection' is equipped with an interface for the implementation * of robust block-session clients that perform block I/O in an asynchronous * fashion. * * An application-defined JOB type, inherited from 'Connection::Job', * encapsulates the application's context information associated with a block * operation. * * The life cycle of the jobs is implemented by the 'Connection' and driven by * the application's invocation of 'Connection::update_jobs'. The 'update_jobs' * mechanism takes three hook functions as arguments, which implement the * applications-defined policy for producing and consuming data, and for the * completion of jobs. */ /* * Copyright (C) 2019-2023 Genode Labs GmbH * * This file is part of the Genode OS framework, which is distributed * under the terms of the GNU Affero General Public License version 3. */ #ifndef _INCLUDE__BLOCK_SESSION__CONNECTION_H_ #define _INCLUDE__BLOCK_SESSION__CONNECTION_H_ #include #include #include #include #include namespace Block { template struct Connection; } template struct Block::Connection : Genode::Connection, Session_client { public: class Job; typedef Genode::size_t size_t; private: /* * Define internally used '_JOB' type that corresponds to the 'JOB' * template argument but falls back to 'Job' if no template argument * is given. */ template struct Fallback { typedef T Type; }; template struct Fallback { typedef FB Type; }; typedef typename Fallback::Type _JOB; typedef Genode::Id_space<_JOB> Tag_id_space; typedef Packet_descriptor::Payload Payload; public: class Job : Genode::Noncopyable { private: friend class Connection; Connection &_connection; Operation const _operation; /* * Block offset relative to '_operation.block_number', used * when performing large read/write operations in multiple * steps. */ block_count_t _position = 0; /* * Packet-stream allocation used for read/write operations */ Payload _payload { }; bool _completed = false; /* * A job undergoes three stages. The transition from one * stage to the next happens 'update_jobs'. * * Initially, it resides in the '_pending' fifo queue. * * Once submitted, it is removed from the '_pending' fifo * and added to the '_tags' ID space. * * Once acknowledged and complete, it is removed from the * '_tags' ID space. */ Genode::Constructible _tag { }; Genode::Fifo_element<_JOB> _pending_elem { *static_cast<_JOB *>(this) }; Operation _curr_operation() const { if (!Operation::has_payload(_operation.type)) return _operation; return { .type = _operation.type, .block_number = _operation.block_number + _position, .count = Genode::min(_connection._max_block_count, _operation.count - _position) }; } template static void _with_offset_and_length(Job &job, FN const &fn) { if (!Operation::has_payload(job._operation.type)) return; Operation const operation = job._curr_operation(); size_t const block_size = job._connection._info.block_size; fn(job._position * block_size, Genode::min(job._payload.bytes, operation.count * block_size)); } template void _submit(POLICY &policy, _JOB &job, Tx::Source &tx) { if (!_tag.constructed()) return; Request::Tag const tag { _tag->id().value }; Packet_descriptor const p(_curr_operation(), _payload, tag); if (_operation.type == Operation::Type::WRITE) _with_offset_and_length(job, [&] (off_t offset, size_t length) { policy.produce_write_content(job, offset, tx.packet_content(p), length); }); tx.try_submit_packet(p); } public: Job(Connection &connection, Operation operation) : _connection(connection), _operation(operation) { _connection._pending.enqueue(_pending_elem); } ~Job() { if (pending()) { _connection._pending.remove(_pending_elem); } else if (in_progress()) { Genode::warning("block-session job prematurely destructed"); } } bool in_progress() const { return _tag.constructed(); } bool completed() const { return _completed; } bool pending() const { return !in_progress() && !completed(); } Operation operation() const { return _operation; } }; private: block_count_t const _max_block_count; block_count_t _init_max_block_count(size_t buf_size) const { /* * Number of bytes that may be unusable at the beginning or * and of the buffer because of alignment constraints. */ size_t const align_scrap = 2*(1UL << _info.align_log2); if (buf_size < align_scrap) return 0; buf_size -= align_scrap; return buf_size / _info.block_size; } /** * Tag ID space of submitted jobs */ Tag_id_space _tags { }; /** * Jobs that are pending for submission */ Genode::Fifo > _pending { }; /** * Process a single acknowledgement * * \return true if progress was made */ template bool _try_process_ack(POLICY &, Tx::Source &); /** * Submit next pending job to server, if possible * * \return true if a job was successfully submitted */ template bool _try_submit_pending_job(POLICY &, Tx::Source &); public: /** * Constructor * * \param tx_buffer_alloc allocator used for managing the * transmission buffer * \param tx_buf_size size of transmission buffer in bytes */ Connection(Genode::Env &env, Genode::Range_allocator *tx_block_alloc, Genode::size_t tx_buf_size = 128*1024, Label const &label = Label()) : Genode::Connection(env, label, Ram_quota { 14*1024 + tx_buf_size }, Args("tx_buf_size=", tx_buf_size)), Session_client(cap(), *tx_block_alloc, env.rm()), _max_block_count(_init_max_block_count(_tx.source()->bulk_buffer_size())) { } /** * Register handler for data-flow signals * * The handler is triggered on the arrival of new acknowledgements or * when the server becomes ready for new requests. It is thereby able * to execute 'update_jobs' on these conditions. */ void sigh(Genode::Signal_context_capability sigh) { tx_channel()->sigh_ack_avail(sigh); tx_channel()->sigh_ready_to_submit(sigh); } /** * Handle the submission and completion of block-operation jobs * * \return true if progress was made */ template bool update_jobs(POLICY &policy) { Tx::Source &tx = *_tx.source(); bool overall_progress = false; for (;;) { /* track progress of a single iteration */ bool progress = false; /* process acknowledgements */ while (_try_process_ack(policy, tx)) progress = true; /* try to submit pending requests */ while (_try_submit_pending_job(policy, tx)) progress = true; overall_progress |= progress; if (!progress) break; } if (overall_progress) tx.wakeup(); return overall_progress; } /** * Interface of 'POLICY' argument for 'update_jobs' */ struct Update_jobs_policy { /** * Produce content for write operation * * \param offset position of to-be-written data window in bytes * \param dst destination buffer (located within the I/O * communication buffer shared with the server) * \param length size of 'dst' buffer in bytes */ void produce_write_content(Job &, off_t offset, char *dst, size_t length); /** * Consume data resulting from read operation * * \param offset position of received data window in bytes * \param src pointer to received data * \param length number of bytes received */ void consume_read_result(Job &, off_t offset, char const *src, size_t length); /** * Respond on the completion of the given job */ void completed(Job &, bool success); }; /** * Call 'fn' with each job as argument * * This method is intended for the destruction of the jobs associated * with the connection before destructing the 'Connection' object. */ template void dissolve_all_jobs(FN const &fn) { _pending.dequeue_all([&] (Genode::Fifo_element<_JOB> &elem) { fn(elem.object()); }); auto discard_tag_and_apply_fn = [&] (_JOB &job) { Job &job_base = job; job_base._tag.destruct(); fn(job); }; while (_tags.template apply_any<_JOB>(discard_tag_and_apply_fn)); } }; template template bool Block::Connection::_try_process_ack(POLICY &policy, Tx::Source &tx) { /* * Tx must be ready to accept at least one submission. * This is needed to continue split read/write operations. */ if (!tx.ack_avail() || !tx.ready_to_submit()) return false; Packet_descriptor const p = tx.try_get_acked_packet(); Operation::Type const type = p.operation_type(); bool release_packet = true; typename Tag_id_space::Id const id { p.tag().value }; try { _tags.template apply<_JOB>(id, [&] (_JOB &job) { if (type == Operation::Type::READ) Job::_with_offset_and_length(job, [&] (off_t offset, size_t length) { policy.consume_read_result(job, offset, tx.packet_content(p), length); }); /* needed to access private members of 'Job' (friend) */ Job &job_base = job; bool const partial_read_or_write = p.succeeded() && Operation::has_payload(type) && (job_base._position + p.block_count() < job_base._operation.count); if (partial_read_or_write) { /* * Issue next part of split read/write operation while reusing the * existing payload allocation within the packet stream. */ job_base._position += p.block_count(); job_base._submit(policy, job, tx); release_packet = false; } else { job_base._completed = true; job_base._tag.destruct(); policy.completed(job, p.succeeded()); } }); } catch (typename Tag_id_space::Unknown_id) { Genode::warning("spurious block-operation acknowledgement"); } if (release_packet) tx.release_packet(p); return true; } template template bool Block::Connection::_try_submit_pending_job(POLICY &policy, Tx::Source &tx) { if (_pending.empty()) return false; /* * Preserve one slot for the submission, plus another slot to satisfy the * precondition of '_try_process_ack'. */ if (!tx.ready_to_submit(2)) return false; /* * Allocate space for the payload in the packet-stream buffer. */ Payload payload { }; try { _pending.head([&] (Genode::Fifo_element<_JOB> const &elem) { Job const &job = elem.object(); if (!Operation::has_payload(job._operation.type)) return; size_t const bytes = _info.block_size * job._curr_operation().count; payload = { .offset = tx.alloc_packet(bytes, (unsigned)_info.align_log2).offset(), .bytes = bytes }; }); } catch (Tx::Source::Packet_alloc_failed) { /* the packet-stream buffer is saturated */ return false; } /* * All preconditions for the submission of the job are satisfied. * So the job can go from the pending to the in-progress stage. */ _pending.dequeue([&] (Genode::Fifo_element<_JOB> &elem) { Job &job = elem.object(); /* let the job join the tag ID space, allocating a tag */ job._tag.construct(elem.object(), _tags); job._payload = payload; job._submit(policy, elem.object(), tx); }); return true; } #endif /* _INCLUDE__BLOCK_SESSION__CONNECTION_H_ */