diff --git a/repos/os/include/block/request.h b/repos/os/include/block/request.h
new file mode 100644
index 0000000000..1bb0e5e32c
--- /dev/null
+++ b/repos/os/include/block/request.h
@@ -0,0 +1,43 @@
+/*
+ * \brief Block request
+ * \author Norman Feske
+ * \date 2018-12-17
+ */
+
+/*
+ * Copyright (C) 2018 Genode Labs GmbH
+ *
+ * This file is part of the Genode OS framework, which is distributed
+ * under the terms of the GNU Affero General Public License version 3.
+ */
+
+#ifndef _INCLUDE__BLOCK__REQUEST_H_
+#define _INCLUDE__BLOCK__REQUEST_H_
+
+/* Genode includes */
+#include
+
+namespace Block { struct Request; }
+
+
+struct Block::Request
+{
+ enum class Operation : Genode::uint32_t { INVALID, READ, WRITE, SYNC };
+ enum class Success : Genode::uint32_t { FALSE, TRUE };
+
+ Operation operation;
+ Success success;
+ Genode::uint64_t block_number;
+ Genode::uint64_t offset;
+ Genode::uint32_t count;
+
+ bool operation_defined() const
+ {
+ return operation == Operation::READ
+ || operation == Operation::WRITE
+ || operation == Operation::SYNC;
+ }
+
+} __attribute__((packed));
+
+#endif /* _INCLUDE__BLOCK__REQUEST_H_ */
diff --git a/repos/os/include/block/request_stream.h b/repos/os/include/block/request_stream.h
new file mode 100644
index 0000000000..7b799e28d2
--- /dev/null
+++ b/repos/os/include/block/request_stream.h
@@ -0,0 +1,317 @@
+/*
+ * \brief Stream of block-operation requests
+ * \author Norman Feske
+ * \date 2018-12-17
+ */
+
+/*
+ * Copyright (C) 2018 Genode Labs GmbH
+ *
+ * This file is part of the Genode OS framework, which is distributed
+ * under the terms of the GNU Affero General Public License version 3.
+ */
+
+#ifndef _INCLUDE__BLOCK__REQUEST_STREAM_H_
+#define _INCLUDE__BLOCK__REQUEST_STREAM_H_
+
+/* Genode includes */
+#include
+#include
+#include
+
+namespace Block { struct Request_stream; }
+
+
+class Block::Request_stream : Genode::Noncopyable
+{
+ public:
+
+ /**
+ * Interface for accessing the content of a 'Request'
+ *
+ * The 'Payload' is separate from 'Request_stream' to allow its use
+ * as argument without exposing the entirely of the 'Request_stream'
+ * to the called code.
+ */
+ class Payload
+ {
+ private:
+
+ friend class Request_stream;
+
+ Genode::addr_t const _base;
+ Genode::size_t const _size;
+ Genode::uint32_t const _block_size;
+
+ /**
+ * Return pointer to the first byte of the request content
+ */
+ void *_request_ptr(Block::Request request) const
+ {
+ return (void *)(_base + request.offset);
+ }
+
+ /**
+ * Return request size in bytes
+ */
+ Genode::size_t _request_size(Block::Request const &request) const
+ {
+ return request.count * _block_size;
+ }
+
+ bool _valid_range(Block::Request const &request) const
+ {
+ /* local address of the last byte of the request */
+ Genode::addr_t const request_end = _base + request.offset
+ + _request_size(request) - 1;
+
+ /* check for integer overflow and zero-sized request */
+ if (request_end <= _base)
+ return false;
+
+ /* check for upper bound */
+ if (request_end > _base + _size - 1)
+ return false;
+
+ return true;
+ }
+
+ Payload(Genode::addr_t base, Genode::size_t size,
+ Genode::uint32_t block_size)
+ :
+ _base(base), _size(size), _block_size(block_size)
+ { }
+
+ public:
+
+ /**
+ * Call functor 'fn' with the pointer and size of the 'request'
+ * content as arguments
+ *
+ * If the request does not carry any payload, 'fn' is not
+ * called.
+ */
+ template
+ void with_content(Block::Request request, FN const &fn) const
+ {
+ if (_valid_range(request))
+ fn(_request_ptr(request), _request_size(request));
+ }
+ };
+
+ private:
+
+ Packet_stream_tx::Rpc_object _tx;
+
+ typedef Genode::Packet_stream_sink Tx_sink;
+
+ Payload const _payload;
+
+ public:
+
+ Request_stream(Genode::Region_map &rm,
+ Genode::Dataspace_capability ds,
+ Genode::Entrypoint &ep,
+ Genode::Signal_context_capability sigh,
+ Genode::uint32_t block_size)
+ :
+ _tx(ds, rm, ep.rpc_ep()),
+ _payload(_tx.sink()->ds_local_base(), _tx.sink()->ds_size(), block_size)
+ {
+ _tx.sigh_ready_to_ack(sigh);
+ _tx.sigh_packet_avail(sigh);
+ }
+
+ virtual ~Request_stream()
+ {
+ _tx.sigh_ready_to_ack(Genode::Signal_context_capability());
+ _tx.sigh_packet_avail(Genode::Signal_context_capability());
+ }
+
+ Genode::Capability tx_cap() { return _tx.cap(); }
+
+ /**
+ * Call functor 'fn' with 'Payload' interface as argument
+ *
+ * The 'Payload' interface allows the functor to access the content
+ * of a request by calling 'Payload::with_content'.
+ */
+ template
+ void with_payload(FN const &fn) const { fn(_payload); }
+
+ /**
+ * Call functor 'fn' with the pointer and size to the 'request' content
+ *
+ * This is a wrapper for 'Payload::with_content'. It is convenient
+ * in situations where the 'Payload' interface does not need to be
+ * propagated as argument.
+ */
+ template
+ void with_content(Request const &request, FN const &fn) const
+ {
+ _payload.with_content(request, fn);
+ }
+
+ enum class Response { ACCEPTED, REJECTED, RETRY };
+
+ /**
+ * Call functor 'fn' for each pending request, with its packet as argument
+ *
+ * The boolean return value of the functor expresses whether the request
+ * was accepted or not. If it was accepted, the request is removed from the
+ * packet stream. If the request could not be accepted, the iteration
+ * aborts and the request packet stays in the packet stream.
+ */
+ template
+ void with_requests(FN const &fn)
+ {
+ Tx_sink &tx_sink = *_tx.sink();
+
+ using namespace Genode;
+
+ for (;;) {
+
+ if (!tx_sink.packet_avail())
+ return;
+
+ typedef Block::Packet_descriptor Packet_descriptor;
+
+ Packet_descriptor const packet = tx_sink.peek_packet();
+
+ auto operation = [] (Packet_descriptor::Opcode op)
+ {
+ switch (op) {
+ case Packet_descriptor::READ: return Request::Operation::READ;
+ case Packet_descriptor::WRITE: return Request::Operation::WRITE;
+ case Packet_descriptor::END: return Request::Operation::INVALID;
+ };
+ return Request::Operation::INVALID;
+ };
+
+ bool const packet_valid = (tx_sink.packet_valid(packet)
+ && (packet.offset() >= 0)
+ && (packet.size() <= (size_t)((uint32_t)~0UL)));
+
+ Request request { .operation = operation(packet.operation()),
+ .success = Request::Success::FALSE,
+ .block_number = (uint64_t)packet.block_number(),
+ .offset = (uint64_t)packet.offset(),
+ .count = (uint32_t)packet.block_count() };
+
+ Response const response = packet_valid
+ ? fn(request)
+ : Response::REJECTED;
+ bool progress = false;
+
+ switch (response) {
+
+ case Response::REJECTED:
+
+ /*
+ * Acknowledge rejected packet if there is enough room in
+ * the acknowledgement queue. Otherwise, the rejected
+ * packet stays in the request queue and is evaluated
+ * again.
+ */
+ if (tx_sink.ack_slots_free()) {
+ (void)tx_sink.try_get_packet();
+ tx_sink.try_ack_packet(packet);
+ progress = true;
+ }
+ break;
+
+ case Response::ACCEPTED:
+ (void)tx_sink.try_get_packet();
+ progress = true;
+ break;
+
+ case Response::RETRY:
+ break;
+ }
+
+ /*
+ * Stop iterating of no request-queue elements can be consumed.
+ */
+ if (!progress)
+ break;
+ }
+ }
+
+ class Ack
+ {
+ private:
+
+ friend class Request_stream;
+
+ Tx_sink &_tx_sink;
+
+ bool _submitted = false;
+
+ Genode::uint32_t const _block_size;
+
+ Ack(Tx_sink &tx_sink, Genode::uint32_t block_size)
+ : _tx_sink(tx_sink), _block_size(block_size) { }
+
+ public:
+
+ void submit(Block::Request request)
+ {
+ if (_submitted) {
+ Genode::warning("attempt to ack the same packet twice");
+ return;
+ }
+
+ typedef Block::Packet_descriptor Packet_descriptor;
+ Packet_descriptor packet { (Genode::off_t)request.offset,
+ request.count * _block_size };
+
+ auto opcode = [] (Request::Operation operation)
+ {
+ switch (operation) {
+ case Request::Operation::READ: return Packet_descriptor::READ;
+ case Request::Operation::WRITE: return Packet_descriptor::WRITE;
+ case Request::Operation::SYNC: return Packet_descriptor::END;
+ case Request::Operation::INVALID: return Packet_descriptor::END;
+ };
+ return Packet_descriptor::END;
+ };
+
+ packet = Packet_descriptor(packet, opcode(request.operation),
+ request.block_number, request.count);
+
+ packet.succeeded(request.success == Request::Success::TRUE);
+
+ _tx_sink.try_ack_packet(packet);
+ _submitted = true;
+ }
+ };
+
+ /**
+ * Try to submit acknowledgement packets
+ *
+ * The method repeatedly calls the functor 'fn' with an 'Ack' reference,
+ * which provides an interface to 'submit' one acknowledgement. The
+ * iteration stops when the acknowledgement queue is fully populated or if
+ * the functor does not call 'Ack::submit'.
+ */
+ template
+ void try_acknowledge(FN const &fn)
+ {
+ Tx_sink &tx_sink = *_tx.sink();
+
+ while (tx_sink.ack_slots_free()) {
+
+ Ack ack(tx_sink, _payload._block_size);
+
+ fn(ack);
+
+ if (!ack._submitted)
+ break;
+ }
+ }
+
+ void wakeup_client() { _tx.sink()->wakeup(); }
+};
+
+
+#endif /* _INCLUDE__BLOCK__REQUEST_STREAM_H_ */
diff --git a/repos/os/run/block_request_stream.run b/repos/os/run/block_request_stream.run
new file mode 100644
index 0000000000..2b4d8b44e6
--- /dev/null
+++ b/repos/os/run/block_request_stream.run
@@ -0,0 +1,62 @@
+build { core init timer test/block_request_stream app/block_tester }
+
+create_boot_directory
+
+install_config {
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+}
+
+
+build_boot_image { core init timer block_tester test-block_request_stream ld.lib.so }
+
+
+append qemu_args " -nographic -m 512 "
+
+run_genode_until {.*child "block_tester" exited with exit value 0.*\n} 360
diff --git a/repos/os/src/test/block_request_stream/main.cc b/repos/os/src/test/block_request_stream/main.cc
new file mode 100644
index 0000000000..9996e0dd03
--- /dev/null
+++ b/repos/os/src/test/block_request_stream/main.cc
@@ -0,0 +1,247 @@
+/*
+ * \brief Example block service
+ * \author Norman Feske
+ * \date 2018-12-06
+ */
+
+/*
+ * Copyright (C) 2018 Genode Labs GmbH
+ *
+ * This file is part of the Genode OS framework, which is distributed
+ * under the terms of the GNU Affero General Public License version 3.
+ */
+
+/* Genode includes */
+#include
+#include
+#include
+#include
+
+namespace Test {
+
+ struct Block_session_component;
+ template struct Jobs;
+
+ struct Main;
+
+ using namespace Genode;
+}
+
+
+struct Test::Block_session_component : Rpc_object,
+ Block::Request_stream
+{
+ Entrypoint &_ep;
+
+ static constexpr size_t BLOCK_SIZE = 4096;
+ static constexpr size_t NUM_BLOCKS = 16;
+
+ Block_session_component(Region_map &rm,
+ Dataspace_capability ds,
+ Entrypoint &ep,
+ Signal_context_capability sigh)
+ :
+ Request_stream(rm, ds, ep, sigh, BLOCK_SIZE), _ep(ep)
+ {
+ _ep.manage(*this);
+ }
+
+ ~Block_session_component() { _ep.dissolve(*this); }
+
+ void info(Block::sector_t *count, size_t *block_size, Operations *ops) override
+ {
+ *count = NUM_BLOCKS;
+ *block_size = BLOCK_SIZE;
+ *ops = Operations();
+
+ ops->set_operation(Block::Packet_descriptor::Opcode::READ);
+ ops->set_operation(Block::Packet_descriptor::Opcode::WRITE);
+ }
+
+ void sync() override { }
+
+ Capability tx_cap() override { return Request_stream::tx_cap(); }
+};
+
+
+template
+struct Test::Jobs : Noncopyable
+{
+ struct Entry
+ {
+ Block::Request request;
+
+ enum State { UNUSED, IN_PROGRESS, COMPLETE } state;
+ };
+
+ Entry _entries[N] { };
+
+ bool acceptable(Block::Request) const
+ {
+ for (unsigned i = 0; i < N; i++)
+ if (_entries[i].state == Entry::UNUSED)
+ return true;
+
+ return false;
+ }
+
+ void submit(Block::Request request)
+ {
+ for (unsigned i = 0; i < N; i++) {
+ if (_entries[i].state == Entry::UNUSED) {
+ _entries[i] = Entry { .request = request,
+ .state = Entry::IN_PROGRESS };
+ return;
+ }
+ }
+
+ error("failed to accept request");
+ }
+
+ bool execute()
+ {
+ bool progress = false;
+ for (unsigned i = 0; i < N; i++) {
+ if (_entries[i].state == Entry::IN_PROGRESS) {
+ _entries[i].state = Entry::COMPLETE;
+ _entries[i].request.success = Block::Request::Success::TRUE;
+ progress = true;
+ }
+ }
+
+ return progress;
+ }
+
+ void completed_job(Block::Request &out)
+ {
+ out = Block::Request { };
+
+ for (unsigned i = 0; i < N; i++) {
+ if (_entries[i].state == Entry::COMPLETE) {
+ out = _entries[i].request;
+ _entries[i].state = Entry::UNUSED;
+ return;
+ }
+ }
+ }
+
+ /**
+ * Apply 'fn' with completed job, reset job
+ */
+ template
+ void with_any_completed_job(FN const &fn)
+ {
+ Block::Request request { };
+
+ completed_job(request);
+
+ if (request.operation_defined())
+ fn(request);
+ }
+};
+
+
+struct Test::Main : Rpc_object >
+{
+ Env &_env;
+
+ Constructible _block_ds { };
+
+ Constructible _block_session { };
+
+ Signal_handler _request_handler { _env.ep(), *this, &Main::_handle_requests };
+
+ Jobs<10> _jobs { };
+
+ void _handle_requests()
+ {
+ if (!_block_session.constructed())
+ return;
+
+ Block_session_component &block_session = *_block_session;
+
+ for (;;) {
+
+ bool progress = false;
+
+ /* import new requests */
+ block_session.with_requests([&] (Block::Request request) {
+
+ if (!_jobs.acceptable(request))
+ return Block_session_component::Response::RETRY;
+
+ /* access content of the request */
+ block_session.with_content(request, [&] (void *ptr, size_t size) {
+ (void)ptr;
+ (void)size;
+ });
+
+ _jobs.submit(request);
+
+ progress = true;
+
+ return Block_session_component::Response::ACCEPTED;
+ });
+
+ /* process I/O */
+ progress |= _jobs.execute();
+
+ /* acknowledge finished jobs */
+ block_session.try_acknowledge([&] (Block_session_component::Ack &ack) {
+
+ _jobs.with_any_completed_job([&] (Block::Request request) {
+ progress |= true;
+ ack.submit(request);
+ });
+ });
+
+ if (!progress)
+ break;
+ }
+
+ block_session.wakeup_client();
+ }
+
+
+ /*
+ * Root interface
+ */
+
+ Capability session(Root::Session_args const &args,
+ Affinity const &) override
+ {
+ log("new block session: ", args.string());
+
+ size_t const ds_size =
+ Arg_string::find_arg(args.string(), "tx_buf_size").ulong_value(0);
+
+ Ram_quota const ram_quota = ram_quota_from_args(args.string());
+
+ if (ds_size >= ram_quota.value) {
+ warning("communication buffer size exceeds session quota");
+ throw Insufficient_ram_quota();
+ }
+
+ _block_ds.construct(_env.ram(), _env.rm(), ds_size);
+ _block_session.construct(_env.rm(), _block_ds->cap(), _env.ep(),
+ _request_handler);
+
+ return _block_session->cap();
+ }
+
+ void upgrade(Capability, Root::Upgrade_args const &) override { }
+
+ void close(Capability) override
+ {
+ _block_session.destruct();
+ _block_ds.destruct();
+ }
+
+ Main(Env &env) : _env(env)
+ {
+ _env.parent().announce(_env.ep().manage(*this));
+ }
+};
+
+
+void Component::construct(Genode::Env &env) { static Test::Main inst(env); }
diff --git a/repos/os/src/test/block_request_stream/target.mk b/repos/os/src/test/block_request_stream/target.mk
new file mode 100644
index 0000000000..daddb1853f
--- /dev/null
+++ b/repos/os/src/test/block_request_stream/target.mk
@@ -0,0 +1,3 @@
+TARGET := test-block_request_stream
+SRC_CC := main.cc
+LIBS += base