From 12fa5d122ae959842c9d28602144ffe7514eeeaa Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 15 Dec 2020 08:30:26 +0100 Subject: [PATCH] fixed a problem with join --- .../eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java | 2 +- .../dhp/broker/oa/PrepareRelatedDatasetsJob.java | 5 +++-- .../dhp/broker/oa/PrepareRelatedProjectsJob.java | 2 +- .../broker/oa/PrepareRelatedPublicationsJob.java | 5 +++-- .../dhp/broker/oa/PrepareRelatedSoftwaresJob.java | 2 +- .../eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java | 11 +++++++++++ .../dnetlib/dhp/broker/oa/util/ConversionUtils.java | 2 +- .../oa/util/DatasourceRelationsAccumulator.java | 13 +++++++++++-- 8 files changed, 32 insertions(+), 10 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java index a575328e8..dc156cbcf 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java @@ -63,7 +63,7 @@ public class PrepareGroupsJob { .readPath(spark, workingDir + "/joinedEntities_step4", OaBrokerMainEntity.class); final Dataset mergedRels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); final TypedColumn, ResultGroup> aggr = new ResultAggregator() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java index 8c2879b38..9bdf32a64 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java @@ -62,7 +62,7 @@ public class PrepareRelatedDatasetsJob { .map(ConversionUtils::oafDatasetToBrokerDataset, Encoders.bean(OaBrokerRelatedDataset.class)); final Dataset rels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass())) @@ -72,7 +72,8 @@ public class PrepareRelatedDatasetsJob { final Dataset dataset = rels .joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner") .map(t -> { - final RelatedDataset rel = new RelatedDataset(t._1.getSource(), t._2); + final RelatedDataset rel = new RelatedDataset(t._1.getSource(), + t._2); rel.getRelDataset().setRelType(t._1.getRelClass()); return rel; }, Encoders.bean(RelatedDataset.class)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java index 1952b5d9c..9498c0f33 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java @@ -64,7 +64,7 @@ public class PrepareRelatedProjectsJob { .map(ConversionUtils::oafProjectToBrokerProject, Encoders.bean(OaBrokerProject.class)); final Dataset rels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java index d6ac27fc5..8270500fd 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java @@ -65,7 +65,7 @@ public class PrepareRelatedPublicationsJob { Encoders.bean(OaBrokerRelatedPublication.class)); final Dataset rels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass())) @@ -75,7 +75,8 @@ public class PrepareRelatedPublicationsJob { final Dataset dataset = rels .joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner") .map(t -> { - final RelatedPublication rel = new RelatedPublication(t._1.getSource(), t._2); + final RelatedPublication rel = new RelatedPublication( + t._1.getSource(), t._2); rel.getRelPublication().setRelType(t._1.getRelClass()); return rel; }, Encoders.bean(RelatedPublication.class)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java index c4ccf4af6..16b450733 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java @@ -64,7 +64,7 @@ public class PrepareRelatedSoftwaresJob { .map(ConversionUtils::oafSoftwareToBrokerSoftware, Encoders.bean(OaBrokerRelatedSoftware.class)); final Dataset rels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java index 04985a6ab..9ce64f6bd 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.oaf.Relation; public class ClusterUtils { @@ -30,6 +31,16 @@ public class ClusterUtils { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } + public static Dataset loadRelations(final String graphPath, final SparkSession spark) { + return ClusterUtils + .readPath(spark, graphPath + "/relation", Relation.class) + .map(r -> { + r.setSource(ConversionUtils.cleanOpenaireId(r.getSource())); + r.setTarget(ConversionUtils.cleanOpenaireId(r.getTarget())); + return r; + }, Encoders.bean(Relation.class)); + } + public static Dataset readPath( final SparkSession spark, final String inputPath, diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index 085681488..ecbfd821e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -129,7 +129,7 @@ public class ConversionUtils { return res; } - private static String cleanOpenaireId(final String id) { + public static String cleanOpenaireId(final String id) { return id.contains("|") ? StringUtils.substringAfter(id, "|") : id; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java index 75c4625ce..c693be93c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java @@ -59,9 +59,18 @@ public class DatasourceRelationsAccumulator implements Serializable { final DatasourceRelationsAccumulator res = new DatasourceRelationsAccumulator(); collectedFromSet .stream() - .map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.COLLECTED_FROM_REL)) + .map( + s -> new Tuple3<>(ConversionUtils.cleanOpenaireId(r.getId()), ConversionUtils.cleanOpenaireId(s), + BrokerConstants.COLLECTED_FROM_REL)) .forEach(res::addTuple); - hostedBySet.stream().map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.HOSTED_BY_REL)).forEach(res::addTuple); + + hostedBySet + .stream() + .map( + s -> new Tuple3<>(ConversionUtils.cleanOpenaireId(r.getId()), ConversionUtils.cleanOpenaireId(s), + BrokerConstants.HOSTED_BY_REL)) + .forEach(res::addTuple); + return res; }