From f96ca900e1ba70288c1cee5433758b453bb7c067 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 15 Jun 2020 11:12:14 +0200 Subject: [PATCH] fixed issues while running on cluster --- .../dhp/oa/graph/dump/CommunityMap.java | 5 +- .../dnetlib/dhp/oa/graph/dump/Constants.java | 31 +- .../eu/dnetlib/dhp/oa/graph/dump/Mapper.java | 584 ++++++++++++------ .../oa/graph/dump/QueryInformationSystem.java | 41 +- .../dhp/oa/graph/dump/ResultProject.java | 33 +- .../dump/SparkDumpCommunityProducts.java | 197 +++--- .../graph/dump/SparkPrepareResultProject.java | 223 ++++--- .../oa/graph/dump/SparkSplitForCommunity.java | 148 +++-- .../oa/graph/dump/SparkUpdateProjectInfo.java | 138 +++-- .../eu/dnetlib/dhp/oa/graph/dump/Utils.java | 32 +- 10 files changed, 867 insertions(+), 565 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/CommunityMap.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/CommunityMap.java index bee20afe6..dccfcd766 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/CommunityMap.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/CommunityMap.java @@ -1,7 +1,8 @@ + package eu.dnetlib.dhp.oa.graph.dump; - +import java.io.Serializable; import java.util.HashMap; -public class CommunityMap extends HashMap { +public class CommunityMap extends HashMap implements Serializable { } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Constants.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Constants.java index 71e0767e1..b77cfdb6b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Constants.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Constants.java @@ -1,18 +1,29 @@ -package eu.dnetlib.dhp.oa.graph.dump; -import com.google.common.collect.Maps; +package eu.dnetlib.dhp.oa.graph.dump; import java.util.Map; +import com.google.common.collect.Maps; + public class Constants { - public static final Map accessRightsCoarMap = Maps.newHashMap(); + public static final Map accessRightsCoarMap = Maps.newHashMap(); + public static final Map coarCodeLabelMap = Maps.newHashMap(); - static { - accessRightsCoarMap.put("OPEN", "http://purl.org/coar/access_right/c_abf2"); - accessRightsCoarMap.put("RESTRICTED", "http://purl.org/coar/access_right/c_16ec"); - accessRightsCoarMap.put("OPEN SOURCE", "http://purl.org/coar/access_right/c_abf2"); - accessRightsCoarMap.put("CLOSED", "http://purl.org/coar/access_right/c_14cb //metadataonly for coar"); - accessRightsCoarMap.put("EMBARGO", "http://purl.org/coar/access_right/c_f1cf"); - } + public static String COAR_ACCESS_RIGHT_SCHEMA = "http://vocabularies.coar-repositories.org/documentation/access_rights/"; + + static { + accessRightsCoarMap.put("OPEN", "c_abf2"); + accessRightsCoarMap.put("RESTRICTED", "c_16ec"); + accessRightsCoarMap.put("OPEN SOURCE", "c_abf2"); + accessRightsCoarMap.put("CLOSED", "c_14cb"); + accessRightsCoarMap.put("EMBARGO", "c_f1cf"); + } + + static { + coarCodeLabelMap.put("c_abf2", "OPEN"); + coarCodeLabelMap.put("c_16ec", "RESTRICTED"); + coarCodeLabelMap.put("c_14cb", "CLOSED"); + coarCodeLabelMap.put("c_f1cf", "EMBARGO"); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Mapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Mapper.java index d171b240a..4693d1ed1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Mapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Mapper.java @@ -1,223 +1,407 @@ -package eu.dnetlib.dhp.oa.graph.dump; -import eu.dnetlib.dhp.schema.dump.oaf.*; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.Journal; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -import scala.collection.immutable.Stream; +package eu.dnetlib.dhp.oa.graph.dump; import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; +import javax.swing.text.html.Option; + +import org.apache.avro.generic.GenericData; + +import eu.dnetlib.dhp.schema.dump.oaf.*; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Journal; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; + public class Mapper implements Serializable { - public static O map( - I input, Map communityMap){ + public static O map( + I input, Map communityMap) { - O out = null; - switch (input.getResulttype().getClassid()){ - case "publication": - out = (O)new Publication(); - Optional journal = Optional.ofNullable(((eu.dnetlib.dhp.schema.oaf.Publication) input).getJournal()); - if(journal.isPresent()){ - Journal j = journal.get(); - Container c = new Container(); - c.setConferencedate(j.getConferencedate()); - c.setConferenceplace(j.getConferenceplace()); - c.setEdition(j.getEdition()); - c.setEp(j.getEp()); - c.setIss(j.getIss()); - c.setIssnLinking(j.getIssnLinking()); - c.setIssnOnline(j.getIssnOnline()); - c.setIssnPrinted(j.getIssnPrinted()); - c.setName(j.getName()); - c.setSp(j.getSp()); - c.setVol(j.getVol()); - out.setContainer(c); - } - break; - case "dataset": - Dataset d = new Dataset(); - eu.dnetlib.dhp.schema.oaf.Dataset id = (eu.dnetlib.dhp.schema.oaf.Dataset)input; - d.setSize(id.getSize().getValue()); - d.setVersion(id.getVersion().getValue()); + O out = null; + Optional ort = Optional.ofNullable(input.getResulttype()); + if (ort.isPresent()) { + switch (ort.get().getClassid()) { + case "publication": + out = (O) new Publication(); + Optional journal = Optional + .ofNullable(((eu.dnetlib.dhp.schema.oaf.Publication) input).getJournal()); + if (journal.isPresent()) { + Journal j = journal.get(); + Container c = new Container(); + c.setConferencedate(j.getConferencedate()); + c.setConferenceplace(j.getConferenceplace()); + c.setEdition(j.getEdition()); + c.setEp(j.getEp()); + c.setIss(j.getIss()); + c.setIssnLinking(j.getIssnLinking()); + c.setIssnOnline(j.getIssnOnline()); + c.setIssnPrinted(j.getIssnPrinted()); + c.setName(j.getName()); + c.setSp(j.getSp()); + c.setVol(j.getVol()); + out.setContainer(c); + } + break; + case "dataset": + Dataset d = new Dataset(); + eu.dnetlib.dhp.schema.oaf.Dataset id = (eu.dnetlib.dhp.schema.oaf.Dataset) input; + Optional.ofNullable(id.getSize()).ifPresent(v -> d.setSize(v.getValue())); + Optional.ofNullable(id.getVersion()).ifPresent(v -> d.setVersion(v.getValue())); - List igl = id.getGeolocation(); - d.setGeolocation(igl.stream() - .filter(Objects::nonNull) - .map(gli -> { - GeoLocation gl = new GeoLocation(); - gl.setBox(gli.getBox()); - gl.setPlace(gli.getPlace()); - gl.setPoint(gli.getPoint()); - return gl; - }).collect(Collectors.toList())); - out = (O)d; + d + .setGeolocation( + Optional + .ofNullable(id.getGeolocation()) + .map( + igl -> igl + .stream() + .filter(Objects::nonNull) + .map(gli -> { + GeoLocation gl = new GeoLocation(); + gl.setBox(gli.getBox()); + gl.setPlace(gli.getPlace()); + gl.setPoint(gli.getPoint()); + return gl; + }) + .collect(Collectors.toList())) + .orElse(null)); - break; - case "software": - Software s = new Software(); - eu.dnetlib.dhp.schema.oaf.Software is = (eu.dnetlib.dhp.schema.oaf.Software)input; - s.setCodeRepositoryUrl(is.getCodeRepositoryUrl().getValue()); - s.setDocumentationUrl(is.getDocumentationUrl() - .stream() - .map(du -> du.getValue()).collect(Collectors.toList())); - s.setProgrammingLanguage(is.getProgrammingLanguage().getClassid()); + out = (O) d; - out = (O) s; - break; - case "otherresearchproduct": - OtherResearchProduct or = new OtherResearchProduct(); - eu.dnetlib.dhp.schema.oaf.OtherResearchProduct ir = (eu.dnetlib.dhp.schema.oaf.OtherResearchProduct)input; - or.setContactgroup(ir.getContactgroup().stream().map(cg -> cg.getValue()).collect(Collectors.toList())); - or.setContactperson(ir.getContactperson().stream().map(cp->cp.getValue()).collect(Collectors.toList())); - or.setTool(ir.getTool().stream().map(t -> t.getValue()).collect(Collectors.toList())); - out = (O) or; - break; - } - out.setAuthor(input.getAuthor() - .stream() - .map(oa -> { - Author a = new Author(); - a.setAffiliation(oa.getAffiliation().stream().map(aff -> aff.getValue()).collect(Collectors.toList())); - a.setFullname(oa.getFullname()); - a.setName(oa.getName()); - a.setSurname(oa.getSurname()); - a.setRank(oa.getRank()); - a.setPid(oa.getPid().stream().map(p -> { - ControlledField cf = new ControlledField(); - cf.setScheme( p.getQualifier().getClassid()); - cf.setValue( p.getValue()); - return cf; - }).collect(Collectors.toList())); - return a; - }).collect(Collectors.toList())); - //I do not map Access Right UNKNOWN or OTHER - if (Constants.accessRightsCoarMap.containsKey(input.getBestaccessright().getClassid())){ - AccessRight ar = new AccessRight(); - ar.setSchema(Constants.accessRightsCoarMap.get(input.getBestaccessright().getClassid())); - ar.setCode(input.getBestaccessright().getClassid()); - ar.setLabel(input.getBestaccessright().getClassname()); - out.setBestaccessright(ar); - } + break; + case "software": + Software s = new Software(); + eu.dnetlib.dhp.schema.oaf.Software is = (eu.dnetlib.dhp.schema.oaf.Software) input; + Optional + .ofNullable(is.getCodeRepositoryUrl()) + .ifPresent(value -> s.setCodeRepositoryUrl(value.getValue())); + Optional + .ofNullable(is.getDocumentationUrl()) + .ifPresent( + value -> s + .setDocumentationUrl( + value + .stream() + .map(v -> v.getValue()) + .collect(Collectors.toList()))); - out.setCollectedfrom(input.getCollectedfrom().stream().map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue())) - .collect(Collectors.toList())); + Optional + .ofNullable(is.getProgrammingLanguage()) + .ifPresent(value -> s.setProgrammingLanguage(value.getClassid())); - Set communities = communityMap.keySet(); - List contextList = input.getContext() - .stream() - .map(c -> { - if(communities.contains(c.getId())){ - Context context = new Context(); - context.setCode(c.getId()); - context.setLabel(communityMap.get(c.getId())); - Optional> dataInfo = Optional.ofNullable(c.getDataInfo()); - if(dataInfo.isPresent()){ - context.setProvenance(dataInfo.get().stream() - .map(di -> { - if (di.getInferred()){ - return di.getProvenanceaction().getClassid(); - } - return null; - }).filter(Objects::nonNull) - .collect(Collectors.toList())); - } - return context; - } - return null; - } - ).filter(Objects::nonNull) - .collect(Collectors.toList()); - if(contextList.size() > 0){ - out.setContext(contextList); - } - out.setContributor(input.getContributor() - .stream() - .map(c -> c.getValue()).collect(Collectors.toList())); - out.setCountry(input.getCountry() - .stream() - .map(c -> { - Country country = new Country(); - country.setCode(c.getClassid()); - country.setLabel(c.getClassname()); - Optional dataInfo = Optional.ofNullable(c.getDataInfo()); - if(dataInfo.isPresent()){ - country.setProvenance(dataInfo.get().getProvenanceaction().getClassid()); - } - return country; - }).collect(Collectors.toList())); - out.setCoverage(input.getCoverage().stream().map(c->c.getValue()).collect(Collectors.toList())); + out = (O) s; + break; + case "other": + OtherResearchProduct or = new OtherResearchProduct(); + eu.dnetlib.dhp.schema.oaf.OtherResearchProduct ir = (eu.dnetlib.dhp.schema.oaf.OtherResearchProduct) input; + or + .setContactgroup( + Optional + .ofNullable(ir.getContactgroup()) + .map(value -> value.stream().map(cg -> cg.getValue()).collect(Collectors.toList())) + .orElse(null)); - out.setDateofcollection(input.getDateofcollection()); - out.setDescription(input.getDescription().stream().map(d->d.getValue()).collect(Collectors.toList())); - out.setEmbargoenddate(input.getEmbargoenddate().getValue()); - out.setFormat(input.getFormat().stream().map(f->f.getValue()).collect(Collectors.toList())); - out.setId(input.getId()); - out.setOriginalId(input.getOriginalId()); - out.setInstance(input.getInstance() - .stream() - .map(i -> { - Instance instance = new Instance(); - AccessRight ar = new AccessRight(); - ar.setCode(i.getAccessright().getClassid()); - ar.setLabel(i.getAccessright().getClassname()); - if(Constants.accessRightsCoarMap.containsKey(i.getAccessright().getClassid())){ - ar.setSchema(Constants.accessRightsCoarMap.get(i.getAccessright().getClassid())); - } - instance.setAccessright(ar); - instance.setCollectedfrom(KeyValue.newInstance(i.getCollectedfrom().getKey(), i.getCollectedfrom().getValue())); - instance.setHostedby(KeyValue.newInstance(i.getHostedby().getKey(),i.getHostedby().getValue())); - instance.setLicense(i.getLicense().getValue()); - instance.setPublicationdata(i.getDateofacceptance().getValue()); - instance.setRefereed(i.getRefereed().getValue()); - instance.setType(i.getInstancetype().getClassid()); - instance.setUrl(i.getUrl()); - return instance; - }).collect(Collectors.toList())); + or + .setContactperson( + Optional + .ofNullable(ir.getContactperson()) + .map(value -> value.stream().map(cp -> cp.getValue()).collect(Collectors.toList())) + .orElse(null)); + or + .setTool( + Optional + .ofNullable(ir.getTool()) + .map(value -> value.stream().map(t -> t.getValue()).collect(Collectors.toList())) + .orElse(null)); + out = (O) or; + break; + } + Optional> oAuthor = Optional.ofNullable(input.getAuthor()); + if (oAuthor.isPresent()) { + // List authorList = new ArrayList<>(); + out + .setAuthor( + oAuthor + .get() + .stream() + .map(oa -> getAuthor(oa)) + .collect(Collectors.toList())); + } - out.setLanguage(Qualifier.newInstance(input.getLanguage().getClassid(), input.getLanguage().getClassname())); - out.setLastupdatetimestamp(input.getLastupdatetimestamp()); + // I do not map Access Right UNKNOWN or OTHER - Optional> otitle = Optional.ofNullable(input.getTitle()); - if(otitle.isPresent()){ - List iTitle = otitle.get() - .stream() - .filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("main title")) - .collect(Collectors.toList()); - if(iTitle.size() > 0 ){ - out.setMaintitle(iTitle.get(0).getValue()); - } + Optional oar = Optional.ofNullable(input.getBestaccessright()); + if (oar.isPresent()) { + if (Constants.accessRightsCoarMap.containsKey(oar.get().getClassid())) { + String code = Constants.accessRightsCoarMap.get(oar.get().getClassid()); + out + .setBestaccessright( + AccessRight + .newInstance( + code, + Constants.coarCodeLabelMap.get(code), + Constants.COAR_ACCESS_RIGHT_SCHEMA)); + } + } - iTitle = otitle.get() - .stream() - .filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("subtitle")) - .collect(Collectors.toList()); - if(iTitle.size() > 0){ - out.setSubtitle(iTitle.get(0).getValue()); - } + out + .setCollectedfrom( + input + .getCollectedfrom() + .stream() + .map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue())) + .collect(Collectors.toList())); - } + Set communities = communityMap.keySet(); + List contextList = input + .getContext() + .stream() + .map(c -> { + if (communities.contains(c.getId()) + || communities.contains(c.getId().substring(0, c.getId().indexOf("::")))) { + Context context = new Context(); + if (!communityMap.containsKey(c.getId())) { + context.setCode(c.getId().substring(0, c.getId().indexOf("::"))); + context.setLabel(communityMap.get(context.getCode())); + } else { + context.setCode(c.getId()); + context.setLabel(communityMap.get(c.getId())); + } + Optional> dataInfo = Optional.ofNullable(c.getDataInfo()); + if (dataInfo.isPresent()) { + List provenance = new ArrayList<>(); + provenance + .addAll( + dataInfo + .get() + .stream() + .map(di -> { + if (di.getInferred()) { + return di.getProvenanceaction().getClassname(); + } + return null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet())); + context.setProvenance(provenance); + } + return context; + } + return null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (contextList.size() > 0) { + out.setContext(contextList); + } + final List contributorList = new ArrayList<>(); + Optional + .ofNullable(input.getContributor()) + .ifPresent(value -> value.stream().forEach(c -> contributorList.add(c.getValue()))); + out.setContributor(contributorList); - out.setPid(input.getPid().stream().map(p -> { - ControlledField pid = new ControlledField(); - pid.setScheme(p.getQualifier().getClassid()); - pid.setValue(p.getValue()); - return pid; - }).collect(Collectors.toList())); - out.setPublicationdata(input.getDateofacceptance().getValue()); - out.setPublisher(input.getPublisher().getValue()); - out.setSource(input.getSource().stream().map(s -> s.getValue()).collect(Collectors.toList())); - out.setSubject(input.getSubject().stream().map(s->{ - ControlledField subject = new ControlledField(); - subject.setScheme(s.getQualifier().getClassid()); - subject.setValue(s.getValue()); - return subject; - }).collect(Collectors.toList())); - out.setType(input.getResulttype().getClassid()); + List countryList = new ArrayList<>(); + Optional + .ofNullable(input.getCountry()) + .ifPresent( + value -> value + .stream() + .forEach( + c -> { + Country country = new Country(); + country.setCode(c.getClassid()); + country.setLabel(c.getClassname()); + Optional + .ofNullable(c.getDataInfo()) + .ifPresent( + provenance -> country + .setProvenance( + provenance + .getProvenanceaction() + .getClassname())); + countryList + .add(country); + })); - return out; - } + out.setCountry(countryList); + + final List coverageList = new ArrayList<>(); + Optional + .ofNullable(input.getCoverage()) + .ifPresent(value -> value.stream().forEach(c -> coverageList.add(c.getValue()))); + out.setCoverage(coverageList); + + out.setDateofcollection(input.getDateofcollection()); + + final List descriptionList = new ArrayList<>(); + Optional + .ofNullable(input.getDescription()) + .ifPresent(value -> value.stream().forEach(d -> descriptionList.add(d.getValue()))); + out.setDescription(descriptionList); + Optional> oStr = Optional.ofNullable(input.getEmbargoenddate()); + if (oStr.isPresent()) { + out.setEmbargoenddate(oStr.get().getValue()); + } + + final List formatList = new ArrayList<>(); + Optional + .ofNullable(input.getFormat()) + .ifPresent(value -> value.stream().forEach(f -> formatList.add(f.getValue()))); + out.setFormat(formatList); + out.setId(input.getId()); + out.setOriginalId(input.getOriginalId()); + + final List instanceList = new ArrayList<>(); + Optional + .ofNullable(input.getInstance()) + .ifPresent( + inst -> inst + .stream() + .forEach(i -> { + Instance instance = new Instance(); + + Optional opAr = Optional + .ofNullable(i.getAccessright()); + if (opAr.isPresent()) { + if (Constants.accessRightsCoarMap.containsKey(opAr.get().getClassid())) { + String code = Constants.accessRightsCoarMap.get(opAr.get().getClassid()); + instance + .setAccessright( + AccessRight + .newInstance( + code, + Constants.coarCodeLabelMap.get(code), + Constants.COAR_ACCESS_RIGHT_SCHEMA)); + } + } + + instance + .setCollectedfrom( + KeyValue + .newInstance(i.getCollectedfrom().getKey(), i.getCollectedfrom().getValue())); + instance + .setHostedby( + KeyValue.newInstance(i.getHostedby().getKey(), i.getHostedby().getValue())); + Optional + .ofNullable(i.getLicense()) + .ifPresent(value -> instance.setLicense(value.getValue())); + Optional + .ofNullable(i.getDateofacceptance()) + .ifPresent(value -> instance.setPublicationdate(value.getValue())); + Optional + .ofNullable(i.getRefereed()) + .ifPresent(value -> instance.setRefereed(value.getValue())); + Optional + .ofNullable(i.getInstancetype()) + .ifPresent(value -> instance.setType(value.getClassname())); + Optional.ofNullable(i.getUrl()).ifPresent(value -> instance.setUrl(value)); + instanceList.add(instance); + })); + out + .setInstance(instanceList); + + Optional oL = Optional.ofNullable(input.getLanguage()); + if (oL.isPresent()) { + eu.dnetlib.dhp.schema.oaf.Qualifier language = oL.get(); + out.setLanguage(Qualifier.newInstance(language.getClassid(), language.getClassname())); + } + Optional oLong = Optional.ofNullable(input.getLastupdatetimestamp()); + if (oLong.isPresent()) { + out.setLastupdatetimestamp(oLong.get()); + } + Optional> otitle = Optional.ofNullable(input.getTitle()); + if (otitle.isPresent()) { + List iTitle = otitle + .get() + .stream() + .filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("main title")) + .collect(Collectors.toList()); + if (iTitle.size() > 0) { + out.setMaintitle(iTitle.get(0).getValue()); + } + + iTitle = otitle + .get() + .stream() + .filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("subtitle")) + .collect(Collectors.toList()); + if (iTitle.size() > 0) { + out.setSubtitle(iTitle.get(0).getValue()); + } + + } + + List pids = new ArrayList<>(); + Optional + .ofNullable(input.getPid()) + .ifPresent( + value -> value + .stream() + .forEach( + p -> pids + .add( + ControlledField + .newInstance(p.getQualifier().getClassid(), p.getValue())))); + out.setPid(pids); + oStr = Optional.ofNullable(input.getDateofacceptance()); + if (oStr.isPresent()) { + out.setPublicationdate(oStr.get().getValue()); + } + oStr = Optional.ofNullable(input.getPublisher()); + if (oStr.isPresent()) { + out.setPublisher(oStr.get().getValue()); + } + + List sourceList = new ArrayList<>(); + Optional + .ofNullable(input.getSource()) + .ifPresent(value -> value.stream().forEach(s -> sourceList.add(s.getValue()))); + // out.setSource(input.getSource().stream().map(s -> s.getValue()).collect(Collectors.toList())); + List subjectList = new ArrayList<>(); + Optional + .ofNullable(input.getSubject()) + .ifPresent( + value -> value + .stream() + .forEach( + s -> subjectList + .add(ControlledField.newInstance(s.getQualifier().getClassid(), s.getValue())))); + out.setSubject(subjectList); + + out.setType(input.getResulttype().getClassid()); + } + + return out; + } + + private static Author getAuthor(eu.dnetlib.dhp.schema.oaf.Author oa) { + Author a = new Author(); + Optional + .ofNullable(oa.getAffiliation()) + .ifPresent( + value -> a + .setAffiliation( + value + .stream() + .map(aff -> aff.getValue()) + .collect(Collectors.toList()))); + a.setFullname(oa.getFullname()); + a.setName(oa.getName()); + a.setSurname(oa.getSurname()); + a.setRank(oa.getRank()); + Optional + .ofNullable(oa.getPid()) + .ifPresent( + value -> a + .setPid( + value + .stream() + .map(p -> ControlledField.newInstance(p.getQualifier().getClassid(), p.getValue())) + .collect(Collectors.toList()))); + return a; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java index ab98335ca..4ff54eee1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystem.java @@ -1,34 +1,32 @@ package eu.dnetlib.dhp.oa.graph.dump; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import java.io.StringReader; +import java.util.List; + import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; import org.dom4j.io.SAXReader; -import java.io.StringReader; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; public class QueryInformationSystem { private ISLookUpService isLookUp; - private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " + - " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " + - " return " + - " " + - "{$x//CONFIGURATION/context/@id}" + - "{$x//CONFIGURATION/context/@label}" + - ""; + private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " + + + " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " + + " return " + + " " + + "{$x//CONFIGURATION/context/@id}" + + "{$x//CONFIGURATION/context/@label}" + + ""; - - - public Map getCommunityMap() + public CommunityMap getCommunityMap() throws ISLookUpException { return getMap(isLookUp.quickSearchProfile(XQUERY)); @@ -42,12 +40,8 @@ public class QueryInformationSystem { this.isLookUp = isLookUpService; } -public void set(String isLookUpUrl){ - isLookUpUrl = get(isLookUpUrl); -} - public ISLookUpService - private static Map getMap(List communityMap) { - final Map map = new HashMap<>(); + public static CommunityMap getMap(List communityMap) { + final CommunityMap map = new CommunityMap(); communityMap.stream().forEach(xml -> { final Document doc; @@ -59,7 +53,6 @@ public void set(String isLookUpUrl){ e.printStackTrace(); } - }); return map; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java index bf0e046f9..61e205bfb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultProject.java @@ -1,27 +1,28 @@ -package eu.dnetlib.dhp.oa.graph.dump; -import eu.dnetlib.dhp.schema.dump.oaf.Projects; +package eu.dnetlib.dhp.oa.graph.dump; import java.io.Serializable; import java.util.List; +import eu.dnetlib.dhp.schema.dump.oaf.Projects; + public class ResultProject implements Serializable { - private String resultId; - private List projectsList; + private String resultId; + private List projectsList; - public String getResultId() { - return resultId; - } + public String getResultId() { + return resultId; + } - public void setResultId(String resultId) { - this.resultId = resultId; - } + public void setResultId(String resultId) { + this.resultId = resultId; + } - public List getProjectsList() { - return projectsList; - } + public List getProjectsList() { + return projectsList; + } - public void setProjectsList(List projectsList) { - this.projectsList = projectsList; - } + public void setProjectsList(List projectsList) { + this.projectsList = projectsList; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java index e7dba7595..b5204382d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkDumpCommunityProducts.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.oa.graph.dump; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; @@ -6,10 +7,8 @@ import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.oaf.Context; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import javax.management.Query; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; @@ -19,127 +18,129 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.gson.Gson; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.Result; - -import javax.management.Query; - +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; public class SparkDumpCommunityProducts implements Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkDumpCommunityProducts.class); - private QueryInformationSystem queryInformationSystem; + private static final Logger log = LoggerFactory.getLogger(SparkDumpCommunityProducts.class); + private static QueryInformationSystem queryInformationSystem; + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkDumpCommunityProducts.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/input_parameters.json")); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkDumpCommunityProducts.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/input_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + final String dumpClassName = parser.get("dumpTableName"); + log.info("dumpClassName: {}", dumpClassName); - final String resultClassName = parser.get("resultTableName"); - log.info("resultTableName: {}", resultClassName); + final String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookUpUrl: {}", isLookUpUrl); - final String dumpClassName = parser.get("dumpTableName"); - log.info("dumpClassName: {}", dumpClassName); +// final String resultType = parser.get("resultType"); +// log.info("resultType: {}", resultType); - final String isLookUpUrl = parser.get("isLookUpUrl"); - log.info("isLookUpUrl: {}", isLookUpUrl); + final Optional cm = Optional.ofNullable(parser.get("communityMap")); - final String resultType = parser.get("resultType"); - log.info("resultType: {}", resultType); + Class inputClazz = (Class) Class.forName(resultClassName); + Class dumpClazz = (Class) Class + .forName(dumpClassName); + SparkConf conf = new SparkConf(); - SparkDumpCommunityProducts sdcp = new SparkDumpCommunityProducts(); + CommunityMap communityMap; - sdcp.exec(isLookUpUrl, isSparkSessionManaged, outputPath, - inputPath, resultClassName, dumpClassName); + if (!isLookUpUrl.equals("BASEURL:8280/is/services/isLookUp")) { + queryInformationSystem = new QueryInformationSystem(); + queryInformationSystem.setIsLookUp(getIsLookUpService(isLookUpUrl)); + communityMap = queryInformationSystem.getCommunityMap(); + } else { + communityMap = new Gson().fromJson(cm.get(), CommunityMap.class); + } - } + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + execDump(spark, inputPath, outputPath, communityMap, inputClazz, dumpClazz); - public QueryInformationSystem getQueryInformationSystem() { - return queryInformationSystem; - } + }); - public void setQueryInformationSystem(QueryInformationSystem queryInformationSystem) { - this.queryInformationSystem = queryInformationSystem; - } + } - public ISLookUpService getIsLookUpService(String isLookUpUrl){ - return ISLookupClientFactory.getLookUpService(isLookUpUrl); - } + public static ISLookUpService getIsLookUpService(String isLookUpUrl) { + return ISLookupClientFactory.getLookUpService(isLookUpUrl); + } - public void exec(String isLookUpUrl, Boolean isSparkSessionManaged, String outputPath, String inputPath, - String resultClassName, String dumpClassName) throws ISLookUpException, ClassNotFoundException { - SparkConf conf = new SparkConf(); + public static void execDump(SparkSession spark, + String inputPath, + String outputPath, + CommunityMap communityMap, + Class inputClazz, + Class dumpClazz) { - Class inputClazz = (Class) Class.forName(resultClassName); - Class dumpClazz = - (Class) Class.forName(dumpClassName); + // Set communities = communityMap.keySet(); + Dataset tmp = Utils.readPath(spark, inputPath, inputClazz); - queryInformationSystem.setIsLookUp(getIsLookUpService(isLookUpUrl)); - Map - communityMap = queryInformationSystem.getCommunityMap(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - execDump(spark, inputPath, outputPath , communityMap, inputClazz, dumpClazz); - }); - } + tmp + .map(value -> execMap(value, communityMap), Encoders.bean(dumpClazz)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } - private void execDump( - SparkSession spark, - String inputPath, - String outputPath, - Map communityMap, - Class inputClazz, - Class dumpClazz) { - - Set communities = communityMap.keySet(); - Dataset tmp = Utils.readPath(spark, inputPath, inputClazz); - tmp.map(value -> { - Optional> inputContext = Optional.ofNullable(value.getContext()); - if(!inputContext.isPresent()){ - return null; - } - List toDumpFor = inputContext.get().stream().map(c -> { - if (communities.contains(c.getId())) { - return c.getId(); - } - return null; - }).filter(Objects::nonNull).collect(Collectors.toList()); - if(toDumpFor.size() == 0){ - return null; - } - return Mapper.map(value, communityMap); - },Encoders.bean(dumpClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(outputPath); - - } - + private static O execMap(I value, + CommunityMap communityMap) { + { + Set communities = communityMap.keySet(); + Optional> inputContext = Optional.ofNullable(value.getContext()); + if (!inputContext.isPresent()) { + return null; + } + List toDumpFor = inputContext.get().stream().map(c -> { + if (communities.contains(c.getId())) { + return c.getId(); + } + if (c.getId().contains("::") && communities.contains(c.getId().substring(0, c.getId().indexOf("::")))) { + return c.getId().substring(0, 3); + } + return null; + }).filter(Objects::nonNull).collect(Collectors.toList()); + if (toDumpFor.size() == 0) { + return null; + } + return Mapper.map(value, communityMap); + } + } } - diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkPrepareResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkPrepareResultProject.java index b8f076d8e..abf1a40d3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkPrepareResultProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkPrepareResultProject.java @@ -1,11 +1,13 @@ + package eu.dnetlib.dhp.oa.graph.dump; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.dump.oaf.Projects; -import eu.dnetlib.dhp.schema.dump.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Relation; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.io.StringReader; +import java.util.*; +import java.util.stream.Collectors; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -14,86 +16,165 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.dom4j.Node; +import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.dump.oaf.Funder; +import eu.dnetlib.dhp.schema.dump.oaf.Projects; +import eu.dnetlib.dhp.schema.dump.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; +public class SparkPrepareResultProject implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkPrepareResultProject.class); -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkPrepareResultProject.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/project_prepare_parameters.json")); -public class SparkPrepareResultProject implements Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkPrepareResultProject.class); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkPrepareResultProject.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/project_prepare_parameters.json")); + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + SparkConf conf = new SparkConf(); - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + prepareResultProjectList(spark, inputPath, outputPath); + }); + } - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath) { + Dataset relation = Utils + .readPath(spark, inputPath + "/relation", Relation.class) + .filter("dataInfo.deletedbyinference = false and relClass = 'produces'"); + Dataset projects = Utils.readPath(spark, inputPath + "/project", Project.class); + projects + .joinWith(relation, projects.col("id").equalTo(relation.col("source"))) + .groupByKey( + (MapFunction, String>) value -> value._2().getTarget(), Encoders.STRING()) + .mapGroups((MapGroupsFunction, ResultProject>) (s, it) -> { + Set projectSet = new HashSet<>(); + Tuple2 first = it.next(); + ResultProject rp = new ResultProject(); + rp.setResultId(first._2().getTarget()); + Project p = first._1(); + projectSet.add(p.getId()); + Projects ps = Projects + .newInstance( + p.getId(), p.getCode().getValue(), + Optional + .ofNullable(p.getAcronym()) + .map(a -> a.getValue()) + .orElse(null), + Optional + .ofNullable(p.getTitle()) + .map(v -> v.getValue()) + .orElse(null), + Optional + .ofNullable(p.getFundingtree()) + .map( + value -> value + .stream() + .map(ft -> getFunder(ft.getValue())) + .collect(Collectors.toList()) + .get(0)) + .orElse(null)); + List projList = new ArrayList<>(); + projList.add(ps); + rp.setProjectsList(projList); + it.forEachRemaining(c -> { + Project op = c._1(); + if (!projectSet.contains(op.getId())) { + projList + .add( + Projects + .newInstance( + op.getId(), + op.getCode().getValue(), + Optional + .ofNullable(op.getAcronym()) + .map(a -> a.getValue()) + .orElse(null), + Optional + .ofNullable(op.getTitle()) + .map(v -> v.getValue()) + .orElse(null), + Optional + .ofNullable(op.getFundingtree()) + .map( + value -> value + .stream() + .map(ft -> getFunder(ft.getValue())) + .collect(Collectors.toList()) + .get(0)) + .orElse(null))); + projectSet.add(op.getId()); - SparkConf conf = new SparkConf(); + } - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - prepareResultProjectList(spark, inputPath, outputPath); - }); - } + }); + return rp; + }, Encoders.bean(ResultProject.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } - private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath) { - Dataset relation = Utils.readPath(spark, inputPath + "/relation" , Relation.class) - .filter("dataInfo.deletedbyinference = false and relClass = 'produces'"); - Dataset projects = Utils.readPath(spark, inputPath + "/project" , Project.class); + private static Funder getFunder(String fundingtree) { + // ["nsf_________::NSFNSFNational Science + // FoundationUSnsf_________::NSF::CISE/OAD::CISE/CCFDivision + // of Computing and Communication FoundationsDivision of Computing and Communication + // Foundationsnsf_________::NSF::CISE/OADDirectorate for + // Computer & Information Science & EngineeringDirectorate for Computer & + // Information Science & + // Engineeringnsf:fundingStream"] + Funder f = new Funder(); + final Document doc; + try { + doc = new SAXReader().read(new StringReader(fundingtree)); + f.setShortName(((Node) (doc.selectNodes("//funder/shortname").get(0))).getText()); + f.setName(((Node) (doc.selectNodes("//funder/name").get(0))).getText()); + f.setJurisdiction(((Node) (doc.selectNodes("//funder/jurisdiction").get(0))).getText()); + for (Object o : doc.selectNodes("//funding_level_0")) { + List node = ((Node) o).selectNodes("./name"); + f.setFundingStream(((Node) node.get(0)).getText()); - projects.joinWith(relation, projects.col("id").equalTo(relation.col("source"))) - .groupByKey((MapFunction,String>)value -> value._2().getTarget(), Encoders.STRING()) - .mapGroups((MapGroupsFunction, ResultProject>) (s, it) -> - { - Tuple2 first = it.next(); - ResultProject rp = new ResultProject(); - rp.setResultId(first._2().getTarget()); - Project p = first._1(); - Projects ps = Projects.newInstance(p.getId(), p.getCode().getValue(), p.getAcronym().getValue(), - p.getTitle().getValue(), p.getFundingtree() - .stream() - .map(ft -> ft.getValue()).collect(Collectors.toList())); - List projList = Arrays.asList(ps); - rp.setProjectsList(projList); - it.forEachRemaining(c -> { - Project op = c._1(); - projList.add(Projects.newInstance(op.getId(), op.getCode().getValue(), - op.getAcronym().getValue(), op.getTitle().getValue(), - op.getFundingtree().stream().map(ft -> ft.getValue()).collect(Collectors.toList()))); - }); - return rp; - } ,Encoders.bean(ResultProject.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); - } + } + + return f; + } catch (DocumentException e) { + e.printStackTrace(); + } + return f; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkSplitForCommunity.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkSplitForCommunity.java index 7bc4d5f80..dd6df59bf 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkSplitForCommunity.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkSplitForCommunity.java @@ -1,98 +1,122 @@ + package eu.dnetlib.dhp.oa.graph.dump; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.io.StringReader; +import java.util.*; +import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.dump.oaf.Result; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; +import com.google.gson.Gson; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.dump.oaf.Result; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; public class SparkSplitForCommunity implements Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkSplitForCommunity.class); + private static final Logger log = LoggerFactory.getLogger(SparkSplitForCommunity.class); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkSplitForCommunity.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/split_parameters.json")); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkSplitForCommunity.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/split_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + final String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookUpUrl: {}", isLookUpUrl); - final String resultClassName = parser.get("resultTableName"); - log.info("resultTableName: {}", resultClassName); + final Optional cm = Optional.ofNullable(parser.get("communityMap")); - final String isLookUpUrl = parser.get("isLookUpUrl"); - log.info("isLookUpUrl: {}", isLookUpUrl); + Class inputClazz = (Class) Class.forName(resultClassName); + SparkConf conf = new SparkConf(); - Class inputClazz = (Class) Class.forName(resultClassName); + CommunityMap communityMap; - SparkConf conf = new SparkConf(); + if (!isLookUpUrl.equals("BASEURL:8280/is/services/isLookUp")) { + QueryInformationSystem queryInformationSystem = new QueryInformationSystem(); + queryInformationSystem.setIsLookUp(getIsLookUpService(isLookUpUrl)); + communityMap = queryInformationSystem.getCommunityMap(); + } else { + communityMap = new Gson().fromJson(cm.get(), CommunityMap.class); + } - Map - communityMap = QueryInformationSystem.getCommunityMap(isLookUpUrl); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + execSplit(spark, inputPath, outputPath, communityMap.keySet(), inputClazz); + }); + } + public static ISLookUpService getIsLookUpService(String isLookUpUrl) { + return ISLookupClientFactory.getLookUpService(isLookUpUrl); + } - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - execSplit(spark, inputPath, outputPath , communityMap.keySet(), inputClazz); - }); - } + private static void execSplit(SparkSession spark, String inputPath, String outputPath, + Set communities, Class inputClazz) { - private static void execSplit(SparkSession spark, String inputPath, String outputPath, Set communities - , Class inputClazz) { + Dataset result = Utils.readPath(spark, inputPath, inputClazz); - Dataset result = Utils.readPath(spark, inputPath, inputClazz); + communities + .stream() + .forEach(c -> printResult(c, result, outputPath)); - communities.stream() - .forEach(c -> printResult(c, result, outputPath)); + } - } + private static void printResult(String c, Dataset result, String outputPath) { + result + .filter(r -> containsCommunity(r, c)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Append) + .json(outputPath + "/" + c); + } - private static void printResult(String c, Dataset result, String outputPath) { - result.filter(r -> containsCommunity(r, c)) - .write() - .option("compression","gzip") - .mode(SaveMode.Append) - .json(outputPath + "/" + c); - } - - private static boolean containsCommunity(R r, String c) { - if(Optional.ofNullable(r.getContext()).isPresent()) { - return r.getContext().stream().filter(con -> con.getCode().equals(c)).collect(Collectors.toList()).size() > 0; - } - return false; - } + private static boolean containsCommunity(R r, String c) { + if (Optional.ofNullable(r.getContext()).isPresent()) { + return r + .getContext() + .stream() + .filter(con -> con.getCode().equals(c)) + .collect(Collectors.toList()) + .size() > 0; + } + return false; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkUpdateProjectInfo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkUpdateProjectInfo.java index b5031f8dd..3fce0abe9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkUpdateProjectInfo.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SparkUpdateProjectInfo.java @@ -1,11 +1,11 @@ + package eu.dnetlib.dhp.oa.graph.dump; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Optional; -import eu.dnetlib.dhp.schema.dump.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Relation; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -16,84 +16,86 @@ import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.dump.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; -import java.io.Serializable; -import java.util.Optional; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - public class SparkUpdateProjectInfo implements Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkUpdateProjectInfo.class); - public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Logger log = LoggerFactory.getLogger(SparkUpdateProjectInfo.class); + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkUpdateProjectInfo.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/project_input_parameters.json")); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkUpdateProjectInfo.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/project_input_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - final String resultClassName = parser.get("resultTableName"); - log.info("resultTableName: {}", resultClassName); + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + final String preparedInfoPath = parser.get("preparedInfoPath"); + log.info("preparedInfoPath: {}", preparedInfoPath); - final String preparedInfoPath = parser.get("preparedInfoPath"); - log.info("preparedInfoPath: {}", preparedInfoPath); + Class inputClazz = (Class) Class.forName(resultClassName); - Class inputClazz = (Class) Class.forName(resultClassName); + SparkConf conf = new SparkConf(); - SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + extend(spark, inputPath, outputPath, preparedInfoPath, inputClazz); + }); + } - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - extend(spark, inputPath, outputPath , preparedInfoPath, inputClazz); - }); - } + private static void extend( + SparkSession spark, + String inputPath, + String outputPath, + String preparedInfoPath, + Class inputClazz) { - private static void extend( - SparkSession spark, - String inputPath, - String outputPath, - String preparedInfoPath, - Class inputClazz) { + Dataset result = Utils.readPath(spark, inputPath, inputClazz); + Dataset resultProject = Utils.readPath(spark, preparedInfoPath, ResultProject.class); + result + .joinWith( + resultProject, result.col("id").equalTo(resultProject.col("resultId")), + "left") + .map(value -> { + R r = value._1(); + Optional.ofNullable(value._2()).ifPresent(rp -> { + r.setProjects(rp.getProjectsList()); + }); + return r; + }, Encoders.bean(inputClazz)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); - Dataset result = Utils.readPath(spark, inputPath , inputClazz); - Dataset resultProject = Utils.readPath(spark, preparedInfoPath, ResultProject.class); - result.joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId")), - "left") - .map(value -> { - R r = value._1(); - Optional.ofNullable(value._2()).ifPresent(rp -> { - r.setProjects(rp.getProjectsList()); - }); - return r; - },Encoders.bean(inputClazz)) - .write() - .option("compression","gzip") - .mode(SaveMode.Overwrite) - .json(outputPath); + } - - } - - } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java index 9b2aa7d79..7d43ea3fe 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java @@ -1,23 +1,27 @@ + package eu.dnetlib.dhp.oa.graph.dump; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.common.HdfsSupport; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; -public class Utils { - public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void removeOutputDir(SparkSession spark, String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } +import com.fasterxml.jackson.databind.ObjectMapper; - public static Dataset readPath( - SparkSession spark, String inputPath, Class clazz) { - return spark - .read() - .textFile(inputPath) - .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); - } +import eu.dnetlib.dhp.common.HdfsSupport; + +public class Utils { + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } }