package eu.dnetlib.openaire.dsm.dao; import java.io.IOException; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.*; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.*; import eu.dnetlib.DnetOpenaireExporterProperties; import eu.dnetlib.enabling.datasources.common.DsmException; import eu.dnetlib.miscutils.functional.hash.Hashing; import eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils; import eu.dnetlib.openaire.dsm.dao.utils.IndexDsInfo; import eu.dnetlib.openaire.dsm.dao.utils.IndexRecordsInfo; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocument; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; /** * Created by claudio on 20/10/2016. */ @Component @ConditionalOnProperty(value = "openaire.exporter.enable.dsm", havingValue = "true") public class DatasourceIndexClientImpl implements DatasourceIndexClient { private static final Log log = LogFactory.getLog(DatasourceIndexClientImpl.class); public static final String SEPARATOR = "::"; public static final String DSVERSION = "__dsversion"; @Autowired private DnetOpenaireExporterProperties config; private ListeningExecutorService executor; private static final Map indexClientMap = new ConcurrentHashMap<>(); @PostConstruct public void init() { executor = MoreExecutors.listeningDecorator( new ScheduledThreadPoolExecutor(5, new ThreadFactoryBuilder().setNameFormat("datasource-index-client-%d").build())); } @PreDestroy public void tearDown() { indexClientMap.forEach((name, client) -> { try { client.close(); } catch (IOException e) { log.warn(String.format("unable to gracefully shutdown client for index %s", name)); } }); } @Override public IndexRecordsInfo getIndexInfo(final String dsId, final IndexDsInfo info, final Queue errors) throws DsmException { try { final String collectedFrom = StringUtils.substringBefore(dsId, SEPARATOR) + SEPARATOR + Hashing.md5(StringUtils.substringAfter(dsId, SEPARATOR)); final CloudSolrClient indexClient = getIndexClient(info); final CountDownLatch latch = new CountDownLatch(2); final IndexRecordsInfo indexRecordInfo = new IndexRecordsInfo(); Futures.addCallback( executor.submit(() -> setDateAndTotal(dsId, collectedFrom, indexClient)), new FutureCallback() { @Override public void onSuccess(final IndexRecordsInfo info) { indexRecordInfo .setTotal(info.getTotal()) .setDate(info.getDate()); latch.countDown(); } @Override public void onFailure(final Throwable e) { errors.offer(e); latch.countDown(); } }, executor); Futures.addCallback( executor.submit(() -> setFunded(dsId, collectedFrom, indexClient)), new FutureCallback() { @Override public void onSuccess(final Long numFound) { indexRecordInfo.setFunded(numFound); latch.countDown(); } @Override public void onFailure(final Throwable e) { errors.offer(e); latch.countDown(); } }, executor); waitLatch(latch, errors, config.getRequestTimeout()); return indexRecordInfo; } catch (final Throwable e) { throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("error reading index info", dsId), e); } } @Override public String getLastIndexingDate(final IndexDsInfo info) throws DsmException { try { final SolrQuery query = new SolrQuery("oaftype:datasource").setRows(1); final QueryResponse rsp = getIndexClient(info).query(query); final SolrDocument doc = Iterables.getFirst(rsp.getResults(), null); final String dsversion = doc.get("__dsversion").toString(); return StringUtils.substringBefore(dsversion, "T"); } catch (SolrServerException | IOException e) { throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying index DS profile: " + info, e); } } private Long setFunded( final String dsId, final String collectedFrom, final CloudSolrClient indexClient) throws DsmException { final String query = String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\" AND relprojectid:*", collectedFrom); log.debug(String.format("query: %s", query)); try { return indexClient.query(new SolrQuery(query).setRows(0)).getResults().getNumFound(); } catch (Throwable e) { throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("Error querying index for funded results '%s'", dsId), e); } } private IndexRecordsInfo setDateAndTotal( final String dsId, final String collectedFrom, final CloudSolrClient indexClient) throws DsmException { try { final String query = String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\"", collectedFrom); log.debug(String.format("query: %s", query)); final QueryResponse rsp = indexClient.query(new SolrQuery(query).setRows(1)); final SolrDocument doc = Iterables.getFirst(rsp.getResults(), new SolrDocument()); if (log.isDebugEnabled()) { log.debug(String.format("got document %s", doc.get("__indexrecordidentifier"))); } // if (doc.isEmpty()) { // throw new DatasourceManagerException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("cannot find document matching // query: %s", queryTotal)); // } return new IndexRecordsInfo() .setDate(getDate(doc)) .setTotal(rsp.getResults().getNumFound()); } catch (Throwable e) { throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("Error querying index for date and total '%s'", dsId), e); } } @SuppressWarnings("unchecked") private String getDate(final SolrDocument doc) throws DsmException { final List dsversion = (List) doc.get(DSVERSION); if (dsversion == null || dsversion.isEmpty()) { throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("cannot find %s in matched solr document", DSVERSION)); } final Date date = Iterables.getLast(dsversion); return DateFormatUtils.format(date, DsmMappingUtils.DATE_FORMAT); } private synchronized CloudSolrClient getIndexClient(final IndexDsInfo info) { if (!indexClientMap.containsKey(info.getColl())) { final CloudSolrClient client = new Builder(Lists.newArrayList(info.getIndexBaseUrl())).build(); client.setDefaultCollection(info.getColl()); indexClientMap.put(info.getColl(), client); } return indexClientMap.get(info.getColl()); } private void waitLatch(final CountDownLatch latch, final Queue errors, final int waitSeconds) { try { if (!latch.await(waitSeconds, TimeUnit.SECONDS)) { errors.offer(new TimeoutException("Waiting for requests to complete has timed out.")); } } catch (final InterruptedException e) { errors.offer(e); } } }