Beispiel für Parallelprogrammierung mit C++11
Das Interesse an paralleler Programmierung ist in letzter Zeit gefühlt stark gestiegen – getrieben durch die zunehmende Erkenntnis, dass sich die Performance einzelner CPU-Kerne in den letzten Jahren tatsächlich nicht signifikant entwickelt hat, und die Zukunft auch nichts anderes verspricht. Die augenscheinlich vorherrschende Meinung ist, dass etablierte Programmiersprachen weitere Sprachkonstrukte – wie beispielsweise pure Funktionsausdrücke und unveränderliche Datenstrukturen – benötigen, um die Hardware sinnvoll und effizient ausnutzen zu können.
Diese Bewegung basiert auf der Annahme, dass die bisherige Unterstützung für Parallelität die Entwickler im Allgemeinen überfordert oder zumindest in ihrer Produktivität signifikant beeinträchtigt. Stattdessen sollen die Entwicklungs- und Laufzeitumgebung die Parallelisierung weitgehend automatisch und transparent für den Entwickler durchführen. So stellt man sich beispielsweise vor, dass im folgenden Python-Fragment die filter
-Funktion parallel ausgeführt wird, wobei sich der Code funktional identisch zu einer sequentiellen Ausführung verhält.
def find_primes(bound):
# define pure function
is_prime = lambda value: ...
# will automatically be executed in parallel
return filter(is_prime, range(bound))
Ich finde diese Bestrebungen sehr gut und zielführend. Andererseits bin ich allerdings auch der Ansicht, dass sich die Unterstützung unabhängig davon signifikant verbessert hat, und damit parallele Programme einfach umgesetzt werden können.
Ich möchte das mit dem folgenden Beispiel zeigen. Das fast vollständig angegebene Programm gibt alle Primzahlen bis zu der vorgegebenen Schranke aus. Normalerweise würde man dafür einen anderen Algorithmus (Sieb des Eratosthenes) verwenden, der wesentlich effizienter ist. Mir geht es lediglich darum, ein einfaches und verständliches Beispiel zu haben.
// data type used for prime numbers
using prime_t = unsigned int;
bool is_prime(prime_t value)
{
// Note that this implementation is used only for demonstration
// purposes. The algorithm is inefficient. However, for this
// example, the CPU consumption is quite useful.
for (prime_t i = 2; i < value; ++i) {
if (value % i == 0) {
return false;
}
}
return true;
}
// This functions returns a sorted sequence of all prime number less
// than bound.
auto find_primes(prime_t bound) -> std::vector<prime_t>
{
std::vector<prime_t> result;
for (prime_t i = 2; i < bound; ++i) {
if (is_prime(i)) {
result.push_back(i);
} // else: nothing to do
}
return result;
}
int main()
{
for (auto value : find_primes(100000)) {
std::cout << value << '\n';
}
}
Dieses Programm ist rein sequentiell und in dieser Form typisch für imperative Programmierung. Das Ziel besteht nun darin, die Funktion find_primes
so zu umzuschreiben, dass sie mehrere CPU-Kerne parallel und mit möglichst effizient nutzt. Dafür verwende ich die folgende Hilfsfunktion fork
. Sie führt eine Funktion mit Argumenten parallel aus und liefert die Einzelergebnisse als Futures zurück. Diese werden hauptsächlich verwendet, um sowohl reguläre Ergebnisse als auch Ausnahmen transparent an die Aufrufstelle zu übertragen. Unten gehe ich genauer darauf ein.
template <typename Callable, typename... Args>
auto fork(std::size_t concurrency, Callable&& callable, Args&&... args)
-> std::vector<std::future<typename std::result_of<Callable(Args...)>::type>>;
Auf den ersten Blick fällt auf, dass die Implementierung durch die Parallelisierung deutlich länger geworden ist. Ich finde sie allerdings immer noch verhältnismäßig einfach. Abgesehen von der zusätzlichen Absicherung gegen einen potentiellen Integer-Überlauf besteht sie im wesentlichen aus den folgenden Teilen: Die Atomic-Variable current
wird für die effiziente und fein-granulare Verteilung der Arbeitspakete auf die Worker-Threads verwendet. Die durch die Variable worker
referenzierte Lambda-Funktion übernimmt die eigentliche Arbeit und bestimmt für jede Ausführung ein Teil der Primzahlen. Mit der Hilfsfunktion fork
wird die Berechnung parallel ausgeführt und in einer Schleife die Einzelergebnisse zusammengeführt. Und schließlich erfolgt die Nachbearbeitung und Rückgabe des Gesamtergebnis.
auto find_primes(prime_t bound, std::size_t concurrency)
-> std::vector<prime_t>
{
if (bound + static_cast<prime_t>(concurrency) - 1 < bound) {
// avoid overflow in execution below
throw std::invalid_argument("invalid bound");
}
// used for work distribution
std::atomic<prime_t> current{2};
// use lambda expression for actual worker
auto&& worker = [¤t,bound]() -> std::vector<prime_t> {
std::vector<prime_t> result;
prime_t value;
constexpr auto relaxed = std::memory_order_relaxed;
while ((value = current.fetch_add(1, relaxed)) < bound) {
if (is_prime(value)) {
result.push_back(value);
} // else: nothing to do
}
return result;
};
// launch workers and collect results
std::vector<prime_t> result;
for (auto&& future : fork(concurrency, worker)) {
auto&& values = future.get();
result.insert(result.end(), values.begin(), values.end());
}
// sort result, because the values are most likely not in
// ascending order due to the parallel execution of the workers.
std::sort(result.begin(), result.end());
return result;
}
Die Hilfsfunktion fork
ist konzeptionell eigentlich ganz einfach. Ein erster Wurf könnte wie folgt aussehen.
template <typename Callable, typename... Args>
auto fork(std::size_t concurrency, Callable&& callable, Args&&... args)
-> std::vector<std::future<typename std::result_of<Callable(Args...)>::type>>
{
using result_type = typename std::result_of<Callable(Args...)>::type;
std::vector<std::future<result_type>> futures;
for (std::size_t i = 0; i != concurrency; ++i) {
futures.push_back(std::async(std::launch::async, callable, args...));
}
return futures;
}
Diese Implementierung sieht zwar harmlos aus. Bei genauerer Betrachtung zeigen sich allerdings zwei gravierende Probleme, die beide mit Ausnahmen zu tun haben. Das erste Problem steht in Zusammenhang mit Ausnahmen, die bei der Ausführung der übergebenen Funktion geworfen werden. Diese Ausnahmen werden korrekt im Future verpackt und damit zum Aufrufer zurückgegeben. Für die Funktion fork
ist das zunächst in Ordnung, jedoch ist die Behandlung beim Aufrufer sehr fehleranfällig. Denn wenn der Aufrufer nicht auf jedes Future zugreift – beispielsweise wegen einer Ausnahme, kann die asynchrone Abarbeitung der verbleibenden Tasks noch weiterlaufen, und damit zu sehr unerwarteten Ergebnissen führen. Um dieses Problem zu vermeiden, lasse ich die fork
-Funktion am Ende warten, bis alle gestarteten Tasks abgearbeitet sind.
Das zweite Problem betrifft Ausnahmen, die in der fork
-Funktion selbst geworfen werden – beispielsweise wenn nicht genügend Ressourcen vorhanden sind, um eine weitere asynchrone Abarbeitung zu starten. Meiner Meinung nach sollte der Aufrufer in diesem Fall zumindest die Möglichkeit haben, an die Ergebnisse der erfolgreich gestarteten Ausführungen zu gelangen. Denn diese können aufgrund von Nebeneffekten für die weitere Abarbeitung relevant sein.
Wünschenswert sind zumindest die folgenden beiden Eigenschaften: Wenn die fork
-Funktion eine Ausnahme wirft, dann gibt es keine Nebeneffekte. Und umgekehrt, wenn eine Abarbeitung gestartet wurde, dann gibt die Funktion in jedem Fall die vollständige Anzahl Futures zurück. Das Ganze ist in der folgenden Implementierung umgesetzt.
template <typename Callable, typename... Args>
auto fork(std::size_t concurrency, Callable&& callable, Args&&... args)
-> std::vector<std::future<typename std::result_of<Callable(Args...)>::type>>
{
// Pre-allocate resources, so that beginning with the execution
// of the first worker, all exceptions can be easily handled.
using result_type = typename std::result_of<Callable(Args...)>::type;
std::vector<std::future<result_type>> futures;
futures.reserve(concurrency);
// The promises will only be used, if the async execution of the
// workers fails.
std::vector<std::promise<result_type>> promises{concurrency};
for (std::size_t i = 0; i != concurrency; ++i) {
try {
futures.push_back(std::async(std::launch::async, callable, args...));
} catch (...) {
// The following statements are not supposed to throw
// any exceptions, however, this isn't guaranteed by the
// standard.
promises[i].set_exception(std::current_exception());
futures.push_back(promises[i].get_future());
}
}
// Wait for the completion of all asynchronous taks. This
// simplifies the exception handling on the caller side.
for (auto&& future : futures) {
future.wait();
}
return futures;
}
Auch wenn meine parallele Primzahlberechnung ein wenig länger als der Python-Code geworden ist, den ich initial gezeigt habe, so sollte doch angekommen sein, dass parallele Programmierung mit den aktuell vorhandenen Bibliotheken relativ einfach geworden ist. Die gezeigte Implementierung verhält sich robust – auch bei stark beschränkten Ressourcen, und nutzt die Hardware effizient aus. Und die zu beachtenden Details ergeben sich durch ein wenig Erfahrung.