From b71d12cf2614885e8274b1766a50cda1b00820dc Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 29 Jul 2020 16:52:44 +0200 Subject: [PATCH] refactoring --- .../dump/{ => community}/ResultProject.java | 2 +- .../community/SparkDumpCommunityProducts.java | 17 +-- .../community/SparkPrepareResultProject.java | 107 ++++++++++-------- .../community/SparkUpdateProjectInfo.java | 21 ++-- 4 files changed, 72 insertions(+), 75 deletions(-) rename dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/{ => community}/ResultProject.java (91%) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/ResultProject.java similarity index 91% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/ResultProject.java index 99222e84c..300af62f3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/ResultProject.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.oa.graph.dump; +package eu.dnetlib.dhp.oa.graph.dump.community; import java.io.Serializable; import java.util.List; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java index 640de8df6..71d3e6bf4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java @@ -4,16 +4,16 @@ package eu.dnetlib.dhp.oa.graph.dump.community; import java.io.Serializable; import java.util.*; -import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; -import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; -import eu.dnetlib.dhp.oa.graph.dump.Utils; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; +import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; public class SparkDumpCommunityProducts implements Serializable { @@ -48,8 +48,6 @@ public class SparkDumpCommunityProducts implements Serializable { final String isLookUpUrl = parser.get("isLookUpUrl"); log.info("isLookUpUrl: {}", isLookUpUrl); - final Optional cm = Optional.ofNullable(parser.get("communityMap")); - Class inputClazz = (Class) Class.forName(resultClassName); queryInformationSystem = new QueryInformationSystem(); @@ -58,11 +56,8 @@ public class SparkDumpCommunityProducts implements Serializable { DumpProducts dump = new DumpProducts(); - dump.run(isSparkSessionManaged, inputPath, outputPath, communityMap, inputClazz, false); + dump.run(isSparkSessionManaged, inputPath, outputPath, communityMap, inputClazz, CommunityResult.class, false); } - - - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java index e7a09a488..5c2e2ff12 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.oa.graph.dump; +package eu.dnetlib.dhp.oa.graph.dump.community; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; @@ -8,9 +8,6 @@ import java.io.StringReader; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.dump.oaf.community.Project; -import eu.dnetlib.dhp.schema.dump.oaf.Provenance; -import eu.dnetlib.dhp.schema.oaf.DataInfo; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -27,7 +24,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.dump.oaf.Funder; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.community.Funder; +import eu.dnetlib.dhp.schema.dump.oaf.Provenance; +import eu.dnetlib.dhp.schema.dump.oaf.community.Project; +import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; @@ -71,37 +72,43 @@ public class SparkPrepareResultProject implements Serializable { Dataset relation = Utils .readPath(spark, inputPath + "/relation", Relation.class) .filter("dataInfo.deletedbyinference = false and relClass = 'produces'"); - Dataset projects = Utils.readPath(spark, inputPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class); + Dataset projects = Utils + .readPath(spark, inputPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class); projects .joinWith(relation, projects.col("id").equalTo(relation.col("source"))) .groupByKey( - (MapFunction, String>) value -> value._2().getTarget(), Encoders.STRING()) - .mapGroups((MapGroupsFunction, ResultProject>) (s, it) -> { - Set projectSet = new HashSet<>(); - Tuple2 first = it.next(); - ResultProject rp = new ResultProject(); - rp.setResultId(first._2().getTarget()); - eu.dnetlib.dhp.schema.oaf.Project p = first._1(); - projectSet.add(p.getId()); - Project ps = getProject(p); + (MapFunction, String>) value -> value + ._2() + .getTarget(), + Encoders.STRING()) + .mapGroups( + (MapGroupsFunction, ResultProject>) (s, + it) -> { + Set projectSet = new HashSet<>(); + Tuple2 first = it.next(); + ResultProject rp = new ResultProject(); + rp.setResultId(first._2().getTarget()); + eu.dnetlib.dhp.schema.oaf.Project p = first._1(); + projectSet.add(p.getId()); + Project ps = getProject(p); - List projList = new ArrayList<>(); - projList.add(ps); - rp.setProjectsList(projList); - it.forEachRemaining(c -> { - eu.dnetlib.dhp.schema.oaf.Project op = c._1(); - if (!projectSet.contains(op.getId())) { - projList - .add(getProject(op)); + List projList = new ArrayList<>(); + projList.add(ps); + rp.setProjectsList(projList); + it.forEachRemaining(c -> { + eu.dnetlib.dhp.schema.oaf.Project op = c._1(); + if (!projectSet.contains(op.getId())) { + projList + .add(getProject(op)); - projectSet.add(op.getId()); + projectSet.add(op.getId()); - } + } - }); - return rp; - }, Encoders.bean(ResultProject.class)) + }); + return rp; + }, Encoders.bean(ResultProject.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -109,31 +116,31 @@ public class SparkPrepareResultProject implements Serializable { } private static Project getProject(eu.dnetlib.dhp.schema.oaf.Project op) { - Project p = Project - .newInstance( - op.getId(), - op.getCode().getValue(), - Optional - .ofNullable(op.getAcronym()) - .map(a -> a.getValue()) - .orElse(null), - Optional - .ofNullable(op.getTitle()) - .map(v -> v.getValue()) - .orElse(null), - Optional - .ofNullable(op.getFundingtree()) - .map( - value -> value - .stream() - .map(ft -> getFunder(ft.getValue())) - .collect(Collectors.toList()) - .get(0)) - .orElse(null)); + Project p = Project + .newInstance( + op.getId(), + op.getCode().getValue(), + Optional + .ofNullable(op.getAcronym()) + .map(a -> a.getValue()) + .orElse(null), + Optional + .ofNullable(op.getTitle()) + .map(v -> v.getValue()) + .orElse(null), + Optional + .ofNullable(op.getFundingtree()) + .map( + value -> value + .stream() + .map(ft -> getFunder(ft.getValue())) + .collect(Collectors.toList()) + .get(0)) + .orElse(null)); Optional di = Optional.ofNullable(op.getDataInfo()); Provenance provenance = new Provenance(); - if(di.isPresent()){ + if (di.isPresent()) { provenance.setProvenance(di.get().getProvenanceaction().getClassname()); provenance.setTrust(di.get().getTrust()); p.setProvenance(provenance); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkUpdateProjectInfo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkUpdateProjectInfo.java index d87810b79..506601f14 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkUpdateProjectInfo.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkUpdateProjectInfo.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.oa.graph.dump; +package eu.dnetlib.dhp.oa.graph.dump.community; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; @@ -8,8 +8,6 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -20,10 +18,9 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.dump.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Relation; -import scala.Tuple2; +import eu.dnetlib.dhp.oa.graph.dump.Utils; + +import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; public class SparkUpdateProjectInfo implements Serializable { @@ -70,22 +67,20 @@ public class SparkUpdateProjectInfo implements Serializable { SparkSession spark, String inputPath, String outputPath, - String preparedInfoPath) {// , - // Class inputClazz) { - - Dataset result = Utils.readPath(spark, inputPath, Result.class); + String preparedInfoPath) { + Dataset result = Utils.readPath(spark, inputPath, CommunityResult.class); Dataset resultProject = Utils.readPath(spark, preparedInfoPath, ResultProject.class); result .joinWith( resultProject, result.col("id").equalTo(resultProject.col("resultId")), "left") .map(value -> { - Result r = value._1(); + CommunityResult r = value._1(); Optional.ofNullable(value._2()).ifPresent(rp -> { r.setProjects(rp.getProjectsList()); }); return r; - }, Encoders.bean(Result.class)) + }, Encoders.bean(CommunityResult.class)) .write() .option("compression", "gzip") .mode(SaveMode.Append)