package eu.dnetlib.enabling.resultset; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.xml.ws.wsaddressing.W3CEndpointReference; import eu.dnetlib.miscutils.functional.UnaryFunction; public class ParallelMappedResultSetFactory extends MappedResultSetFactory { private ExecutorService executor; private static final int QUEUE_SIZE = 40; private int queueSize = QUEUE_SIZE; private int cpus = 0; public ParallelMappedResultSetFactory() { super(); if (cpus == 0) cpus = getNumberOfCPUs(); executor = new ThreadPoolExecutor(cpus, cpus, 5, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize), new ThreadPoolExecutor.CallerRunsPolicy()); } @Override public W3CEndpointReference createMappedResultSet(final W3CEndpointReference source, final UnaryFunction mapper) { return getResultSetFactory().createResultSet(new ParallelMappedResultSet(source, mapper, getServiceResolver(), executor)); } private int getNumberOfCPUs() { return Runtime.getRuntime().availableProcessors(); } public int getCpus() { return cpus; } public void setCpus(int cpus) { if(cpus > 0) this.cpus = cpus; } }