Iterative Codeentwicklung am Beispiel eines Bounded-Counter in C++

von Hubert Schmid vom 2012-02-12

Es gibt viele Möglichkeiten guten Code zu schreiben. Nach meiner Erfahrung entsteht solcher Code aber in den seltensten Fällen beim ersten Versuch. Stattdessen sind aus meiner Sicht mehrere, unmittelbar aufeinanderfolgende Iterationen notwendig, die noch vor der ersten Ausführung durchlaufen werden. Denn erst wenn ich den Code sehe, kann ich ein Gefühl für dessen Eleganz entwickeln. Und was bringt es mir Code auszuführen, der keine weiteren fünf Minuten überlebt.

Ich versuche in diesem Artikel dieses Vorgehen an einem Beispiel zu demonstrieren und starte mit der Motivation der Problemstellung: Um den möglichen Missbrauch eines technischen Dienstes einzuschränken, sollen die innerhalb einer bestimmten Zeitspanne ausführbaren Operationen pro Verursacher eingeschränkt werden. Ein konkreter Anwendungsfall für eine Webanwendung könnte so aussehen, dass von jeder IP‑Adresse maximal zehn Login-Versuche innerhalb von fünf Minuten akzeptiert werden sollen. Die Datenstruktur bietet dafür eine Funktion, die angibt wie viele Versuche innerhalb der letzten fünf Minuten registriert wurden. Außerdem enthält sie eine Funktion um die aktuelle Operation zu erfassen.

Es gibt zwei wichtige Dinge, die ich ergänzend dazu sagen muss: Erstens steht für mich in diesem Artikel nicht der fachliche Anwendungsfall im Vordergrund, sondern technische Aspekte bei der Verwendung der eingesetzten Datenstrukturen. Damit ich mich darauf fokussieren kann, vereinfache ich die Aufgabenstellung weiter und ersetze den zeitlichen Aspekt durch eine Beschränkung auf die Gesamtoperationen. Zweitens werde ich in diesem Artikel keine Alternativen zu den fachlichen Anforderungen diskutieren. Die obige Aufgabenstellung erzwingt, dass jedes Ereignis einzeln erfasst ist, wofür relativ viel Speicher benötigt wird. Ich werde ein anderes Mal eine Variante zeigen, mit der man in der Praxis das gleiche Ziel erreicht, die aber nur einen Bruchteil des Speichers benötigt.

/* * Instances of this class store the last N added strings. And there is a * function to query the number of occurrences for a certain string. * Instances of this class may be used for a simple frequency analysis. */ class bounded_counter { private: // --- fields --- std::size_t _limit; // ... public: // --- con-/destruction --- /* * Construct an instance that will store the last limit strings. * Initially the are no stored strings. */ explicit bounded_counter(std::size_t limit) : _limit{limit} { } public: // --- methods --- /* * Returns the number of occurrences of the given string within the last * limit added strings. */ auto get(const std::string& key) const -> std::size_t; /* * Add the given string to this instance. The instance only needs to * keep the last LIMIT strings. If the limit is already reached, it is * possible to remove the eldest value. */ void add(const std::string& key); };

Ich haben mich bei der Schnittstelle auf die minimal notwendige Funktionalität beschränkt. Die erste Implementierung davon sieht auch sein einfach aus. Ich verwende eine std::deque um die Ereignisse zu speichern. Diese Datenstruktur ist für diesen Zweck sehr geeignet, da das Einfügen und Löschen an beiden Enden sehr effizient ist, und gleichzeitig die Datenstruktur nur einen geringen Speicher-Overhead mit sich bringt.

class bounded_counter { private: // --- fields --- std::size_t _limit; /* * Sequence of strings - sorted by insertion order - with the eldest at * the front and the newest at the back. */ std::deque<std::string> _items; public: // --- con-/destruction --- explicit bounded_counter(std::size_t limit); public: // --- methods --- auto get(const std::string& key) const -> std::size_t { // NOTE: linear time algorithm return std::count(_items.begin(), _items.end(), key); } void add(const std::string& key) { _items.push_back(key); trim_to_limit(); } private: // --- methods --- void trim_to_limit() { while (_items.size() > _limit) { _items.pop_front(); } } };

Diese Implementierung ist aus meiner Sicht sehr verständlich und geradlinig. Dennoch ist sie vollkommen ungeeignet, da die Zeitkomplexität der Abfragefunktion viel zu hoch ist. Ich werde darauf gleich genauer eingehen, aber zunächst möchte ich noch auf ein ganz andere Besonderheit eingehen.

Im obigen Beispiel wird die Zeichenkette beim Einfügen in die Datenstruktur kopiert. Mit der Move-Semantik aus C++11 lässt sich diese Kopieroperation einfach vermeiden. Ein wenig erstaunlich ist, dass diese Eigenschaft durch Call-By-Value erreicht wird. Denn wenn der Aufrufer den Parameter mit einem R‑Value initialisiert, dann wird dessen Speicher bis in den Container verschoben. Initialisiert der Aufrufer den Parameter hingegen mit einem L‑Value, so wird automatisch eine Kopie angelegt, die in diesem Fall aber auch benötigt wird.

// NOTE: call-by-value enables move semantics. void add(std::string key) { // NOTE: avoid copy construction _items.push_back(std::move(key)); trim_to_limit(); }

Zurück zur Performance: Eine (annähernd) konstante Zeitkomplexität lässt sich erreichen, indem die aktuelle Anzahl der Vorkommen pro Zeichenkette in einer Hash-Map redundant gespeichert und inkrementell aktualisiert wird. Da Änderungen der Datenstruktur nur an zwei Stellen stattfinden, lässt sich das auch mit sinnvollem Aufwand realisieren. Man darf dabei nur nicht vergessen, die Einträge aus der Map zu löschen, wenn der Zähler auf Null fällt. Denn andernfalls erzeugt man ein typisches Memory-Leak, bei dem auch eine Garbage-Collection nicht mehr hilft.

class bounded_counter { private: // --- fields --- std::size_t _limit; std::deque<std::string> _items; /* * The hash map contains exactly the number of occurrences within _items * - for all strings that exists at least once. There are no entries for * strings that do no exist in _items, because it is difficult to get * rid of them later. */ std::unordered_map<std::string, std::size_t> _count; public: // --- con-/destruction --- explicit bounded_counter(std::size_t limit); public: // --- methods --- auto get(const std::string& key) const -> std::size_t { // NOTE: constant time algorithm (depending on hash quality) auto p = _count.find(key); return p == _count.end() ? 0 : p->second; } void add(std::string key) { // keep _items and _count in sync ++_count[key]; _items.push_back(std::move(key)); trim_to_limit(); } private: // --- methods --- void trim_to_limit() { while (_items.size() > _limit) { auto p = _count.find(_items.front()); _items.pop_front(); assert(p != _count.end()); // according to class invariant // keep _items and _count in sync --p->second; if (p->second == 0) { _count.erase(p); } } } };

Dieser Code enthält einen gravierenden und nicht ganz offensichtlichen Fehler, auf den ich gleich eingehen werde. Bevor ich dieses Problem korrigiere, möchte ich allerdings noch zwei andere Änderungen vornehmen.

Auch wenn viele Objekte gleich sind, so sind doch manche Objekte gleicher. Das klingt ein wenig merkwürdig, ist aus Performance-Gründen aber praktisch relevant und charakteristisch für std::string. Wenn beispielsweise ein Objekt durch Kopie aus einen anderem Objekt erzeugt wird, da kann es sich nicht-funktional ein wenig anders anfühlen, als ein Objekt mit lediglich dem gleichen Inhalt. Ein Stichwort dazu ist Copy-On-Write.

Ich nenne das Idiom die Bevorzugung gleicherer Objekte. In diesem Beispiel verwende ich die Einträge der Map als Repräsentanten der Äquivalenzklassen, und die Einträge der Deque werden daraus generiert. Für sich genommen ist diese Änderung noch keine Verbesserung (ganz im Gegenteil), aber sie ist ein wichtiger Schritt auf dem Weg zum Ziel.

void add(std::string key) { auto p = _count.emplace(std::move(key), 0).first; /* Use existing string from map instead of function parameter. This * may reduce the memory consumption, if related strings share their * content (e.g. copy-on-write). */ _items.push_back(p->first); ++p->second; trim_to_limit(); }

Durch diese Änderung ist der Funktionsparameter nicht mehr optimal bezüglich der Vermeidung unnötiger Kopieroperationen, da nun bei jedem Aufruf eine Kopie erzeugt wird, die nicht in jedem Fall verwendet wird. Dieses Mal ist es aber auch nicht damit getan, aus dem Parameter wieder eine konstante Referenz zu machen, denn dann würde eine unnötige Kopie angelegt werden, wenn der Aufrufer einen R‑Value verwendet und die Zeichenkette in der Map noch nicht existiert. Will man dieses Problem lösen, so benötigt man entweder für beide Fälle jeweils eine Funktion oder ein Funktionstemplate. Ich habe mich für eine Kombination entschieden. Die öffentliche Schnittstelle enthält zwei konkrete Funktionen, während die eigentliche Implementierung durch privates Funktionstemplate bereitgestellt wird – das bemerkenswerterweise die beiden Fälle nicht explizit unterscheiden muss.

void add(const std::string& key) { add_aux(key); } void add(std::string&& key) { add_aux(std::move(key)); } /* * This template works perfectly for l-value- and r-value-references. * There are no unnecessary copies, and move semantics will be used if * possible. Unfortunately this can not be done without a template (or * duplicating the function). */ template <typename String> void add_aux(String&& key) { auto p = _count.emplace(std::forward<String>(key), 0).first; _items.push_back(p->first); ++p->second; trim_to_limit(); }

Zurück zum Fehler, der sich durch die letzten Beispiele mit kleinen Variationen durchgeschlichen hat, dessen Betrachtung ich aber bewusst zurückgestellt habe, um den Code möglichst lange einfach zu halten. In der letzten Implementierung kann die Klasseninvariante beim Einfügen verletzt werden, wenn die Zeichenkette in der Map noch nicht existiert und das Einfügen in die Deque aufgrund der Speicherbeschränkung fehlschlägt. In den vorherigen Versionen war dieses Problem noch gravierender. Das Objekt hätte sich dann in einem inkonsistenten Zustand befunden, bei dem für ein Schlüssel stets ein zu hoher Wert angenommen wird. Besonders gefährlich daran ist, dass diese Situation vom Objekt selbst nicht erkannt wird und noch nicht einmal dokumentiert ist. Der Aufrufer müsste eigentlich das Objekt bei der ersten Exception zerstören und wieder von Vorne zu zählen beginnen.

Diese Situation ist natürlich unbefriedigend. Man sagt auch, dass die Funktion nur die Minimal-Exception-Safety garantiert. Ich erwarte hingegen von einer Klasse mit dieser Funktionalität bei allen Operationen mindestens die Strong-Exception-Safety (auch als Commit-Or-Rollback-Semantics bekannt). In vielen Fällen lässt sich diese Eigenschaft relativ einfach erreichen. In diesem Fall fällt mir aber nichts Besseres als ein try-catch-Block ein, der im Fehlerfall die vorherige Operation rückgängig macht. Man kann sich davon überzeugen oder mir auch einfach nur glauben, dass alle anderen Stellen in diesem Beispiel bereits diesbezüglich korrekt sind.

template <typename String> void add_aux(String&& key) { auto p = _count.emplace(std::forward<String>(key), 0).first; try { /* The following operation can fail due to out-of-memory. In * this case, we have to rollback the modification of _count. * Otherwise the class invariant will be violated. */ _items.push_back(p->first); ++p->second; } catch (...) { // rollback and rethrow (note that erase does not throw) if (p->second == 0) { _count.erase(p); } throw; } trim_to_limit(); }

Bisher habe ich stets argumentiert, dass die Deque die primäre Datenstruktur ist, und dass die Map lediglich für den schnellen Zugriff verwendet wird und synchron zur Deque gehalten werden muss. Wenn man sich den Code aber nochmals anschaut, dann wird man feststellen, dass diese Aufteilung in primäre und sekundäre Datenstruktur eigentlich nicht sichtbar ist. Man kann sogar umgekehrt argumentieren, dass die Map die primäre Datenstruktur ist, und die Deque lediglich verwendet wird, um zum richtigen Zeitpunkt den Wert in der Map herunter zu zählen.

Diese umgedrehte Sichtweise eröffnet ganz neue Möglichkeiten. Der erste Ansatz könnte sein, in der Deque nur noch die Zeiger auf die in der Map enthaltenen Zeichenketten zu speichern. Dadurch würde der Speicherverbrauch signifikant reduziert werden, aber schön sieht das nicht aus. Wesentlich eleganter und performanter ist es, wenn man in der Deque stattdessen einen Iterator auf die Map speichert, der gleichzeitig auch den Zusammenhang dieser beiden Datenstrukturen deutlich macht. Dadurch werden nicht nur die Kopien der Zeichenketten eingespart, sondern auch die Suchoperationen auf der Map werden durch den direkten Zugriff abgelöst.

Diese Änderung betrifft sowohl die Member-Variablen als auch zwei Funktionen. Da möglicherweise aufgrund der vielen einzelnen Änderungen in der Zwischenzeit die Gesamtsicht ein wenig verloren gegangen ist, füge ich abschließend ein weitgehend vollständiges Fragment ein.

Ich habe die einzelnen Entwicklungsschritte in diesem Artikel relativ ausführlich dargestellt. Mit einer gewissen Erfahrung wird man in der Praxis einige davon überspringen. Meine Erfahrung ist aber, dass man mehrere Schritte bei der Entwicklung hochwertigen Codes fast immer durchläuft. Solcher Code entsteht nicht aus dem Nichts, sondern erfordert eine permanente, kritische Betrachtung während der Entwicklung, bis man ein stimmiges Gesamtbild erreicht.

/* * Instances of this class store the last N added strings. And there is a * function to query the number of occurrences for a certain string. * Instances of this class may be used for a simple frequency analysis. */ class bounded_counter { private: // --- fields --- std::size_t _limit; /* * The hash map contains exactly the number of occurrences for the last * LIMIT strings. This data structure is tightly coupled with _items, * which contains the actual order of the added strings. There are no * entries for strings that do no exist in _items, because it is * difficult to get rid of them later. */ std::unordered_map<std::string, std::size_t> _count; /* * Sequence of added strings - sorted by insertion order - with the * eldest at the front and the newest at the back. Instead of storing * the actual string, the data structure contains iterators to nodes in * _count, which contain the strings. On the one hand that reduces the * memory consumption if strings occur more than once. On the other hand * that improves the performance. */ std::deque<decltype(_count)::iterator> _items; public: // --- con-/destruction --- // ... public: // --- methods --- // ... private: // --- methods --- /* * This template works perfectly for l-value- and r-value-references. * There are no unnecessary copies, and move semantics will be used if * possible. Unfortunately this can not be done without a template (or * duplicating the function). */ template <typename String> void add_aux(String&& key) { // keep _items and _count in sync auto p = _count.emplace(std::forward<String>(key), 0).first; try { /* The following operation can fail due to out-of-memory. In * this case, we have to rollback the modification of _count. * Otherwise the class invariant will be violated. */ _items.push_back(p); ++p->second; } catch (...) { // rollback and rethrow (note that erase does not throw) if (p->second == 0) { _count.erase(p); } throw; } trim_to_limit(); } void trim_to_limit() { while (_items.size() > _limit) { auto p = _items.front(); _items.pop_front(); // keep _items and _count in sync --p->second; if (p->second == 0) { _count.erase(p); } } } };