Java: Multithreading ganz einfach!

von Hubert Schmid vom 2014-01-19

Parallelprogrammierung mit Threads und Shared-Memory ist schwierig. Solche Aussagen sind wie selbsterfüllende Prophezeiungen. Sie reduzieren die Erwartungshaltung der Entwickler bezüglich der Qualität des Codes. Daraus resultieren viele kleine Nachlässigkeiten während der Entwicklung, die sich im Laufe des Projekts verstärken und zu ernsthaften Problemen werden.

Dabei könnte es doch so einfach sein. Eine klare Strukturierung der Anwendung vorausgesetzt, ist Multithreading in Java mindestens so einfach wie jedes andere Modell paralleler Programmierung – zumindest für 90 Prozent aller Einsatzszenarien. Insbesondere ist sie viel einfacher, als einem häufig glauben gemacht wird.

Beispiel

Ein Beispiel: Vor einigen Wochen war ich bei einem Vortrag zur Actor-basierten Parallelprogrammierung in Java. Als Beispiel wurde ein minimaler Webcrawler verwendet, der einmal sequentiell und einmal nebenläufig mit Akka implementiert wurde. Dabei war die Laufzeit der zweiten Variante wenig überzeugend, was auch der Vortragende zugeben musste. Der Grund für die schlechte Performance blieb zumindest mir unklar. Überrascht war ich hingegen nicht. Denn bereits auf den ersten Folien konnte ich der Struktur der Implementierung nicht mehr folgen.

Nun ist die Implementierung eines Webcrawlers zwar kein typisches Beispiel für Parallelprogrammierung, doch ich halte daran fest. Im Folgenden zeige ich eine sequentielle und eine parallele Implementierung, wobei Letztere auf Multithreading basiert. Wie bereits erwähnt ist eine klare Struktur des Codes hilfreich. Einen wesentlichen Teil davon machen die beiden folgenden Schnittstellen-Definitionen aus.

public interface Page { List<URI> getAbsoluteLinks(); }public interface PageLoader { Page loadPage(URI uri); }

Die Schnittstelle Page repräsentiert eine gecrawlte Seite. Normalerweise würde sie eine ganze Reihe von Funktionen bereitstellen, für das Beispiel wird jedoch nur die Methode getAbsoluteLinks benötigt, die Verweise auf referenzierte URIs liefert. Die Schnittstelle PageLoader wird zum Laden von Seiten verwendet – eine sogenannte abstrakte Fabrik. Im Gegensatz zu Page müssen die Methoden in PageLoader threadsicher sein. Das heißt sie können aus mehreren Threads gleichzeitig verwendet werden, so wie das für Fabrikmethoden üblich ist.

Sequentiell

Die sequentielle Version ist sehr einfach: Beginnend mit der Start-URI werden die Seiten geladen und rekursiv den Verweisen gefolgt, die zur gleichen Site gehören und noch nicht gesehen wurden. Die Implementierung beschränkt sich auf das Minimale. Sie gibt lediglich die Menge aller transitiv gefundenen URIs zurück.

public Set<URI> crawl(PageLoader loader, URI baseUri) { Set<URI> uris = new HashSet<>(); new Object() { void process(URI uri) { loader.loadPage(uri).getAbsoluteLinks() .stream() .filter(this::belongsToBaseUri) .filter(uris::add) .forEach(this::process); } boolean belongsToBaseUri(URI uri) { /* ... */ } }.process(baseUri); return uris; }

Ein wenig merkwürdig wirkt möglicherweise die Verwendung der anonymen Klasse. Leider unterstützen die Lambda-Ausdrücke aus Java 8 noch keine Rekursion. Die einfachste Lösung dafür sind anonyme Klassen. Alternativ hätte man auch die komplette crawl-Methode in eine Klasse verpacken können – müsste dabei jedoch auf die implizite Closure über loader, baseUri und uris verzichten.

Parallel

Die parallele Ausführung kleiner Tasks erfolgt in Java üblicherweise über eine der beiden Schnittstellen Executor oder ExecutorService aus dem Paket java.util.concurrent. Die Klasse Executors enthält zahlreiche Fabrikmethoden für Implementierungen mit unterschiedlichen Strategien. Ein Executor ist vergleichsweise schwergewichtig und sollte daher nach Möglichkeit wiederverwendet werden. Aus diesem Grund wird er in der parallelen Implementierung als zusätzlicher Parameter übergeben. Dabei spielt es keine Rolle, ob er auch gleichzeitig an anderer Stelle verwendet wird.

Abgesehen davon ist die Implementierung sehr ähnlich zur sequentiellen Variante. Der wichtigste Unterschied ist, dass die Verweise nicht rekursiv abgearbeitet sondern als neue Tasks dem Executor übergeben werden. Außerdem wird für die Datenstruktur uris eine synchronisierte Sicht verwendet, da sie von den unterschiedlichen Tasks gemeinsam benutzt wird.

public Set<URI> crawl(Executor executor, PageLoader loader, URI baseUri) { Set<URI> uris = new HashSet<>(); Set<URI> synchronizedUris = Collections.synchronizedSet(uris); new Object() { void submit(URI uri) { executor.execute(() -> loader.loadPage(uri).getAbsoluteLinks() .stream() .filter(this::belongsToBaseUri) .filter(synchronizedUris::add) .forEach(this::submit)); } boolean belongsToBaseUri(URI uri) { /* ... */ } }.submit(baseUri); return uris; }

Die Implementierung hat noch ein gravierendes Problem: Die Methode wartet nicht bis alle Tasks abgearbeitet sind, die zurückgegebene Liste ist sehr wahrscheinlich unvollständig, und der Aufrufer hat keine Möglichkeit die Vollständigkeit zu erkennen.

Ist die Liste abzuarbeitender Tasks a priori bekannt, so gibt es eine sehr einfache Lösung: Die Methode ExcecutorService.invokeAll führt mehrere Tasks parallel aus und wartet bis alle fertig sind. Beim Webcrawler kommen jedoch laufend neue Tasks hinzu, was eine andere Lösung erfordert. Die Standardbibliothek liefert dazu leider keine klare Antwort. Allerdings gibt es ein häufig verwendetes Pattern: Der Executor wird mit einer leichtgewichtigen Klasse dekoriert, die zusätzlich Closeable implementiert, und deren close-Methode wartet, bis alle über sie eingestellten Tasks fertig sind. Dann wird der komplette Rumpf in eine try-Anweisung verpackt, und jede Verwendung des Executors durch die dekorierte Instanz ersetzt. Die Umsetzung ist im folgenden Listing skizziert. Die Implementierung des ScopedExecutors besteht nur aus wenigen Zeilen.

public Set<URI> crawl(Executor executor, PageLoader loader, URI baseUri) { try (ScopedExecutor scopedExecutor = new ScopedExecutor(executor)) { Set<URI> uris = new HashSet<>(); ... return uris; } // scopedExecutor.close() blocks until all tasks are complete }
class ScopedExecutor implements Executor, Closeable {
    private final Executor delegate;
    private final AtomicInteger pending = new AtomicInteger();
    private final Object condition = new Object();
    public ScopedExecutor(Executor delegate) {
        this.delegate = delegate;
    }
    public void execute(Runnable runnable) {
        delegate.execute(() -> {
            try {
                runnable.run();
            } finally {
                if (pending.addAndGet(-1) == 0) {
                    synchronized (condition) {
                        condition.notify();
                    }
                }
            }
        });
        pending.addAndGet(1);
    }
    public void close() {
        try {
            synchronized (condition) {
                while (pending.get() > 0) {
                    condition.wait();
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException("wrapped exception", e);
        }
    }
}

Ergebnis

Da das Netzwerk beim Webcrawler den Flaschenhals darstellt, verwundert es nicht, dass der Durchsatz der parallelen Variante linear mit der Anzahl der Threads im Excecutor steigt. Interessant ist jedoch der Unterschied zwischen den beiden Varianten auf Quelltextebene: Der ist minimal und die Parallelisierung trivial. Also ist doch alles ganz einfach!