diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java index b837a14541..b8781e257f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java @@ -1,8 +1,11 @@ -package eu.dnetlib.dhp.oa.graph.dump.community; +package eu.dnetlib.dhp.oa.graph.dump; import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; import eu.dnetlib.dhp.schema.oaf.Context; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Result; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; @@ -21,7 +24,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; public class DumpProducts implements Serializable { - public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap, Class inputClazz, boolean graph) { + public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap, Class inputClazz, boolean graph) { SparkConf conf = new SparkConf(); @@ -35,12 +38,12 @@ public class DumpProducts implements Serializable { }); } - public static void execDump(SparkSession spark, - String inputPath, - String outputPath, - CommunityMap communityMap, - Class inputClazz, - boolean graph) { + public static void execDump(SparkSession spark, + String inputPath, + String outputPath, + CommunityMap communityMap, + Class inputClazz, + boolean graph) { Dataset tmp = Utils.readPath(spark, inputPath, inputClazz); @@ -54,14 +57,14 @@ public class DumpProducts implements Serializable { } - private static eu.dnetlib.dhp.schema.dump.oaf.Result execMap(I value, + private static eu.dnetlib.dhp.schema.dump.oaf.Result execMap(I value, CommunityMap communityMap, boolean graph) { if (!graph) { Set communities = communityMap.keySet(); - Optional> inputContext = Optional.ofNullable(value.getContext()); + Optional> inputContext = Optional.ofNullable(((eu.dnetlib.dhp.schema.oaf.Result)value).getContext()); if (!inputContext.isPresent()) { return null; } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java index 65dc9d6637..8bd5173b8f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.oa.graph.dump.community; +package eu.dnetlib.dhp.oa.graph.dump; import java.io.StringReader; import java.util.List; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java index cc0be5c70e..d81098355f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java @@ -12,12 +12,13 @@ import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Journal; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -public class Mapper implements Serializable { +public class ResultMapper implements Serializable { - public static Result map( - I input, Map communityMap) { + public static Result map( + I in, Map communityMap) { final Result out = new Result(); + eu.dnetlib.dhp.schema.oaf.Result input = (eu.dnetlib.dhp.schema.oaf.Result)in; Optional ort = Optional.ofNullable(input.getResulttype()); if (ort.isPresent()) { switch (ort.get().getClassid()) { @@ -152,43 +153,46 @@ public class Mapper implements Serializable { .map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue())) .collect(Collectors.toList())); - Set communities = communityMap.keySet(); - List contextList = input - .getContext() - .stream() - .map(c -> { - String community_id = c.getId(); - if (community_id.indexOf("::") > 0) { - community_id = community_id.substring(0, community_id.indexOf("::")); - } - if (communities.contains(community_id)) { - Context context = new Context(); - context.setCode(community_id); - context.setLabel(communityMap.get(community_id)); - Optional> dataInfo = Optional.ofNullable(c.getDataInfo()); - if (dataInfo.isPresent()) { - List provenance = new ArrayList<>(); - provenance - .addAll( - dataInfo - .get() - .stream() - .map(di -> { - if (di.getInferred()) { - return di.getProvenanceaction().getClassname(); - } - return null; - }) - .filter(Objects::nonNull) - .collect(Collectors.toSet())); - context.setProvenance(provenance); - } - return context; - } - return null; - }) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + + Set communities = communityMap.keySet(); + List contextList = Optional.ofNullable(input + .getContext()) + .map(value -> value.stream() + .map(c -> { + String community_id = c.getId(); + if (community_id.indexOf("::") > 0) { + community_id = community_id.substring(0, community_id.indexOf("::")); + } + if (communities.contains(community_id)) { + Context context = new Context(); + context.setCode(community_id); + context.setLabel(communityMap.get(community_id)); + Optional> dataInfo = Optional.ofNullable(c.getDataInfo()); + if (dataInfo.isPresent()) { + List provenance = new ArrayList<>(); + provenance + .addAll( + dataInfo + .get() + .stream() + .map(di -> { + if (di.getInferred()) { + return Provenance.newInstance(di.getProvenanceaction().getClassname(), di.getTrust()); + } + return null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet())); + context.setProvenance(provenance); + } + return context; + } + return null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toList())) + .orElse(new ArrayList<>()); + if (contextList.size() > 0) { out.setContext(contextList); } @@ -214,9 +218,9 @@ public class Mapper implements Serializable { .ifPresent( provenance -> country .setProvenance( - provenance + Provenance.newInstance(provenance .getProvenanceaction() - .getClassname())); + .getClassname(), c.getDataInfo().getTrust()))); countryList .add(country); })); @@ -378,9 +382,9 @@ public class Mapper implements Serializable { private static Subject getSubject(StructuredProperty s){ Subject subject = new Subject(); subject.setSubject(ControlledField.newInstance(s.getQualifier().getClassid(), s.getValue())); - Optional di = Optional.of(s.getDataInfo()); - Provenance p = new Provenance(); + Optional di = Optional.ofNullable(s.getDataInfo()); if (di.isPresent()){ + Provenance p = new Provenance(); p.setProvenance(di.get().getProvenanceaction().getClassname()); p.setTrust(di.get().getTrust()); subject.setProvenance(p); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java index 61e205bfb1..99222e84c9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java @@ -4,11 +4,11 @@ package eu.dnetlib.dhp.oa.graph.dump; import java.io.Serializable; import java.util.List; -import eu.dnetlib.dhp.schema.dump.oaf.Projects; +import eu.dnetlib.dhp.schema.dump.oaf.community.Project; public class ResultProject implements Serializable { private String resultId; - private List projectsList; + private List projectsList; public String getResultId() { return resultId; @@ -18,11 +18,11 @@ public class ResultProject implements Serializable { this.resultId = resultId; } - public List getProjectsList() { + public List getProjectsList() { return projectsList; } - public void setProjectsList(List projectsList) { + public void setProjectsList(List projectsList) { this.projectsList = projectsList; } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodo.java index 069e4d28c1..98573d0fb8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodo.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodo.java @@ -3,10 +3,8 @@ package eu.dnetlib.dhp.oa.graph.dump; import java.io.File; import java.io.Serializable; -import java.util.Arrays; - -import javax.management.Query; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -16,10 +14,7 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; -import com.google.gson.Gson; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.zenodo.*; import eu.dnetlib.dhp.utils.ISLookupClientFactory; public class SendToZenodo implements Serializable { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkPrepareResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkPrepareResultProject.java index abf1a40d34..e7a09a4882 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkPrepareResultProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkPrepareResultProject.java @@ -8,6 +8,9 @@ import java.io.StringReader; import java.util.*; import java.util.stream.Collectors; +import eu.dnetlib.dhp.schema.dump.oaf.community.Project; +import eu.dnetlib.dhp.schema.dump.oaf.Provenance; +import eu.dnetlib.dhp.schema.oaf.DataInfo; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -18,20 +21,13 @@ import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.dom4j.Document; import org.dom4j.DocumentException; -import org.dom4j.Element; import org.dom4j.Node; import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.dump.oaf.Funder; -import eu.dnetlib.dhp.schema.dump.oaf.Projects; -import eu.dnetlib.dhp.schema.dump.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.Field; -import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; @@ -75,68 +71,30 @@ public class SparkPrepareResultProject implements Serializable { Dataset relation = Utils .readPath(spark, inputPath + "/relation", Relation.class) .filter("dataInfo.deletedbyinference = false and relClass = 'produces'"); - Dataset projects = Utils.readPath(spark, inputPath + "/project", Project.class); + Dataset projects = Utils.readPath(spark, inputPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class); projects .joinWith(relation, projects.col("id").equalTo(relation.col("source"))) .groupByKey( - (MapFunction, String>) value -> value._2().getTarget(), Encoders.STRING()) - .mapGroups((MapGroupsFunction, ResultProject>) (s, it) -> { + (MapFunction, String>) value -> value._2().getTarget(), Encoders.STRING()) + .mapGroups((MapGroupsFunction, ResultProject>) (s, it) -> { Set projectSet = new HashSet<>(); - Tuple2 first = it.next(); + Tuple2 first = it.next(); ResultProject rp = new ResultProject(); rp.setResultId(first._2().getTarget()); - Project p = first._1(); + eu.dnetlib.dhp.schema.oaf.Project p = first._1(); projectSet.add(p.getId()); - Projects ps = Projects - .newInstance( - p.getId(), p.getCode().getValue(), - Optional - .ofNullable(p.getAcronym()) - .map(a -> a.getValue()) - .orElse(null), - Optional - .ofNullable(p.getTitle()) - .map(v -> v.getValue()) - .orElse(null), - Optional - .ofNullable(p.getFundingtree()) - .map( - value -> value - .stream() - .map(ft -> getFunder(ft.getValue())) - .collect(Collectors.toList()) - .get(0)) - .orElse(null)); - List projList = new ArrayList<>(); + Project ps = getProject(p); + + List projList = new ArrayList<>(); projList.add(ps); rp.setProjectsList(projList); it.forEachRemaining(c -> { - Project op = c._1(); + eu.dnetlib.dhp.schema.oaf.Project op = c._1(); if (!projectSet.contains(op.getId())) { projList - .add( - Projects - .newInstance( - op.getId(), - op.getCode().getValue(), - Optional - .ofNullable(op.getAcronym()) - .map(a -> a.getValue()) - .orElse(null), - Optional - .ofNullable(op.getTitle()) - .map(v -> v.getValue()) - .orElse(null), - Optional - .ofNullable(op.getFundingtree()) - .map( - value -> value - .stream() - .map(ft -> getFunder(ft.getValue())) - .collect(Collectors.toList()) - .get(0)) - .orElse(null))); + .add(getProject(op)); + projectSet.add(op.getId()); } @@ -150,6 +108,41 @@ public class SparkPrepareResultProject implements Serializable { .json(outputPath); } + private static Project getProject(eu.dnetlib.dhp.schema.oaf.Project op) { + Project p = Project + .newInstance( + op.getId(), + op.getCode().getValue(), + Optional + .ofNullable(op.getAcronym()) + .map(a -> a.getValue()) + .orElse(null), + Optional + .ofNullable(op.getTitle()) + .map(v -> v.getValue()) + .orElse(null), + Optional + .ofNullable(op.getFundingtree()) + .map( + value -> value + .stream() + .map(ft -> getFunder(ft.getValue())) + .collect(Collectors.toList()) + .get(0)) + .orElse(null)); + + Optional di = Optional.ofNullable(op.getDataInfo()); + Provenance provenance = new Provenance(); + if(di.isPresent()){ + provenance.setProvenance(di.get().getProvenanceaction().getClassname()); + provenance.setTrust(di.get().getTrust()); + p.setProvenance(provenance); + } + + return p; + + } + private static Funder getFunder(String fundingtree) { // ["nsf_________::NSFNSFNational Science // FoundationUSnsf_________::NSF::CISE/OAD::CISE/CCFDivision diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java index 7d43ea3fe2..3e800175aa 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java @@ -1,6 +1,8 @@ package eu.dnetlib.dhp.oa.graph.dump; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -24,4 +26,8 @@ public class Utils { .textFile(inputPath) .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } + + public static ISLookUpService getIsLookUpService(String isLookUpUrl) { + return ISLookupClientFactory.getLookUpService(isLookUpUrl); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunityMap.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunityMap.java index dccfcd7667..d459063377 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunityMap.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/CommunityMap.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.oa.graph.dump; +package eu.dnetlib.dhp.oa.graph.dump.community; import java.io.Serializable; import java.util.HashMap; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java index 62ba1b167f..640de8df6d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkDumpCommunityProducts.java @@ -1,30 +1,18 @@ -package eu.dnetlib.dhp.oa.graph.dump; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +package eu.dnetlib.dhp.oa.graph.dump.community; import java.io.Serializable; import java.util.*; -import java.util.stream.Collectors; - -import javax.management.Query; +import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; +import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; +import eu.dnetlib.dhp.oa.graph.dump.Utils; import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; public class SparkDumpCommunityProducts implements Serializable { @@ -63,74 +51,18 @@ public class SparkDumpCommunityProducts implements Serializable { final Optional cm = Optional.ofNullable(parser.get("communityMap")); Class inputClazz = (Class) Class.forName(resultClassName); - SparkConf conf = new SparkConf(); - CommunityMap communityMap; + queryInformationSystem = new QueryInformationSystem(); + queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl)); + CommunityMap communityMap = queryInformationSystem.getCommunityMap(); - if (!isLookUpUrl.equals("BASEURL:8280/is/services/isLookUp")) { - queryInformationSystem = new QueryInformationSystem(); - queryInformationSystem.setIsLookUp(getIsLookUpService(isLookUpUrl)); - communityMap = queryInformationSystem.getCommunityMap(); - } else { - communityMap = new Gson().fromJson(cm.get(), CommunityMap.class); - } + DumpProducts dump = new DumpProducts(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - execDump(spark, inputPath, outputPath, communityMap, inputClazz);// , dumpClazz); - - }); + dump.run(isSparkSessionManaged, inputPath, outputPath, communityMap, inputClazz, false); } - public static ISLookUpService getIsLookUpService(String isLookUpUrl) { - return ISLookupClientFactory.getLookUpService(isLookUpUrl); - } - public static void execDump(SparkSession spark, - String inputPath, - String outputPath, - CommunityMap communityMap, - Class inputClazz) {// Class dumpClazz) { - // Set communities = communityMap.keySet(); - Dataset tmp = Utils.readPath(spark, inputPath, inputClazz); - - tmp - .map(value -> execMap(value, communityMap), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.Result.class)) - .filter(Objects::nonNull) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); - - } - - private static eu.dnetlib.dhp.schema.dump.oaf.Result execMap(I value, - CommunityMap communityMap) { - { - Set communities = communityMap.keySet(); - Optional> inputContext = Optional.ofNullable(value.getContext()); - if (!inputContext.isPresent()) { - return null; - } - List toDumpFor = inputContext.get().stream().map(c -> { - if (communities.contains(c.getId())) { - return c.getId(); - } - if (c.getId().contains("::") && communities.contains(c.getId().substring(0, c.getId().indexOf("::")))) { - return c.getId().substring(0, 3); - } - return null; - }).filter(Objects::nonNull).collect(Collectors.toList()); - if (toDumpFor.size() == 0) { - return null; - } - return Mapper.map(value, communityMap); - } - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunity.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunity.java index a1f0cdfdf8..452f6dc543 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunity.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkSplitForCommunity.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.oa.graph.dump; +package eu.dnetlib.dhp.oa.graph.dump.community; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; @@ -8,6 +8,8 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; +import eu.dnetlib.dhp.oa.graph.dump.Utils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpOrganization.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpOrganization.java index 921ae76d91..69a2a38dc7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpOrganization.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpOrganization.java @@ -1,4 +1,46 @@ package eu.dnetlib.dhp.oa.graph.dump.graph; -public class DumpOrganization { +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Organization; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; + +import java.io.Serializable; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class DumpOrganization implements Serializable { + + public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath ) { + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + execDump(spark, inputPath, outputPath); + + }); + } + + private void execDump(SparkSession spark, String inputPath, String outputPath) { + + Utils.readPath(spark, inputPath, Organization.class) + .map(org -> OrganizationMapper.map(org), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Organization.class)) + .write() + .option("compression","gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); + + + } + + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/OrganizationMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/OrganizationMapper.java index 21b019ab04..9b427e899a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/OrganizationMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/OrganizationMapper.java @@ -1,4 +1,15 @@ package eu.dnetlib.dhp.oa.graph.dump.graph; -public class OrganizationMapper { +import eu.dnetlib.dhp.schema.dump.oaf.ControlledField; +import eu.dnetlib.dhp.schema.dump.oaf.Country; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Organization; + +import java.io.Serializable; +import java.util.List; +import java.util.stream.Collectors; + +public class OrganizationMapper implements Serializable { + + + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpJob.java index d3ee539a7e..cd96137580 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpJob.java @@ -1,4 +1,133 @@ package eu.dnetlib.dhp.oa.graph.dump.graph; -public class SparkDumpJob { +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; +import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; +import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.dump.oaf.ControlledField; +import eu.dnetlib.dhp.schema.dump.oaf.Country; +import eu.dnetlib.dhp.schema.dump.oaf.Result; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Organization; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Optional; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class SparkDumpJob implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkDumpCommunityProducts.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkDumpJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/input_graphdump_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + final String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookUpUrl: {}", isLookUpUrl); + + Class inputClazz = (Class) Class.forName(resultClassName); + + QueryInformationSystem queryInformationSystem = new QueryInformationSystem(); + queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl)); + CommunityMap communityMap = queryInformationSystem.getCommunityMap(); + + switch (ModelSupport.idPrefixMap.get(inputClazz)){ + case "50": + DumpProducts d = new DumpProducts(); + d.run(isSparkSessionManaged,inputPath,outputPath,communityMap, inputClazz, true); + break; + case "40": + + break; + case "20": + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + organizationMap(spark, inputPath, outputPath); + + }); + break; + } + + + + + } + + private static void organizationMap(SparkSession spark, String inputPath, String outputPath) { + Utils.readPath(spark, inputPath, eu.dnetlib.dhp.schema.oaf.Organization.class) + .map(o -> map(o), Encoders.bean(Organization.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(outputPath); + } + + private static Organization map(eu.dnetlib.dhp.schema.oaf.Organization org){ + Organization organization = new Organization(); + Optional.ofNullable(org.getLegalshortname()) + .ifPresent(value -> organization.setLegalshortname(value.getValue())); + + Optional.ofNullable(org.getLegalname()) + .ifPresent(value -> organization.setLegalname(value.getValue())); + + Optional.ofNullable(org.getWebsiteurl()) + .ifPresent(value -> organization.setWebsiteurl(value.getValue())); + + Optional.ofNullable(org.getAlternativeNames()) + .ifPresent(value -> organization.setAlternativenames(value.stream() + .map( v-> v.getValue()).collect(Collectors.toList()))); + + Optional.ofNullable(org.getCountry()) + .ifPresent(value -> organization.setCountry(Country.newInstance(value.getClassid(), value.getClassname(), null))); + + Optional.ofNullable(org.getId()) + .ifPresent(value -> organization.setId(value)); + + Optional.ofNullable(org.getPid()) + .ifPresent(value -> organization.setPid( + value.stream().map(p -> ControlledField.newInstance(p.getQualifier().getClassid(), p.getValue())).collect(Collectors.toList()) + )); + + return organization; + } + }