From 6bbe27587f0c94860c2317220aa18ee23d807c19 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 9 Jun 2020 15:39:03 +0200 Subject: [PATCH] new classes to execute the dump for products associated to community, enrich each result with project information and assign the result to each community it belongs to --- .../dhp/oa/graph/dump/CommunityMap.java | 5 +- .../dnetlib/dhp/oa/graph/dump/Constants.java | 14 ++ .../eu/dnetlib/dhp/oa/graph/dump/Mapper.java | 221 +++++++++++++++++- .../oa/graph/dump/QueryInformationSystem.java | 13 +- .../dhp/oa/graph/dump/ResultProject.java | 25 +- .../dump/SparkDumpCommunityProducts.java | 10 +- .../graph/dump/SparkPrepareResultProject.java | 97 +++++++- .../oa/graph/dump/SparkSplitForCommunity.java | 96 +++++++- .../oa/graph/dump/SparkUpdateProjectInfo.java | 52 ++--- .../eu/dnetlib/dhp/oa/graph/dump/Utils.java | 19 ++ 10 files changed, 510 insertions(+), 42 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/CommunityMap.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/CommunityMap.java index 8b184f7dc..bee20afe6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/CommunityMap.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/CommunityMap.java @@ -1,4 +1,7 @@ package eu.dnetlib.dhp.oa.graph.dump; -public class CommunityMap { + +import java.util.HashMap; + +public class CommunityMap extends HashMap { } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Constants.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Constants.java index a37147aef..71e0767e1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Constants.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Constants.java @@ -1,4 +1,18 @@ package eu.dnetlib.dhp.oa.graph.dump; +import com.google.common.collect.Maps; + +import java.util.Map; + public class Constants { + + public static final Map accessRightsCoarMap = Maps.newHashMap(); + + static { + accessRightsCoarMap.put("OPEN", "http://purl.org/coar/access_right/c_abf2"); + accessRightsCoarMap.put("RESTRICTED", "http://purl.org/coar/access_right/c_16ec"); + accessRightsCoarMap.put("OPEN SOURCE", "http://purl.org/coar/access_right/c_abf2"); + accessRightsCoarMap.put("CLOSED", "http://purl.org/coar/access_right/c_14cb //metadataonly for coar"); + accessRightsCoarMap.put("EMBARGO", "http://purl.org/coar/access_right/c_f1cf"); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Mapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Mapper.java index 346881568..d171b240a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Mapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Mapper.java @@ -1,4 +1,223 @@ package eu.dnetlib.dhp.oa.graph.dump; -public class Mapper { +import eu.dnetlib.dhp.schema.dump.oaf.*; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Journal; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import scala.collection.immutable.Stream; + +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +public class Mapper implements Serializable { + + public static O map( + I input, Map communityMap){ + + O out = null; + switch (input.getResulttype().getClassid()){ + case "publication": + out = (O)new Publication(); + Optional journal = Optional.ofNullable(((eu.dnetlib.dhp.schema.oaf.Publication) input).getJournal()); + if(journal.isPresent()){ + Journal j = journal.get(); + Container c = new Container(); + c.setConferencedate(j.getConferencedate()); + c.setConferenceplace(j.getConferenceplace()); + c.setEdition(j.getEdition()); + c.setEp(j.getEp()); + c.setIss(j.getIss()); + c.setIssnLinking(j.getIssnLinking()); + c.setIssnOnline(j.getIssnOnline()); + c.setIssnPrinted(j.getIssnPrinted()); + c.setName(j.getName()); + c.setSp(j.getSp()); + c.setVol(j.getVol()); + out.setContainer(c); + } + break; + case "dataset": + Dataset d = new Dataset(); + eu.dnetlib.dhp.schema.oaf.Dataset id = (eu.dnetlib.dhp.schema.oaf.Dataset)input; + d.setSize(id.getSize().getValue()); + d.setVersion(id.getVersion().getValue()); + + List igl = id.getGeolocation(); + d.setGeolocation(igl.stream() + .filter(Objects::nonNull) + .map(gli -> { + GeoLocation gl = new GeoLocation(); + gl.setBox(gli.getBox()); + gl.setPlace(gli.getPlace()); + gl.setPoint(gli.getPoint()); + return gl; + }).collect(Collectors.toList())); + out = (O)d; + + break; + case "software": + Software s = new Software(); + eu.dnetlib.dhp.schema.oaf.Software is = (eu.dnetlib.dhp.schema.oaf.Software)input; + s.setCodeRepositoryUrl(is.getCodeRepositoryUrl().getValue()); + s.setDocumentationUrl(is.getDocumentationUrl() + .stream() + .map(du -> du.getValue()).collect(Collectors.toList())); + s.setProgrammingLanguage(is.getProgrammingLanguage().getClassid()); + + out = (O) s; + break; + case "otherresearchproduct": + OtherResearchProduct or = new OtherResearchProduct(); + eu.dnetlib.dhp.schema.oaf.OtherResearchProduct ir = (eu.dnetlib.dhp.schema.oaf.OtherResearchProduct)input; + or.setContactgroup(ir.getContactgroup().stream().map(cg -> cg.getValue()).collect(Collectors.toList())); + or.setContactperson(ir.getContactperson().stream().map(cp->cp.getValue()).collect(Collectors.toList())); + or.setTool(ir.getTool().stream().map(t -> t.getValue()).collect(Collectors.toList())); + out = (O) or; + break; + } + out.setAuthor(input.getAuthor() + .stream() + .map(oa -> { + Author a = new Author(); + a.setAffiliation(oa.getAffiliation().stream().map(aff -> aff.getValue()).collect(Collectors.toList())); + a.setFullname(oa.getFullname()); + a.setName(oa.getName()); + a.setSurname(oa.getSurname()); + a.setRank(oa.getRank()); + a.setPid(oa.getPid().stream().map(p -> { + ControlledField cf = new ControlledField(); + cf.setScheme( p.getQualifier().getClassid()); + cf.setValue( p.getValue()); + return cf; + }).collect(Collectors.toList())); + return a; + }).collect(Collectors.toList())); + //I do not map Access Right UNKNOWN or OTHER + if (Constants.accessRightsCoarMap.containsKey(input.getBestaccessright().getClassid())){ + AccessRight ar = new AccessRight(); + ar.setSchema(Constants.accessRightsCoarMap.get(input.getBestaccessright().getClassid())); + ar.setCode(input.getBestaccessright().getClassid()); + ar.setLabel(input.getBestaccessright().getClassname()); + out.setBestaccessright(ar); + } + + out.setCollectedfrom(input.getCollectedfrom().stream().map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue())) + .collect(Collectors.toList())); + + Set communities = communityMap.keySet(); + List contextList = input.getContext() + .stream() + .map(c -> { + if(communities.contains(c.getId())){ + Context context = new Context(); + context.setCode(c.getId()); + context.setLabel(communityMap.get(c.getId())); + Optional> dataInfo = Optional.ofNullable(c.getDataInfo()); + if(dataInfo.isPresent()){ + context.setProvenance(dataInfo.get().stream() + .map(di -> { + if (di.getInferred()){ + return di.getProvenanceaction().getClassid(); + } + return null; + }).filter(Objects::nonNull) + .collect(Collectors.toList())); + } + return context; + } + return null; + } + ).filter(Objects::nonNull) + .collect(Collectors.toList()); + if(contextList.size() > 0){ + out.setContext(contextList); + } + out.setContributor(input.getContributor() + .stream() + .map(c -> c.getValue()).collect(Collectors.toList())); + out.setCountry(input.getCountry() + .stream() + .map(c -> { + Country country = new Country(); + country.setCode(c.getClassid()); + country.setLabel(c.getClassname()); + Optional dataInfo = Optional.ofNullable(c.getDataInfo()); + if(dataInfo.isPresent()){ + country.setProvenance(dataInfo.get().getProvenanceaction().getClassid()); + } + return country; + }).collect(Collectors.toList())); + out.setCoverage(input.getCoverage().stream().map(c->c.getValue()).collect(Collectors.toList())); + + out.setDateofcollection(input.getDateofcollection()); + out.setDescription(input.getDescription().stream().map(d->d.getValue()).collect(Collectors.toList())); + out.setEmbargoenddate(input.getEmbargoenddate().getValue()); + out.setFormat(input.getFormat().stream().map(f->f.getValue()).collect(Collectors.toList())); + out.setId(input.getId()); + out.setOriginalId(input.getOriginalId()); + out.setInstance(input.getInstance() + .stream() + .map(i -> { + Instance instance = new Instance(); + AccessRight ar = new AccessRight(); + ar.setCode(i.getAccessright().getClassid()); + ar.setLabel(i.getAccessright().getClassname()); + if(Constants.accessRightsCoarMap.containsKey(i.getAccessright().getClassid())){ + ar.setSchema(Constants.accessRightsCoarMap.get(i.getAccessright().getClassid())); + } + instance.setAccessright(ar); + instance.setCollectedfrom(KeyValue.newInstance(i.getCollectedfrom().getKey(), i.getCollectedfrom().getValue())); + instance.setHostedby(KeyValue.newInstance(i.getHostedby().getKey(),i.getHostedby().getValue())); + instance.setLicense(i.getLicense().getValue()); + instance.setPublicationdata(i.getDateofacceptance().getValue()); + instance.setRefereed(i.getRefereed().getValue()); + instance.setType(i.getInstancetype().getClassid()); + instance.setUrl(i.getUrl()); + return instance; + }).collect(Collectors.toList())); + + out.setLanguage(Qualifier.newInstance(input.getLanguage().getClassid(), input.getLanguage().getClassname())); + out.setLastupdatetimestamp(input.getLastupdatetimestamp()); + + Optional> otitle = Optional.ofNullable(input.getTitle()); + if(otitle.isPresent()){ + List iTitle = otitle.get() + .stream() + .filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("main title")) + .collect(Collectors.toList()); + if(iTitle.size() > 0 ){ + out.setMaintitle(iTitle.get(0).getValue()); + } + + iTitle = otitle.get() + .stream() + .filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("subtitle")) + .collect(Collectors.toList()); + if(iTitle.size() > 0){ + out.setSubtitle(iTitle.get(0).getValue()); + } + + } + + out.setPid(input.getPid().stream().map(p -> { + ControlledField pid = new ControlledField(); + pid.setScheme(p.getQualifier().getClassid()); + pid.setValue(p.getValue()); + return pid; + }).collect(Collectors.toList())); + out.setPublicationdata(input.getDateofacceptance().getValue()); + out.setPublisher(input.getPublisher().getValue()); + out.setSource(input.getSource().stream().map(s -> s.getValue()).collect(Collectors.toList())); + out.setSubject(input.getSubject().stream().map(s->{ + ControlledField subject = new ControlledField(); + subject.setScheme(s.getQualifier().getClassid()); + subject.setValue(s.getValue()); + return subject; + }).collect(Collectors.toList())); + out.setType(input.getResulttype().getClassid()); + + return out; + } + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java index 9014bd99d..7245e2021 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java @@ -3,9 +3,14 @@ package eu.dnetlib.dhp.oa.graph.dump; import com.google.common.base.Joiner; import com.google.common.collect.Maps; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -17,7 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class PrepareCommunityMap { +public class QueryInformationSystem { private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " + " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " + " return " + @@ -26,6 +31,8 @@ public class PrepareCommunityMap { "{$x//CONFIGURATION/context/@label}" + ""; + + public static Map getCommunityMap(final String isLookupUrl) throws ISLookUpException, DocumentException { ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); @@ -51,4 +58,8 @@ public class PrepareCommunityMap { } + + + + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java index 8dc354972..bf0e046f9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java @@ -1,4 +1,27 @@ package eu.dnetlib.dhp.oa.graph.dump; -public class ResultProject { +import eu.dnetlib.dhp.schema.dump.oaf.Projects; + +import java.io.Serializable; +import java.util.List; + +public class ResultProject implements Serializable { + private String resultId; + private List projectsList; + + public String getResultId() { + return resultId; + } + + public void setResultId(String resultId) { + this.resultId = resultId; + } + + public List getProjectsList() { + return projectsList; + } + + public void setProjectsList(List projectsList) { + this.projectsList = projectsList; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java index 0e84358a1..17bf66ee3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java @@ -6,11 +6,9 @@ import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.oaf.Context; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -18,22 +16,20 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Result; -public class DumpCommunityProducts implements Serializable { +public class SparkDumpCommunityProducts implements Serializable { - private static final Logger log = LoggerFactory.getLogger(DumpCommunityProducts.class); + private static final Logger log = LoggerFactory.getLogger(SparkDumpCommunityProducts.class); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - DumpCommunityProducts.class + SparkDumpCommunityProducts.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/graph/dump/input_parameters.json")); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkPrepareResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkPrepareResultProject.java index 2626cecd9..b8f076d8e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkPrepareResultProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkPrepareResultProject.java @@ -1,4 +1,99 @@ package eu.dnetlib.dhp.oa.graph.dump; -public class SparkPrepareResultProject { +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.dump.oaf.Projects; +import eu.dnetlib.dhp.schema.dump.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Relation; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class SparkPrepareResultProject implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkPrepareResultProject.class); + + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkPrepareResultProject.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/project_prepare_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + prepareResultProjectList(spark, inputPath, outputPath); + }); + } + + private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath) { + Dataset relation = Utils.readPath(spark, inputPath + "/relation" , Relation.class) + .filter("dataInfo.deletedbyinference = false and relClass = 'produces'"); + Dataset projects = Utils.readPath(spark, inputPath + "/project" , Project.class); + + projects.joinWith(relation, projects.col("id").equalTo(relation.col("source"))) + .groupByKey((MapFunction,String>)value -> value._2().getTarget(), Encoders.STRING()) + .mapGroups((MapGroupsFunction, ResultProject>) (s, it) -> + { + Tuple2 first = it.next(); + ResultProject rp = new ResultProject(); + rp.setResultId(first._2().getTarget()); + Project p = first._1(); + Projects ps = Projects.newInstance(p.getId(), p.getCode().getValue(), p.getAcronym().getValue(), + p.getTitle().getValue(), p.getFundingtree() + .stream() + .map(ft -> ft.getValue()).collect(Collectors.toList())); + List projList = Arrays.asList(ps); + rp.setProjectsList(projList); + it.forEachRemaining(c -> { + Project op = c._1(); + projList.add(Projects.newInstance(op.getId(), op.getCode().getValue(), + op.getAcronym().getValue(), op.getTitle().getValue(), + op.getFundingtree().stream().map(ft -> ft.getValue()).collect(Collectors.toList()))); + }); + return rp; + } ,Encoders.bean(ResultProject.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkSplitForCommunity.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkSplitForCommunity.java index ec1c23a2e..7bc4d5f80 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkSplitForCommunity.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkSplitForCommunity.java @@ -1,4 +1,98 @@ package eu.dnetlib.dhp.oa.graph.dump; -public class SparkSplitForCommunity { +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +import eu.dnetlib.dhp.schema.dump.oaf.Result; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class SparkSplitForCommunity implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkSplitForCommunity.class); + + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkSplitForCommunity.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/split_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + final String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookUpUrl: {}", isLookUpUrl); + + + Class inputClazz = (Class) Class.forName(resultClassName); + + SparkConf conf = new SparkConf(); + + Map + communityMap = QueryInformationSystem.getCommunityMap(isLookUpUrl); + + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + execSplit(spark, inputPath, outputPath , communityMap.keySet(), inputClazz); + }); + } + + private static void execSplit(SparkSession spark, String inputPath, String outputPath, Set communities + , Class inputClazz) { + + Dataset result = Utils.readPath(spark, inputPath, inputClazz); + + communities.stream() + .forEach(c -> printResult(c, result, outputPath)); + + } + + private static void printResult(String c, Dataset result, String outputPath) { + result.filter(r -> containsCommunity(r, c)) + .write() + .option("compression","gzip") + .mode(SaveMode.Append) + .json(outputPath + "/" + c); + } + + private static boolean containsCommunity(R r, String c) { + if(Optional.ofNullable(r.getContext()).isPresent()) { + return r.getContext().stream().filter(con -> con.getCode().equals(c)).collect(Collectors.toList()).size() > 0; + } + return false; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkUpdateProjectInfo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkUpdateProjectInfo.java index 31fb951fe..b5031f8dd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkUpdateProjectInfo.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkUpdateProjectInfo.java @@ -12,26 +12,26 @@ import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; import java.io.Serializable; -import java.util.Map; import java.util.Optional; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -public class UpdateProjectInfo implements Serializable { +public class SparkUpdateProjectInfo implements Serializable { - private static final Logger log = LoggerFactory.getLogger(UpdateProjectInfo.class); + private static final Logger log = LoggerFactory.getLogger(SparkUpdateProjectInfo.class); public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - UpdateProjectInfo.class + SparkUpdateProjectInfo.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/graph/dump/project_input_parameters.json")); @@ -53,8 +53,9 @@ public class UpdateProjectInfo implements Serializable { final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); - final String resultType = parser.get("resultType"); - log.info("resultType: {}", resultType); + + final String preparedInfoPath = parser.get("preparedInfoPath"); + log.info("preparedInfoPath: {}", preparedInfoPath); Class inputClazz = (Class) Class.forName(resultClassName); @@ -65,7 +66,7 @@ public class UpdateProjectInfo implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - extend(spark, inputPath, outputPath , resultType, inputClazz); + extend(spark, inputPath, outputPath , preparedInfoPath, inputClazz); }); } @@ -73,31 +74,24 @@ public class UpdateProjectInfo implements Serializable { SparkSession spark, String inputPath, String outputPath, - String resultType, + String preparedInfoPath, Class inputClazz) { - Dataset result = Utils.readPath(spark, inputPath + "/" + resultType, inputClazz); - Dataset relation = Utils.readPath(spark, inputPath + "/relation", Relation.class) - .filter("dataInfo.deletedbyinference = false and relClass = 'produces'"); - Dataset project = Utils.readPath(spark,inputPath + "/project", Project.class); - relation.joinWith(project, relation.col("source").equalTo(project.col("id"))) - result.joinWith(relation, result.col("id").equalTo(relation.col("target")), "left") - .groupByKey( - (MapFunction, String>) p -> p._1().getId(), - Encoders.STRING()) - .mapGroups((MapGroupsFunction, R>)(c, it) -> { - Tuple2 first = it.next(); - - - }, Encoders.bean(inputClazz)); - .mapGroups((MapGroupsFunction) (s, it) -> { - Project first = it.next(); - it.forEachRemaining(p -> { - first.mergeFrom(p); + Dataset result = Utils.readPath(spark, inputPath , inputClazz); + Dataset resultProject = Utils.readPath(spark, preparedInfoPath, ResultProject.class); + result.joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId")), + "left") + .map(value -> { + R r = value._1(); + Optional.ofNullable(value._2()).ifPresent(rp -> { + r.setProjects(rp.getProjectsList()); }); - return first; - } - + return r; + },Encoders.bean(inputClazz)) + .write() + .option("compression","gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java index eae3194c5..9b2aa7d79 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java @@ -1,4 +1,23 @@ package eu.dnetlib.dhp.oa.graph.dump; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.common.HdfsSupport; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; + public class Utils { + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } }