dnet-applications/apps/dnet-exporter-api/src/main/java/eu/dnetlib/openaire/dsm/DsmCore.java

443 lines
15 KiB
Java

package eu.dnetlib.openaire.dsm;
import static eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils.asDbEntry;
import static eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils.asDetails;
import static eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils.copyNonNullProperties;
import static eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils.createId;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.domain.Page;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import eu.dnetlib.OpenaireExporterConfig;
import eu.dnetlib.enabling.datasources.common.AggregationInfo;
import eu.dnetlib.enabling.datasources.common.AggregationStage;
import eu.dnetlib.enabling.datasources.common.Datasource;
import eu.dnetlib.enabling.datasources.common.DsmException;
import eu.dnetlib.enabling.datasources.common.DsmForbiddenException;
import eu.dnetlib.enabling.datasources.common.DsmNotFoundException;
import eu.dnetlib.openaire.common.ISClient;
import eu.dnetlib.openaire.community.CommunityClient;
import eu.dnetlib.openaire.dsm.dao.DatasourceDao;
import eu.dnetlib.openaire.dsm.dao.DatasourceIndexClient;
import eu.dnetlib.openaire.dsm.dao.MongoLoggerClient;
import eu.dnetlib.openaire.dsm.dao.ObjectStoreClient;
import eu.dnetlib.openaire.dsm.dao.ResponseUtils;
import eu.dnetlib.openaire.dsm.dao.VocabularyClient;
import eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils;
import eu.dnetlib.openaire.dsm.dao.utils.IndexDsInfo;
import eu.dnetlib.openaire.dsm.dao.utils.IndexRecordsInfo;
import eu.dnetlib.openaire.dsm.domain.AggregationHistoryResponse;
import eu.dnetlib.openaire.dsm.domain.ApiDetails;
import eu.dnetlib.openaire.dsm.domain.ApiDetailsResponse;
import eu.dnetlib.openaire.dsm.domain.DatasourceDetailResponse;
import eu.dnetlib.openaire.dsm.domain.DatasourceDetails;
import eu.dnetlib.openaire.dsm.domain.DatasourceDetailsUpdate;
import eu.dnetlib.openaire.dsm.domain.DatasourceInfo;
import eu.dnetlib.openaire.dsm.domain.DatasourceSnippetResponse;
import eu.dnetlib.openaire.dsm.domain.RegisteredDatasourceInfo;
import eu.dnetlib.openaire.dsm.domain.RequestFilter;
import eu.dnetlib.openaire.dsm.domain.RequestSort;
import eu.dnetlib.openaire.dsm.domain.RequestSortOrder;
import eu.dnetlib.openaire.dsm.domain.SimpleResponse;
import eu.dnetlib.openaire.dsm.domain.db.ApiDbEntry;
import eu.dnetlib.openaire.dsm.domain.db.DatasourceDbEntry;
import eu.dnetlib.openaire.dsm.domain.db.IdentityDbEntry;
import eu.dnetlib.openaire.vocabularies.Country;
@Component
@ConditionalOnProperty(value = "openaire.exporter.enable.dsm", havingValue = "true")
public class DsmCore {
private static final Log log = LogFactory.getLog(DsmCore.class);
@Autowired
private MongoLoggerClient mongoLoggerClient;
@Autowired
private ISClient isClient;
@Autowired
private ObjectStoreClient objectStoreClient;
@Autowired
private DatasourceIndexClient datasourceIndexClient;
@Autowired
private VocabularyClient vocabularyClient;
@Autowired
private DatasourceDao dsDao;
@Autowired
private OpenaireExporterConfig config;
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private CommunityClient communityClient;
private ListeningExecutorService executor;
@PostConstruct
public void init() {
executor = MoreExecutors.listeningDecorator(new ScheduledThreadPoolExecutor(config.getRequestWorkers(),
new ThreadFactoryBuilder().setNameFormat("dsm-client-%d").build()));
}
public List<Country> listCountries() throws DsmException {
try {
return dsDao.listCountries();
} catch (final Throwable e) {
log.error("error listing countries", e);
throw e;
}
}
public DatasourceDetailResponse searchDsDetails(final RequestSort requestSortBy,
final RequestSortOrder order,
final RequestFilter requestFilter,
final int page,
final int size)
throws DsmException {
try {
final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
return ResponseUtils.detailsResponse(dsPage.map(d -> asDetails(d)).getContent(), dsPage.getTotalElements());
} catch (final Throwable e) {
log.error("error searching datasources", e);
throw e;
}
}
public DatasourceSnippetResponse searchSnippet(final RequestSort requestSortBy,
final RequestSortOrder order,
final RequestFilter requestFilter,
final int page,
final int size)
throws DsmException {
try {
final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
return ResponseUtils.snippetResponse(dsPage.map(DsmMappingUtils::asSnippetExtended).getContent(), dsPage.getTotalElements());
} catch (final Throwable e) {
log.error("error searching datasources", e);
throw e;
}
}
public DatasourceSnippetResponse searchRegistered(final RequestSort requestSortBy,
final RequestSortOrder order,
final RequestFilter requestFilter,
final int page,
final int size)
throws DsmException {
try {
final Page<DatasourceDbEntry> dsPage = dsDao.searchRegistered(requestSortBy, order, requestFilter, page, size);
return ResponseUtils.snippetResponse(dsPage.map(DsmMappingUtils::asSnippetExtended).getContent(), dsPage.getTotalElements());
} catch (final Throwable e) {
log.error("error searching datasources", e);
throw e;
}
}
public List<String> findBaseURLs(final RequestFilter requestFilter, final int page, final int size) throws DsmException {
try {
return dsDao.findApiBaseURLs(requestFilter, page, size);
} catch (final Throwable e) {
log.error("error searching datasource base urls", e);
throw e;
}
}
public ApiDetailsResponse getApis(final String dsId) throws DsmException {
try {
final List<ApiDbEntry> apis = dsDao.getApis(dsId);
final List<ApiDetails> api = apis.stream()
.map(DsmMappingUtils::asDetails)
.collect(Collectors.toList());
return ResponseUtils.apiResponse(api, api.size());
} catch (final Throwable e) {
log.error(String.format("error searching datasource api %s", dsId), e);
throw e;
}
}
public void setManaged(final String dsId, final boolean managed) throws DsmException {
log.info(String.format("updated ds '%s' managed with '%s'", dsId, managed));
dsDao.setManaged(dsId, managed);
}
public boolean isManaged(final String dsId) throws DsmException {
return dsDao.isManaged(dsId);
}
public boolean exist(final DatasourceDetails d) throws DsmException {
return dsDao.existDs(d.getId());
}
public void save(final DatasourceDetails d) throws DsmException {
try {
dsDao.saveDs(asDbEntry(d));
} catch (final Throwable e) {
log.error(ExceptionUtils.getStackTrace(e));
throw e;
}
}
public void updateDatasource(final DatasourceDetailsUpdate d) throws DsmException, DsmNotFoundException {
try {
// initialize with current values from DB
final Datasource ds = dsDao.getDs(d.getId());
final DatasourceDbEntry dbEntry = (DatasourceDbEntry) ds;
if (dbEntry == null) { throw new DsmNotFoundException(String.format("ds '%s' does not exist", d.getId())); }
final DatasourceDbEntry update = asDbEntry(d);
if (d.getIdentities() != null) {
final Set<IdentityDbEntry> identities = new HashSet<>(
Stream.of(update.getIdentities(), dbEntry.getIdentities())
.flatMap(Collection::stream)
.collect(Collectors.toMap(i -> i.getIssuertype() + i.getPid(), Function.identity(), (i1, i2) -> i1))
.values());
copyNonNullProperties(update, dbEntry);
dbEntry.setIdentities(identities);
} else {
copyNonNullProperties(update, dbEntry);
}
dsDao.saveDs(dbEntry);
} catch (final Throwable e) {
log.error(ExceptionUtils.getStackTrace(e));
throw e;
}
}
// TODO remove if unused
public void deleteDs(final String dsId) throws DsmException {
log.info(String.format("deleted datasource '%s'", dsId));
dsDao.deleteDs(dsId);
}
// API
public void updateApiOaiSet(final String dsId, final String apiId, final String oaiSet) throws DsmException {
dsDao.upsertApiOaiSet(apiId, oaiSet);
}
public void updateApiBaseurl(final String dsId, final String apiId, final String baseUrl) throws DsmException {
log.info(String.format("updated api '%s' baseurl with '%s'", apiId, baseUrl));
dsDao.updateApiBaseUrl(apiId, baseUrl);
}
public void updateApiCompatibility(final String dsId, final String apiId, final String compliance, final boolean override) throws DsmException {
log.info(String.format("updated api '%s' compliance with '%s'", apiId, compliance));
dsDao.updateCompliance(null, apiId, compliance, override);
}
public void addApi(final ApiDetails api) throws DsmException {
if (StringUtils.isBlank(api.getId())) {
api.setId(createId(api));
log.info(String.format("missing api id, created '%s'", api.getId()));
}
dsDao.addApi(asDbEntry(api));
}
public void deleteApi(final String apiId) throws DsmForbiddenException, DsmNotFoundException {
// TODO handle the api removal in case of associated workflows.
dsDao.deleteApi(null, apiId);
}
public void dropCaches() {
mongoLoggerClient.dropCache();
isClient.dropCache();
vocabularyClient.dropCache();
communityClient.dropCache();
}
// HELPERS //////////////
private DatasourceInfo enrichDatasourceInfo(final DatasourceDetails d, final CountDownLatch outerLatch, final Queue<Throwable> errors) {
final DatasourceInfo dsInfo = new DatasourceInfo().setDatasource(d);
getAggregationHistory(d.getId(), outerLatch, errors, dsInfo);
getIndexDsInfo(d.getId(), outerLatch, errors, dsInfo);
return dsInfo;
}
private void getAggregationHistory(final String dsId,
final CountDownLatch outerLatch,
final Queue<Throwable> errors,
final DatasourceInfo datasourceInfo) {
Futures.addCallback(executor.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)), new FutureCallback<List<AggregationInfo>>() {
@Override
public void onSuccess(final List<AggregationInfo> info) {
setAggregationHistory(datasourceInfo, info);
outerLatch.countDown();
}
@Override
public void onFailure(final Throwable e) {
log.error(ExceptionUtils.getStackTrace(e));
errors.offer(e);
outerLatch.countDown();
}
}, executor);
}
private void setAggregationHistory(final DatasourceInfo datasourceInfo, final List<AggregationInfo> info) {
datasourceInfo.setAggregationHistory(info);
if (!info.isEmpty()) {
datasourceInfo
.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
}
}
private void getIndexDsInfo(final String dsId,
final CountDownLatch outerLatch,
final Queue<Throwable> errors,
final DatasourceInfo datasourceInfo) {
Futures.addCallback(executor.submit(() -> isClient.calculateCurrentIndexDsInfo()), new FutureCallback<IndexDsInfo>() {
@Override
public void onSuccess(final IndexDsInfo info) {
final CountDownLatch innerLatch = new CountDownLatch(2);
Futures.addCallback(executor.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)), new FutureCallback<IndexRecordsInfo>() {
@Override
public void onSuccess(final IndexRecordsInfo info) {
datasourceInfo
.setIndexRecords(info.getTotal())
.setFundedContent(info.getFunded())
.setLastIndexingDate(info.getDate());
innerLatch.countDown();
}
@Override
public void onFailure(final Throwable e) {
errors.offer(e);
innerLatch.countDown();
}
}, executor);
Futures.addCallback(executor.submit(() -> objectStoreClient.getObjectStoreSize(isClient.getObjectStoreId(dsId))), new FutureCallback<Long>() {
@Override
public void onSuccess(final Long objectStoreSize) {
datasourceInfo.setFulltexts(objectStoreSize);
innerLatch.countDown();
}
@Override
public void onFailure(final Throwable e) {
errors.offer(e);
innerLatch.countDown();
}
}, executor);
waitLatch(innerLatch, errors, config.getRequestTimeout());
outerLatch.countDown();
}
@Override
public void onFailure(final Throwable e) {
// log.error(ExceptionUtils.getStackTrace(e));
errors.offer(e);
outerLatch.countDown();
}
}, executor);
}
private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
try {
if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
}
} catch (final InterruptedException e) {
errors.offer(e);
}
}
public SimpleResponse searchRecentRegistered(final int size) throws Throwable {
try {
final String sql =
IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/openaire/sql/recent_registered_datasources.sql.st"), Charset.defaultCharset());
final List<RegisteredDatasourceInfo> list = jdbcTemplate.query(sql, BeanPropertyRowMapper.newInstance(RegisteredDatasourceInfo.class), size);
return ResponseUtils.simpleResponse(list);
} catch (final Throwable e) {
log.error("error searching recent datasources", e);
throw e;
}
}
public Long countRegisteredAfter(final String fromDate, final String typologyFilter) throws Throwable {
try {
if (StringUtils.isNotBlank(typologyFilter)) {
final String sql =
IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/openaire/sql/recent_registered_datasources_fromDate_typology.st.sql"), Charset
.defaultCharset());
return jdbcTemplate.queryForObject(sql, new Object[] {
fromDate, typologyFilter + "%"
}, Long.class);
} else {
final String sql =
IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/openaire/sql/recent_registered_datasources_fromDate.st.sql"), Charset
.defaultCharset());
return jdbcTemplate.queryForObject(sql, new Object[] {
fromDate
}, Long.class);
}
} catch (final Throwable e) {
log.error("error searching recent datasources", e);
throw e;
}
}
public AggregationHistoryResponse aggregationhistory(final String dsId) throws DsmException {
final List<AggregationInfo> history = mongoLoggerClient.getAggregationHistory(dsId);
final AggregationHistoryResponse rsp = new AggregationHistoryResponse(history);
rsp.setHeader(ResponseUtils.header(history.size()));
return rsp;
}
}