diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpJob.java index cd9613758..abfa03dcf 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpJob.java @@ -3,19 +3,16 @@ package eu.dnetlib.dhp.oa.graph.dump.graph; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; -import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; import eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.dump.oaf.ControlledField; import eu.dnetlib.dhp.schema.dump.oaf.Country; -import eu.dnetlib.dhp.schema.dump.oaf.Result; -import eu.dnetlib.dhp.schema.dump.oaf.graph.Organization; +import eu.dnetlib.dhp.schema.dump.oaf.graph.*; import eu.dnetlib.dhp.schema.oaf.OafEntity; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -23,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -64,18 +62,23 @@ public class SparkDumpJob implements Serializable { QueryInformationSystem queryInformationSystem = new QueryInformationSystem(); queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl)); CommunityMap communityMap = queryInformationSystem.getCommunityMap(); - + SparkConf conf = new SparkConf(); switch (ModelSupport.idPrefixMap.get(inputClazz)){ case "50": DumpProducts d = new DumpProducts(); d.run(isSparkSessionManaged,inputPath,outputPath,communityMap, inputClazz, true); break; case "40": + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + projectMap(spark, inputPath, outputPath); + }); break; case "20": - SparkConf conf = new SparkConf(); - runWithSparkSession( conf, isSparkSessionManaged, @@ -92,16 +95,64 @@ public class SparkDumpJob implements Serializable { } - private static void organizationMap(SparkSession spark, String inputPath, String outputPath) { - Utils.readPath(spark, inputPath, eu.dnetlib.dhp.schema.oaf.Organization.class) - .map(o -> map(o), Encoders.bean(Organization.class)) + private static void projectMap(SparkSession spark, String inputPath, String outputPath) { + Utils.readPath(spark, inputPath, eu.dnetlib.dhp.schema.oaf.Project.class) + .map(p -> mapProject(p), Encoders.bean(Project.class)) .write() .mode(SaveMode.Overwrite) .option("compression","gzip") .json(outputPath); } - private static Organization map(eu.dnetlib.dhp.schema.oaf.Organization org){ + private static Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p) { + Project project = new Project(); + + Optional.ofNullable(p.getId()) + .ifPresent(id -> project.setId(id)); + + Optional.ofNullable(p.getWebsiteurl()) + .ifPresent(w -> project.setWebsiteurl(w.getValue())); + + Optional.ofNullable(p.getCode()) + .ifPresent(code -> project.setCode(code)); + + +// private String acronym; +// private String title; +// private String startdate; +// +// private String enddate; +// +// private String callidentifier; +// +// private String keywords; +// +// private String duration; +// +// private boolean openaccessmandateforpublications; +// +// private boolean openaccessmandatefordataset; +// private List subject; +// private Funder funding; +// +// private String summary; +// +// private Granted granted; +// +// private Programme programme; + return project; + } + + private static void organizationMap(SparkSession spark, String inputPath, String outputPath) { + Utils.readPath(spark, inputPath, eu.dnetlib.dhp.schema.oaf.Organization.class) + .map(o -> mapOrganization(o), Encoders.bean(Organization.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(outputPath); + } + + private static Organization mapOrganization(eu.dnetlib.dhp.schema.oaf.Organization org){ Organization organization = new Organization(); Optional.ofNullable(org.getLegalshortname()) .ifPresent(value -> organization.setLegalshortname(value.getValue()));