From c3286f4c37d8e828ae00240a45e933fc03d9fabc Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 23 Jun 2020 09:32:32 +0200 Subject: [PATCH] fixed relType --- .../broker/oa/PrepareRelatedDatasetsJob.java | 15 ++++++++----- .../broker/oa/PrepareRelatedProjectsJob.java | 17 +++++++------- .../oa/PrepareRelatedPublicationsJob.java | 22 +++++++++++++------ .../broker/oa/PrepareRelatedSoftwaresJob.java | 14 +++++++----- .../aggregators/withRels/RelatedDataset.java | 13 ++--------- .../aggregators/withRels/RelatedProject.java | 12 +--------- .../withRels/RelatedPublication.java | 13 +---------- .../aggregators/withRels/RelatedSoftware.java | 13 ++--------- 8 files changed, 47 insertions(+), 72 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java index bcd333d564..110f5f317f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java @@ -15,9 +15,11 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.broker.objects.OaBrokerRelatedDataset; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Relation; public class PrepareRelatedDatasetsJob { @@ -60,17 +62,18 @@ public class PrepareRelatedDatasetsJob { final Dataset rels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) + .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); rels .joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner") - .map( - t -> new RelatedDataset( - t._1.getSource(), - t._1.getRelType(), - t._2), - Encoders.bean(RelatedDataset.class)) + .map(t -> { + final RelatedDataset rel = new RelatedDataset(t._1.getSource(), t._2); + rel.getRelDataset().setRelType(t._1.getRelClass()); + return rel; + }, Encoders.bean(RelatedDataset.class)) .write() .mode(SaveMode.Overwrite) .json(relsPath); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java index 0460bfabb2..3ae240982d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java @@ -15,7 +15,9 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.broker.objects.OaBrokerProject; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; @@ -58,22 +60,21 @@ public class PrepareRelatedProjectsJob { ClusterUtils.removeDir(spark, relsPath); - final Dataset projects = ClusterUtils.readPath(spark, graphPath + "/project", Project.class); + final Dataset projects = ClusterUtils + .readPath(spark, graphPath + "/project", Project.class) + .filter(p -> !ClusterUtils.isDedupRoot(p.getId())) + .map(ConversionUtils::oafProjectToBrokerProject, Encoders.bean(OaBrokerProject.class)); final Dataset rels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)) + .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); rels - .joinWith(projects, projects.col("id").equalTo(rels.col("target")), "inner") - .map( - t -> new RelatedProject( - t._1.getSource(), - t._1.getRelType(), - ConversionUtils.oafProjectToBrokerProject(t._2)), - Encoders.bean(RelatedProject.class)) + .joinWith(projects, projects.col("openaireId").equalTo(rels.col("target")), "inner") + .map(t -> new RelatedProject(t._1.getSource(), t._2), Encoders.bean(RelatedProject.class)) .write() .mode(SaveMode.Overwrite) .json(relsPath); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java index f3db509bb7..17e078c2c0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java @@ -17,9 +17,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.broker.objects.OaBrokerRelatedPublication; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -32,8 +34,9 @@ public class PrepareRelatedPublicationsJob { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(PrepareRelatedPublicationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + .toString( + PrepareRelatedPublicationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -60,19 +63,24 @@ public class PrepareRelatedPublicationsJob { final Dataset pubs = ClusterUtils .readPath(spark, graphPath + "/publication", Publication.class) .filter(p -> !ClusterUtils.isDedupRoot(p.getId())) - .map(ConversionUtils::oafPublicationToBrokerPublication, Encoders.bean(OaBrokerRelatedPublication.class)); + .map( + ConversionUtils::oafPublicationToBrokerPublication, + Encoders.bean(OaBrokerRelatedPublication.class)); final Dataset rels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) + .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); rels .joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner") - .map(t -> new RelatedPublication( - t._1.getSource(), - t._1.getRelType(), - t._2), Encoders.bean(RelatedPublication.class)) + .map(t -> { + final RelatedPublication rel = new RelatedPublication(t._1.getSource(), t._2); + rel.getRelPublication().setRelType(t._1.getRelClass()); + return rel; + }, Encoders.bean(RelatedPublication.class)) .write() .mode(SaveMode.Overwrite) .json(relsPath); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java index ffc3a8c658..0704fb44a8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java @@ -17,9 +17,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Software; @@ -32,8 +34,9 @@ public class PrepareRelatedSoftwaresJob { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(PrepareRelatedSoftwaresJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + .toString( + PrepareRelatedSoftwaresJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -64,15 +67,14 @@ public class PrepareRelatedSoftwaresJob { final Dataset rels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) + .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); rels .joinWith(softwares, softwares.col("openaireId").equalTo(rels.col("target")), "inner") - .map(t -> new RelatedSoftware( - t._1.getSource(), - t._1.getRelType(), - t._2), Encoders.bean(RelatedSoftware.class)) + .map(t -> new RelatedSoftware(t._1.getSource(), t._2), Encoders.bean(RelatedSoftware.class)) .write() .mode(SaveMode.Overwrite) .json(relsPath); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java index daf75ea2ef..0925e32912 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java @@ -11,16 +11,15 @@ public class RelatedDataset implements Serializable { * */ private static final long serialVersionUID = 774487705184038324L; + private String source; - private String relType; private OaBrokerRelatedDataset relDataset; public RelatedDataset() { } - public RelatedDataset(final String source, final String relType, final OaBrokerRelatedDataset relDataset) { + public RelatedDataset(final String source, final OaBrokerRelatedDataset relDataset) { this.source = source; - this.relType = relType; this.relDataset = relDataset; } @@ -32,14 +31,6 @@ public class RelatedDataset implements Serializable { this.source = source; } - public String getRelType() { - return relType; - } - - public void setRelType(final String relType) { - this.relType = relType; - } - public OaBrokerRelatedDataset getRelDataset() { return relDataset; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java index 4116c8c77b..74d19fe9df 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedProject.java @@ -13,15 +13,13 @@ public class RelatedProject implements Serializable { private static final long serialVersionUID = 4941437626549329870L; private String source; - private String relType; private OaBrokerProject relProject; public RelatedProject() { } - public RelatedProject(final String source, final String relType, final OaBrokerProject relProject) { + public RelatedProject(final String source, final OaBrokerProject relProject) { this.source = source; - this.relType = relType; this.relProject = relProject; } @@ -33,14 +31,6 @@ public class RelatedProject implements Serializable { this.source = source; } - public String getRelType() { - return relType; - } - - public void setRelType(final String relType) { - this.relType = relType; - } - public OaBrokerProject getRelProject() { return relProject; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java index 9e222a9523..ed6aeeab11 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedPublication.java @@ -13,16 +13,13 @@ public class RelatedPublication implements Serializable { private static final long serialVersionUID = 9021609640411395128L; private String source; - private String relType; private OaBrokerRelatedPublication relPublication; public RelatedPublication() { } - public RelatedPublication(final String source, final String relType, - final OaBrokerRelatedPublication relPublication) { + public RelatedPublication(final String source, final OaBrokerRelatedPublication relPublication) { this.source = source; - this.relType = relType; this.relPublication = relPublication; } @@ -34,14 +31,6 @@ public class RelatedPublication implements Serializable { this.source = source; } - public String getRelType() { - return relType; - } - - public void setRelType(final String relType) { - this.relType = relType; - } - public OaBrokerRelatedPublication getRelPublication() { return relPublication; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java index 2f3b8668c2..0aa3a4045d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedSoftware.java @@ -11,16 +11,15 @@ public class RelatedSoftware implements Serializable { * */ private static final long serialVersionUID = 7573383356943300157L; + private String source; - private String relType; private OaBrokerRelatedSoftware relSoftware; public RelatedSoftware() { } - public RelatedSoftware(final String source, final String relType, final OaBrokerRelatedSoftware relSoftware) { + public RelatedSoftware(final String source, final OaBrokerRelatedSoftware relSoftware) { this.source = source; - this.relType = relType; this.relSoftware = relSoftware; } @@ -32,14 +31,6 @@ public class RelatedSoftware implements Serializable { this.source = source; } - public String getRelType() { - return relType; - } - - public void setRelType(final String relType) { - this.relType = relType; - } - public OaBrokerRelatedSoftware getRelSoftware() { return relSoftware; }