trace_buffer: partition trace buffer

Split the trace buffer into two partitions in order to prevent overwriting
of entries when the consumer is too slow. See file comment in buffer.h.

genodelabs/genode#4434
This commit is contained in:
Johannes Schlatow 2022-02-18 16:43:03 +01:00 committed by Norman Feske
parent edc46d15f8
commit c763890f04
6 changed files with 417 additions and 128 deletions

View File

@ -64,8 +64,6 @@
* case, however, occurs when the tail pointer resides at the very beginning
* of the buffer. In this case, newly produced entries must be dropped.
*
* XXX: What follows is only a sketch of ideas that have not been implemented.
*
* Another approach is to split the buffer into two equal
* partitions. The foreground partition is the one currently written so that
* the background partition can be read without memory corruption. When the
@ -79,13 +77,14 @@
* overwrites the very first entry. This way the consumer is able to detect if
* it lost some events.
*
* XXX
* The consumer is also able to lock the foreground partition so that it does
* not need to wait for the producer to fill it and switch partitions. Yet,
* it must never lock both partitions as this would stall the producer. We
* ensure this making the unlock-background-lock-foreground operation atomic.
* In case the consumer crashed when a lock is held, the producer is still able
* to use half of the buffer.
* to use half of the buffer. Care must be taken, however, to eliminate a race
* between the producer wrapping and the consumer switching to the foreground
* buffer.
*/
/*
@ -100,21 +99,29 @@
#include <base/stdint.h>
#include <cpu_session/cpu_session.h>
#include <util/register.h>
namespace Genode { namespace Trace { class Buffer; } }
namespace Genode::Trace {
class Simple_buffer;
class Partitioned_buffer;
using Buffer = Partitioned_buffer;
}
/**
* Buffer shared between CPU client thread and TRACE client
*/
class Genode::Trace::Buffer
class Genode::Trace::Simple_buffer
{
private:
unsigned volatile _wrapped; /* count of buffer wraps */
friend class Partitioned_buffer;
size_t _head_offset; /* in bytes, relative to 'entries' */
size_t _size; /* in bytes */
unsigned volatile _num_entries; /* number of entries currently in buffer */
struct _Entry
{
@ -134,21 +141,63 @@ class Genode::Trace::Buffer
_Entry _entries[0];
/*
* The 'entries' member marks the beginning of the trace buffer
* entries. No other member variables must follow.
*/
_Entry *_head_entry() { return (_Entry *)((addr_t)_entries + _head_offset); }
void _buffer_wrapped()
{
if (_num_entries == 1)
error("trace buffer is dangerously small");
_num_entries = 0;
_head_offset = 0;
_wrapped++;
/* mark first entry as head */
_head_entry()->mark(_Entry::HEAD);
}
/*
* The 'entries' member marks the beginning of the trace buffer
* entries. No other member variables must follow.
*/
template <typename WRAP_FUNC>
char *_reserve(size_t len, WRAP_FUNC && wrap)
{
if (_head_offset + sizeof(_Entry) + len <= _size)
return _head_entry()->data;
/* mark unused space as padding */
if (_head_offset + sizeof(_Entry) <= _size)
_head_entry()->mark(_Entry::PADDING);
return wrap();
}
template <typename WRAP_FUNC>
void _commit(size_t len, WRAP_FUNC && wrap)
{
/* omit empty entries */
if (len == 0)
return;
/**
* remember current length field so that we can write it after we set
* the new head
*/
size_t *old_head_len = &_head_entry()->len;
_num_entries++;
/* advance head offset, wrap when next entry does not fit into buffer */
_head_offset += sizeof(_Entry) + len;
if (_head_offset + sizeof(_Entry) > _size)
wrap();
/* mark entry next to new entry as head */
else if (_head_offset + sizeof(_Entry) <= _size)
_head_entry()->mark(_Entry::HEAD);
*old_head_len = len;
}
public:
@ -165,48 +214,23 @@ class Genode::Trace::Buffer
_size = size - header_size;
_wrapped = 0;
/* mark first entry as head */
_head_entry()->mark(_Entry::HEAD);
_num_entries = 0;
}
char *reserve(size_t len)
{
if (_head_offset + sizeof(_Entry) + len <= _size)
return _head_entry()->data;
/* mark unused space as padding */
if (_head_offset + sizeof(_Entry) <= _size)
_head_entry()->mark(_Entry::PADDING);
_buffer_wrapped();
return _head_entry()->data;
}
void commit(size_t len)
{
/* omit empty entries */
if (len == 0)
return;
/**
* remember current length field so that we can write it after we set
* the new head
*/
size_t *old_head_len = &_head_entry()->len;
/* advance head offset, wrap when next entry does not fit into buffer */
_head_offset += sizeof(_Entry) + len;
if (_head_offset + sizeof(_Entry) > _size)
return _reserve(len, [&] () -> char* {
_buffer_wrapped();
/* mark entry next to new entry as head */
else if (_head_offset + sizeof(_Entry) <= _size)
_head_entry()->mark(_Entry::HEAD);
*old_head_len = len;
return _head_entry()->data;
});
}
size_t wrapped() const { return _wrapped; }
void commit(size_t len) {
return _commit(len, [&] () { _buffer_wrapped(); }); }
/********************************************
@ -218,7 +242,7 @@ class Genode::Trace::Buffer
private:
_Entry const *_entry;
friend class Buffer;
friend class Simple_buffer;
Entry(_Entry const *entry) : _entry(entry) { }
@ -267,4 +291,119 @@ class Genode::Trace::Buffer
}
};
class Genode::Trace::Partitioned_buffer
{
public:
using Entry = Simple_buffer::Entry;
private:
enum { PRIMARY = 0, SECONDARY = 1 };
/* place consumer and producer state into single word to make switches atomic */
struct State : Register<32>
{
struct Producer : Bitfield<0, 1> { };
struct Consumer : Bitfield<16,1> { };
static int toggle_consumer(int old) {
return Producer::masked(old) | Consumer::bits(~Consumer::get(old)); }
static int toggle_producer(int old) {
return Consumer::masked(old) | Producer::bits(~Producer::get(old)); }
};
/************
** Member **
************/
unsigned long long volatile _lost_entries;
unsigned volatile _wrapped;
int volatile _state;
int volatile _consumer_lock;
size_t _secondary_offset;
Simple_buffer _primary[0];
/*
* The '_primary' member marks the beginning of the trace buffers.
* No other member variables must follow.
*/
Simple_buffer *_secondary() const {
return reinterpret_cast<Simple_buffer*>((addr_t)_primary + _secondary_offset); }
Simple_buffer &_producer()
{
if (State::Producer::get(_state) == PRIMARY)
return *_primary;
return *_secondary();
}
Simple_buffer const &_consumer() const
{
if (State::Consumer::get(_state) == PRIMARY)
return *_primary;
return *_secondary();
}
/**
* Switch consumer's partition
*
* The consumer can always switch but must wait for the producer if the
* latter is currently switching into the same partition.
*/
Simple_buffer const &_switch_consumer();
/**
* Switch producer's partition
*
* The producer switches partitions only if the consumer is currently
* in the same partition. Otherwise, it wraps and discards all entries
* in the current partition.
*/
Simple_buffer &_switch_producer();
public:
/******************************************
** Functions called from the CPU client **
******************************************/
void init(size_t len);
char *reserve(size_t len);
void commit(size_t len);
/********************************************
** Functions called from the TRACE client **
********************************************/
unsigned wrapped() { return _wrapped; }
unsigned long long lost_entries() { return _lost_entries; }
Entry first() const { return _consumer().first(); }
/**
* Return the entry that follows the given entry.
* Automatically switches between the partitions if the end of the buffer
* was reached. Stops at the head of the buffer.
*
* The reader must check before on a valid entry whether it is the head
* of the buffer (not yet written).
*/
Entry next(Entry entry)
{
Entry e = _consumer().next(entry);
if (e.last())
return _switch_consumer().first();
return e;
}
};
#endif /* _INCLUDE__BASE__TRACE__BUFFER_H_ */

View File

@ -29,6 +29,7 @@ SRC_CC += region_map_client.cc
SRC_CC += rm_session_client.cc
SRC_CC += stack_allocator.cc
SRC_CC += trace.cc
SRC_CC += trace_buffer.cc
SRC_CC += root_proxy.cc
SRC_CC += env_session_id_space.cc
SRC_CC += stack_protector.cc

View File

@ -247,6 +247,10 @@ _ZN6Genode5Trace6Logger17_evaluate_controlEv T
_ZN6Genode5Trace6Logger3logEPKcm T
_ZN6Genode5Trace6LoggerC1Ev T
_ZN6Genode5Trace6LoggerC2Ev T
_ZN6Genode5Trace18Partitioned_buffer4initEm T
_ZN6Genode5Trace18Partitioned_buffer6commitEm T
_ZN6Genode5Trace18Partitioned_buffer7reserveEm T
_ZN6Genode5Trace18Partitioned_buffer16_switch_consumerEv T
_ZN6Genode5printERNS_6OutputEPKc T
_ZN6Genode5printERNS_6OutputEPKv T
_ZN6Genode5printERNS_6OutputEd T

View File

@ -0,0 +1,113 @@
/*
* \brief Trace::Buffer implementation
* \author Johannes Schlatow
* \date 2022-02-18
*/
/*
* Copyright (C) 2022 Genode Labs GmbH
*
* This file is part of the Genode OS framework, which is distributed
* under the terms of the GNU Affero General Public License version 3.
*/
/* Genode includes */
#include <base/trace/buffer.h>
#include <cpu/memory_barrier.h>
/* base-internal includes */
#include <base/internal/spin_lock.h>
using namespace Genode;
/*******************************
** Trace::Partitioned_buffer **
*******************************/
void Trace::Partitioned_buffer::init(size_t size)
{
/* compute number of bytes available for partitions */
size_t const header_size = (addr_t)&_primary - (addr_t)this;
size_t const avail_size = size - header_size;
_secondary_offset = align_natural(avail_size / 2);
_primary ->init(_secondary_offset);
_secondary()->init(avail_size - _secondary_offset);
/* mark first entry in secondary partition as padding instead of head */
_secondary()->_head_entry()->mark(Simple_buffer::_Entry::PADDING);
_state = State::Producer::bits(PRIMARY) | State::Consumer::bits(SECONDARY);
_consumer_lock = SPINLOCK_UNLOCKED;
_lost_entries = 0;
_wrapped = 0;
}
Trace::Simple_buffer const &Trace::Partitioned_buffer::_switch_consumer()
{
/* first switch atomically */
bool switched = false;
while (!switched)
{
int const old_state = _state;
switched = cmpxchg(&_state, old_state, State::toggle_consumer(old_state));
};
spinlock_lock(&_consumer_lock);
/* use spin lock to wait if producer is currently wrapping */
spinlock_unlock(&_consumer_lock);
return _consumer();
}
Trace::Simple_buffer &Trace::Partitioned_buffer::_switch_producer()
{
/* stops consumer from reading after switching */
_consumer_lock = SPINLOCK_LOCKED;
bool switched = false;
while (!switched) {
int const old_state = _state;
if (State::Producer::get(old_state) == State::Consumer::get(old_state))
switched = cmpxchg(&_state, old_state, State::toggle_producer(old_state));
else {
/**
* consumer may still switch partitions at this point but not continue
* reading until we set the new head entry
*/
_lost_entries += _producer()._num_entries;
switched = true;
}
}
Trace::Simple_buffer &current = _producer();
current._buffer_wrapped();
/* XXX _wrapped only needed for testing */
if (State::Producer::get(_state) == PRIMARY)
_wrapped++;
Genode::memory_barrier();
_consumer_lock = SPINLOCK_UNLOCKED;
return current;
}
char *Trace::Partitioned_buffer::reserve(size_t len)
{
return _producer()._reserve(len, [&] () -> char* {
return _switch_producer()._head_entry()->data;
});
}
void Trace::Partitioned_buffer::commit(size_t len) {
_producer()._commit(len, [&] () { _switch_producer(); }); }

View File

@ -27,7 +27,7 @@ class Trace_buffer
Genode::Trace::Buffer &_buffer;
Genode::Trace::Buffer::Entry _curr { _buffer.first() };
unsigned _wrapped_count { 0 };
unsigned long long _lost_count { 0 };
public:
@ -41,14 +41,11 @@ class Trace_buffer
{
using namespace Genode;
bool wrapped = _buffer.wrapped() != _wrapped_count;
if (wrapped) {
if ((_buffer.wrapped() - 1) != _wrapped_count) {
warning("buffer wrapped multiple times; you might want to raise buffer size; curr_count=",
_buffer.wrapped(), " last_count=", _wrapped_count);
_curr = _buffer.first();
}
_wrapped_count = (unsigned)_buffer.wrapped();
bool lost = _buffer.lost_entries() != _lost_count;
if (lost) {
warning("lost ", _buffer.lost_entries() - _lost_count,
", entries; you might want to raise buffer size");
_lost_count = (unsigned)_buffer.lost_entries();
}
Trace::Buffer::Entry entry { _curr };

View File

@ -51,9 +51,9 @@ struct Generator1
{
Entry const &current { *reinterpret_cast<const Entry*>(entry.data()) };
if (current.value != _next_value) {
if (print_error) {
if (print_error || current.value < _next_value)
error("expected entry: ", _next_value, ", but got: ", current);
}
return false;
}
@ -61,10 +61,13 @@ struct Generator1
return true;
}
void value(Trace::Buffer::Entry const &entry) {
_next_value = reinterpret_cast<const Entry*>(entry.data())->value; }
void print(Output &out) const { Genode::print(out, "constant entry size"); }
void skip_lost(unsigned long long count)
{
for (; count; count--)
_next_value++;
}
};
@ -75,7 +78,7 @@ struct Generator2
{
unsigned char _next_value { 1 };
size_t _next_length { 10 };
size_t const _max_length { 200 };
size_t const _max_length { 60 };
struct Entry {
unsigned char value[0] { };
@ -91,6 +94,8 @@ struct Generator2
{
_next_value++;
_next_length = (_next_length + 10) % (_max_length+1);
if (_next_length == 0)
_next_length = 10;
}
size_t generate(char *dst)
@ -106,7 +111,7 @@ struct Generator2
{
Entry const &current { *reinterpret_cast<const Entry*>(entry.data()) };
if (current.value[0] != _next_value) {
if (print_error) {
if (print_error || current.value[0] < _next_value) {
error("expected entry: ", _next_value, ", but got: ", current);
}
return false;
@ -131,10 +136,10 @@ struct Generator2
return true;
}
void value(Trace::Buffer::Entry const &entry)
void skip_lost(unsigned long long count)
{
_next_value = reinterpret_cast<const Entry*>(entry.data())->value[0];
_next_length = entry.length();
for (; count; count--)
_next();
}
void print(Output &out) const { Genode::print(out, "variable entry size"); }
@ -181,65 +186,85 @@ template <typename T>
struct Trace_buffer_monitor
{
Env &env;
Trace::Buffer &raw_buffer;
Trace_buffer buffer;
unsigned delay;
Timer::Connection timer { env };
T generator { };
unsigned long long lost_count { 0 };
unsigned long long received_count { 0 };
Timer::Connection timer { env };
T generator { };
struct Failed : Genode::Exception { };
Trace_buffer_monitor(Env &env, Trace::Buffer &buffer, unsigned delay)
: env(env),
buffer(buffer),
raw_buffer(buffer),
buffer(raw_buffer),
delay(delay)
{ }
void test_ok()
unsigned long long lost_entries()
{
bool done = false;
while (!done) {
buffer.for_each_new_entry([&] (Trace::Buffer::Entry &entry) {
if (!entry.length() || !entry.data() || entry.length() > generator.max_len()) {
error("Got invalid entry from for_each_new_entry()");
throw Failed();
}
if (!generator.validate(entry))
throw Failed();
done = true;
if (delay)
timer.usleep(delay);
return true;
});
if (lost_count != raw_buffer.lost_entries()) {
unsigned long long current_lost = raw_buffer.lost_entries() - lost_count;
lost_count += current_lost;
return current_lost;
}
return 0;
}
void test_lost()
{
/* read a single entry (which has unexpected value) and stop */
bool recalibrated = false;
unsigned long long consumed() { return received_count; }
while (!recalibrated) {
buffer.for_each_new_entry([&] (Trace::Buffer::Entry &entry) {
if (!entry.length() || !entry.data() || entry.length() > generator.max_len()) {
error("Got invalid entry from for_each_new_entry()");
bool _try_read(bool const lossy, unsigned long long &lost)
{
bool consumed = false;
buffer.for_each_new_entry([&] (Trace::Buffer::Entry &entry) {
if (!entry.length() || !entry.data() || entry.length() > generator.max_len()) {
error("Got invalid entry from for_each_new_entry()");
throw Failed();
}
consumed = true;
if (!generator.validate(entry, !lossy)) {
if (!lossy) throw Failed();
lost = lost_entries();
if (!lost) {
error("Lost entries unexpectedly.");
throw Failed();
}
if (generator.validate(entry, false))
throw Failed();
/* reset generator value */
generator.value(entry);
recalibrated = true;
generator.skip_lost(lost);
/* abort for_each, otherwise we'd never catch up with a faster producer */
return false;
});
}
}
received_count++;
if (delay)
timer.usleep(delay);
return true;
});
return consumed;
}
void consume(bool lossy)
{
unsigned long long lost { 0 };
while (!_try_read(lossy, lost));
}
void recalibrate()
{
unsigned long long lost { 0 };
while (!_try_read(true, lost) || !lost);
}
};
@ -248,13 +273,15 @@ template <typename T>
class Test_tracing
{
private:
using Monitor = Constructible<Trace_buffer_monitor<T>>;
size_t _trace_buffer_sz;
Attached_ram_dataspace _buffer_ds;
Trace::Buffer *_buffer { _buffer_ds.local_addr<Trace::Buffer>() };
unsigned long long *_canary { (unsigned long long*)(_buffer_ds.local_addr<char>()
+ _trace_buffer_sz) };
Test_thread<T> _thread;
Trace_buffer_monitor<T> _test_monitor;
Monitor _test_monitor { };
/*
* Noncopyable
@ -270,9 +297,11 @@ class Test_tracing
Test_tracing(Env &env, size_t buffer_sz, unsigned producer_delay, unsigned consumer_delay)
: _trace_buffer_sz (buffer_sz),
_buffer_ds (env.ram(), env.rm(), _trace_buffer_sz + sizeof(_canary)),
_thread (env, *_buffer, producer_delay),
_test_monitor(env, *_buffer, consumer_delay)
_thread (env, *_buffer, producer_delay)
{
/* determine whether lost entries are interpreted as error */
bool const lossy = consumer_delay >= producer_delay;
/**
* The canary is placed right after the trace buffer. This allows us
* to detect buffer overflows. By filling the canary with a bogus
@ -281,32 +310,40 @@ class Test_tracing
*_canary = ~0ULL;
_buffer->init(_trace_buffer_sz);
log("running ", _test_monitor.generator, " test");
_thread.start();
_test_monitor.construct(env, *_buffer, consumer_delay);
log("running ", _test_monitor->generator, " test");
/* read until buffer wrapped once */
while (_buffer->wrapped() < 1)
_test_monitor.test_ok();
/* read until buffer wrapped a few times and we read 100 entries */
while (_buffer->wrapped() < 2 ||
_test_monitor->consumed() < 50) {
_test_monitor->consume(lossy);
}
/* make sure to continue reading after buffer wrapped */
_test_monitor.test_ok();
/* sanity check if test configuration triggers overwriting during read */
if (lossy && !_buffer->lost_entries())
warning("Haven't lost any buffer entry during lossy test.");
/* wait for buffer to wrap twice */
size_t const wrapped = _buffer->wrapped();
while (_buffer->wrapped() < wrapped + 2);
/* intentionally induce overwriting */
if (!lossy) {
/* wait for buffer overwriting unconsumed entries */
while (!_buffer->lost_entries());
/* read an unexpected value */
_test_monitor.test_lost();
/* read expecting lost entries*/
_test_monitor->recalibrate();
/* read some more expected entries */
_test_monitor.test_ok();
/* read some more expected entries */
_test_monitor->consume(false);
}
if (*_canary != ~0ULL) {
error("Buffer overflow, canary was overwritten with ", Hex(*_canary));
throw Overflow();
}
log(_test_monitor.generator, " test succeeded\n");
log(_test_monitor->generator, " test succeeded (",
"read: ", _test_monitor->consumed(),
", lost: ", _buffer->lost_entries(), ")\n");
}
};
@ -320,19 +357,17 @@ struct Main
{
/* determine buffer size so that Generator1 entries fit perfectly */
enum { ENTRY_SIZE = sizeof(Trace::Buffer::Entry) + sizeof(Generator1::Entry) };
enum { BUFFER_SIZE = 32 * ENTRY_SIZE + sizeof(Trace::Buffer) };
enum { BUFFER_SIZE = 32 * ENTRY_SIZE + 2*sizeof(Trace::Buffer) };
/* consume as fast as possible */
test_1.construct(env, BUFFER_SIZE, 10000, 0);
test_1.destruct();
/* leave a word-sized padding at the end */
test_1.construct(env, BUFFER_SIZE+4, 10000, 0);
/* leave a word-sized padding at the end and make consumer slower than producer */
test_1.construct(env, BUFFER_SIZE+4, 5000, 10000);
test_1.destruct();
/* XXX also test with slower consumer than producer */
/* variable-size test */
/* variable-size test with fast consumer*/
test_2.construct(env, BUFFER_SIZE, 10000, 0);
test_2.destruct();