diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml index 119031b064..1a219c5c91 100644 --- a/dhp-workflows/dhp-broker-events/pom.xml +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -57,9 +57,9 @@ - eu.dnetlib + eu.dnetlib.dhp dnet-openaire-broker-common - [3.0.4,4.0.0) + [3.0.0-SNAPSHOT,) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java new file mode 100644 index 0000000000..5ca865e8fc --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java @@ -0,0 +1,112 @@ + +package eu.dnetlib.dhp.broker.oa; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.TypedColumn; +import org.apache.spark.sql.expressions.Aggregator; +import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import scala.Tuple2; + +public class CheckDuplictedIdsJob { + + private static final Logger log = LoggerFactory.getLogger(CheckDuplictedIdsJob.class); + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + CheckDuplictedIdsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final SparkConf conf = new SparkConf(); + + final String eventsPath = parser.get("workingPath") + "/events"; + log.info("eventsPath: {}", eventsPath); + + final String countPath = parser.get("workingPath") + "/counts"; + log.info("countPath: {}", countPath); + + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); + + final LongAccumulator total = spark.sparkContext().longAccumulator("invaild_event_id"); + + final TypedColumn, Tuple2> agg = new CountAggregator().toColumn(); + + ClusterUtils + .readPath(spark, eventsPath, Event.class) + .map(e -> new Tuple2<>(e.getEventId(), 1l), Encoders.tuple(Encoders.STRING(), Encoders.LONG())) + .groupByKey(t -> t._1, Encoders.STRING()) + .agg(agg) + .map(t -> t._2, Encoders.tuple(Encoders.STRING(), Encoders.LONG())) + .filter(t -> t._2 > 1) + .map(o -> ClusterUtils.incrementAccumulator(o, total), Encoders.tuple(Encoders.STRING(), Encoders.LONG())) + .write() + .mode(SaveMode.Overwrite) + .json(countPath); + ; + + } + + private static String eventAsJsonString(final Event f) throws JsonProcessingException { + return new ObjectMapper().writeValueAsString(f); + } + +} + +class CountAggregator extends Aggregator, Tuple2, Tuple2> { + + /** + * + */ + private static final long serialVersionUID = 1395935985734672538L; + + @Override + public Encoder> bufferEncoder() { + return Encoders.tuple(Encoders.STRING(), Encoders.LONG()); + } + + @Override + public Tuple2 finish(final Tuple2 arg0) { + return arg0; + } + + @Override + public Tuple2 merge(final Tuple2 arg0, final Tuple2 arg1) { + final String s = StringUtils.defaultIfBlank(arg0._1, arg1._1); + return new Tuple2<>(s, arg0._2 + arg1._2); + } + + @Override + public Encoder> outputEncoder() { + return Encoders.tuple(Encoders.STRING(), Encoders.LONG()); + } + + @Override + public Tuple2 reduce(final Tuple2 arg0, final Tuple2 arg1) { + final String s = StringUtils.defaultIfBlank(arg0._1, arg1._1); + return new Tuple2<>(s, arg0._2 + arg1._2); + } + + @Override + public Tuple2 zero() { + return new Tuple2<>(null, 0l); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index 5d3121aedc..cfee360c57 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -3,8 +3,6 @@ package eu.dnetlib.dhp.broker.oa; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.util.Arrays; -import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -20,8 +18,6 @@ import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; @@ -29,9 +25,6 @@ import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.EventFinder; import eu.dnetlib.dhp.broker.oa.util.EventGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.config.DedupConfig; public class GenerateEventsJob { @@ -54,30 +47,20 @@ public class GenerateEventsJob { final String workingPath = parser.get("workingPath"); log.info("workingPath: {}", workingPath); - final String isLookupUrl = parser.get("isLookupUrl"); - log.info("isLookupUrl: {}", isLookupUrl); - - final String dedupConfigProfileId = parser.get("dedupConfProfile"); - log.info("dedupConfigProfileId: {}", dedupConfigProfileId); - final String eventsPath = workingPath + "/events"; log.info("eventsPath: {}", eventsPath); - final Set dsIdWhitelist = parseParamAsList(parser, "datasourceIdWhitelist"); + final Set dsIdWhitelist = ClusterUtils.parseParamAsList(parser, "datasourceIdWhitelist"); log.info("datasourceIdWhitelist: {}", StringUtils.join(dsIdWhitelist, ",")); - final Set dsTypeWhitelist = parseParamAsList(parser, "datasourceTypeWhitelist"); + final Set dsTypeWhitelist = ClusterUtils.parseParamAsList(parser, "datasourceTypeWhitelist"); log.info("datasourceTypeWhitelist: {}", StringUtils.join(dsTypeWhitelist, ",")); - final Set dsIdBlacklist = parseParamAsList(parser, "datasourceIdBlacklist"); + final Set dsIdBlacklist = ClusterUtils.parseParamAsList(parser, "datasourceIdBlacklist"); log.info("datasourceIdBlacklist: {}", StringUtils.join(dsIdBlacklist, ",")); final SparkConf conf = new SparkConf(); - // TODO UNCOMMENT - // final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId); - final DedupConfig dedupConfig = null; - runWithSparkSession(conf, isSparkSessionManaged, spark -> { ClusterUtils.removeDir(spark, eventsPath); @@ -92,7 +75,7 @@ public class GenerateEventsJob { final Dataset dataset = groups .map( g -> EventFinder - .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, dedupConfig, accumulators), + .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators), Encoders .bean(EventGroup.class)) .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)); @@ -103,22 +86,6 @@ public class GenerateEventsJob { } - private static Set parseParamAsList(final ArgumentApplicationParser parser, final String key) { - final String s = parser.get(key).trim(); - - final Set res = new HashSet<>(); - - if (s.length() > 1) { // A value of a single char (for example: '-') indicates an empty list - Arrays - .stream(s.split(",")) - .map(String::trim) - .filter(StringUtils::isNotBlank) - .forEach(res::add); - } - - return res; - } - public static Map prepareAccumulators(final SparkContext sc) { return EventFinder @@ -130,23 +97,4 @@ public class GenerateEventsJob { } - private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception { - - final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); - - final String conf = isLookUpService - .getResourceProfileByQuery( - String - .format( - "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", - profId)); - - final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); - dedupConfig.getPace().initModel(); - dedupConfig.getPace().initTranslationMap(); - // dedupConfig.getWf().setConfigurationId("???"); - - return dedupConfig; - } - } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java index 9124d18e39..806147bdd5 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -48,7 +48,7 @@ public class IndexOnESJob { final JavaRDD inputRdd = ClusterUtils .readPath(spark, eventsPath, Event.class) - .limit(10000) // TODO REMOVE + // .limit(10000) // TODO REMOVE .map(IndexOnESJob::eventAsJsonString, Encoders.STRING()) .javaRDD(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java index af6ab30a1a..3d688fa1d1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java @@ -17,7 +17,6 @@ import org.apache.spark.util.LongAccumulator; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.pace.config.DedupConfig; public abstract class UpdateMatcher { @@ -37,7 +36,6 @@ public abstract class UpdateMatcher { public Collection> searchUpdatesForRecord(final OaBrokerMainEntity res, final Collection others, - final DedupConfig dedupConfig, final Map accumulators) { final Map> infoMap = new HashMap<>(); @@ -49,7 +47,7 @@ public abstract class UpdateMatcher { if (topic != null) { final UpdateInfo info = new UpdateInfo<>(topic, hl, source, res, getCompileHighlightFunction(), - getHighlightToStringFunction(), dedupConfig); + getHighlightToStringFunction()); final String s = DigestUtils.md5Hex(info.getHighlightValueAsString()); if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java index 4b563d381a..ab2735f2a8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java @@ -15,7 +15,7 @@ public class EnrichMissingProject extends UpdateMatcher { super(20, prj -> Topic.ENRICH_MISSING_PROJECT, (p, prj) -> p.getProjects().add(prj), - prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode()); + prj -> prj.getOpenaireId()); } @Override diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java index 6a10f19be5..85086a6df8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java @@ -18,11 +18,7 @@ public class EnrichMoreProject extends UpdateMatcher { super(20, prj -> Topic.ENRICH_MORE_PROJECT, (p, prj) -> p.getProjects().add(prj), - prj -> projectAsString(prj)); - } - - private static String projectAsString(final OaBrokerProject prj) { - return prj.getFunder() + "::" + prj.getFundingProgram() + "::" + prj.getCode(); + prj -> prj.getOpenaireId()); } @Override @@ -36,13 +32,13 @@ public class EnrichMoreProject extends UpdateMatcher { final Set existingProjects = target .getProjects() .stream() - .map(EnrichMoreProject::projectAsString) + .map(p -> p.getOpenaireId()) .collect(Collectors.toSet()); return source .getProjects() .stream() - .filter(p -> !existingProjects.contains(projectAsString(p))) + .filter(p -> !existingProjects.contains(p.getOpenaireId())) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java index 2d0106a7a8..d8b8dd8079 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java @@ -1,6 +1,11 @@ package eu.dnetlib.dhp.broker.oa.util; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -10,6 +15,7 @@ import org.apache.spark.util.LongAccumulator; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; public class ClusterUtils { @@ -53,7 +59,9 @@ public class ClusterUtils { return o; } - public static void save(final Dataset dataset, final String path, final Class clazz, + public static void save(final Dataset dataset, + final String path, + final Class clazz, final LongAccumulator acc) { dataset .map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz)) @@ -62,4 +70,20 @@ public class ClusterUtils { .json(path); } + public static Set parseParamAsList(final ArgumentApplicationParser parser, final String key) { + final String s = parser.get(key).trim(); + + final Set res = new HashSet<>(); + + if (s.length() > 1) { // A value of a single char (for example: '-') indicates an empty list + Arrays + .stream(s.split(",")) + .map(String::trim) + .filter(StringUtils::isNotBlank) + .forEach(res::add); + } + + return res; + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index 1ce84283af..d00c5b817d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -142,6 +142,7 @@ public class ConversionUtils { .filter(pid -> pid.getQualifier().getClassid() != null) .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) .map(pid -> pid.getValue()) + .map(pid -> cleanOrcid(pid)) .filter(StringUtils::isNotBlank) .findFirst() .orElse(null) : null; @@ -149,6 +150,11 @@ public class ConversionUtils { return new OaBrokerAuthor(author.getFullname(), pids); } + private static String cleanOrcid(final String s) { + final String match = "//orcid.org/"; + return s.contains(match) ? StringUtils.substringAfter(s, match) : s; + } + private static OaBrokerJournal oafJournalToBrokerJournal(final Journal journal) { if (journal == null) { return null; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java index e7abae68b0..b6328eb954 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -37,7 +37,6 @@ import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; -import eu.dnetlib.pace.config.DedupConfig; public class EventFinder { @@ -70,14 +69,12 @@ public class EventFinder { matchers.add(new EnrichMissingDatasetReferences()); matchers.add(new EnrichMissingDatasetIsSupplementedTo()); matchers.add(new EnrichMissingDatasetIsSupplementedBy()); - matchers.add(new EnrichMissingAbstract()); } public static EventGroup generateEvents(final ResultGroup results, final Set dsIdWhitelist, final Set dsIdBlacklist, final Set dsTypeWhitelist, - final DedupConfig dedupConfig, final Map accumulators) { final List> list = new ArrayList<>(); @@ -85,7 +82,7 @@ public class EventFinder { for (final OaBrokerMainEntity target : results.getData()) { if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) { for (final UpdateMatcher matcher : matchers) { - list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators)); + list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), accumulators)); } } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java index 5338d4f3d7..72fe1b204e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java @@ -1,8 +1,62 @@ package eu.dnetlib.dhp.broker.oa.util; +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.MapDocument; +import eu.dnetlib.pace.tree.support.TreeProcessor; +import eu.dnetlib.pace.util.MapDocumentUtil; + public class TrustUtils { + private static final Logger log = LoggerFactory.getLogger(TrustUtils.class); + + private static DedupConfig dedupConfig; + + static { + final ObjectMapper mapper = new ObjectMapper(); + try { + dedupConfig = mapper + .readValue( + DedupConfig.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"), + DedupConfig.class); + } catch (final IOException e) { + log.error("Error loading dedupConfig, e"); + } + + } + + protected static float calculateTrust(final OaBrokerMainEntity r1, final OaBrokerMainEntity r2) { + + if (dedupConfig == null) { + return BrokerConstants.MIN_TRUST; + } + + try { + final ObjectMapper objectMapper = new ObjectMapper(); + final MapDocument doc1 = MapDocumentUtil + .asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r1)); + final MapDocument doc2 = MapDocumentUtil + .asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r2)); + + final double score = new TreeProcessor(dedupConfig).computeScore(doc1, doc2); + + final double threshold = dedupConfig.getWf().getThreshold(); + + return TrustUtils.rescale(score, threshold); + } catch (final Exception e) { + log.error("Error computing score between results", e); + return BrokerConstants.MIN_TRUST; + } + } + public static float rescale(final double score, final double threshold) { if (score >= BrokerConstants.MAX_TRUST) { return BrokerConstants.MAX_TRUST; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java index 0586b681ea..ef8fb240c4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java @@ -4,20 +4,11 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.function.BiConsumer; import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.broker.objects.OaBrokerEventPayload; import eu.dnetlib.broker.objects.OaBrokerInstance; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerProvenance; import eu.dnetlib.dhp.broker.model.Topic; -import eu.dnetlib.pace.config.DedupConfig; -import eu.dnetlib.pace.model.MapDocument; -import eu.dnetlib.pace.tree.support.TreeProcessor; -import eu.dnetlib.pace.util.MapDocumentUtil; public final class UpdateInfo { @@ -35,20 +26,17 @@ public final class UpdateInfo { private final float trust; - private static final Logger log = LoggerFactory.getLogger(UpdateInfo.class); - public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source, final OaBrokerMainEntity target, final BiConsumer compileHighlight, - final Function highlightToString, - final DedupConfig dedupConfig) { + final Function highlightToString) { this.topic = topic; this.highlightValue = highlightValue; this.source = source; this.target = target; this.compileHighlight = compileHighlight; this.highlightToString = highlightToString; - this.trust = calculateTrust(dedupConfig, source, target); + this.trust = TrustUtils.calculateTrust(source, target); } public T getHighlightValue() { @@ -63,31 +51,6 @@ public final class UpdateInfo { return target; } - private float calculateTrust(final DedupConfig dedupConfig, - final OaBrokerMainEntity r1, - final OaBrokerMainEntity r2) { - - if (dedupConfig == null) { - return BrokerConstants.MIN_TRUST; - } - - try { - final ObjectMapper objectMapper = new ObjectMapper(); - final MapDocument doc1 = MapDocumentUtil - .asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r1)); - final MapDocument doc2 = MapDocumentUtil - .asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r2)); - - final double score = new TreeProcessor(dedupConfig).computeScore(doc1, doc2); - final double threshold = dedupConfig.getWf().getThreshold(); - - return TrustUtils.rescale(score, threshold); - } catch (final Exception e) { - log.error("Error computing score between results", e); - return BrokerConstants.MIN_TRUST; - } - } - protected Topic getTopic() { return topic; } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json new file mode 100644 index 0000000000..d0319b441f --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json @@ -0,0 +1,122 @@ +{ + "wf": { + + }, + "pace": { + "clustering": [ + { + "name": "wordssuffixprefix", + "fields": [ + "title" + ], + "params": { + "max": "2", + "len": "3" + } + }, + { + "name": "lowercase", + "fields": [ + "doi" + ], + "params": { + + } + } + ], + "decisionTree": { + "start": { + "fields": [ + { + "field": "doi", + "comparator": "exactMatch", + "weight": 1.0, + "countIfUndefined": "false", + "params": { + + } + } + ], + "threshold": 0.5, + "aggregation": "AVG", + "positive": "MATCH", + "negative": "layer1", + "undefined": "layer1", + "ignoreUndefined": "true" + }, + "layer1": { + "fields": [ + { + "field": "title", + "comparator": "titleVersionMatch", + "weight": 0.9, + "countIfUndefined": "false", + "params": { + + } + }, + { + "field": "authors", + "comparator": "sizeMatch", + "weight": 0.9, + "countIfUndefined": "false", + "params": { + + } + } + ], + "threshold": 0.5, + "aggregation": "AVG", + "positive": "MATCH", + "negative": "layer2", + "undefined": "layer2", + "ignoreUndefined": "true" + }, + "layer2": { + "fields": [ + { + "field": "title", + "comparator": "levensteinTitle", + "weight": 1.0, + "countIfUndefined": "true", + "params": { + + } + } + ], + "threshold": 0.99, + "aggregation": "AVG", + "positive": "MATCH", + "negative": "NO_MATCH", + "undefined": "NO_MATCH", + "ignoreUndefined": "true" + } + }, + "model": [ + { + "name": "doi", + "type": "String", + "path": "$.pids[?(@.type == 'doi')].value" + }, + { + "name": "title", + "type": "String", + "path": "$.titles", + "length": 250, + "size": 5 + }, + { + "name": "authors", + "type": "List", + "path": "$.creators[*].fullname", + "size": 200 + } + ], + "blacklists": { + + }, + "synonyms": { + + } + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index a0c7b00dbd..2c728cd981 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -8,14 +8,6 @@ workingPath the path where the the generated data will be stored - - - isLookupUrl - the address of the lookUp service - - - dedupConfProfId - the id of a valid Dedup Configuration Profile datasourceIdWhitelist @@ -427,8 +419,6 @@ --conf spark.sql.shuffle.partitions=3840 --workingPath${workingPath} - --isLookupUrl${isLookupUrl} - --dedupConfProfile${dedupConfProfId} --datasourceIdWhitelist${datasourceIdWhitelist} --datasourceTypeWhitelist${datasourceTypeWhitelist} --datasourceIdBlacklist${datasourceIdBlacklist} @@ -447,7 +437,7 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="2" + --conf spark.dynamicAllocation.maxExecutors="8" --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json index c545884f9e..bab8081932 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json @@ -5,18 +5,6 @@ "paramDescription": "the path where the generated events will be stored", "paramRequired": true }, - { - "paramName": "lu", - "paramLongName": "isLookupUrl", - "paramDescription": "the address of the ISLookUpService", - "paramRequired": true - }, - { - "paramName": "d", - "paramLongName": "dedupConfProfile", - "paramDescription": "the id of a valid Dedup Configuration Profile", - "paramRequired": true - }, { "paramName": "datasourceIdWhitelist", "paramLongName": "datasourceIdWhitelist", diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index b382904481..d19ad6c5a0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -9,15 +9,6 @@ workingPath the path where the the generated data will be stored - - isLookupUrl - the address of the lookUp service - - - dedupConfProfId - the id of a valid Dedup Configuration Profile - - sparkDriverMemory memory for driver process @@ -73,33 +64,32 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - IndexOnESJob - eu.dnetlib.dhp.broker.oa.IndexOnESJob + Count + eu.dnetlib.dhp.broker.oa.CheckDuplictedIdsJob dhp-broker-events-${projectVersion}.jar + --executor-cores=${sparkExecutorCores} --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="2" --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --graphPath${graphInputPath} --workingPath${workingPath} - --index${esIndexName} - --esHost${esIndexHost} diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java index 93bc5617fb..82374b335f 100644 --- a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java @@ -30,7 +30,7 @@ class UpdateMatcherTest { final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.isEmpty()); } @@ -46,7 +46,7 @@ class UpdateMatcherTest { res.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.isEmpty()); } @@ -62,7 +62,7 @@ class UpdateMatcherTest { p2.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.size() == 1); } @@ -79,7 +79,7 @@ class UpdateMatcherTest { p2.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.isEmpty()); } @@ -98,7 +98,7 @@ class UpdateMatcherTest { p4.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.isEmpty()); } @@ -117,7 +117,7 @@ class UpdateMatcherTest { p4.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.size() == 1); } diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/TrustUtilsTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/TrustUtilsTest.java index bb23d6085e..974baa28b4 100644 --- a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/TrustUtilsTest.java +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/TrustUtilsTest.java @@ -5,6 +5,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.Test; +import eu.dnetlib.broker.objects.OaBrokerAuthor; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerTypedValue; + public class TrustUtilsTest { private static final double THRESHOLD = 0.95; @@ -64,6 +68,23 @@ public class TrustUtilsTest { verifyValue(2.00, BrokerConstants.MAX_TRUST); } + @Test + public void test() throws Exception { + final OaBrokerMainEntity r1 = new OaBrokerMainEntity(); + r1.getTitles().add("D-NET Service Package: Data Import"); + r1.getPids().add(new OaBrokerTypedValue("doi", "123")); + r1.getCreators().add(new OaBrokerAuthor("Michele Artini", null)); + r1.getCreators().add(new OaBrokerAuthor("Claudio Atzori", null)); + + final OaBrokerMainEntity r2 = new OaBrokerMainEntity(); + r2.getTitles().add("D-NET Service Package: Data Import"); + // r2.getPids().add(new OaBrokerTypedValue("doi", "123")); + r2.getCreators().add(new OaBrokerAuthor("Michele Artini", null)); + // r2.getCreators().add(new OaBrokerAuthor("Claudio Atzori", null)); + + System.out.println("TRUST: " + TrustUtils.calculateTrust(r1, r2)); + } + private void verifyValue(final double originalScore, final float expectedTrust) { final float trust = TrustUtils.rescale(originalScore, THRESHOLD); System.out.println(trust); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java index c72940deb8..180f9f8460 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java @@ -37,7 +37,7 @@ public class Deduper implements Serializable { public static JavaPairRDD createSortedBlocks( JavaPairRDD mapDocs, DedupConfig config) { final String of = config.getWf().getOrderField(); - final int maxQueueSize = config.getWf().getGroupMaxSize(); + final int maxQueueSize = config.getWf().getQueueMaxSize(); return mapDocs // the reduce is just to be sure that we haven't document with same id diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java index b8ccb038d7..513e14f073 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java @@ -45,6 +45,16 @@ public class EntityMergerTest implements Serializable { } + @Test + public void softwareMergerTest() throws InstantiationException, IllegalAccessException { + List> softwares = readSample(testEntityBasePath + "/software_merge.json", Software.class); + + Software merged = DedupRecordFactory + .entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class); + + System.out.println(merged.getBestaccessright().getClassid()); + } + @Test public void publicationMergerTest() throws InstantiationException, IllegalAccessException { diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 8dd00be979..88d5f24f90 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -182,7 +182,7 @@ public class SparkDedupTest implements Serializable { .count(); assertEquals(3432, orgs_simrel); - assertEquals(7152, pubs_simrel); + assertEquals(7054, pubs_simrel); assertEquals(344, sw_simrel); assertEquals(458, ds_simrel); assertEquals(6750, orp_simrel); @@ -234,7 +234,7 @@ public class SparkDedupTest implements Serializable { .count(); assertEquals(1276, orgs_mergerel); - assertEquals(1442, pubs_mergerel); + assertEquals(1440, pubs_mergerel); assertEquals(288, sw_mergerel); assertEquals(472, ds_mergerel); assertEquals(718, orp_mergerel); @@ -423,7 +423,7 @@ public class SparkDedupTest implements Serializable { long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - assertEquals(4975, relations); + assertEquals(4971, relations); // check deletedbyinference final Dataset mergeRels = spark diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json index 13b18e1c35..2469b2cc03 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json @@ -6,10 +6,10 @@ "subEntityType" : "resulttype", "subEntityValue" : "dataset", "orderField" : "title", - "queueMaxSize" : "2000", + "queueMaxSize" : "800", "groupMaxSize" : "100", "maxChildren" : "100", - "slidingWindowSize" : "200", + "slidingWindowSize" : "80", "rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "includeChildren" : "true", "idPath" : "$.id", @@ -17,8 +17,7 @@ }, "pace" : { "clustering" : [ - { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, - { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } }, + { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } ], "decisionTree" : { diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json index 5fb2a171ac..4adcc0439f 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json @@ -6,10 +6,10 @@ "subEntityType" : "resulttype", "subEntityValue" : "otherresearchproduct", "orderField" : "title", - "queueMaxSize" : "2000", + "queueMaxSize" : "800", "groupMaxSize" : "100", "maxChildren" : "100", - "slidingWindowSize" : "200", + "slidingWindowSize" : "80", "rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "includeChildren" : "true", "idPath" : "$.id", @@ -17,8 +17,7 @@ }, "pace" : { "clustering" : [ - { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, - { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } }, + { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } ], "decisionTree" : { diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json index d471ccb89d..ef0b26af41 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json @@ -6,10 +6,10 @@ "subEntityType": "resulttype", "subEntityValue": "publication", "orderField": "title", - "queueMaxSize": "2000", + "queueMaxSize": "800", "groupMaxSize": "100", "maxChildren": "100", - "slidingWindowSize": "200", + "slidingWindowSize": "80", "rootBuilder": [ "result", "resultProject_outcome_isProducedBy", @@ -29,8 +29,7 @@ }, "pace": { "clustering" : [ - { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, - { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } }, + { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } ], "decisionTree": { diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json index f4a107c745..623abbf9f4 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json @@ -6,10 +6,10 @@ "subEntityType" : "resulttype", "subEntityValue" : "software", "orderField" : "title", - "queueMaxSize" : "2000", + "queueMaxSize" : "800", "groupMaxSize" : "100", "maxChildren" : "100", - "slidingWindowSize" : "200", + "slidingWindowSize" : "80", "rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "includeChildren" : "true", "idPath" : "$.id", @@ -17,8 +17,7 @@ }, "pace" : { "clustering" : [ - { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, - { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } }, + { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, { "name" : "lowercase", "fields" : [ "doi", "url" ], "params" : { } } ], "decisionTree": { diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/json/software_merge.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/json/software_merge.json new file mode 100644 index 0000000000..b146d61022 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/json/software_merge.json @@ -0,0 +1,3 @@ +{"context": [], "dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive", "classname": "sysimport:crosswalk:datasetarchive", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "dedup-similarity-result-levenstein", "invisible": false, "trust": "0.95"}, "resourcetype": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "pid": [], "contributor": [], "resulttype": {"classid": "software", "classname": "software", "schemename": "dnet:result_typologies", "schemeid": "dnet:result_typologies"}, "relevantdate": [], "collectedfrom": [{"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "Journal.fi", "key": "10|openaire____::6eef8049d0feedc089ee009abca55e35"}], "id": "50|a89337edbe55::4930db9e954866d70916cbfba9f81f97", "subject": [], "instance": [{"refereed": null, "hostedby": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "Journal.fi", "key": "10|openaire____::6eef8049d0feedc089ee009abca55e35"}, "processingchargeamount": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": ""}, "license": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": ""}, "url": [], "distributionlocation": "", "processingchargecurrency": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": ""}, "dateofacceptance": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "2016-01-01"}, "collectedfrom": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "Journal.fi", "key": "10|openaire____::6eef8049d0feedc089ee009abca55e35"}, "accessright": {"classid": "OPEN", "classname": "Open Access", "schemename": "dnet:access_modes", "schemeid": "dnet:access_modes"}, "instancetype": {"classid": "0001", "classname": "Article", "schemename": "dnet:dataCite_resource", "schemeid": "dnet:dataCite_resource"}}], "embargoenddate": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": ""}, "lastupdatetimestamp": 0, "author": [{"surname": "Go\\u0308tz", "name": "Julia", "pid": [], "rank": 5, "affiliation": [], "fullname": "G\\u00f6tz, Julia"}, {"surname": "Wolff", "name": "Stephan", "pid": [], "rank": 6, "affiliation": [], "fullname": "Wolff, Stephan"}, {"surname": "Jansen", "name": "Olav", "pid": [], "rank": 7, "affiliation": [], "fullname": "Jansen, Olav"}, {"surname": "Dressler", "name": "Dirk", "pid": [{"qualifier": {"classid": "ORCID", "classname": "ORCID"}, "value": "0000-0000-0656-9999"},{"qualifier": {"classid": "id", "classname": "id"}, "value": "987654321"}], "rank": 8, "affiliation": [], "fullname": "Dressler, Dirk"}, {"surname": "Schneider", "name": "Susanne A.", "pid": [], "rank": 9, "affiliation": [], "fullname": "Schneider, Susanne A."}], "source": [], "dateofcollection": "2019-11-05T14:49:22.351Z", "fulltext": [], "dateoftransformation": "2019-11-05T16:10:58.988Z", "description": [], "format": [], "coverage": [], "publisher": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": ""}, "language": {"classid": "eng", "classname": "English", "schemename": "dnet:languages", "schemeid": "dnet:languages"}, "bestaccessright": {"classid": "OPEN SOURCE", "classname": "Open Source", "schemename": "dnet:access_modes", "schemeid": "dnet:access_modes"}, "country": [], "extraInfo": [], "originalId": [], "dateofacceptance": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "2018-09-30"}, "title": [{"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "qualifier": {"classid": "main title", "classname": "main title", "schemename": "dnet:dataCite_title", "schemeid": "dnet:dataCite_title"}, "value": "Altered brain activation in a reversal learning task unmasks adaptive changes in cognitive control in writer's cramp"}]} +{"context": [], "dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "sysimport:crosswalk:repository", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": true, "inferenceprovenance": "dedup-similarity-result-levenstein", "invisible": false, "trust": "0.9"}, "resourcetype": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "pid": [{"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "qualifier": {"classid": "doi", "classname": "doi", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "10.1016/j.nicl.2015.11.006"}], "contributor": [], "resulttype": {"classid": "software", "classname": "software", "schemename": "dnet:result_typologies", "schemeid": "dnet:result_typologies"}, "relevantdate": [], "collectedfrom": [{"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "BASE (Open Access Aggregator)", "key": "10|openaire____::df45502607927471ecf8a6ae83683ff5"}], "id": "50|base_oa_____::0968af610a356656706657e4f234b340", "subject": [], "instance": [{"refereed": null, "hostedby": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "NeuroImage: Clinical", "key": "10|doajarticles::0c0e74daa5d95504eade9c81ebbd5b8a"}, "processingchargeamount": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": ""}, "license": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "http://creativecommons.org/licenses/by-nc-nd/4.0/"}, "url": ["http://dx.doi.org/10.1016/j.nicl.2015.11.006"], "distributionlocation": "", "processingchargecurrency": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": ""}, "dateofacceptance": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "2016-01-01"}, "collectedfrom": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "BASE (Open Access Aggregator)", "key": "10|openaire____::df45502607927471ecf8a6ae83683ff5"}, "accessright": {"classid": "OPEN", "classname": "Open Access", "schemename": "dnet:access_modes", "schemeid": "dnet:access_modes"}, "instancetype": {"classid": "0001", "classname": "Article", "schemename": "dnet:publication_resource", "schemeid": "dnet:publication_resource"}}], "embargoenddate": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": ""}, "lastupdatetimestamp": 0, "author": [{"surname": "Zeuner", "name": "Kirsten E.", "pid": [], "rank": 1, "affiliation": [], "fullname": "Zeuner, Kirsten E."}, {"surname": "Knutzen", "name": "Arne", "pid": [], "rank": 2, "affiliation": [], "fullname": "Knutzen, Arne"}, {"surname": "Granert", "name": "Oliver", "pid": [{"qualifier": {"classid": "ORCID", "classname": "ORCID"}, "value": "0000-0002-0656-1023"}], "rank": 3, "affiliation": [], "fullname": "Granert, Oliver"}, {"surname": "Sablowsky", "name": "Simone", "pid": [], "rank": 4, "affiliation": [], "fullname": "Sablowsky, Simone"}, {"surname": "Go\\u0308tz", "name": "Julia", "pid": [], "rank": 5, "affiliation": [], "fullname": "G\\u00f6tz, Julia"}, {"surname": "Wolff", "name": "Stephan", "pid": [], "rank": 6, "affiliation": [], "fullname": "Wolff, Stephan"}, {"surname": "Jansen", "name": "Olav", "pid": [], "rank": 7, "affiliation": [], "fullname": "Jansen, Olav"}, {"surname": "Dressler", "name": "Dirk", "pid": [], "rank": 8, "affiliation": [], "fullname": "Dressler, Dirk"}, {"surname": "Schneider", "name": "Susanne A.", "pid": [], "rank": 9, "affiliation": [], "fullname": "Schneider, Susanne A."}, {"surname": "Klein", "name": "Christine", "pid": [], "rank": 10, "affiliation": [], "fullname": "Klein, Christine"}, {"surname": "Deuschl", "name": "Gu\\u0308nther", "pid": [], "rank": 11, "affiliation": [], "fullname": "Deuschl, G\\u00fcnther"}, {"surname": "Eimeren", "name": "Thilo", "pid": [], "rank": 12, "affiliation": [], "fullname": "van Eimeren, Thilo"}, {"surname": "Witt", "name": "Karsten", "pid": [], "rank": 13, "affiliation": [], "fullname": "Witt, Karsten"}], "source": [], "dateofcollection": "2017-07-27T19:04:09.131Z", "fulltext": [], "dateoftransformation": "2019-01-23T10:15:19.582Z", "description": [], "format": [], "coverage": [], "publisher": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "Elsevier BV"}, "language": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "bestaccessright": {"classid": "OPEN SOURCE", "classname": "Open Source", "schemename": "dnet:access_modes", "schemeid": "dnet:access_modes"}, "country": [{"classid": "IT", "classname": "Italy", "schemeid": "dnet:countries", "schemename": "dnet:countries"}], "extraInfo": [], "originalId": ["10.1016/j.nicl.2015.11.006"], "dateofacceptance": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "2016-01-01"}, "title": [{"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "qualifier": {"classid": "main title", "classname": "main title", "schemename": "dnet:dataCite_title", "schemeid": "dnet:dataCite_title"}, "value": "Altered brain activation in a reversal learning task unmasks adaptive changes in cognitive control in writer's cramp"}]} +{"context": [], "dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:datasetarchive", "classname": "sysimport:crosswalk:datasetarchive", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": true, "inferenceprovenance": "dedup-similarity-result-levenstein", "invisible": false, "trust": "0.9"}, "resourcetype": {"classid": "0004", "classname": "Conference object", "schemename": "dnet:dataCite_resource", "schemeid": "dnet:dataCite_resource"}, "pid": [], "contributor": [], "resulttype": {"classid": "software", "classname": "software", "schemename": "dnet:result_typologies", "schemeid": "dnet:result_typologies"}, "relevantdate": [], "collectedfrom": [{"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "CRIS UNS (Current Research Information System University of Novi Sad)", "key": "10|CRIS_UNS____::f66f1bd369679b5b077dcdf006089556"}], "id": "50|CrisUnsNoviS::9f9d014eea45dab432cab636c4c9cf39", "subject": [], "instance": [{"refereed": null, "hostedby": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "CRIS UNS (Current Research Information System University of Novi Sad)", "key": "10|CRIS_UNS____::f66f1bd369679b5b077dcdf006089556"}, "processingchargeamount": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": ""}, "license": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": ""}, "url": ["https://www.cris.uns.ac.rs/record.jsf?recordId=113444&source=OpenAIRE&language=en"], "distributionlocation": "", "processingchargecurrency": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": ""}, "dateofacceptance": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "2019-01-01"}, "collectedfrom": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "CRIS UNS (Current Research Information System University of Novi Sad)", "key": "10|CRIS_UNS____::f66f1bd369679b5b077dcdf006089556"}, "accessright": {"classid": "UNKNOWN", "classname": "UNKNOWN", "schemename": "dnet:access_modes", "schemeid": "dnet:access_modes"}, "instancetype": {"classid": "0004", "classname": "Conference object", "schemename": "dnet:dataCite_resource", "schemeid": "dnet:dataCite_resource"}}], "embargoenddate": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": ""}, "lastupdatetimestamp": 0, "author": [{"surname": "Zeuner", "name": "Kirsten E.", "pid": [], "rank": 1, "affiliation": [], "fullname": "Zeuner, Kirsten E."}, {"surname": "Knutzen", "name": "Arne", "pid": [], "rank": 2, "affiliation": [], "fullname": "Knutzen, Arne"}, {"surname": "Granert", "name": "Oliver", "pid": [{"qualifier": {"classid": "ORCID", "classname": "ORCID"}, "value": "0000-0002-0656-1023"}, {"qualifier": {"classid": "pubmed", "classname": "pubmed"}, "value": "pubmed.it"}], "rank": 3, "affiliation": [], "fullname": "Granert, Oliver"}, {"surname": "Sablowsky", "name": "Simone", "pid": [{"qualifier": {"classid": "id", "classname": "id"}, "value": "12345678"}], "rank": 4, "affiliation": [], "fullname": "Sablowsky, Simone"}, {"surname": "Go\\u0308tz", "name": "Julia", "pid": [], "rank": 5, "affiliation": [], "fullname": "G\\u00f6tz, Julia"}, {"surname": "Wolff", "name": "Stephan", "pid": [], "rank": 6, "affiliation": [], "fullname": "Wolff, Stephan"}, {"surname": "Jansen", "name": "Olav", "pid": [{"qualifier": {"classid": "ORCID", "classname": "ORCID"}, "value": "0000-0000-0656-1023"},{"qualifier": {"classid": "id", "classname": "id"}, "value": "987654321"}], "rank": 7, "affiliation": [], "fullname": "Jansen, Olav"}, {"surname": "Dressler", "name": "Dirk", "pid": [], "rank": 8, "affiliation": [], "fullname": "Dressler, Dirk"}, {"surname": "Schneider", "name": "Susanne A.", "pid": [], "rank": 9, "affiliation": [], "fullname": "Schneider, Susanne A."}], "source": [], "dateofcollection": "2020-03-10T15:05:38.685Z", "fulltext": [], "dateoftransformation": "2020-03-11T20:11:13.15Z", "description": [], "format": [], "coverage": [], "publisher": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": ""}, "language": {"classid": "en", "classname": "en", "schemename": "dnet:languages", "schemeid": "dnet:languages"}, "bestaccessright": {"classid": "UNKNOWN", "classname": "unknown", "schemename": "dnet:access_modes", "schemeid": "dnet:access_modes"}, "country": [{"classid": "FI", "classname": "Finland", "schemeid": "dnet:countries", "schemename": "dnet:countries"}], "extraInfo": [], "originalId": ["(BISIS)113444", "https://www.cris.uns.ac.rs/record.jsf?recordId=113444&source=OpenAIRE&language=en"], "dateofacceptance": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "value": "2016-01-01"}, "title": [{"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "", "classname": "", "schemename": "", "schemeid": ""}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": ""}, "qualifier": {"classid": "test title", "classname": "test title", "schemename": "dnet:dataCite_title", "schemeid": "dnet:dataCite_title"}, "value": "Antichains of copies of ultrahomogeneous structures"}]} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index e1c4b53b50..fd707e949a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -8,6 +8,7 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -97,7 +98,7 @@ public class CleanGraphSparkJob { .json(outputPath); } - private static T fixDefaults(T value) { + protected static T fixDefaults(T value) { if (value instanceof Datasource) { // nothing to clean here } else if (value instanceof Project) { @@ -134,11 +135,6 @@ public class CleanGraphSparkJob { .setResourcetype( qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE)); } - if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) { - r - .setBestaccessright( - qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); - } if (Objects.nonNull(r.getInstance())) { for (Instance i : r.getInstance()) { if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) { @@ -152,6 +148,15 @@ public class CleanGraphSparkJob { } } } + if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) { + Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance()); + if (Objects.isNull(bestaccessrights)) { + r.setBestaccessright( + qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); + } else { + r.setBestaccessright(bestaccessrights); + } + } if (Objects.nonNull(r.getAuthor())) { boolean nullRank = r .getAuthor() diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index fc77950d01..c43ee29fe0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -378,6 +378,10 @@ public abstract class AbstractMdRecordToOafMapper { protected abstract Field prepareDatasetStorageDate(Document doc, DataInfo info); + public static Qualifier createBestAccessRights(final List instanceList) { + return getBestAccessRights(instanceList); + } + protected static Qualifier getBestAccessRights(final List instanceList) { if (instanceList != null) { final Optional min = instanceList diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/workflow.xml new file mode 100644 index 0000000000..66eaeeb263 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/workflow.xml @@ -0,0 +1,157 @@ + + + + reuseContent + false + should import content from the aggregator or reuse a previous version + + + contentPath + path location to store (or reuse) content from the aggregator + + + postgresURL + the postgres URL to access to the database + + + postgresUser + the user postgres + + + postgresPassword + the password postgres + + + dbSchema + beta + the database schema according to the D-Net infrastructure (beta or production) + + + mongoURL + mongoDB url, example: mongodb://[username:password@]host[:port] + + + mongoDb + mongo database + + + isLookupUrl + the address of the lookUp service + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication + --hdfsPath${contentPath}/db_claims + --postgresUrl${postgresURL} + --postgresUser${postgresUser} + --postgresPassword${postgresPassword} + --isLookupUrl${isLookupUrl} + --actionclaims + --dbschema${dbSchema} + + + + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication + -p${contentPath}/odf_claims + -mongourl${mongoURL} + -mongodb${mongoDb} + -fODF + -lstore + -iclaim + + + + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication + -p${contentPath}/oaf_claims + -mongourl${mongoURL} + -mongodb${mongoDb} + -fOAF + -lstore + -iclaim + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/workflow.xml deleted file mode 100644 index 1ac456976d..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/workflow.xml +++ /dev/null @@ -1,169 +0,0 @@ - - - - migrationClaimsPathStep1 - the base path to store hdfs file - - - migrationClaimsPathStep2 - the temporary path to store entities before dispatching - - - migrationClaimsPathStep3 - the graph Raw base path - - - postgresURL - the postgres URL to access to the database - - - postgresUser - the user postgres - - - postgresPassword - the password postgres - - - mongoURL - mongoDB url, example: mongodb://[username:password@]host[:port] - - - mongoDb - mongo database - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication - -p${migrationClaimsPathStep1}/db_claims - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - -aclaims - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication - -p${migrationClaimsPathStep1}/odf_claims - -mongourl${mongoURL} - -mongodb${mongoDb} - -fODF - -lstore - -iclaim - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication - -p${migrationClaimsPathStep1}/oaf_claims - -mongourl${mongoURL} - -mongodb${mongoDb} - -fOAF - -lstore - -iclaim - - - - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - GenerateClaimEntities - eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication - dhp-aggregation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - -s${migrationClaimsPathStep1}/db_claims,${migrationClaimsPathStep1}/oaf_claims,${migrationClaimsPathStep1}/odf_claims - -t${migrationClaimsPathStep2}/claim_entities - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - - - - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - GenerateClaimGraph - eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication - dhp-aggregation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - -s${migrationClaimsPathStep2}/claim_entities - -g${migrationClaimsPathStep3} - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java index 4783aa81f6..559a30b1ea 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java @@ -57,6 +57,8 @@ public class CleaningFunctionTest { String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json")); Publication p_in = MAPPER.readValue(json, Publication.class); + assertNull(p_in.getBestaccessright()); + assertTrue(p_in instanceof Result); assertTrue(p_in instanceof Publication); @@ -84,6 +86,9 @@ public class CleaningFunctionTest { .map(p -> p.getQualifier()) .allMatch(q -> pidTerms.contains(q.getClassid()))); + Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out); + assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid()); + // TODO add more assertions to verity the cleaned values System.out.println(MAPPER.writeValueAsString(p_out)); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json index 2c1d5017d6..5d0c0d1ed8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json @@ -185,12 +185,7 @@ "surname": "" } ], - "bestaccessright": { - "classid": "CLOSED", - "classname": "Closed Access", - "schemeid": "dnet:access_modes", - "schemename": "dnet:access_modes" - }, + "bestaccessright": null, "collectedfrom": [ { "key": "10|CSC_________::a2b9ce8435390bcbfc05f3cae3948747", diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 601cf64499..eb63d4423d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -156,6 +156,7 @@ public class PrepareRelationsJob { .parquet(outputPath); } + // experimental private static void prepareRelationsDataset( SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, int relPartitions) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 697a00a09b..faa81ad644 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -134,6 +134,7 @@ --inputRelationsPath${inputGraphRootPath}/relation --outputPath${workingDir}/relation --maxRelations${maxRelations} + --relationFilter${relationFilter} --relPartitions5000 diff --git a/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/graph_construction.xml b/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/graph_construction.xml index 819b3e12d1..4d77883b49 100644 --- a/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/graph_construction.xml +++ b/dhp-workflows/dhp-worfklow-profiles/src/main/resources/eu/dnetlib/dhp/wf/profiles/graph_construction.xml @@ -11,6 +11,29 @@ Data Provision 30 + + + reuse cached content from the aggregation system + + reuseContent + true + + + + + + + + set the aggregator content path + + contentPath + /tmp/beta_aggregator + + + + + + Set the path containing the AGGREGATOR graph @@ -62,87 +85,94 @@ - - Set the target path to store the CLEANED graph + + Set the target path to store the ORCID enriched graph - cleanedGraphPath - /tmp/beta_provision/graph/05_graph_cleaned + orcidGraphPath + /tmp/beta_provision/graph/05_graph_orcid - - Set the target path to store the ORCID enriched graph - - orcidGraphPath - /tmp/beta_provision/graph/06_graph_orcid - - - - - Set the target path to store the BULK TAGGED graph bulkTaggingGraphPath - /tmp/beta_provision/graph/07_graph_bulktagging + /tmp/beta_provision/graph/06_graph_bulktagging + Set the target path to store the AFFILIATION from INSTITUTIONAL REPOS graph affiliationGraphPath - /tmp/beta_provision/graph/08_graph_affiliation + /tmp/beta_provision/graph/07_graph_affiliation + Set the target path to store the COMMUNITY from SELECTED SOURCES graph communityOrganizationGraphPath - /tmp/beta_provision/graph/09_graph_comunity_organization + /tmp/beta_provision/graph/08_graph_comunity_organization + Set the target path to store the FUNDING from SEMANTIC RELATION graph fundingGraphPath - /tmp/beta_provision/graph/10_graph_funding + /tmp/beta_provision/graph/09_graph_funding + Set the target path to store the COMMUNITY from SEMANTIC RELATION graph communitySemRelGraphPath - /tmp/beta_provision/graph/11_graph_comunity_sem_rel + /tmp/beta_provision/graph/10_graph_comunity_sem_rel + Set the target path to store the COUNTRY enriched graph countryGraphPath - /tmp/beta_provision/graph/12_graph_country + /tmp/beta_provision/graph/11_graph_country + + + Set the target path to store the CLEANED graph + + cleanedGraphPath + /tmp/beta_provision/graph/12_graph_cleaned + + + + + + Set the target path to store the blacklisted graph @@ -153,6 +183,7 @@ + Set the lookup address @@ -163,6 +194,7 @@ + Set the map of paths for the Bulk Tagging @@ -173,6 +205,7 @@ + Set the map of associations organization, community list for the propagation of community to result through organization @@ -185,6 +218,7 @@ + Set the dedup orchestrator name @@ -195,6 +229,7 @@ + declares the ActionSet ids to promote in the RAW graph @@ -205,6 +240,7 @@ + declares the ActionSet ids to promote in the INFERRED graph @@ -215,6 +251,7 @@ + wait configurations @@ -222,6 +259,7 @@ + create the AGGREGATOR graph @@ -230,7 +268,9 @@ { 'graphOutputPath' : 'aggregatorGraphPath', - 'isLookupUrl' : 'isLookUpUrl' + 'isLookupUrl' : 'isLookUpUrl', + 'reuseContent' : 'reuseContent', + 'contentPath' : 'contentPath' } @@ -241,8 +281,6 @@ 'postgresURL' : 'jdbc:postgresql://beta.services.openaire.eu:5432/dnet_openaireplus', 'postgresUser' : 'dnet', 'postgresPassword' : '', - 'reuseContent' : 'false', - 'contentPath' : '/tmp/beta_provision/aggregator', 'workingDir' : '/tmp/beta_provision/working_dir/aggregator' } @@ -252,6 +290,7 @@ + create the RAW graph @@ -289,6 +328,7 @@ + search for duplicates in the raw graph @@ -314,6 +354,7 @@ + create the INFERRED graph @@ -351,6 +392,7 @@ + mark duplicates as deleted and redistribute the relationships @@ -375,41 +417,6 @@ - - clean the properties in the graph typed as Qualifier according to the vocabulary indicated in schemeid - - executeOozieJob - IIS - - { - 'graphInputPath' : 'consistentGraphPath', - 'graphOutputPath': 'cleanedGraphPath', - 'isLookupUrl': 'isLookUpUrl' - } - - - { - 'oozie.wf.application.path' : '/lib/dnet/oa/graph/clean/oozie_app', - 'workingPath' : '/tmp/beta_provision/working_dir/clean' - } - - build-report - - - - - - - - Do we skip the graph enrichment steps? (Yes to prepare the graph for the IIS) - - NO - - - - - - propagates ORCID among results linked by allowedsemrels semantic relationships @@ -417,7 +424,7 @@ IIS { - 'sourcePath' : 'cleanedGraphPath', + 'sourcePath' : 'consistentGraphPath', 'outputPath': 'orcidGraphPath' } @@ -435,6 +442,7 @@ + mark results respecting some rules as belonging to communities @@ -460,6 +468,7 @@ + creates relashionships between results and organizations when the organizations are associated to institutional repositories @@ -484,6 +493,7 @@ + marks as belonging to communities the result collected from datasources related to the organizations specified in the organizationCommunityMap @@ -509,6 +519,7 @@ + created relation between projects and results linked to other results trough allowedsemrel semantic relations linked to projects @@ -534,6 +545,7 @@ + tag as belonging to communitites result in in allowedsemrels relation with other result already linked to communities @@ -560,6 +572,7 @@ + associated to results colleced from allowedtypes and those in the whithelist the country of the organization(s) handling the datasource it is collected from @@ -584,10 +597,36 @@ build-report + + + + + + + clean the properties in the graph typed as Qualifier according to the vocabulary indicated in schemeid + + executeOozieJob + IIS + + { + 'graphInputPath' : 'countryGraphPath', + 'graphOutputPath': 'cleanedGraphPath', + 'isLookupUrl': 'isLookUpUrl' + } + + + { + 'oozie.wf.application.path' : '/lib/dnet/oa/graph/clean/oozie_app', + 'workingPath' : '/tmp/beta_provision/working_dir/clean' + } + + build-report + + removes blacklisted relations @@ -595,7 +634,7 @@ IIS { - 'sourcePath' : 'countryGraphPath', + 'sourcePath' : 'cleanedGraphPath', 'outputPath': 'blacklistedGraphPath' } diff --git a/pom.xml b/pom.xml index 89b7e8829c..4619f31749 100644 --- a/pom.xml +++ b/pom.xml @@ -315,7 +315,7 @@ eu.dnetlib dnet-pace-core - 4.0.1 + 4.0.2 eu.dnetlib