2022-02-04 10:12:15 +01:00
|
|
|
package eu.dnetlib.openaire.dsm.dao;
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.util.Date;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.Queue;
|
|
|
|
import java.util.concurrent.*;
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
|
import javax.annotation.PreDestroy;
|
|
|
|
|
|
|
|
import com.google.common.collect.Iterables;
|
|
|
|
import com.google.common.collect.Lists;
|
|
|
|
import com.google.common.util.concurrent.*;
|
2022-02-07 10:09:18 +01:00
|
|
|
import eu.dnetlib.DnetOpenaireExporterProperties;
|
2022-02-04 10:12:15 +01:00
|
|
|
import eu.dnetlib.enabling.datasources.common.DsmException;
|
|
|
|
import eu.dnetlib.miscutils.functional.hash.Hashing;
|
|
|
|
import eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils;
|
|
|
|
import eu.dnetlib.openaire.dsm.dao.utils.IndexDsInfo;
|
|
|
|
import eu.dnetlib.openaire.dsm.dao.utils.IndexRecordsInfo;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
import org.apache.commons.lang3.time.DateFormatUtils;
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
import org.apache.solr.client.solrj.SolrQuery;
|
|
|
|
import org.apache.solr.client.solrj.SolrServerException;
|
|
|
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
|
|
|
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
|
|
|
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
|
|
|
import org.apache.solr.common.SolrDocument;
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
|
|
import org.springframework.http.HttpStatus;
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Created by claudio on 20/10/2016.
|
|
|
|
*/
|
|
|
|
@Component
|
|
|
|
@ConditionalOnProperty(value = "openaire.exporter.enable.dsm", havingValue = "true")
|
|
|
|
public class DatasourceIndexClientImpl implements DatasourceIndexClient {
|
|
|
|
|
|
|
|
private static final Log log = LogFactory.getLog(DatasourceIndexClientImpl.class);
|
|
|
|
|
|
|
|
public static final String SEPARATOR = "::";
|
|
|
|
public static final String DSVERSION = "__dsversion";
|
|
|
|
|
|
|
|
@Autowired
|
2022-02-07 10:09:18 +01:00
|
|
|
private DnetOpenaireExporterProperties config;
|
2022-02-04 10:12:15 +01:00
|
|
|
|
|
|
|
private ListeningExecutorService executor;
|
|
|
|
|
|
|
|
private static final Map<String, CloudSolrClient> indexClientMap = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
@PostConstruct
|
|
|
|
public void init() {
|
|
|
|
executor = MoreExecutors.listeningDecorator(
|
|
|
|
new ScheduledThreadPoolExecutor(5,
|
|
|
|
new ThreadFactoryBuilder().setNameFormat("datasource-index-client-%d").build()));
|
|
|
|
}
|
|
|
|
|
|
|
|
@PreDestroy
|
|
|
|
public void tearDown() {
|
|
|
|
indexClientMap.forEach((name, client) -> {
|
|
|
|
try {
|
|
|
|
client.close();
|
|
|
|
} catch (IOException e) {
|
|
|
|
log.warn(String.format("unable to gracefully shutdown client for index %s", name));
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public IndexRecordsInfo getIndexInfo(final String dsId, final IndexDsInfo info, final Queue<Throwable> errors) throws DsmException {
|
|
|
|
try {
|
|
|
|
final String collectedFrom = StringUtils.substringBefore(dsId, SEPARATOR) + SEPARATOR + Hashing.md5(StringUtils.substringAfter(dsId, SEPARATOR));
|
|
|
|
final CloudSolrClient indexClient = getIndexClient(info);
|
|
|
|
final CountDownLatch latch = new CountDownLatch(2);
|
|
|
|
final IndexRecordsInfo indexRecordInfo = new IndexRecordsInfo();
|
|
|
|
|
|
|
|
Futures.addCallback(
|
|
|
|
executor.submit(() -> setDateAndTotal(dsId, collectedFrom, indexClient)),
|
|
|
|
new FutureCallback<IndexRecordsInfo>() {
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void onSuccess(final IndexRecordsInfo info) {
|
|
|
|
indexRecordInfo
|
|
|
|
.setTotal(info.getTotal())
|
|
|
|
.setDate(info.getDate());
|
|
|
|
latch.countDown();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void onFailure(final Throwable e) {
|
|
|
|
errors.offer(e);
|
|
|
|
latch.countDown();
|
|
|
|
}
|
|
|
|
}, executor);
|
|
|
|
|
|
|
|
Futures.addCallback(
|
|
|
|
executor.submit(() -> setFunded(dsId, collectedFrom, indexClient)),
|
|
|
|
new FutureCallback<Long>() {
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void onSuccess(final Long numFound) {
|
|
|
|
indexRecordInfo.setFunded(numFound);
|
|
|
|
latch.countDown();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void onFailure(final Throwable e) {
|
|
|
|
errors.offer(e);
|
|
|
|
latch.countDown();
|
|
|
|
}
|
|
|
|
}, executor);
|
|
|
|
|
|
|
|
waitLatch(latch, errors, config.getRequestTimeout());
|
|
|
|
return indexRecordInfo;
|
|
|
|
} catch (final Throwable e) {
|
|
|
|
throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("error reading index info", dsId), e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public String getLastIndexingDate(final IndexDsInfo info) throws DsmException {
|
|
|
|
try {
|
|
|
|
final SolrQuery query = new SolrQuery("oaftype:datasource").setRows(1);
|
|
|
|
final QueryResponse rsp = getIndexClient(info).query(query);
|
|
|
|
final SolrDocument doc = Iterables.getFirst(rsp.getResults(), null);
|
|
|
|
final String dsversion = doc.get("__dsversion").toString();
|
|
|
|
return StringUtils.substringBefore(dsversion, "T");
|
|
|
|
} catch (SolrServerException | IOException e) {
|
|
|
|
throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying index DS profile: " + info, e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private Long setFunded(
|
|
|
|
final String dsId,
|
|
|
|
final String collectedFrom,
|
|
|
|
final CloudSolrClient indexClient) throws DsmException {
|
|
|
|
final String query =
|
|
|
|
String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\" AND relprojectid:*", collectedFrom);
|
|
|
|
log.debug(String.format("query: %s", query));
|
|
|
|
try {
|
|
|
|
return indexClient.query(new SolrQuery(query).setRows(0)).getResults().getNumFound();
|
|
|
|
} catch (Throwable e) {
|
|
|
|
throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("Error querying index for funded results '%s'", dsId), e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private IndexRecordsInfo setDateAndTotal(
|
|
|
|
final String dsId,
|
|
|
|
final String collectedFrom,
|
|
|
|
final CloudSolrClient indexClient) throws DsmException {
|
|
|
|
try {
|
|
|
|
final String query = String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\"", collectedFrom);
|
|
|
|
log.debug(String.format("query: %s", query));
|
|
|
|
|
|
|
|
final QueryResponse rsp = indexClient.query(new SolrQuery(query).setRows(1));
|
|
|
|
final SolrDocument doc = Iterables.getFirst(rsp.getResults(), new SolrDocument());
|
|
|
|
if (log.isDebugEnabled()) {
|
|
|
|
log.debug(String.format("got document %s", doc.get("__indexrecordidentifier")));
|
|
|
|
}
|
|
|
|
// if (doc.isEmpty()) {
|
|
|
|
// throw new DatasourceManagerException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("cannot find document matching
|
|
|
|
// query: %s", queryTotal));
|
|
|
|
// }
|
|
|
|
return new IndexRecordsInfo()
|
|
|
|
.setDate(getDate(doc))
|
|
|
|
.setTotal(rsp.getResults().getNumFound());
|
|
|
|
} catch (Throwable e) {
|
|
|
|
throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("Error querying index for date and total '%s'", dsId), e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
private String getDate(final SolrDocument doc) throws DsmException {
|
|
|
|
final List<Date> dsversion = (List<Date>) doc.get(DSVERSION);
|
|
|
|
if (dsversion == null || dsversion.isEmpty()) { throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(),
|
|
|
|
String.format("cannot find %s in matched solr document", DSVERSION)); }
|
|
|
|
final Date date = Iterables.getLast(dsversion);
|
|
|
|
|
|
|
|
return DateFormatUtils.format(date, DsmMappingUtils.DATE_FORMAT);
|
|
|
|
}
|
|
|
|
|
|
|
|
private synchronized CloudSolrClient getIndexClient(final IndexDsInfo info) {
|
|
|
|
if (!indexClientMap.containsKey(info.getColl())) {
|
|
|
|
|
|
|
|
final CloudSolrClient client = new Builder(Lists.newArrayList(info.getIndexBaseUrl())).build();
|
|
|
|
client.setDefaultCollection(info.getColl());
|
|
|
|
|
|
|
|
indexClientMap.put(info.getColl(), client);
|
|
|
|
}
|
|
|
|
return indexClientMap.get(info.getColl());
|
|
|
|
}
|
|
|
|
|
|
|
|
private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
|
|
|
|
try {
|
|
|
|
if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
|
|
|
|
errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
|
|
|
|
}
|
|
|
|
} catch (final InterruptedException e) {
|
|
|
|
errors.offer(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|