1
0
Fork 0

Merge remote-tracking branch 'origin/beta' into beta

This commit is contained in:
Miriam Baglioni 2024-09-24 14:26:29 +02:00
commit 4d3e079590
17 changed files with 989 additions and 155 deletions

View File

@ -30,6 +30,7 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
public class MergeUtils { public class MergeUtils {
public static <T extends Oaf> T mergeById(String s, Iterator<T> oafEntityIterator) { public static <T extends Oaf> T mergeById(String s, Iterator<T> oafEntityIterator) {
return mergeGroup(s, oafEntityIterator, true); return mergeGroup(s, oafEntityIterator, true);
} }
@ -88,7 +89,7 @@ public class MergeUtils {
private static Oaf mergeEntities(Oaf left, Oaf right, boolean checkDelegatedAuthority) { private static Oaf mergeEntities(Oaf left, Oaf right, boolean checkDelegatedAuthority) {
if (sameClass(left, right, Result.class)) { if (sameClass(left, right, Result.class)) {
if (!left.getClass().equals(right.getClass()) || checkDelegatedAuthority) { if (checkDelegatedAuthority) {
return mergeResultsOfDifferentTypes((Result) left, (Result) right); return mergeResultsOfDifferentTypes((Result) left, (Result) right);
} }

View File

@ -0,0 +1,210 @@
/*
* Copyright (c) 2024.
* SPDX-FileCopyrightText: © 2023 Consiglio Nazionale delle Ricerche
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package eu.dnetlib.dhp.actionmanager.promote;
import static eu.dnetlib.dhp.common.FunctionalInterfaceSupport.*;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
import static org.apache.spark.sql.functions.*;
import static org.junit.jupiter.api.Assertions.*;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
public class PromoteResultWithMeasuresTest {
private static final Logger log = LoggerFactory.getLogger(PromoteResultWithMeasuresTest.class);
private static SparkSession spark;
private static Path tempDir;
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@BeforeAll
public static void beforeAll() throws IOException {
tempDir = Files.createTempDirectory(PromoteResultWithMeasuresTest.class.getSimpleName());
log.info("using work dir {}", tempDir);
SparkConf conf = new SparkConf();
conf.setMaster("local[*]");
conf.setAppName(PromoteResultWithMeasuresTest.class.getSimpleName());
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", tempDir.toString());
conf.set("hive.metastore.warehouse.dir", tempDir.resolve("warehouse").toString());
spark = SparkSession.builder().config(conf).getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
spark.stop();
FileUtils.deleteDirectory(tempDir.toFile());
}
@Test
void testPromoteResultWithMeasures_job() throws Exception {
final String inputGraphTablePath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/promote/measures/graph")
.getPath();
final String inputActionPayloadPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/promote/measures/actionPayloads")
.getPath();
final String actionPayloadsPath = tempDir.resolve("actionPayloads").toString();
spark
.read()
.text(inputActionPayloadPath)
.withColumn("payload", col("value"))
.select("payload")
.write()
.parquet(actionPayloadsPath);
final Path outputGraphTablePath = tempDir.resolve("outputGraphTablePath");
PromoteActionPayloadForGraphTableJob
.main(new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--graphTableClassName", Publication.class.getCanonicalName(),
"--inputGraphTablePath", inputGraphTablePath,
"--inputActionPayloadPath", actionPayloadsPath,
"--actionPayloadClassName", Result.class.getCanonicalName(),
"--outputGraphTablePath", outputGraphTablePath.toString(),
"--mergeAndGetStrategy", MergeAndGet.Strategy.MERGE_FROM_AND_GET.toString(),
"--promoteActionStrategy", PromoteAction.Strategy.ENRICH.toString(),
"--shouldGroupById", "true"
});
assertFalse(isDirEmpty(outputGraphTablePath));
final Encoder<Publication> pubEncoder = Encoders.bean(Publication.class);
List<Publication> results = spark
.read()
.schema(pubEncoder.schema())
.json(outputGraphTablePath.toString())
.as(pubEncoder)
.collectAsList();
verify(results);
}
@Test
void testPromoteResultWithMeasures_internal() throws JsonProcessingException {
Dataset<Publication> rowDS = spark
.read()
.schema(Encoders.bean(Publication.class).schema())
.json("src/test/resources/eu/dnetlib/dhp/actionmanager/promote/measures/graph")
.as(Encoders.bean(Publication.class));
Dataset<Result> actionPayloadDS = spark
.read()
.schema(Encoders.bean(Result.class).schema())
.json("src/test/resources/eu/dnetlib/dhp/actionmanager/promote/measures/actionPayloads")
.as(Encoders.bean(Result.class));
final MergeAndGet.Strategy mergeFromAndGet = MergeAndGet.Strategy.MERGE_FROM_AND_GET;
final SerializableSupplier<Function<Publication, String>> rowIdFn = ModelSupport::idFn;
final SerializableSupplier<BiFunction<Publication, Result, Publication>> mergeAndGetFn = MergeAndGet
.functionFor(mergeFromAndGet);
final SerializableSupplier<Publication> zeroFn = () -> Publication.class
.cast(new eu.dnetlib.dhp.schema.oaf.Publication());
final SerializableSupplier<Function<Publication, Boolean>> isNotZeroFn = PromoteResultWithMeasuresTest::isNotZeroFnUsingIdOrSourceAndTarget;
Dataset<Publication> joinedResults = PromoteActionPayloadFunctions
.joinGraphTableWithActionPayloadAndMerge(
rowDS,
actionPayloadDS,
rowIdFn,
ModelSupport::idFn,
mergeAndGetFn,
PromoteAction.Strategy.ENRICH,
Publication.class,
Result.class);
SerializableSupplier<BiFunction<Publication, Publication, Publication>> mergeRowsAndGetFn = MergeAndGet
.functionFor(mergeFromAndGet);
Dataset<Publication> mergedResults = PromoteActionPayloadFunctions
.groupGraphTableByIdAndMerge(
joinedResults, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, Publication.class);
verify(mergedResults.collectAsList());
}
private static void verify(List<Publication> results) throws JsonProcessingException {
assertNotNull(results);
assertEquals(1, results.size());
Result r = results.get(0);
log.info(OBJECT_MAPPER.writeValueAsString(r));
assertNotNull(r.getMeasures());
assertFalse(r.getMeasures().isEmpty());
assertTrue(
r
.getMeasures()
.stream()
.map(Measure::getId)
.collect(Collectors.toCollection(HashSet::new))
.containsAll(
Lists
.newArrayList(
"downloads", "views", "influence", "popularity", "influence_alt", "popularity_alt",
"impulse")));
}
private static <T extends Oaf> Function<T, Boolean> isNotZeroFnUsingIdOrSourceAndTarget() {
return t -> {
if (isSubClass(t, Relation.class)) {
final Relation rel = (Relation) t;
return StringUtils.isNotBlank(rel.getSource()) && StringUtils.isNotBlank(rel.getTarget());
}
return StringUtils.isNotBlank(((OafEntity) t).getId());
};
}
private static boolean isDirEmpty(final Path directory) throws IOException {
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(directory)) {
return !dirStream.iterator().hasNext();
}
}
}

View File

@ -0,0 +1,3 @@
{"collectedfrom":null,"dataInfo":null,"lastupdatetimestamp":null,"id":"50|doi_dedup___::02317b7093277ec8aa0311d5c6a25b9b","originalId":null,"pid":null,"dateofcollection":null,"dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":[{"id":"downloads","unit":[{"key":"opendoar____::358aee4cc897452c00244351e4d91f69||ZENODO","value":"125","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:usage_counts","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"views","unit":[{"key":"opendoar____::358aee4cc897452c00244351e4d91f69||ZENODO","value":"35","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:usage_counts","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]}],"context":null,"processingchargeamount":null,"processingchargecurrency":null,"author":null,"resulttype":null,"metaResourceType":null,"language":null,"country":null,"subject":null,"title":null,"relevantdate":null,"description":null,"dateofacceptance":null,"publisher":null,"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"externalReference":null,"instance":null,"eoscifguidelines":null,"openAccessColor":null,"publiclyFunded":null,"transformativeAgreement":null,"isGreen":null,"isInDiamondJournal":null}
{"collectedfrom":null,"dataInfo":null,"lastupdatetimestamp":null,"id":"50|doi_dedup___::02317b7093277ec8aa0311d5c6a25b9b","originalId":null,"pid":null,"dateofcollection":null,"dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":[{"id":"influence","unit":[{"key":"score","value":"3.1167566E-9","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C5","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"popularity","unit":[{"key":"score","value":"7.335433E-9","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C4","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"influence_alt","unit":[{"key":"score","value":"4","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C5","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"popularity_alt","unit":[{"key":"score","value":"2.96","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C4","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"impulse","unit":[{"key":"score","value":"4","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C5","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]}],"context":null,"processingchargeamount":null,"processingchargecurrency":null,"author":null,"resulttype":null,"metaResourceType":null,"language":null,"country":null,"subject":null,"title":null,"relevantdate":null,"description":null,"dateofacceptance":null,"publisher":null,"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"externalReference":null,"instance":null,"eoscifguidelines":null,"openAccessColor":null,"publiclyFunded":null,"transformativeAgreement":null,"isGreen":null,"isInDiamondJournal":null}
{"collectedfrom":null,"dataInfo":null,"lastupdatetimestamp":null,"id":"50|doi_dedup___::02317b7093277ec8aa0311d5c6a25b9b","originalId":null,"pid":null,"dateofcollection":null,"dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"context":null,"processingchargeamount":null,"processingchargecurrency":null,"author":null,"resulttype":null,"metaResourceType":null,"language":null,"country":null,"subject":null,"title":null,"relevantdate":null,"description":null,"dateofacceptance":null,"publisher":null,"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"externalReference":null,"instance":null,"eoscifguidelines":null,"openAccessColor":"hybrid","publiclyFunded":false,"transformativeAgreement":null,"isGreen":true,"isInDiamondJournal":false}

View File

@ -22,9 +22,11 @@ import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.base.BaseCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.base.BaseCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.file.FileCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.file.FileCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.gtr2.Gtr2PublicationsCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.osf.OsfPreprintsCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.rest.RestCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.rest.RestCollectorPlugin;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.CollectorException;
@ -44,11 +46,11 @@ public class CollectorWorker extends ReportingJob {
private final HttpClientParams clientParams; private final HttpClientParams clientParams;
public CollectorWorker( public CollectorWorker(
final ApiDescriptor api, final ApiDescriptor api,
final FileSystem fileSystem, final FileSystem fileSystem,
final MDStoreVersion mdStoreVersion, final MDStoreVersion mdStoreVersion,
final HttpClientParams clientParams, final HttpClientParams clientParams,
final AggregatorReport report) { final AggregatorReport report) {
super(report); super(report);
this.api = api; this.api = api;
this.fileSystem = fileSystem; this.fileSystem = fileSystem;
@ -58,7 +60,7 @@ public class CollectorWorker extends ReportingJob {
public void collect() throws UnknownCollectorPluginException, CollectorException, IOException { public void collect() throws UnknownCollectorPluginException, CollectorException, IOException {
final String outputPath = mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME; final String outputPath = this.mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
log.info("outputPath path is {}", outputPath); log.info("outputPath path is {}", outputPath);
final CollectorPlugin plugin = getCollectorPlugin(); final CollectorPlugin plugin = getCollectorPlugin();
@ -67,37 +69,34 @@ public class CollectorWorker extends ReportingJob {
scheduleReport(counter); scheduleReport(counter);
try (SequenceFile.Writer writer = SequenceFile try (SequenceFile.Writer writer = SequenceFile
.createWriter( .createWriter(this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
fileSystem.getConf(), .keyClass(IntWritable.class), SequenceFile.Writer
SequenceFile.Writer.file(new Path(outputPath)), .valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final IntWritable key = new IntWritable(counter.get()); final IntWritable key = new IntWritable(counter.get());
final Text value = new Text(); final Text value = new Text();
plugin plugin
.collect(api, report) .collect(this.api, this.report)
.forEach( .forEach(content -> {
content -> {
key.set(counter.getAndIncrement()); key.set(counter.getAndIncrement());
value.set(content); value.set(content);
try { try {
writer.append(key, value); writer.append(key, value);
} catch (Throwable e) { } catch (final Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
} catch (Throwable e) { } catch (final Throwable e) {
report.put(e.getClass().getName(), e.getMessage()); this.report.put(e.getClass().getName(), e.getMessage());
throw new CollectorException(e); throw new CollectorException(e);
} finally { } finally {
shutdown(); shutdown();
report.ongoing(counter.longValue(), counter.longValue()); this.report.ongoing(counter.longValue(), counter.longValue());
} }
} }
private void scheduleReport(AtomicInteger counter) { private void scheduleReport(final AtomicInteger counter) {
schedule(new ReporterCallback() { schedule(new ReporterCallback() {
@Override @Override
public Long getCurrent() { public Long getCurrent() {
return counter.longValue(); return counter.longValue();
@ -112,33 +111,37 @@ public class CollectorWorker extends ReportingJob {
private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException { private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException {
switch (CollectorPlugin.NAME.valueOf(api.getProtocol())) { switch (CollectorPlugin.NAME.valueOf(this.api.getProtocol())) {
case oai: case oai:
return new OaiCollectorPlugin(clientParams); return new OaiCollectorPlugin(this.clientParams);
case rest_json2xml: case rest_json2xml:
return new RestCollectorPlugin(clientParams); return new RestCollectorPlugin(this.clientParams);
case file: case file:
return new FileCollectorPlugin(fileSystem); return new FileCollectorPlugin(this.fileSystem);
case fileGzip: case fileGzip:
return new FileGZipCollectorPlugin(fileSystem); return new FileGZipCollectorPlugin(this.fileSystem);
case baseDump: case baseDump:
return new BaseCollectorPlugin(this.fileSystem); return new BaseCollectorPlugin(this.fileSystem);
case other: case gtr2Publications:
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional return new Gtr2PublicationsCollectorPlugin(this.clientParams);
.ofNullable(api.getParams().get("other_plugin_type")) case osfPreprints:
return new OsfPreprintsCollectorPlugin(this.clientParams);
case other:
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
.ofNullable(this.api.getParams().get("other_plugin_type"))
.map(CollectorPlugin.NAME.OTHER_NAME::valueOf) .map(CollectorPlugin.NAME.OTHER_NAME::valueOf)
.orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type")); .orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type"));
switch (plugin) { switch (plugin) {
case mdstore_mongodb_dump: case mdstore_mongodb_dump:
return new MongoDbDumpCollectorPlugin(fileSystem); return new MongoDbDumpCollectorPlugin(this.fileSystem);
case mdstore_mongodb: case mdstore_mongodb:
return new MDStoreCollectorPlugin(); return new MDStoreCollectorPlugin();
default:
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
}
default: default:
throw new UnknownCollectorPluginException("protocol is not managed: " + api.getProtocol()); throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
}
default:
throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol());
} }
} }

View File

@ -11,7 +11,7 @@ public interface CollectorPlugin {
enum NAME { enum NAME {
oai, other, rest_json2xml, file, fileGzip, baseDump; oai, other, rest_json2xml, file, fileGzip, baseDump, gtr2Publications, osfPreprints;
public enum OTHER_NAME { public enum OTHER_NAME {
mdstore_mongodb_dump, mdstore_mongodb mdstore_mongodb_dump, mdstore_mongodb

View File

@ -0,0 +1,43 @@
package eu.dnetlib.dhp.collection.plugin.gtr2;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
public class Gtr2PublicationsCollectorPlugin implements CollectorPlugin {
private final HttpClientParams clientParams;
public Gtr2PublicationsCollectorPlugin(final HttpClientParams clientParams) {
this.clientParams = clientParams;
}
@Override
public Stream<String> collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException {
final String baseUrl = api.getBaseUrl();
final String startPage = api.getParams().get("startPage");
final String endPage = api.getParams().get("endPage");
final String fromDate = api.getParams().get("fromDate");
if ((fromDate != null) && !fromDate.matches("\\d{4}-\\d{2}-\\d{2}")) {
throw new CollectorException("Invalid date (YYYY-MM-DD): " + fromDate);
}
final Iterator<String> iterator = new Gtr2PublicationsIterator(baseUrl, fromDate, startPage, endPage,
this.clientParams);
final Spliterator<String> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
return StreamSupport.stream(spliterator, false);
}
}

View File

@ -0,0 +1,215 @@
package eu.dnetlib.dhp.collection.plugin.gtr2;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import eu.dnetlib.dhp.common.collection.HttpConnector2;
public class Gtr2PublicationsIterator implements Iterator<String> {
public static final int PAGE_SIZE = 20;
private static final Logger log = LoggerFactory.getLogger(Gtr2PublicationsIterator.class);
private final HttpConnector2 connector;
private static final DateTimeFormatter simpleDateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd");
private static final int MAX_ATTEMPTS = 10;
private final String baseUrl;
private int currPage;
private int endPage;
private boolean incremental = false;
private DateTime fromDate;
private final Map<String, String> cache = new HashMap<>();
private final Queue<String> queue = new LinkedList<>();
private String nextElement;
public Gtr2PublicationsIterator(final String baseUrl, final String fromDate, final String startPage,
final String endPage,
final HttpClientParams clientParams)
throws CollectorException {
this.baseUrl = baseUrl;
this.currPage = NumberUtils.toInt(startPage, 1);
this.endPage = NumberUtils.toInt(endPage, Integer.MAX_VALUE);
this.incremental = StringUtils.isNotBlank(fromDate);
this.connector = new HttpConnector2(clientParams);
if (this.incremental) {
this.fromDate = parseDate(fromDate);
}
prepareNextElement();
}
@Override
public boolean hasNext() {
return this.nextElement != null;
}
@Override
public String next() {
try {
return this.nextElement;
} finally {
prepareNextElement();
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
private void prepareNextElement() {
while ((this.currPage <= this.endPage) && this.queue.isEmpty()) {
log.debug("FETCHING PAGE + " + this.currPage + "/" + this.endPage);
this.queue.addAll(fetchPage(this.currPage++));
}
this.nextElement = this.queue.poll();
}
private List<String> fetchPage(final int pageNumber) {
final List<String> res = new ArrayList<>();
try {
final Document doc = loadURL(cleanURL(this.baseUrl + "/outcomes/publications?p=" + pageNumber), 0);
if (this.endPage == Integer.MAX_VALUE) {
this.endPage = NumberUtils.toInt(doc.valueOf("/*/@*[local-name() = 'totalPages']"));
}
for (final Object po : doc.selectNodes("//*[local-name() = 'publication']")) {
final Element mainEntity = (Element) ((Element) po).detach();
if (filterIncremental(mainEntity)) {
res.add(expandMainEntity(mainEntity));
} else {
log.debug("Skipped entity");
}
}
} catch (final Throwable e) {
log.error("Exception fetching page " + pageNumber, e);
throw new RuntimeException("Exception fetching page " + pageNumber, e);
}
return res;
}
private void addLinkedEntities(final Element master, final String relType, final Element newRoot,
final Function<Document, Element> mapper) {
for (final Object o : master.selectNodes(".//*[local-name()='link']")) {
final String rel = ((Element) o).valueOf("@*[local-name()='rel']");
final String href = ((Element) o).valueOf("@*[local-name()='href']");
if (relType.equals(rel) && StringUtils.isNotBlank(href)) {
final String cacheKey = relType + "#" + href;
if (this.cache.containsKey(cacheKey)) {
try {
log.debug(" * from cache (" + relType + "): " + href);
newRoot.add(DocumentHelper.parseText(this.cache.get(cacheKey)).getRootElement());
} catch (final DocumentException e) {
log.error("Error retrieving cache element: " + cacheKey, e);
throw new RuntimeException("Error retrieving cache element: " + cacheKey, e);
}
} else {
final Document doc = loadURL(cleanURL(href), 0);
final Element elem = mapper.apply(doc);
newRoot.add(elem);
this.cache.put(cacheKey, elem.asXML());
}
}
}
}
private boolean filterIncremental(final Element e) {
if (!this.incremental || isAfter(e.valueOf("@*[local-name() = 'created']"), this.fromDate)
|| isAfter(e.valueOf("@*[local-name() = 'updated']"), this.fromDate)) {
return true;
}
return false;
}
private String expandMainEntity(final Element mainEntity) {
final Element newRoot = DocumentHelper.createElement("doc");
newRoot.add(mainEntity);
addLinkedEntities(mainEntity, "PROJECT", newRoot, this::asProjectElement);
return DocumentHelper.createDocument(newRoot).asXML();
}
private Element asProjectElement(final Document doc) {
final Element newOrg = DocumentHelper.createElement("project");
newOrg.addElement("id").setText(doc.valueOf("/*/@*[local-name()='id']"));
newOrg
.addElement("code")
.setText(doc.valueOf("//*[local-name()='identifier' and @*[local-name()='type'] = 'RCUK']"));
newOrg.addElement("title").setText(doc.valueOf("//*[local-name()='title']"));
return newOrg;
}
private static String cleanURL(final String url) {
String cleaned = url;
if (cleaned.contains("gtr.gtr")) {
cleaned = cleaned.replace("gtr.gtr", "gtr");
}
if (cleaned.startsWith("http://")) {
cleaned = cleaned.replaceFirst("http://", "https://");
}
return cleaned;
}
private Document loadURL(final String cleanUrl, final int attempt) {
try {
log.debug(" * Downloading Url: " + cleanUrl);
final byte[] bytes = this.connector.getInputSource(cleanUrl).getBytes("UTF-8");
return DocumentHelper.parseText(new String(bytes));
} catch (final Throwable e) {
log.error("Error dowloading url: " + cleanUrl + ", attempt = " + attempt, e);
if (attempt >= MAX_ATTEMPTS) {
throw new RuntimeException("Error dowloading url: " + cleanUrl, e);
}
try {
Thread.sleep(60000); // I wait for a minute
} catch (final InterruptedException e1) {
throw new RuntimeException("Error dowloading url: " + cleanUrl, e);
}
return loadURL(cleanUrl, attempt + 1);
}
}
private DateTime parseDate(final String s) {
return DateTime.parse(s.contains("T") ? s.substring(0, s.indexOf("T")) : s, simpleDateTimeFormatter);
}
private boolean isAfter(final String d, final DateTime fromDate) {
return StringUtils.isNotBlank(d) && parseDate(d).isAfter(fromDate);
}
}

View File

@ -0,0 +1,50 @@
package eu.dnetlib.dhp.collection.plugin.osf;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
public class OsfPreprintsCollectorPlugin implements CollectorPlugin {
public static final int PAGE_SIZE_VALUE_DEFAULT = 100;
private final HttpClientParams clientParams;
public OsfPreprintsCollectorPlugin(final HttpClientParams clientParams) {
this.clientParams = clientParams;
}
@Override
public Stream<String> collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException {
final String baseUrl = api.getBaseUrl();
final int pageSize = Optional
.ofNullable(api.getParams().get("pageSize"))
.filter(StringUtils::isNotBlank)
.map(s -> NumberUtils.toInt(s, PAGE_SIZE_VALUE_DEFAULT))
.orElse(PAGE_SIZE_VALUE_DEFAULT);
if (StringUtils.isBlank(baseUrl)) { throw new CollectorException("Param 'baseUrl' is null or empty"); }
final OsfPreprintsIterator it = new OsfPreprintsIterator(baseUrl, pageSize, getClientParams());
return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
}
public HttpClientParams getClientParams() {
return this.clientParams;
}
}

View File

@ -0,0 +1,138 @@
package eu.dnetlib.dhp.collection.plugin.osf;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.plugin.utils.JsonUtils;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import eu.dnetlib.dhp.common.collection.HttpConnector2;
public class OsfPreprintsIterator implements Iterator<String> {
private static final Logger log = LoggerFactory.getLogger(OsfPreprintsIterator.class);
private static final int MAX_ATTEMPTS = 5;
private final HttpClientParams clientParams;
private final String baseUrl;
private final int pageSize;
private String currentUrl;
private final Queue<String> recordQueue = new PriorityBlockingQueue<>();
public OsfPreprintsIterator(
final String baseUrl,
final int pageSize,
final HttpClientParams clientParams) {
this.clientParams = clientParams;
this.baseUrl = baseUrl;
this.pageSize = pageSize;
initQueue();
}
private void initQueue() {
this.currentUrl = this.baseUrl + "?filter:is_published:d=true&format=json&page[size]=" + this.pageSize;
log.info("REST calls starting with {}", this.currentUrl);
}
@Override
public boolean hasNext() {
synchronized (this.recordQueue) {
while (this.recordQueue.isEmpty() && !this.currentUrl.isEmpty()) {
try {
this.currentUrl = downloadPage(this.currentUrl);
} catch (final CollectorException e) {
log.debug("CollectorPlugin.next()-Exception: {}", e);
throw new RuntimeException(e);
}
}
if (!this.recordQueue.isEmpty()) { return true; }
return false;
}
}
@Override
public String next() {
synchronized (this.recordQueue) {
return this.recordQueue.poll();
}
}
private String downloadPage(final String url) throws CollectorException {
final Document doc = downloadUrl(url, 0);
for (final Object o : doc.selectNodes("/*/data")) {
final Element n = (Element) ((Element) o).detach();
final Element group = DocumentHelper.createElement("group");
group.addAttribute("id", n.valueOf(".//data/id"));
group.addElement("preprint").add(n);
for (final Object o1 : n.selectNodes(".//contributors//href")) {
final Document doc1 = downloadUrl(((Node) o1).getText(), 0);
group.addElement("contributors").add(doc1.getRootElement().detach());
}
for (final Object o1 : n.selectNodes(".//primary_file//href")) {
final Document doc1 = downloadUrl(((Node) o1).getText(), 0);
group.addElement("primary_file").add(doc1.getRootElement().detach());
}
this.recordQueue.add(DocumentHelper.createDocument(group).asXML());
}
return doc.valueOf("/*/links/next");
}
private Document downloadUrl(final String url, final int attempt) throws CollectorException {
if (attempt > MAX_ATTEMPTS) { throw new CollectorException("Max Number of attempts reached, url:" + url); }
if (attempt > 0) {
final int delay = (attempt * 5000);
log.debug("Attempt {} with delay {}", attempt, delay);
try {
Thread.sleep(delay);
} catch (final InterruptedException e) {
new CollectorException(e);
}
}
try {
log.info("requesting URL [{}]", url);
final HttpConnector2 connector = new HttpConnector2(this.clientParams);
final String json = connector.getInputSource(url);
final String xml = JsonUtils.convertToXML(json);
return DocumentHelper.parseText(xml);
} catch (final Throwable e) {
log.warn(e.getMessage(), e);
if ((e instanceof CollectorException) && e.getMessage().contains("401")) {
final Element root = DocumentHelper.createElement("error_401_authorization_required");
return DocumentHelper.createDocument(root);
}
return downloadUrl(url, attempt + 1);
}
}
}

View File

@ -0,0 +1,103 @@
package eu.dnetlib.dhp.collection.plugin.gtr2;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.Iterator;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
class Gtr2PublicationsIteratorTest {
private static final String baseURL = "https://gtr.ukri.org/gtr/api";
private static final HttpClientParams clientParams = new HttpClientParams();
@Test
@Disabled
public void testOne() throws Exception {
System.out.println("one publication");
final Iterator<String> iterator = new Gtr2PublicationsIterator(baseURL, null, null, null, clientParams);
if (iterator.hasNext()) {
final String res = iterator.next();
assertNotNull(res);
System.out.println(res);
}
}
@Test
@Disabled
public void testPaging() throws Exception {
final Iterator<String> iterator = new Gtr2PublicationsIterator(baseURL, null, "2", "2", clientParams);
while (iterator.hasNext()) {
Thread.sleep(300);
final String res = iterator.next();
assertNotNull(res);
System.out.println(res);
}
}
@Test
@Disabled
public void testOnePage() throws Exception {
final Iterator<String> iterator = new Gtr2PublicationsIterator(baseURL, null, "12", "12", clientParams);
final int count = iterateAndCount(iterator);
assertEquals(20, count);
}
@Test
@Disabled
public void testIncrementalHarvestingNoRecords() throws Exception {
System.out.println("incremental Harvesting");
final Iterator<String> iterator = new Gtr2PublicationsIterator(baseURL, "2050-12-12T", "11", "13",
clientParams);
final int count = iterateAndCount(iterator);
assertEquals(0, count);
}
@Test
@Disabled
public void testIncrementalHarvesting() throws Exception {
System.out.println("incremental Harvesting");
final Iterator<String> iterator = new Gtr2PublicationsIterator(baseURL, "2016-11-30", "11", "11", clientParams);
final int count = iterateAndCount(iterator);
assertEquals(20, count);
}
@Test
@Disabled
public void testCompleteHarvesting() throws Exception {
System.out.println("testing complete harvesting");
final Iterator<String> iterator = new Gtr2PublicationsIterator(baseURL, null, null, null, clientParams);
// TryIndentXmlString indenter = new TryIndentXmlString();
// it.setEndAtPage(3);
while (iterator.hasNext()) {
final String res = iterator.next();
assertNotNull(res);
// System.out.println(res);
// Scanner keyboard = new Scanner(System.in);
// System.out.println("press enter for next record");
// keyboard.nextLine();
}
}
private int iterateAndCount(final Iterator<String> iterator) throws Exception {
int i = 0;
while (iterator.hasNext()) {
assertNotNull(iterator.next());
i++;
}
System.out.println("Got " + i + " publications");
return i;
}
}

View File

@ -0,0 +1,113 @@
package eu.dnetlib.dhp.collection.plugin.osf;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.dom4j.DocumentHelper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import eu.dnetlib.dhp.common.collection.HttpConnector2;
public class OsfPreprintsCollectorPluginTest {
private static final Logger log = LoggerFactory.getLogger(OsfPreprintsCollectorPlugin.class);
private final String baseUrl = "https://api.osf.io/v2/preprints/";
private final int pageSize = 100;
private final ApiDescriptor api = new ApiDescriptor();
private OsfPreprintsCollectorPlugin plugin;
@BeforeEach
public void setUp() {
final HashMap<String, String> params = new HashMap<>();
params.put("pageSize", "" + this.pageSize);
this.api.setBaseUrl(this.baseUrl);
this.api.setParams(params);
this.plugin = new OsfPreprintsCollectorPlugin(new HttpClientParams());
}
@Test
@Disabled
void test_one() throws CollectorException {
this.plugin.collect(this.api, new AggregatorReport())
.limit(1)
.forEach(log::info);
}
@Test
@Disabled
void test_limited() throws CollectorException {
final AtomicInteger i = new AtomicInteger(0);
final Stream<String> stream = this.plugin.collect(this.api, new AggregatorReport());
stream.limit(2000).forEach(s -> {
Assertions.assertTrue(s.length() > 0);
i.incrementAndGet();
log.info(s);
});
log.info("{}", i.intValue());
Assertions.assertTrue(i.intValue() > 0);
}
@Test
@Disabled
void test_all() throws CollectorException {
final AtomicLong i = new AtomicLong(0);
final Stream<String> stream = this.plugin.collect(this.api, new AggregatorReport());
stream.forEach(s -> {
Assertions.assertTrue(s.length() > 0);
if ((i.incrementAndGet() % 1000) == 0) {
log.info("COLLECTED: {}", i.get());
}
});
log.info("TOTAL: {}", i.get());
Assertions.assertTrue(i.get() > 0);
}
@Test
@Disabled
void test_authentication_required() {
final HttpConnector2 connector = new HttpConnector2();
try {
final String res = connector.getInputSource("https://api.osf.io/v2/preprints/ydtzx/contributors/?format=json");
System.out.println(res);
fail();
} catch (final Throwable e) {
System.out.println("**** ERROR: " + e.getMessage());
if ((e instanceof CollectorException) && e.getMessage().contains("401")) {
System.out.println(" XML: " + DocumentHelper.createDocument().getRootElement().detach());
}
assertTrue(e.getMessage().contains("401"));
}
}
}

View File

@ -1,105 +0,0 @@
package eu.dnetlib.dhp.collection.plugin.rest;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
public class OsfPreprintCollectorTest {
private static final Logger log = LoggerFactory.getLogger(OsfPreprintCollectorTest.class);
private final String baseUrl = "https://api.osf.io/v2/preprints/";
// private final String requestHeaderMap = "";
// private final String authMethod = "";
// private final String authToken = "";
// private final String resultOutputFormat = "";
private final String queryParams = "filter:is_published:d=true";
private final String entityXpath = "/*/*[local-name()='data']";
private final String resultTotalXpath = "/*/*[local-name()='links']/*[local-name()='meta']/*[local-name()='total']";
private final String resumptionParam = "page";
private final String resumptionType = "scan";
private final String resumptionXpath = "substring-before(substring-after(/*/*[local-name()='links']/*[local-name()='next'], 'page='), '&')";
private final String resultSizeParam = "page[size]";
private final String resultSizeValue = "100";
private final String resultFormatParam = "format";
private final String resultFormatValue = "json";
private final ApiDescriptor api = new ApiDescriptor();
private RestCollectorPlugin rcp;
@BeforeEach
public void setUp() {
final HashMap<String, String> params = new HashMap<>();
params.put("resumptionType", this.resumptionType);
params.put("resumptionParam", this.resumptionParam);
params.put("resumptionXpath", this.resumptionXpath);
params.put("resultTotalXpath", this.resultTotalXpath);
params.put("resultFormatParam", this.resultFormatParam);
params.put("resultFormatValue", this.resultFormatValue);
params.put("resultSizeParam", this.resultSizeParam);
params.put("resultSizeValue", this.resultSizeValue);
params.put("queryParams", this.queryParams);
params.put("entityXpath", this.entityXpath);
this.api.setBaseUrl(this.baseUrl);
this.api.setParams(params);
this.rcp = new RestCollectorPlugin(new HttpClientParams());
}
@Test
@Disabled
void test_limited() throws CollectorException {
final AtomicInteger i = new AtomicInteger(0);
final Stream<String> stream = this.rcp.collect(this.api, new AggregatorReport());
stream.limit(2000).forEach(s -> {
Assertions.assertTrue(s.length() > 0);
i.incrementAndGet();
log.info(s);
});
log.info("{}", i.intValue());
Assertions.assertTrue(i.intValue() > 0);
}
@Test
@Disabled
void test_all() throws CollectorException {
final AtomicLong i = new AtomicLong(0);
final Stream<String> stream = this.rcp.collect(this.api, new AggregatorReport());
stream.forEach(s -> {
Assertions.assertTrue(s.length() > 0);
if ((i.incrementAndGet() % 1000) == 0) {
log.info("COLLECTED: {}", i.get());
}
});
log.info("TOTAL: {}", i.get());
Assertions.assertTrue(i.get() > 0);
}
}

View File

@ -70,9 +70,8 @@ public class PrepareRelatedProjectsJob {
final Dataset<Relation> rels = ClusterUtils final Dataset<Relation> rels = ClusterUtils
.loadRelations(graphPath, spark) .loadRelations(graphPath, spark)
.filter((FilterFunction<Relation>) r -> r.getDataInfo().getDeletedbyinference()) .filter((FilterFunction<Relation>) r -> ModelConstants.RESULT_PROJECT.equals(r.getRelType()))
.filter((FilterFunction<Relation>) r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)) .filter((FilterFunction<Relation>) r -> !BrokerConstants.IS_MERGED_IN_CLASS.equals(r.getRelClass()))
.filter((FilterFunction<Relation>) r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.filter((FilterFunction<Relation>) r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter((FilterFunction<Relation>) r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter((FilterFunction<Relation>) r -> !ClusterUtils.isDedupRoot(r.getTarget())); .filter((FilterFunction<Relation>) r -> !ClusterUtils.isDedupRoot(r.getTarget()));

View File

@ -53,7 +53,7 @@ public class EnrichMoreSubject extends UpdateMatcher<OaBrokerTypedValue> {
.collect(Collectors.toSet()); .collect(Collectors.toSet());
return source return source
.getPids() .getSubjects()
.stream() .stream()
.filter(s -> !existingSubjects.contains(subjectAsString(s))) .filter(s -> !existingSubjects.contains(subjectAsString(s)))
.collect(Collectors.toList()); .collect(Collectors.toList());

View File

@ -0,0 +1,60 @@
package eu.dnetlib.dhp.broker.oa.matchers.simple;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
public class EnrichMoreSubjectTest {
final EnrichMoreSubject matcher = new EnrichMoreSubject();
@BeforeEach
void setUp() throws Exception {
}
@Test
void testFindDifferences_1() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
final List<OaBrokerTypedValue> list = this.matcher.findDifferences(source, target);
assertTrue(list.isEmpty());
}
@Test
void testFindDifferences_2() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
source.setSubjects(Arrays.asList(new OaBrokerTypedValue("arxiv", "subject_01")));
final List<OaBrokerTypedValue> list = this.matcher.findDifferences(source, target);
assertEquals(1, list.size());
}
@Test
void testFindDifferences_3() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
target.setSubjects(Arrays.asList(new OaBrokerTypedValue("arxiv", "subject_01")));
final List<OaBrokerTypedValue> list = this.matcher.findDifferences(source, target);
assertTrue(list.isEmpty());
}
@Test
void testFindDifferences_4() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
source.setSubjects(Arrays.asList(new OaBrokerTypedValue("arxiv", "subject_01")));
target.setSubjects(Arrays.asList(new OaBrokerTypedValue("arxiv", "subject_01")));
final List<OaBrokerTypedValue> list = this.matcher.findDifferences(source, target);
assertTrue(list.isEmpty());
}
}

View File

@ -20,7 +20,6 @@ import javax.xml.transform.*;
import javax.xml.transform.dom.DOMSource; import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult; import javax.xml.transform.stream.StreamResult;
import eu.dnetlib.dhp.oa.provision.model.*;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
@ -42,6 +41,7 @@ import com.google.common.collect.Sets;
import com.mycila.xmltool.XMLDoc; import com.mycila.xmltool.XMLDoc;
import com.mycila.xmltool.XMLTag; import com.mycila.xmltool.XMLTag;
import eu.dnetlib.dhp.oa.provision.model.*;
import eu.dnetlib.dhp.schema.common.*; import eu.dnetlib.dhp.schema.common.*;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;