diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java index f94d286e4..18950d98e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.broker.model; import java.io.Serializable; -import java.util.Map; public class Event implements Serializable { @@ -25,7 +24,7 @@ public class Event implements Serializable { private boolean instantMessage; - private Map map; + private MappedFields map; public Event() { } @@ -33,7 +32,7 @@ public class Event implements Serializable { public Event(final String producerId, final String eventId, final String topic, final String payload, final Long creationDate, final Long expiryDate, final boolean instantMessage, - final Map map) { + final MappedFields map) { this.producerId = producerId; this.eventId = eventId; this.topic = topic; @@ -100,11 +99,11 @@ public class Event implements Serializable { this.instantMessage = instantMessage; } - public Map getMap() { + public MappedFields getMap() { return this.map; } - public void setMap(final Map map) { + public void setMap(final MappedFields map) { this.map = map; } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java index 6e38f7448..315a054d3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java @@ -3,9 +3,8 @@ package eu.dnetlib.dhp.broker.model; import java.text.ParseException; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.stream.Collectors; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; @@ -30,10 +29,10 @@ public class EventFactory { final Event res = new Event(); - final Map map = createMapFromResult(updateInfo); + final MappedFields map = createMapFromResult(updateInfo); final String eventId = calculateEventId( - updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId(), updateInfo.getHighlightValueAsString()); + updateInfo.getTopicPath(), updateInfo.getTarget().getOpenaireId(), updateInfo.getHighlightValueAsString()); res.setEventId(eventId); res.setProducerId(PRODUCER_ID); @@ -46,35 +45,35 @@ public class EventFactory { return res; } - private static Map createMapFromResult(final UpdateInfo updateInfo) { - final Map map = new HashMap<>(); + private static MappedFields createMapFromResult(final UpdateInfo updateInfo) { + final MappedFields map = new MappedFields(); final OaBrokerMainEntity source = updateInfo.getSource(); final OaBrokerMainEntity target = updateInfo.getTarget(); - map.put("target_datasource_id", target.getCollectedFromId()); - map.put("target_datasource_name", target.getCollectedFromName()); + map.setTargetDatasourceId(target.getCollectedFromId()); + map.setTargetDatasourceName(target.getCollectedFromName()); - map.put("target_publication_id", target.getOriginalId()); + map.setTargetResultId(target.getOpenaireId()); final List titles = target.getTitles(); if (titles.size() > 0) { - map.put("target_publication_title", titles.get(0)); + map.setTargetResultTitle(titles.get(0)); } final long date = parseDateTolong(target.getPublicationdate()); if (date > 0) { - map.put("target_dateofacceptance", date); + map.setTargetDateofacceptance(date); } - map.put("target_publication_subject_list", target.getSubjects()); - map.put("target_publication_author_list", target.getCreators()); + map.setTargetSubjects(target.getSubjects().stream().map(s -> s.getValue()).collect(Collectors.toList())); + map.setTargetAuthors(target.getCreators().stream().map(a -> a.getFullname()).collect(Collectors.toList())); // PROVENANCE INFO - map.put("trust", updateInfo.getTrust()); - map.put("provenance_datasource_id", source.getCollectedFromId()); - map.put("provenance_datasource_name", source.getCollectedFromName()); - map.put("provenance_publication_id_list", source.getOriginalId()); + map.setTrust(updateInfo.getTrust()); + map.setProvenanceDatasourceId(source.getCollectedFromId()); + map.setProvenanceDatasourceName(source.getCollectedFromName()); + map.setProvenanceResultId(source.getOpenaireId()); return map; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MappedFields.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MappedFields.java new file mode 100644 index 000000000..4b0ed171b --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MappedFields.java @@ -0,0 +1,114 @@ + +package eu.dnetlib.dhp.broker.model; + +import java.io.Serializable; +import java.util.List; + +public class MappedFields implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -7999704113195802008L; + + private String targetDatasourceId; + private String targetDatasourceName; + private String targetResultId; + private String targetResultTitle; + private long targetDateofacceptance; + private List targetSubjects; + private List targetAuthors; + private float trust; + private String provenanceDatasourceId; + private String provenanceDatasourceName; + private String provenanceResultId; + + public String getTargetDatasourceId() { + return targetDatasourceId; + } + + public void setTargetDatasourceId(final String targetDatasourceId) { + this.targetDatasourceId = targetDatasourceId; + } + + public String getTargetDatasourceName() { + return targetDatasourceName; + } + + public void setTargetDatasourceName(final String targetDatasourceName) { + this.targetDatasourceName = targetDatasourceName; + } + + public String getTargetResultId() { + return targetResultId; + } + + public void setTargetResultId(final String targetResultId) { + this.targetResultId = targetResultId; + } + + public String getTargetResultTitle() { + return targetResultTitle; + } + + public void setTargetResultTitle(final String targetResultTitle) { + this.targetResultTitle = targetResultTitle; + } + + public long getTargetDateofacceptance() { + return targetDateofacceptance; + } + + public void setTargetDateofacceptance(final long targetDateofacceptance) { + this.targetDateofacceptance = targetDateofacceptance; + } + + public List getTargetSubjects() { + return targetSubjects; + } + + public void setTargetSubjects(final List targetSubjects) { + this.targetSubjects = targetSubjects; + } + + public List getTargetAuthors() { + return targetAuthors; + } + + public void setTargetAuthors(final List targetAuthors) { + this.targetAuthors = targetAuthors; + } + + public float getTrust() { + return trust; + } + + public void setTrust(final float trust) { + this.trust = trust; + } + + public String getProvenanceDatasourceId() { + return provenanceDatasourceId; + } + + public void setProvenanceDatasourceId(final String provenanceDatasourceId) { + this.provenanceDatasourceId = provenanceDatasourceId; + } + + public String getProvenanceDatasourceName() { + return provenanceDatasourceName; + } + + public void setProvenanceDatasourceName(final String provenanceDatasourceName) { + this.provenanceDatasourceName = provenanceDatasourceName; + } + + public String getProvenanceResultId() { + return provenanceResultId; + } + + public void setProvenanceResultId(final String provenanceResultId) { + this.provenanceResultId = provenanceResultId; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java similarity index 52% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java index 868faa8f5..1be782a12 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java @@ -11,7 +11,7 @@ import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.expressions.Aggregator; +import org.apache.spark.sql.TypedColumn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,15 +22,15 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProjectAggregator; import scala.Tuple2; -public class JoinEntitiesJob { +public class JoinStep1Job { - private static final Logger log = LoggerFactory.getLogger(JoinEntitiesJob.class); + private static final Logger log = LoggerFactory.getLogger(JoinStep1Job.class); public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - JoinEntitiesJob.class + JoinStep1Job.class .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); @@ -43,7 +43,7 @@ public class JoinEntitiesJob { final String workingPath = parser.get("workingPath"); log.info("workingPath: {}", workingPath); - final String joinedEntitiesPath = workingPath + "/joinedEntities"; + final String joinedEntitiesPath = workingPath + "/joinedEntities_step1"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -52,39 +52,28 @@ public class JoinEntitiesJob { ClusterUtils.removeDir(spark, joinedEntitiesPath); - final Dataset r0 = ClusterUtils + final Dataset sources = ClusterUtils .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); - final Dataset r1 = join( - r0, ClusterUtils.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class), - new RelatedProjectAggregator()); - // final Dataset r2 = join( - // r1, ClusterUtils.readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class), new - // RelatedDatasetAggregator()); - // final Dataset r3 = join( - // r2, ClusterUtils.readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class), new - // RelatedPublicationAggregator()); - // final Dataset r4 = join( - // r3, ClusterUtils.readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class), new - // RelatedSoftwareAggregator()); + final Dataset typedRels = ClusterUtils + .readPath(spark, workingPath + "/relatedProjects", RelatedProject.class); - r1.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath); + final TypedColumn, OaBrokerMainEntity> aggr = new RelatedProjectAggregator() + .toColumn(); + + sources + .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") + .groupByKey( + (MapFunction, String>) t -> t._1.getOpenaireId(), + Encoders.STRING()) + .agg(aggr) + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .json(joinedEntitiesPath); }); } - private static Dataset join(final Dataset sources, - final Dataset typedRels, - final Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> aggr) { - - return sources - .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") - .groupByKey( - (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) - .agg(aggr.toColumn()) - .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); - - } - } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java new file mode 100644 index 000000000..103d79553 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java @@ -0,0 +1,79 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.TypedColumn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftwareAggregator; +import scala.Tuple2; + +public class JoinStep2Job { + + private static final Logger log = LoggerFactory.getLogger(JoinStep2Job.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + JoinStep2Job.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String joinedEntitiesPath = workingPath + "/joinedEntities_step2"; + log.info("joinedEntitiesPath: {}", joinedEntitiesPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, joinedEntitiesPath); + + final Dataset sources = ClusterUtils + .readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class); + + final Dataset typedRels = ClusterUtils + .readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class); + + final TypedColumn, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator() + .toColumn(); + + sources + .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") + .groupByKey( + (MapFunction, String>) t -> t._1.getOpenaireId(), + Encoders.STRING()) + .agg(aggr) + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .json(joinedEntitiesPath); + + }); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java new file mode 100644 index 000000000..ceb199dc4 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java @@ -0,0 +1,79 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.TypedColumn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasetAggregator; +import scala.Tuple2; + +public class JoinStep3Job { + + private static final Logger log = LoggerFactory.getLogger(JoinStep3Job.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + JoinStep3Job.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String joinedEntitiesPath = workingPath + "/joinedEntities_step3"; + log.info("joinedEntitiesPath: {}", joinedEntitiesPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, joinedEntitiesPath); + + final Dataset sources = ClusterUtils + .readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class); + + final Dataset typedRels = ClusterUtils + .readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class); + + final TypedColumn, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator() + .toColumn(); + + sources + .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") + .groupByKey( + (MapFunction, String>) t -> t._1.getOpenaireId(), + Encoders.STRING()) + .agg(aggr) + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .json(joinedEntitiesPath); + + }); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java new file mode 100644 index 000000000..3067810dd --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java @@ -0,0 +1,79 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.TypedColumn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublicationAggregator; +import scala.Tuple2; + +public class JoinStep4Job { + + private static final Logger log = LoggerFactory.getLogger(JoinStep4Job.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + JoinStep4Job.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String joinedEntitiesPath = workingPath + "/joinedEntities_step4"; + log.info("joinedEntitiesPath: {}", joinedEntitiesPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, joinedEntitiesPath); + + final Dataset sources = ClusterUtils + .readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class); + + final Dataset typedRels = ClusterUtils + .readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class); + + final TypedColumn, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator() + .toColumn(); + + sources + .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") + .groupByKey( + (MapFunction, String>) t -> t._1.getOpenaireId(), + Encoders.STRING()) + .agg(aggr) + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .json(joinedEntitiesPath); + + }); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java index 934ddff59..47a9f36c5 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java @@ -32,7 +32,7 @@ public class PrepareGroupsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - JoinEntitiesJob.class + PrepareGroupsJob.class .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); @@ -58,7 +58,7 @@ public class PrepareGroupsJob { ClusterUtils.removeDir(spark, groupsPath); final Dataset results = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities", OaBrokerMainEntity.class); + .readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class); final Dataset mergedRels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java index fe9c87e87..6e006ccf0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java @@ -61,6 +61,7 @@ public class PrepareRelatedDatasetsJob { final Dataset rels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java index 3ae240982..0af5d21b7 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java @@ -13,8 +13,6 @@ import org.apache.spark.sql.SaveMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.broker.objects.OaBrokerProject; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; @@ -29,8 +27,6 @@ public class PrepareRelatedProjectsJob { private static final Logger log = LoggerFactory.getLogger(PrepareRelatedProjectsJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils @@ -67,6 +63,7 @@ public class PrepareRelatedProjectsJob { final Dataset rels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java index 8814ef3e0..84752776e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java @@ -13,8 +13,6 @@ import org.apache.spark.sql.SaveMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.broker.objects.OaBrokerRelatedPublication; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; @@ -28,8 +26,6 @@ public class PrepareRelatedPublicationsJob { private static final Logger log = LoggerFactory.getLogger(PrepareRelatedPublicationsJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils @@ -68,6 +64,7 @@ public class PrepareRelatedPublicationsJob { final Dataset rels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java index 0704fb44a..0ad753a97 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java @@ -13,8 +13,6 @@ import org.apache.spark.sql.SaveMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; @@ -29,8 +27,6 @@ public class PrepareRelatedSoftwaresJob { private static final Logger log = LoggerFactory.getLogger(PrepareRelatedSoftwaresJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils @@ -67,6 +63,7 @@ public class PrepareRelatedSoftwaresJob { final Dataset rels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java index 9aa6f5384..c0287bda0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java @@ -43,12 +43,15 @@ public abstract class UpdateMatcher { if (source != res) { for (final T hl : findDifferences(source, res)) { final Topic topic = getTopicFunction().apply(hl); - final UpdateInfo info = new UpdateInfo<>(topic, hl, source, res, getCompileHighlightFunction(), - getHighlightToStringFunction(), dedupConfig); - final String s = DigestUtils.md5Hex(info.getHighlightValueAsString()); - if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) { - } else { - infoMap.put(s, info); + if (topic != null) { + final UpdateInfo info = new UpdateInfo<>(topic, hl, source, res, + getCompileHighlightFunction(), + getHighlightToStringFunction(), dedupConfig); + + final String s = DigestUtils.md5Hex(info.getHighlightValueAsString()); + if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) { + infoMap.put(s, info); + } } } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java index c197734a3..c8b93596a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java @@ -16,7 +16,7 @@ public abstract class AbstractEnrichMissingDataset extends UpdateMatcher topic, (p, rel) -> p.getDatasets().add(rel), - rel -> rel.getOriginalId()); + rel -> rel.getOpenaireId()); } protected abstract boolean filterByType(String relType); @@ -29,14 +29,14 @@ public abstract class AbstractEnrichMissingDataset extends UpdateMatcher filterByType(rel.getRelType())) - .map(OaBrokerRelatedDataset::getOriginalId) + .map(OaBrokerRelatedDataset::getOpenaireId) .collect(Collectors.toSet()); return source .getDatasets() .stream() .filter(rel -> filterByType(rel.getRelType())) - .filter(d -> !existingDatasets.contains(d.getOriginalId())) + .filter(d -> !existingDatasets.contains(d.getOpenaireId())) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java index ad6d8263b..cc4f68f87 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java @@ -16,7 +16,7 @@ public abstract class AbstractEnrichMissingPublication extends UpdateMatcher topic, (p, rel) -> p.getPublications().add(rel), - rel -> rel.getOriginalId()); + rel -> rel.getOpenaireId()); } @@ -31,14 +31,14 @@ public abstract class AbstractEnrichMissingPublication extends UpdateMatcher filterByType(rel.getRelType())) - .map(OaBrokerRelatedPublication::getOriginalId) + .map(OaBrokerRelatedPublication::getOpenaireId) .collect(Collectors.toSet()); return source .getPublications() .stream() .filter(rel -> filterByType(rel.getRelType())) - .filter(p -> !existingPublications.contains(p.getOriginalId())) + .filter(p -> !existingPublications.contains(p.getOpenaireId())) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java index 452caa503..d01f0c370 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java @@ -16,7 +16,7 @@ public class EnrichMissingSoftware super(true, s -> Topic.ENRICH_MISSING_SOFTWARE, (p, s) -> p.getSoftwares().add(s), - s -> s.getName()); + s -> s.getOpenaireId()); } @Override diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java index aaffe1249..a612b6074 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java @@ -16,7 +16,7 @@ public class EnrichMoreSoftware extends UpdateMatcher { super(true, s -> Topic.ENRICH_MORE_SOFTWARE, (p, s) -> p.getSoftwares().add(s), - s -> s.getName()); + s -> s.getOpenaireId()); } @Override diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java index 04fb494ef..97b289b69 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java @@ -22,6 +22,7 @@ public class EnrichMoreSubject extends UpdateMatcher { @Override protected List findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) { + final Set existingSubjects = target .getSubjects() .stream() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java index 49c46c7f0..58e41acbb 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java @@ -17,6 +17,8 @@ public class BrokerConstants { public static final float MIN_TRUST = 0.25f; public static final float MAX_TRUST = 1.00f; + public static final int MAX_NUMBER_OF_RELS = 20; + public static Class[] getModelClasses() { final Set> list = new HashSet<>(); list.addAll(Arrays.asList(ModelSupport.getOafModelClasses())); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java index 7451e5891..1a3f514e8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -7,29 +7,7 @@ import java.util.List; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.EventFactory; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; -import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy; -import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsRelatedTo; -import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedBy; -import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedTo; -import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetReferences; -import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMissingProject; -import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMoreProject; -import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsReferencedBy; -import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsRelatedTo; -import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy; -import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo; -import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences; -import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMissingSoftware; -import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMoreSoftware; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; import eu.dnetlib.pace.config.DedupConfig; @@ -38,31 +16,31 @@ public class EventFinder { private static List> matchers = new ArrayList<>(); static { matchers.add(new EnrichMissingAbstract()); - matchers.add(new EnrichMissingAuthorOrcid()); - matchers.add(new EnrichMissingOpenAccess()); - matchers.add(new EnrichMissingPid()); - matchers.add(new EnrichMissingPublicationDate()); - matchers.add(new EnrichMissingSubject()); - matchers.add(new EnrichMoreOpenAccess()); - matchers.add(new EnrichMorePid()); - matchers.add(new EnrichMoreSubject()); + // matchers.add(new EnrichMissingAuthorOrcid()); + // matchers.add(new EnrichMissingOpenAccess()); + // matchers.add(new EnrichMissingPid()); + // matchers.add(new EnrichMissingPublicationDate()); + // matchers.add(new EnrichMissingSubject()); + // matchers.add(new EnrichMoreOpenAccess()); + // matchers.add(new EnrichMorePid()); + // matchers.add(new EnrichMoreSubject()); - // Advanced matchers - matchers.add(new EnrichMissingProject()); - matchers.add(new EnrichMoreProject()); - matchers.add(new EnrichMissingSoftware()); - matchers.add(new EnrichMoreSoftware()); - matchers.add(new EnrichMissingPublicationIsRelatedTo()); - matchers.add(new EnrichMissingPublicationIsReferencedBy()); - matchers.add(new EnrichMissingPublicationReferences()); - matchers.add(new EnrichMissingPublicationIsSupplementedTo()); - matchers.add(new EnrichMissingPublicationIsSupplementedBy()); - matchers.add(new EnrichMissingDatasetIsRelatedTo()); - matchers.add(new EnrichMissingDatasetIsReferencedBy()); - matchers.add(new EnrichMissingDatasetReferences()); - matchers.add(new EnrichMissingDatasetIsSupplementedTo()); - matchers.add(new EnrichMissingDatasetIsSupplementedBy()); - matchers.add(new EnrichMissingAbstract()); + // // Advanced matchers + // matchers.add(new EnrichMissingProject()); + // matchers.add(new EnrichMoreProject()); + // matchers.add(new EnrichMissingSoftware()); + // matchers.add(new EnrichMoreSoftware()); + // matchers.add(new EnrichMissingPublicationIsRelatedTo()); + // matchers.add(new EnrichMissingPublicationIsReferencedBy()); + // matchers.add(new EnrichMissingPublicationReferences()); + // matchers.add(new EnrichMissingPublicationIsSupplementedTo()); + // matchers.add(new EnrichMissingPublicationIsSupplementedBy()); + // matchers.add(new EnrichMissingDatasetIsRelatedTo()); + // matchers.add(new EnrichMissingDatasetIsReferencedBy()); + // matchers.add(new EnrichMissingDatasetReferences()); + // matchers.add(new EnrichMissingDatasetIsSupplementedTo()); + // matchers.add(new EnrichMissingDatasetIsSupplementedBy()); + // matchers.add(new EnrichMissingAbstract()); } public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventGroup.java index 25c7698a0..503e31ae1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventGroup.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventGroup.java @@ -14,12 +14,16 @@ public class EventGroup implements Serializable { */ private static final long serialVersionUID = 765977943803533130L; - private final List data = new ArrayList<>(); + private List data = new ArrayList<>(); public List getData() { return data; } + public void setData(final List data) { + this.data = data; + } + public EventGroup addElement(final Event elem) { data.add(elem); return this; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java index 25d0d2bca..048683b50 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java @@ -111,7 +111,7 @@ public final class UpdateInfo { final OaBrokerMainEntity hl = new OaBrokerMainEntity(); compileHighlight.accept(hl, getHighlightValue()); - final String provId = getSource().getOriginalId(); + final String provId = getSource().getOpenaireId(); final String provRepo = getSource().getCollectedFromName(); final String provUrl = getSource() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasetAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasetAggregator.java index a963f073d..45000f6f3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasetAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasetAggregator.java @@ -7,6 +7,7 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.expressions.Aggregator; import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import scala.Tuple2; public class RelatedDatasetAggregator @@ -30,7 +31,7 @@ public class RelatedDatasetAggregator @Override public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2 t) { final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; - if (t._2 != null) { + if (t._2 != null && res.getDatasets().size() < BrokerConstants.MAX_NUMBER_OF_RELS) { res.getDatasets().add(t._2.getRelDataset()); } return res; @@ -40,7 +41,14 @@ public class RelatedDatasetAggregator @Override public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { if (StringUtils.isNotBlank(g1.getOpenaireId())) { - g1.getDatasets().addAll(g2.getDatasets()); + final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getDatasets().size(); + if (availables > 0) { + if (g2.getDatasets().size() <= availables) { + g1.getDatasets().addAll(g2.getDatasets()); + } else { + g1.getDatasets().addAll(g2.getDatasets().subList(0, availables)); + } + } return g1; } else { return g2; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProjectAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProjectAggregator.java index 3fedb1a32..787217837 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProjectAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProjectAggregator.java @@ -7,6 +7,7 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.expressions.Aggregator; import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import scala.Tuple2; public class RelatedProjectAggregator @@ -30,7 +31,7 @@ public class RelatedProjectAggregator @Override public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2 t) { final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; - if (t._2 != null) { + if (t._2 != null && res.getProjects().size() < BrokerConstants.MAX_NUMBER_OF_RELS) { res.getProjects().add(t._2.getRelProject()); } return res; @@ -40,7 +41,14 @@ public class RelatedProjectAggregator @Override public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { if (StringUtils.isNotBlank(g1.getOpenaireId())) { - g1.getProjects().addAll(g2.getProjects()); + final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getProjects().size(); + if (availables > 0) { + if (g2.getProjects().size() <= availables) { + g1.getProjects().addAll(g2.getProjects()); + } else { + g1.getProjects().addAll(g2.getProjects().subList(0, availables)); + } + } return g1; } else { return g2; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublicationAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublicationAggregator.java index b331599ad..2289ebe36 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublicationAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublicationAggregator.java @@ -7,6 +7,7 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.expressions.Aggregator; import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import scala.Tuple2; public class RelatedPublicationAggregator @@ -31,7 +32,7 @@ public class RelatedPublicationAggregator public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2 t) { final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; - if (t._2 != null) { + if (t._2 != null && res.getPublications().size() < BrokerConstants.MAX_NUMBER_OF_RELS) { res.getPublications().add(t._2.getRelPublication()); } return res; @@ -41,8 +42,16 @@ public class RelatedPublicationAggregator @Override public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { if (StringUtils.isNotBlank(g1.getOpenaireId())) { - g1.getPublications().addAll(g2.getPublications()); + final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getPublications().size(); + if (availables > 0) { + if (g2.getPublications().size() <= availables) { + g1.getPublications().addAll(g2.getPublications()); + } else { + g1.getPublications().addAll(g2.getPublications().subList(0, availables)); + } + } return g1; + } else { return g2; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftwareAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftwareAggregator.java index d3b1c3407..fedb3c9e9 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftwareAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftwareAggregator.java @@ -7,6 +7,7 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.expressions.Aggregator; import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import scala.Tuple2; public class RelatedSoftwareAggregator @@ -30,7 +31,7 @@ public class RelatedSoftwareAggregator @Override public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2 t) { final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; - if (t._2 != null) { + if (t._2 != null && res.getSoftwares().size() < BrokerConstants.MAX_NUMBER_OF_RELS) { res.getSoftwares().add(t._2.getRelSoftware()); } return res; @@ -40,7 +41,14 @@ public class RelatedSoftwareAggregator @Override public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { if (StringUtils.isNotBlank(g1.getOpenaireId())) { - g1.getSoftwares().addAll(g2.getSoftwares()); + final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getSoftwares().size(); + if (availables > 0) { + if (g2.getSoftwares().size() <= availables) { + g1.getSoftwares().addAll(g2.getSoftwares()); + } else { + g1.getSoftwares().addAll(g2.getSoftwares().subList(0, availables)); + } + } return g1; } else { return g2; diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 18e2eedca..8752200ff 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -216,14 +216,86 @@ - + - + yarn cluster - JoinEntitiesJob - eu.dnetlib.dhp.broker.oa.JoinEntitiesJob + JoinStep1 + eu.dnetlib.dhp.broker.oa.JoinStep1Job + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + JoinStep2 + eu.dnetlib.dhp.broker.oa.JoinStep2Job + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + JoinStep3 + eu.dnetlib.dhp.broker.oa.JoinStep3Job + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + JoinStep4 + eu.dnetlib.dhp.broker.oa.JoinStep4Job dhp-broker-events-${projectVersion}.jar --executor-cores=${sparkExecutorCores} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index 1ccdef929..fd68bfec2 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -73,62 +73,14 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - yarn - cluster - JoinEntitiesJob - eu.dnetlib.dhp.broker.oa.JoinEntitiesJob - dhp-broker-events-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphPath${graphInputPath} - --workingPath${workingPath} - - - - - - - - yarn - cluster - PrepareGroupsJob - eu.dnetlib.dhp.broker.oa.PrepareGroupsJob - dhp-broker-events-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphPath${graphInputPath} - --workingPath${workingPath} - - - - - - + yarn cluster diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala index 7b21ecda2..1a45defb0 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala @@ -271,6 +271,26 @@ object DoiBoostMappingUtil { } + + def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String): StructuredProperty = { + val sp = new StructuredProperty + sp.setQualifier(createQualifier(classId,className, schemeId, schemeName)) + sp.setValue(value) + sp + + } + + + + def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String, dataInfo: DataInfo): StructuredProperty = { + val sp = new StructuredProperty + sp.setQualifier(createQualifier(classId,className, schemeId, schemeName)) + sp.setValue(value) + sp.setDataInfo(dataInfo) + sp + + } + def createSP(value: String, classId: String, schemeId: String): StructuredProperty = { val sp = new StructuredProperty sp.setQualifier(createQualifier(classId, schemeId)) @@ -279,6 +299,8 @@ object DoiBoostMappingUtil { } + + def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = { val sp = new StructuredProperty sp.setQualifier(createQualifier(classId, schemeId)) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala index 2419f86a3..7bb4686cf 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -129,16 +129,16 @@ case object ConversionUtil { val fieldOfStudy = item._2 if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) { val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => { - val s1 = createSP(s.DisplayName, "keyword", "dnet:subject_classification_typologies") + val s1 = createSP(s.DisplayName, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies") val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString) var resList: List[StructuredProperty] = List(s1) if (s.MainType.isDefined) { val maintp = s.MainType.get - val s2 = createSP(s.MainType.get, "keyword", "dnet:subject_classification_typologies") + val s2 = createSP(s.MainType.get, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies") s2.setDataInfo(di) resList = resList ::: List(s2) if (maintp.contains(".")) { - val s3 = createSP(maintp.split("\\.").head, "keyword", "dnet:subject_classification_typologies") + val s3 = createSP(maintp.split("\\.").head, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies") s3.setDataInfo(di) resList = resList ::: List(s3) } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala index f3d051bd6..a24f0e6bb 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala @@ -43,7 +43,7 @@ object SparkPreProcessMAG { val distinctPaper: Dataset[MagPapers] = spark.createDataset(result) distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct") - logger.info("Phase 6) Enrich Publication with description") + logger.info("Phase 0) Enrich Publication with description") val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract] pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml index 34ba5d89d..bf91958cf 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml @@ -32,7 +32,7 @@ - + diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala index d31f80248..f62ac2b67 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala @@ -18,6 +18,9 @@ class CrossrefMappingTest { val mapper = new ObjectMapper() + + + @Test def testFunderRelationshipsMapping(): Unit = { val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString @@ -58,6 +61,27 @@ class CrossrefMappingTest { } + @Test + def testOrcidID() :Unit = { + val json = Source.fromInputStream(getClass.getResourceAsStream("orcid_data.json")).mkString + + + assertNotNull(json) + assertFalse(json.isEmpty); + + val resultList: List[Oaf] = Crossref2Oaf.convert(json) + + assertTrue(resultList.nonEmpty) + + val items = resultList.filter(p => p.isInstanceOf[Result]) + + + mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) + items.foreach(p => println(mapper.writeValueAsString(p))) + + + } + @Test def testEmptyTitle() :Unit = { val json = Source.fromInputStream(getClass.getResourceAsStream("empty_title.json")).mkString diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/orcid_data.json b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/orcid_data.json new file mode 100644 index 000000000..def546ddb --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/orcid_data.json @@ -0,0 +1,271 @@ +{ + "DOI":"10.1016/j.carbpol.2020.115930", + "issued":{ + "date-parts":[ + [ + 2020, + 4 + ] + ] + }, + "published-print":{ + "date-parts":[ + [ + 2020, + 4 + ] + ] + }, + "prefix":"10.1016", + "subject":[ + "Organic Chemistry", + "Materials Chemistry", + "Polymers and Plastics" + ], + "author":[ + { + "affiliation":[ + + ], + "given":"Lei", + "family":"Fang", + "sequence":"first" + }, + { + "affiliation":[ + + ], + "given":"Hua", + "family":"Lin", + "sequence":"additional" + }, + { + "affiliation":[ + + ], + "given":"Zhenfeng", + "family":"Wu", + "sequence":"additional" + }, + { + "affiliation":[ + + ], + "given":"Zhen", + "family":"Wang", + "sequence":"additional" + }, + { + "affiliation":[ + + ], + "given":"Xinxin", + "family":"Fan", + "sequence":"additional" + }, + { + "affiliation":[ + + ], + "given":"Ziting", + "family":"Cheng", + "sequence":"additional" + }, + { + "affiliation":[ + + ], + "given":"Xiaoya", + "family":"Hou", + "sequence":"additional" + }, + { + "authenticated-orcid":false, + "given":"Daquan", + "family":"Chen", + "sequence":"additional", + "affiliation":[ + + ], + "ORCID":"http://orcid.org/0000-0002-6796-0204" + } + ], + "reference-count":41, + "ISSN":[ + "0144-8617" + ], + "assertion":[ + { + "name":"publisher", + "value":"Elsevier", + "label":"This article is maintained by" + }, + { + "name":"articletitle", + "value":"In vitro/vivo evaluation of novel mitochondrial targeting charge-reversal polysaccharide-based antitumor nanoparticle", + "label":"Article Title" + }, + { + "name":"journaltitle", + "value":"Carbohydrate Polymers", + "label":"Journal Title" + }, + { + "name":"articlelink", + "value":"https://doi.org/10.1016/j.carbpol.2020.115930", + "label":"CrossRef DOI link to publisher maintained version" + }, + { + "name":"content_type", + "value":"article", + "label":"Content Type" + }, + { + "name":"copyright", + "value":"\\u00a9 2020 Elsevier Ltd. All rights reserved.", + "label":"Copyright" + } + ], + "member":"78", + "source":"Crossref", + "score":1.0, + "deposited":{ + "timestamp":1584590965000, + "date-time":"2020-03-19T04:09:25Z", + "date-parts":[ + [ + 2020, + 3, + 19 + ] + ] + }, + "indexed":{ + "timestamp":1584592912467, + "date-time":"2020-03-19T04:41:52Z", + "date-parts":[ + [ + 2020, + 3, + 19 + ] + ] + }, + "type":"journal-article", + "URL":"http://dx.doi.org/10.1016/j.carbpol.2020.115930", + "is-referenced-by-count":0, + "volume":"234", + "issn-type":[ + { + "type":"print", + "value":"0144-8617" + } + ], + "link":[ + { + "URL":"https://api.elsevier.com/content/article/PII:S0144861720301041?httpAccept=text/xml", + "intended-application":"text-mining", + "content-version":"vor", + "content-type":"text/xml" + }, + { + "URL":"https://api.elsevier.com/content/article/PII:S0144861720301041?httpAccept=text/plain", + "intended-application":"text-mining", + "content-version":"vor", + "content-type":"text/plain" + } + ], + "update-policy":"http://dx.doi.org/10.1016/elsevier_cm_policy", + "references-count":41, + "short-container-title":[ + "Carbohydrate Polymers" + ], + "publisher":"Elsevier BV", + "content-domain":{ + "domain":[ + "elsevier.com", + "sciencedirect.com" + ], + "crossmark-restriction":true + }, + "language":"en", + "license":[ + { + "URL":"https://www.elsevier.com/tdm/userlicense/1.0/", + "start":{ + "timestamp":1585699200000, + "date-time":"2020-04-01T00:00:00Z", + "date-parts":[ + [ + 2020, + 4, + 1 + ] + ] + }, + "content-version":"tdm", + "delay-in-days":0 + } + ], + "created":{ + "timestamp":1581759678000, + "date-time":"2020-02-15T09:41:18Z", + "date-parts":[ + [ + 2020, + 2, + 15 + ] + ] + }, + "title":[ + "In vitro/vivo evaluation of novel mitochondrial targeting charge-reversal polysaccharide-based antitumor nanoparticle" + ], + "alternative-id":[ + "S0144861720301041" + ], + "container-title":[ + "Carbohydrate Polymers" + ], + "funder":[ + { + "doi-asserted-by":"publisher", + "DOI":"10.13039/501100007129", + "name":"Natural Science Foundation of Shandong Province", + "award":[ + "ZR2019ZD24", + "ZR2019YQ30" + ] + }, + { + "doi-asserted-by":"publisher", + "DOI":"10.13039/100010449", + "name":"Ministry of Education, Libya", + "award":[ + + ] + }, + { + "doi-asserted-by":"publisher", + "DOI":"10.13039/501100012249", + "name":"Jiangxi University of Traditional Chinese Medicine", + "award":[ + "TCM-0906" + ] + }, + { + "name":"Taishan Scholar Program", + "award":[ + "qnts20161035" + ] + }, + { + "name":"Open fund project of Key Laboratory of Modern Preparation of TCM", + "award":[ + + ] + } + ], + "page":"115930", + "article-number":"115930" +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index 8f43ab1cf..e1c4b53b5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -147,9 +147,23 @@ public class CleanGraphSparkJob { if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) { i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY); } + if (Objects.isNull(i.getRefereed())) { + i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS)); + } + } + } + if (Objects.nonNull(r.getAuthor())) { + boolean nullRank = r + .getAuthor() + .stream() + .anyMatch(a -> Objects.isNull(a.getRank())); + if (nullRank) { + int i = 1; + for (Author author : r.getAuthor()) { + author.setRank(i++); + } } } - if (value instanceof Publication) { } else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 0bad89e9e..da2ba4723 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -50,8 +50,6 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.DbClient; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; @@ -106,6 +104,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i final String dbPassword = parser.get("postgresPassword"); log.info("postgresPassword: xxx"); + final String dbSchema = parser.get("dbschema"); + log.info("dbSchema {}: " + dbSchema); + final String isLookupUrl = parser.get("isLookupUrl"); log.info("isLookupUrl: {}", isLookupUrl); @@ -125,7 +126,11 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i smdbe.execute("queryDatasources.sql", smdbe::processDatasource); log.info("Processing projects..."); - smdbe.execute("queryProjects.sql", smdbe::processProject); + if (dbSchema.equalsIgnoreCase("beta")) { + smdbe.execute("queryProjects.sql", smdbe::processProject); + } else { + smdbe.execute("queryProjects_production.sql", smdbe::processProject); + } log.info("Processing orgs..."); smdbe.execute("queryOrganizations.sql", smdbe::processOrganization); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java index 58f068943..8ede40773 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java @@ -9,7 +9,15 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.ExtraInfo; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Journal; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.OAIProvenance; +import eu.dnetlib.dhp.schema.oaf.OriginDescription; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.utils.DHPUtils; public class OafMapperUtils { @@ -89,7 +97,9 @@ public class OafMapperUtils { } public static StructuredProperty structuredProperty( - final String value, final Qualifier qualifier, final DataInfo dataInfo) { + final String value, + final Qualifier qualifier, + final DataInfo dataInfo) { if (value == null) { return null; } @@ -192,8 +202,12 @@ public class OafMapperUtils { } public static String createOpenaireId( - final int prefix, final String originalId, final boolean to_md5) { - if (to_md5) { + final int prefix, + final String originalId, + final boolean to_md5) { + if (StringUtils.isBlank(originalId)) { + return null; + } else if (to_md5) { final String nsPrefix = StringUtils.substringBefore(originalId, "::"); final String rest = StringUtils.substringAfter(originalId, "::"); return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest)); @@ -203,7 +217,9 @@ public class OafMapperUtils { } public static String createOpenaireId( - final String type, final String originalId, final boolean to_md5) { + final String type, + final String originalId, + final boolean to_md5) { switch (type) { case "datasource": return createOpenaireId(10, originalId, to_md5); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json index 4e838561d..6dfef32db 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json @@ -34,5 +34,11 @@ "paramLongName": "isLookupUrl", "paramDescription": "the url of the ISLookupService", "paramRequired": true + }, + { + "paramName": "dbschema", + "paramLongName": "dbschema", + "paramDescription": "the database schema according to the D-Net infrastructure (beta or production)", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 9a7e36570..d8b61b5ea 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -25,6 +25,11 @@ postgresPassword the password postgres + + + dbSchema + beta + the database schema according to the D-Net infrastructure (beta or production) mongoURL @@ -125,6 +130,7 @@ --postgresPassword${postgresPassword} --isLookupUrl${isLookupUrl} --actionclaims + --dbschema${dbSchema} @@ -175,6 +181,7 @@ --postgresUser${postgresUser} --postgresPassword${postgresPassword} --isLookupUrl${isLookupUrl} + --dbschema${dbSchema} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index 4d2633bc5..80b800017 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -25,9 +25,7 @@ import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; -import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; @@ -109,11 +107,12 @@ public class CreateRelatedEntitiesJob_phase1 { Class clazz, String outputPath) { - Dataset> relsByTarget = readPathRelation(spark, inputRelationsPath) + Dataset> relsByTarget = readPathRelation(spark, inputRelationsPath) .filter("dataInfo.deletedbyinference == false") .map( - (MapFunction>) r -> new Tuple2<>(r.getTarget(), r), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class))) + (MapFunction>) r -> new Tuple2<>(r.getTarget(), + r), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))) .cache(); Dataset> entities = readPathEntity(spark, inputEntityPath, clazz) @@ -129,7 +128,7 @@ public class CreateRelatedEntitiesJob_phase1 { relsByTarget .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") .map( - (MapFunction, Tuple2>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper( + (MapFunction, Tuple2>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper( t._1()._2(), t._2()._2()), Encoders.kryo(RelatedEntityWrapper.class)) .write() @@ -232,11 +231,11 @@ public class CreateRelatedEntitiesJob_phase1 { * @param relationPath * @return the Dataset containing all the relationships */ - private static Dataset readPathRelation( + private static Dataset readPathRelation( SparkSession spark, final String relationPath) { log.info("Reading relations from: {}", relationPath); - return spark.read().load(relationPath).as(Encoders.bean(SortableRelation.class)); + return spark.read().load(relationPath).as(Encoders.bean(Relation.class)); } private static void removeOutputDir(SparkSession spark, String path) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 6b184071a..19823120c 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -3,35 +3,31 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.util.*; -import java.util.function.Function; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.*; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.clearspring.analytics.util.Lists; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.FunctionalInterfaceSupport; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.SortableRelation; +import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; -import scala.Function1; +import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; /** @@ -133,22 +129,35 @@ public class PrepareRelationsJob { SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, int relPartitions) { - RDD cappedRels = readPathRelationRDD(spark, inputRelationsPath) - .repartition(relPartitions) - .filter(rel -> !rel.getDataInfo().getDeletedbyinference()) - .filter(rel -> !relationFilter.contains(rel.getRelClass())) - // group by SOURCE and apply limit - .mapToPair(rel -> new Tuple2<>(rel.getSource(), rel)) - .groupByKey(new RelationPartitioner(relPartitions)) - .flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator()) - // group by TARGET and apply limit - .mapToPair(rel -> new Tuple2<>(rel.getTarget(), rel)) - .groupByKey(new RelationPartitioner(relPartitions)) - .flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator()) + // group by SOURCE and apply limit + RDD bySource = readPathRelationRDD(spark, inputRelationsPath) + .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) + .filter(rel -> relationFilter.contains(rel.getRelClass()) == false) + .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r)) + .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) + .groupBy(Tuple2::_1) + .map(Tuple2::_2) + .map(t -> Iterables.limit(t, maxRelations)) + .flatMap(Iterable::iterator) + .map(Tuple2::_2) + .rdd(); + + // group by TARGET and apply limit + RDD byTarget = readPathRelationRDD(spark, inputRelationsPath) + .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) + .filter(rel -> relationFilter.contains(rel.getRelClass()) == false) + .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getTarget()), r)) + .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) + .groupBy(Tuple2::_1) + .map(Tuple2::_2) + .map(t -> Iterables.limit(t, maxRelations)) + .flatMap(Iterable::iterator) + .map(Tuple2::_2) .rdd(); spark - .createDataset(cappedRels, Encoders.bean(SortableRelation.class)) + .createDataset(bySource.union(byTarget), Encoders.bean(Relation.class)) + .repartition(relPartitions) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); @@ -162,10 +171,10 @@ public class PrepareRelationsJob { * @param inputPath * @return the JavaRDD containing all the relationships */ - private static JavaRDD readPathRelationRDD( + private static JavaRDD readPathRelationRDD( SparkSession spark, final String inputPath) { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, SortableRelation.class)); + return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class)); } private static void removeOutputDir(SparkSession spark, String path) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java index f9fde14e5..051fe923d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java @@ -19,7 +19,7 @@ public class ProvisionModelSupport { RelatedEntityWrapper.class, JoinedEntity.class, RelatedEntity.class, - SortableRelation.class)); + SortableRelationKey.class)); return modelClasses.toArray(new Class[] {}); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java index d708b6ed0..cbb143ee2 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java @@ -5,28 +5,30 @@ import java.io.Serializable; import com.google.common.base.Objects; +import eu.dnetlib.dhp.schema.oaf.Relation; + public class RelatedEntityWrapper implements Serializable { - private SortableRelation relation; + private Relation relation; private RelatedEntity target; public RelatedEntityWrapper() { } - public RelatedEntityWrapper(SortableRelation relation, RelatedEntity target) { + public RelatedEntityWrapper(Relation relation, RelatedEntity target) { this(null, relation, target); } - public RelatedEntityWrapper(TypedRow entity, SortableRelation relation, RelatedEntity target) { + public RelatedEntityWrapper(TypedRow entity, Relation relation, RelatedEntity target) { this.relation = relation; this.target = target; } - public SortableRelation getRelation() { + public Relation getRelation() { return relation; } - public void setRelation(SortableRelation relation) { + public void setRelation(Relation relation) { this.relation = relation; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java deleted file mode 100644 index b6571b9bf..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java +++ /dev/null @@ -1,38 +0,0 @@ - -package eu.dnetlib.dhp.oa.provision.model; - -import java.io.Serializable; -import java.util.Map; - -import com.google.common.collect.ComparisonChain; -import com.google.common.collect.Maps; - -import eu.dnetlib.dhp.schema.oaf.Relation; - -public class SortableRelation extends Relation implements Comparable, Serializable { - - private static final Map weights = Maps.newHashMap(); - - static { - weights.put("outcome", 0); - weights.put("supplement", 1); - weights.put("affiliation", 2); - weights.put("relationship", 3); - weights.put("publicationDataset", 4); - weights.put("similarity", 5); - - weights.put("provision", 6); - weights.put("participation", 7); - weights.put("dedup", 8); - } - - @Override - public int compareTo(Relation o) { - return ComparisonChain - .start() - .compare(weights.get(getSubRelType()), weights.get(o.getSubRelType())) - .compare(getSource(), o.getSource()) - .compare(getTarget(), o.getTarget()) - .result(); - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java new file mode 100644 index 000000000..bf7f9330d --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java @@ -0,0 +1,90 @@ + +package eu.dnetlib.dhp.oa.provision.model; + +import java.io.Serializable; +import java.util.Map; +import java.util.Optional; + +import com.google.common.base.Objects; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Maps; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class SortableRelationKey implements Comparable, Serializable { + + private static final Map weights = Maps.newHashMap(); + + static { + weights.put("outcome", 0); + weights.put("supplement", 1); + weights.put("review", 2); + weights.put("citation", 3); + weights.put("affiliation", 4); + weights.put("relationship", 5); + weights.put("publicationDataset", 6); + weights.put("similarity", 7); + + weights.put("provision", 8); + weights.put("participation", 9); + weights.put("dedup", 10); + } + + private static final long serialVersionUID = 3232323; + + private String groupingKey; + + private String subRelType; + + public static SortableRelationKey create(Relation r, String groupingKey) { + SortableRelationKey sr = new SortableRelationKey(); + sr.setGroupingKey(groupingKey); + sr.setSubRelType(r.getSubRelType()); + return sr; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + SortableRelationKey that = (SortableRelationKey) o; + return getGroupingKey().equals(that.getGroupingKey()); + } + + @Override + public int hashCode() { + return Objects.hashCode(getGroupingKey()); + } + + @Override + public int compareTo(SortableRelationKey o) { + return ComparisonChain + .start() + .compare(getGroupingKey(), o.getGroupingKey()) + .compare(getWeight(this), getWeight(o)) + .result(); + } + + private Integer getWeight(SortableRelationKey o) { + return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE); + } + + public String getSubRelType() { + return subRelType; + } + + public void setSubRelType(String subRelType) { + this.subRelType = subRelType; + } + + public String getGroupingKey() { + return groupingKey; + } + + public void setGroupingKey(String groupingKey) { + this.groupingKey = groupingKey; + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java index c7862b48a..7bd8b9217 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java @@ -4,12 +4,16 @@ package eu.dnetlib.dhp.oa.provision.utils; import org.apache.spark.Partitioner; import org.apache.spark.util.Utils; +import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; + /** * Used in combination with SortableRelationKey, allows to partition the records by source id, therefore allowing to * sort relations sharing the same source id by the ordering defined in SortableRelationKey. */ public class RelationPartitioner extends Partitioner { + private static final long serialVersionUID = 343434456L; + private final int numPartitions; public RelationPartitioner(int numPartitions) { @@ -23,8 +27,18 @@ public class RelationPartitioner extends Partitioner { @Override public int getPartition(Object key) { - String partitionKey = (String) key; - return Utils.nonNegativeMod(partitionKey.hashCode(), numPartitions()); + SortableRelationKey partitionKey = (SortableRelationKey) key; + return Utils.nonNegativeMod(partitionKey.getGroupingKey().hashCode(), numPartitions()); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof RelationPartitioner) { + RelationPartitioner p = (RelationPartitioner) obj; + if (p.numPartitions() == numPartitions()) + return true; + } + return false; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index b2aa01dc7..5d8d9fa20 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -276,7 +276,7 @@ public class XmlRecordFactory implements Serializable { pidType, pidValue .toLowerCase() - .replaceAll("orcid", ""))); + .replaceAll("^.*orcid\\.org\\/", ""))); } } }); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SortableRelationKeyTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SortableRelationKeyTest.java new file mode 100644 index 000000000..72f28fdf2 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SortableRelationKeyTest.java @@ -0,0 +1,42 @@ + +package eu.dnetlib.dhp.oa.provision; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class SortableRelationKeyTest { + + @Test + public void doTesSorting() throws IOException { + final ObjectMapper mapper = new ObjectMapper(); + final String json = IOUtils.toString(this.getClass().getResourceAsStream("relations.json")); + final List relations = mapper.readValue(json, new TypeReference>() { + }); + + relations + .stream() + .map(r -> SortableRelationKey.create(r, r.getSource())) + .sorted() + .forEach( + + it -> { + try { + System.out.println(mapper.writeValueAsString(it)); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + }); + + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.json b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.json new file mode 100644 index 000000000..3280d0d61 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.json @@ -0,0 +1,90 @@ +[ + { + "collectedfrom": [], + "dataInfo": { + "deletedbyinference": false, + "inferred": false, + "invisible": false, + "provenanceaction": { + "classid": "sysimport:crosswalk:entityregistry", + "classname": "Harvested", + "schemeid": "dnet:provenanceActions", + "schemename": "dnet:provenanceActions" + }, + "trust": "0.9" + }, + "lastupdatetimestamp": 1592688952862, + "properties": [], + "relClass": "hasAuthorInstitution", + "relType": "resultOrganization", + "source": "1", + "subRelType": "affiliation", + "target": "2" + }, + { + "collectedfrom": [], + "dataInfo": { + "deletedbyinference": false, + "inferred": false, + "invisible": false, + "provenanceaction": { + "classid": "sysimport:crosswalk:entityregistry", + "classname": "Harvested", + "schemeid": "dnet:provenanceActions", + "schemename": "dnet:provenanceActions" + }, + "trust": "0.9" + }, + "lastupdatetimestamp": 1592688952862, + "properties": [], + "relClass": "isAuthorInstitutionOf", + "relType": "resultOrganization", + "source": "2", + "subRelType": "affiliation", + "target": "1" + }, + { + "collectedfrom": [], + "dataInfo": { + "deletedbyinference": false, + "inferred": false, + "invisible": false, + "provenanceaction": { + "classid": "sysimport:crosswalk:entityregistry", + "classname": "Harvested", + "schemeid": "dnet:provenanceActions", + "schemename": "dnet:provenanceActions" + }, + "trust": "0.9" + }, + "lastupdatetimestamp": 1592688952862, + "properties": [], + "relClass": "isProducedBy", + "relType": "resultProject", + "source": "1", + "subRelType": "outcome", + "target": "2" + }, + { + "collectedfrom": [], + "dataInfo": { + "deletedbyinference": false, + "inferred": false, + "invisible": false, + "provenanceaction": { + "classid": "sysimport:crosswalk:entityregistry", + "classname": "Harvested", + "schemeid": "dnet:provenanceActions", + "schemename": "dnet:provenanceActions" + }, + "trust": "0.9" + }, + "lastupdatetimestamp": 1592688952862, + "properties": [], + "relClass": "produces", + "relType": "resultProject", + "source": "2", + "subRelType": "outcome", + "target": "1" + } +] \ No newline at end of file