Llibreria Java concurrent

La llibreria java.util.concurrent conté classes útils quan fem concurrència:

  • Executors: la interfície Executor permet representar un objecte que executa tasques. ExecutorService permet el processament asíncron, gestionant una cua i executant les tasques enviades segons la disponibilitat dels fils.
  • Cues: ConcurrentLinkedQueue, BlockingQueue.
  • Sincronitzadors: els clàssics semàfors (Semaphore), CountDownLatch.
  • Col·leccions concurrents: per exemple, ConcurrentHashMap, o els mètodes de Collections synchronizedMap(), synchronizedList() i synchronizedSet().
  • Variables que permeten operacions atòmiques sense bloqueig al paquet java.util.concurrent.atomic: AtomicBoolean, AtomicInteger, etc.

Sempre és preferible utilitzar aquestes classes que els mètodes de sincronització wait/notify, perquè simplifiquen la programació. De la mateixa manera que és millor utilitzar executors i tasques que fils directament.

Tasques i executors

La majoria d'aplicacions concurrents s'organitzen mitjançant tasques. Una tasca realitza una feina concreta. D'aquesta forma, podem simplificar el disseny i el funcionament.

Veiem una possible solució per a la gestió de connexions a un servidor. Suposem que tenim un mètode, atendrePeticio(), que atén una petició web.

Execució seqüencial

class ServidorWebUnFil { public static void main(String[] args) throws IOException { ServerSocket socol = new ServerSocket(80); while (true) { Socket connexio = socol.accept(); atendrePeticio(connexio); } } }

Un fil per cada petició

class ServidorWebUnFilPerPeticio { public static void main(String[] args) throws IOException { ServerSocket socol = new ServerSocket(80); while (true) { Socket connexio = socol.accept(); Runnable tasca = new Runnable() { @Override public void run() { atendrePeticio(connexio); } } new Thread(tasca).start(); } } }

Grup compartit de fils

class ServidorWebExecucioTasques { private static final int NFILS = 100; private static final Executor executor = Executors.newFixedThreadPool(NFILS); public static void main(String[] args) throws IOException { ServerSocket socol = new ServerSocket(80); while (true) { final Socket connexio = socol.accept(); Runnable tasca = new Runnable() { public void run() { atendrePeticio(connexio); } }; executor.execute(tasca); } } }

En aquesta solució hem introduït la interfície Executor:

public interface Executor { void execute(Runnable command); }

És un objecte que permet executar Runnables. Internament, el que fa és executar tasques de forma asíncrona, creant un fil per cada tasca en execució, i retornant el control al fil que crida el seu mètode execute. Les tasques poden tenir quatre estats:

  • Creada
  • Enviada
  • Iniciada
  • Completada

Els Executors es poden crear des de la classe amb mètodes estàtics Executors. Aquesta classe retorna una subclasse de Executor, l'ExecutorService. Aquesta subclasse usa el patró Thread Pool, que reutilitza un nombre màxim de fils entre una sèrie de tasques a una cua.

Un ExecutorService ha de parar-se sempre amb el mètode shutdown(), que para tots el fils del pool.

Tasques amb resultats

Algunes tasques retornen resultats. Per implementar-les, podem utilitzar les interfícies Callable i Future:

public interface Callable<V> { V call() throws Exception; } public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException, CancellationException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException; }

Callable<V> permet executar la tasca i retornar un valor del tipus V. Per tal de poder executar-la, necessitem un ExecutorService. En particular, els seus dos mètodes:

  • Future<?> submit(Runnable task)
  • <T> Future<T> submit(Callable<T> task)

Aquests permeten executar un Runnable / Callable i retornen un Future, que és un objecte que permet obtenir el resultat en diferit mitjançant el mètode get() (bloqueig) o get(long timeout, TimeUnit unit) (bloqueig per un temps).

També podem cancel·lar la tasca mitjançant cancel(boolean mayInterruptIfRunning): el paràmetre diu si es vol interrompre també si ja ha començat.

Els ExecutorService poden crear-se mitjançant la mateixa classe que hem vist abans, Executors.

A continuació, un exemple de funcionament. Com canvia l'execució si fem Executors.newFixedThreadPool(2)?

public class SimpleCallableTest { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(1); Future<String> f1 = executor.submit(new ToUpperCallable("hello")); Future<String> f2 = executor.submit(new ToUpperCallable("world")); try { long millis = System.currentTimeMillis(); System.out.println("main " + f1.get() + " " + f2.get() + " in millis: " + (System.currentTimeMillis() - millis)); } catch (InterruptedException | ExecutionException ex) { ex.printStackTrace(); } executor.shutdown(); } private static final class ToUpperCallable implements Callable<String> { private String word; public ToUpperCallable(String word) { this.word = word; } @Override public String call() throws Exception { String name = Thread.currentThread().getName(); System.out.println(name + " calling for " + word); Thread.sleep(2500); String result = word.toUpperCase(); System.out.println(name + " result " + word + " => " + result); return result; } } }

A Java 7 es va introduir el framework fork/join.

A Java 8 es va introduir el CompletableFuture, que permet combinar futurs i gestionar millor els errors que es produeixen. Un exemple és l'ús del mètode complete per a completar un futur, en un altre fil:

CompletableFuture<String> completableFuture = new CompletableFuture<>(); //... String resultat = completableFuture.get(); // mentre en un altre fil... completableFuture.complete("Hola, món!);

O bé, la possibilitat d'executar directament amb supplyAsync:

Supplier<String> supplier = new Supplier<String>() { @Override public String get() { return "Hola, món!"; } }; Future<String> future = CompletableFuture.supplyAsync(supplier, executor); // executor és opcional System.out.println(future.get());