fixed issues while running on cluster

This commit is contained in:
Miriam Baglioni 2020-06-15 11:12:14 +02:00
parent 56e70573c2
commit f96ca900e1
10 changed files with 867 additions and 565 deletions

View File

@ -1,7 +1,8 @@
package eu.dnetlib.dhp.oa.graph.dump; package eu.dnetlib.dhp.oa.graph.dump;
import java.io.Serializable;
import java.util.HashMap; import java.util.HashMap;
public class CommunityMap extends HashMap<String, String> { public class CommunityMap extends HashMap<String, String> implements Serializable {
} }

View File

@ -1,18 +1,29 @@
package eu.dnetlib.dhp.oa.graph.dump; package eu.dnetlib.dhp.oa.graph.dump;
import com.google.common.collect.Maps;
import java.util.Map; import java.util.Map;
import com.google.common.collect.Maps;
public class Constants { public class Constants {
public static final Map<String, String> accessRightsCoarMap = Maps.newHashMap(); public static final Map<String, String> accessRightsCoarMap = Maps.newHashMap();
public static final Map<String, String> coarCodeLabelMap = Maps.newHashMap();
public static String COAR_ACCESS_RIGHT_SCHEMA = "http://vocabularies.coar-repositories.org/documentation/access_rights/";
static { static {
accessRightsCoarMap.put("OPEN", "http://purl.org/coar/access_right/c_abf2"); accessRightsCoarMap.put("OPEN", "c_abf2");
accessRightsCoarMap.put("RESTRICTED", "http://purl.org/coar/access_right/c_16ec"); accessRightsCoarMap.put("RESTRICTED", "c_16ec");
accessRightsCoarMap.put("OPEN SOURCE", "http://purl.org/coar/access_right/c_abf2"); accessRightsCoarMap.put("OPEN SOURCE", "c_abf2");
accessRightsCoarMap.put("CLOSED", "http://purl.org/coar/access_right/c_14cb //metadataonly for coar"); accessRightsCoarMap.put("CLOSED", "c_14cb");
accessRightsCoarMap.put("EMBARGO", "http://purl.org/coar/access_right/c_f1cf"); accessRightsCoarMap.put("EMBARGO", "c_f1cf");
}
static {
coarCodeLabelMap.put("c_abf2", "OPEN");
coarCodeLabelMap.put("c_16ec", "RESTRICTED");
coarCodeLabelMap.put("c_14cb", "CLOSED");
coarCodeLabelMap.put("c_f1cf", "EMBARGO");
} }
} }

View File

@ -1,26 +1,34 @@
package eu.dnetlib.dhp.oa.graph.dump;
import eu.dnetlib.dhp.schema.dump.oaf.*; package eu.dnetlib.dhp.oa.graph.dump;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import scala.collection.immutable.Stream;
import java.io.Serializable; import java.io.Serializable;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.swing.text.html.Option;
import org.apache.avro.generic.GenericData;
import eu.dnetlib.dhp.schema.dump.oaf.*;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class Mapper implements Serializable { public class Mapper implements Serializable {
public static <I extends eu.dnetlib.dhp.schema.oaf.Result, O extends eu.dnetlib.dhp.schema.dump.oaf.Result> O map( public static <I extends eu.dnetlib.dhp.schema.oaf.Result, O extends eu.dnetlib.dhp.schema.dump.oaf.Result> O map(
I input, Map<String,String> communityMap){ I input, Map<String, String> communityMap) {
O out = null; O out = null;
switch (input.getResulttype().getClassid()){ Optional<eu.dnetlib.dhp.schema.oaf.Qualifier> ort = Optional.ofNullable(input.getResulttype());
if (ort.isPresent()) {
switch (ort.get().getClassid()) {
case "publication": case "publication":
out = (O)new Publication(); out = (O) new Publication();
Optional<Journal> journal = Optional.ofNullable(((eu.dnetlib.dhp.schema.oaf.Publication) input).getJournal()); Optional<Journal> journal = Optional
if(journal.isPresent()){ .ofNullable(((eu.dnetlib.dhp.schema.oaf.Publication) input).getJournal());
if (journal.isPresent()) {
Journal j = journal.get(); Journal j = journal.get();
Container c = new Container(); Container c = new Container();
c.setConferencedate(j.getConferencedate()); c.setConferencedate(j.getConferencedate());
@ -39,12 +47,17 @@ public class Mapper implements Serializable {
break; break;
case "dataset": case "dataset":
Dataset d = new Dataset(); Dataset d = new Dataset();
eu.dnetlib.dhp.schema.oaf.Dataset id = (eu.dnetlib.dhp.schema.oaf.Dataset)input; eu.dnetlib.dhp.schema.oaf.Dataset id = (eu.dnetlib.dhp.schema.oaf.Dataset) input;
d.setSize(id.getSize().getValue()); Optional.ofNullable(id.getSize()).ifPresent(v -> d.setSize(v.getValue()));
d.setVersion(id.getVersion().getValue()); Optional.ofNullable(id.getVersion()).ifPresent(v -> d.setVersion(v.getValue()));
List<eu.dnetlib.dhp.schema.oaf.GeoLocation> igl = id.getGeolocation(); d
d.setGeolocation(igl.stream() .setGeolocation(
Optional
.ofNullable(id.getGeolocation())
.map(
igl -> igl
.stream()
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(gli -> { .map(gli -> {
GeoLocation gl = new GeoLocation(); GeoLocation gl = new GeoLocation();
@ -52,172 +65,343 @@ public class Mapper implements Serializable {
gl.setPlace(gli.getPlace()); gl.setPlace(gli.getPlace());
gl.setPoint(gli.getPoint()); gl.setPoint(gli.getPoint());
return gl; return gl;
}).collect(Collectors.toList())); })
out = (O)d; .collect(Collectors.toList()))
.orElse(null));
out = (O) d;
break; break;
case "software": case "software":
Software s = new Software(); Software s = new Software();
eu.dnetlib.dhp.schema.oaf.Software is = (eu.dnetlib.dhp.schema.oaf.Software)input; eu.dnetlib.dhp.schema.oaf.Software is = (eu.dnetlib.dhp.schema.oaf.Software) input;
s.setCodeRepositoryUrl(is.getCodeRepositoryUrl().getValue()); Optional
s.setDocumentationUrl(is.getDocumentationUrl() .ofNullable(is.getCodeRepositoryUrl())
.ifPresent(value -> s.setCodeRepositoryUrl(value.getValue()));
Optional
.ofNullable(is.getDocumentationUrl())
.ifPresent(
value -> s
.setDocumentationUrl(
value
.stream() .stream()
.map(du -> du.getValue()).collect(Collectors.toList())); .map(v -> v.getValue())
s.setProgrammingLanguage(is.getProgrammingLanguage().getClassid()); .collect(Collectors.toList())));
Optional
.ofNullable(is.getProgrammingLanguage())
.ifPresent(value -> s.setProgrammingLanguage(value.getClassid()));
out = (O) s; out = (O) s;
break; break;
case "otherresearchproduct": case "other":
OtherResearchProduct or = new OtherResearchProduct(); OtherResearchProduct or = new OtherResearchProduct();
eu.dnetlib.dhp.schema.oaf.OtherResearchProduct ir = (eu.dnetlib.dhp.schema.oaf.OtherResearchProduct)input; eu.dnetlib.dhp.schema.oaf.OtherResearchProduct ir = (eu.dnetlib.dhp.schema.oaf.OtherResearchProduct) input;
or.setContactgroup(ir.getContactgroup().stream().map(cg -> cg.getValue()).collect(Collectors.toList())); or
or.setContactperson(ir.getContactperson().stream().map(cp->cp.getValue()).collect(Collectors.toList())); .setContactgroup(
or.setTool(ir.getTool().stream().map(t -> t.getValue()).collect(Collectors.toList())); Optional
.ofNullable(ir.getContactgroup())
.map(value -> value.stream().map(cg -> cg.getValue()).collect(Collectors.toList()))
.orElse(null));
or
.setContactperson(
Optional
.ofNullable(ir.getContactperson())
.map(value -> value.stream().map(cp -> cp.getValue()).collect(Collectors.toList()))
.orElse(null));
or
.setTool(
Optional
.ofNullable(ir.getTool())
.map(value -> value.stream().map(t -> t.getValue()).collect(Collectors.toList()))
.orElse(null));
out = (O) or; out = (O) or;
break; break;
} }
out.setAuthor(input.getAuthor() Optional<List<eu.dnetlib.dhp.schema.oaf.Author>> oAuthor = Optional.ofNullable(input.getAuthor());
if (oAuthor.isPresent()) {
// List<eu.dnetlib.dhp.schema.dump.oaf.Author> authorList = new ArrayList<>();
out
.setAuthor(
oAuthor
.get()
.stream() .stream()
.map(oa -> { .map(oa -> getAuthor(oa))
Author a = new Author(); .collect(Collectors.toList()));
a.setAffiliation(oa.getAffiliation().stream().map(aff -> aff.getValue()).collect(Collectors.toList()));
a.setFullname(oa.getFullname());
a.setName(oa.getName());
a.setSurname(oa.getSurname());
a.setRank(oa.getRank());
a.setPid(oa.getPid().stream().map(p -> {
ControlledField cf = new ControlledField();
cf.setScheme( p.getQualifier().getClassid());
cf.setValue( p.getValue());
return cf;
}).collect(Collectors.toList()));
return a;
}).collect(Collectors.toList()));
//I do not map Access Right UNKNOWN or OTHER
if (Constants.accessRightsCoarMap.containsKey(input.getBestaccessright().getClassid())){
AccessRight ar = new AccessRight();
ar.setSchema(Constants.accessRightsCoarMap.get(input.getBestaccessright().getClassid()));
ar.setCode(input.getBestaccessright().getClassid());
ar.setLabel(input.getBestaccessright().getClassname());
out.setBestaccessright(ar);
} }
out.setCollectedfrom(input.getCollectedfrom().stream().map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue())) // I do not map Access Right UNKNOWN or OTHER
Optional<eu.dnetlib.dhp.schema.oaf.Qualifier> oar = Optional.ofNullable(input.getBestaccessright());
if (oar.isPresent()) {
if (Constants.accessRightsCoarMap.containsKey(oar.get().getClassid())) {
String code = Constants.accessRightsCoarMap.get(oar.get().getClassid());
out
.setBestaccessright(
AccessRight
.newInstance(
code,
Constants.coarCodeLabelMap.get(code),
Constants.COAR_ACCESS_RIGHT_SCHEMA));
}
}
out
.setCollectedfrom(
input
.getCollectedfrom()
.stream()
.map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue()))
.collect(Collectors.toList())); .collect(Collectors.toList()));
Set<String> communities = communityMap.keySet(); Set<String> communities = communityMap.keySet();
List<Context> contextList = input.getContext() List<Context> contextList = input
.getContext()
.stream() .stream()
.map(c -> { .map(c -> {
if(communities.contains(c.getId())){ if (communities.contains(c.getId())
|| communities.contains(c.getId().substring(0, c.getId().indexOf("::")))) {
Context context = new Context(); Context context = new Context();
if (!communityMap.containsKey(c.getId())) {
context.setCode(c.getId().substring(0, c.getId().indexOf("::")));
context.setLabel(communityMap.get(context.getCode()));
} else {
context.setCode(c.getId()); context.setCode(c.getId());
context.setLabel(communityMap.get(c.getId())); context.setLabel(communityMap.get(c.getId()));
}
Optional<List<DataInfo>> dataInfo = Optional.ofNullable(c.getDataInfo()); Optional<List<DataInfo>> dataInfo = Optional.ofNullable(c.getDataInfo());
if(dataInfo.isPresent()){ if (dataInfo.isPresent()) {
context.setProvenance(dataInfo.get().stream() List<String> provenance = new ArrayList<>();
provenance
.addAll(
dataInfo
.get()
.stream()
.map(di -> { .map(di -> {
if (di.getInferred()){ if (di.getInferred()) {
return di.getProvenanceaction().getClassid(); return di.getProvenanceaction().getClassname();
} }
return null; return null;
}).filter(Objects::nonNull) })
.collect(Collectors.toList())); .filter(Objects::nonNull)
.collect(Collectors.toSet()));
context.setProvenance(provenance);
} }
return context; return context;
} }
return null; return null;
} })
).filter(Objects::nonNull) .filter(Objects::nonNull)
.collect(Collectors.toList()); .collect(Collectors.toList());
if(contextList.size() > 0){ if (contextList.size() > 0) {
out.setContext(contextList); out.setContext(contextList);
} }
out.setContributor(input.getContributor() final List<String> contributorList = new ArrayList<>();
Optional
.ofNullable(input.getContributor())
.ifPresent(value -> value.stream().forEach(c -> contributorList.add(c.getValue())));
out.setContributor(contributorList);
List<Country> countryList = new ArrayList<>();
Optional
.ofNullable(input.getCountry())
.ifPresent(
value -> value
.stream() .stream()
.map(c -> c.getValue()).collect(Collectors.toList())); .forEach(
out.setCountry(input.getCountry() c -> {
.stream()
.map(c -> {
Country country = new Country(); Country country = new Country();
country.setCode(c.getClassid()); country.setCode(c.getClassid());
country.setLabel(c.getClassname()); country.setLabel(c.getClassname());
Optional<DataInfo> dataInfo = Optional.ofNullable(c.getDataInfo()); Optional
if(dataInfo.isPresent()){ .ofNullable(c.getDataInfo())
country.setProvenance(dataInfo.get().getProvenanceaction().getClassid()); .ifPresent(
} provenance -> country
return country; .setProvenance(
}).collect(Collectors.toList())); provenance
out.setCoverage(input.getCoverage().stream().map(c->c.getValue()).collect(Collectors.toList())); .getProvenanceaction()
.getClassname()));
countryList
.add(country);
}));
out.setCountry(countryList);
final List<String> coverageList = new ArrayList<>();
Optional
.ofNullable(input.getCoverage())
.ifPresent(value -> value.stream().forEach(c -> coverageList.add(c.getValue())));
out.setCoverage(coverageList);
out.setDateofcollection(input.getDateofcollection()); out.setDateofcollection(input.getDateofcollection());
out.setDescription(input.getDescription().stream().map(d->d.getValue()).collect(Collectors.toList()));
out.setEmbargoenddate(input.getEmbargoenddate().getValue()); final List<String> descriptionList = new ArrayList<>();
out.setFormat(input.getFormat().stream().map(f->f.getValue()).collect(Collectors.toList())); Optional
.ofNullable(input.getDescription())
.ifPresent(value -> value.stream().forEach(d -> descriptionList.add(d.getValue())));
out.setDescription(descriptionList);
Optional<Field<String>> oStr = Optional.ofNullable(input.getEmbargoenddate());
if (oStr.isPresent()) {
out.setEmbargoenddate(oStr.get().getValue());
}
final List<String> formatList = new ArrayList<>();
Optional
.ofNullable(input.getFormat())
.ifPresent(value -> value.stream().forEach(f -> formatList.add(f.getValue())));
out.setFormat(formatList);
out.setId(input.getId()); out.setId(input.getId());
out.setOriginalId(input.getOriginalId()); out.setOriginalId(input.getOriginalId());
out.setInstance(input.getInstance()
final List<Instance> instanceList = new ArrayList<>();
Optional
.ofNullable(input.getInstance())
.ifPresent(
inst -> inst
.stream() .stream()
.map(i -> { .forEach(i -> {
Instance instance = new Instance(); Instance instance = new Instance();
AccessRight ar = new AccessRight();
ar.setCode(i.getAccessright().getClassid()); Optional<eu.dnetlib.dhp.schema.oaf.Qualifier> opAr = Optional
ar.setLabel(i.getAccessright().getClassname()); .ofNullable(i.getAccessright());
if(Constants.accessRightsCoarMap.containsKey(i.getAccessright().getClassid())){ if (opAr.isPresent()) {
ar.setSchema(Constants.accessRightsCoarMap.get(i.getAccessright().getClassid())); if (Constants.accessRightsCoarMap.containsKey(opAr.get().getClassid())) {
String code = Constants.accessRightsCoarMap.get(opAr.get().getClassid());
instance
.setAccessright(
AccessRight
.newInstance(
code,
Constants.coarCodeLabelMap.get(code),
Constants.COAR_ACCESS_RIGHT_SCHEMA));
}
} }
instance.setAccessright(ar);
instance.setCollectedfrom(KeyValue.newInstance(i.getCollectedfrom().getKey(), i.getCollectedfrom().getValue()));
instance.setHostedby(KeyValue.newInstance(i.getHostedby().getKey(),i.getHostedby().getValue()));
instance.setLicense(i.getLicense().getValue());
instance.setPublicationdata(i.getDateofacceptance().getValue());
instance.setRefereed(i.getRefereed().getValue());
instance.setType(i.getInstancetype().getClassid());
instance.setUrl(i.getUrl());
return instance;
}).collect(Collectors.toList()));
out.setLanguage(Qualifier.newInstance(input.getLanguage().getClassid(), input.getLanguage().getClassname())); instance
out.setLastupdatetimestamp(input.getLastupdatetimestamp()); .setCollectedfrom(
KeyValue
.newInstance(i.getCollectedfrom().getKey(), i.getCollectedfrom().getValue()));
instance
.setHostedby(
KeyValue.newInstance(i.getHostedby().getKey(), i.getHostedby().getValue()));
Optional
.ofNullable(i.getLicense())
.ifPresent(value -> instance.setLicense(value.getValue()));
Optional
.ofNullable(i.getDateofacceptance())
.ifPresent(value -> instance.setPublicationdate(value.getValue()));
Optional
.ofNullable(i.getRefereed())
.ifPresent(value -> instance.setRefereed(value.getValue()));
Optional
.ofNullable(i.getInstancetype())
.ifPresent(value -> instance.setType(value.getClassname()));
Optional.ofNullable(i.getUrl()).ifPresent(value -> instance.setUrl(value));
instanceList.add(instance);
}));
out
.setInstance(instanceList);
Optional<eu.dnetlib.dhp.schema.oaf.Qualifier> oL = Optional.ofNullable(input.getLanguage());
if (oL.isPresent()) {
eu.dnetlib.dhp.schema.oaf.Qualifier language = oL.get();
out.setLanguage(Qualifier.newInstance(language.getClassid(), language.getClassname()));
}
Optional<Long> oLong = Optional.ofNullable(input.getLastupdatetimestamp());
if (oLong.isPresent()) {
out.setLastupdatetimestamp(oLong.get());
}
Optional<List<StructuredProperty>> otitle = Optional.ofNullable(input.getTitle()); Optional<List<StructuredProperty>> otitle = Optional.ofNullable(input.getTitle());
if(otitle.isPresent()){ if (otitle.isPresent()) {
List<StructuredProperty> iTitle = otitle.get() List<StructuredProperty> iTitle = otitle
.get()
.stream() .stream()
.filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("main title")) .filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("main title"))
.collect(Collectors.toList()); .collect(Collectors.toList());
if(iTitle.size() > 0 ){ if (iTitle.size() > 0) {
out.setMaintitle(iTitle.get(0).getValue()); out.setMaintitle(iTitle.get(0).getValue());
} }
iTitle = otitle.get() iTitle = otitle
.get()
.stream() .stream()
.filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("subtitle")) .filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("subtitle"))
.collect(Collectors.toList()); .collect(Collectors.toList());
if(iTitle.size() > 0){ if (iTitle.size() > 0) {
out.setSubtitle(iTitle.get(0).getValue()); out.setSubtitle(iTitle.get(0).getValue());
} }
} }
out.setPid(input.getPid().stream().map(p -> { List<ControlledField> pids = new ArrayList<>();
ControlledField pid = new ControlledField(); Optional
pid.setScheme(p.getQualifier().getClassid()); .ofNullable(input.getPid())
pid.setValue(p.getValue()); .ifPresent(
return pid; value -> value
}).collect(Collectors.toList())); .stream()
out.setPublicationdata(input.getDateofacceptance().getValue()); .forEach(
out.setPublisher(input.getPublisher().getValue()); p -> pids
out.setSource(input.getSource().stream().map(s -> s.getValue()).collect(Collectors.toList())); .add(
out.setSubject(input.getSubject().stream().map(s->{ ControlledField
ControlledField subject = new ControlledField(); .newInstance(p.getQualifier().getClassid(), p.getValue()))));
subject.setScheme(s.getQualifier().getClassid()); out.setPid(pids);
subject.setValue(s.getValue()); oStr = Optional.ofNullable(input.getDateofacceptance());
return subject; if (oStr.isPresent()) {
}).collect(Collectors.toList())); out.setPublicationdate(oStr.get().getValue());
}
oStr = Optional.ofNullable(input.getPublisher());
if (oStr.isPresent()) {
out.setPublisher(oStr.get().getValue());
}
List<String> sourceList = new ArrayList<>();
Optional
.ofNullable(input.getSource())
.ifPresent(value -> value.stream().forEach(s -> sourceList.add(s.getValue())));
// out.setSource(input.getSource().stream().map(s -> s.getValue()).collect(Collectors.toList()));
List<ControlledField> subjectList = new ArrayList<>();
Optional
.ofNullable(input.getSubject())
.ifPresent(
value -> value
.stream()
.forEach(
s -> subjectList
.add(ControlledField.newInstance(s.getQualifier().getClassid(), s.getValue()))));
out.setSubject(subjectList);
out.setType(input.getResulttype().getClassid()); out.setType(input.getResulttype().getClassid());
}
return out; return out;
} }
private static Author getAuthor(eu.dnetlib.dhp.schema.oaf.Author oa) {
Author a = new Author();
Optional
.ofNullable(oa.getAffiliation())
.ifPresent(
value -> a
.setAffiliation(
value
.stream()
.map(aff -> aff.getValue())
.collect(Collectors.toList())));
a.setFullname(oa.getFullname());
a.setName(oa.getName());
a.setSurname(oa.getSurname());
a.setRank(oa.getRank());
Optional
.ofNullable(oa.getPid())
.ifPresent(
value -> a
.setPid(
value
.stream()
.map(p -> ControlledField.newInstance(p.getQualifier().getClassid(), p.getValue()))
.collect(Collectors.toList())));
return a;
}
} }

View File

@ -1,24 +1,24 @@
package eu.dnetlib.dhp.oa.graph.dump; package eu.dnetlib.dhp.oa.graph.dump;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import java.io.StringReader;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import java.util.List;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import java.io.StringReader; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import java.util.HashMap; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import java.util.List; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.util.Map;
public class QueryInformationSystem { public class QueryInformationSystem {
private ISLookUpService isLookUp; private ISLookUpService isLookUp;
private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " + private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') "
+
" where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " + " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " +
" return " + " return " +
"<community> " + "<community> " +
@ -26,9 +26,7 @@ public class QueryInformationSystem {
"{$x//CONFIGURATION/context/@label}" + "{$x//CONFIGURATION/context/@label}" +
"</community>"; "</community>";
public CommunityMap getCommunityMap()
public Map<String,String> getCommunityMap()
throws ISLookUpException { throws ISLookUpException {
return getMap(isLookUp.quickSearchProfile(XQUERY)); return getMap(isLookUp.quickSearchProfile(XQUERY));
@ -42,12 +40,8 @@ public class QueryInformationSystem {
this.isLookUp = isLookUpService; this.isLookUp = isLookUpService;
} }
public void set(String isLookUpUrl){ public static CommunityMap getMap(List<String> communityMap) {
isLookUpUrl = get(isLookUpUrl); final CommunityMap map = new CommunityMap();
}
public ISLookUpService
private static Map<String, String> getMap(List<String> communityMap) {
final Map<String, String> map = new HashMap<>();
communityMap.stream().forEach(xml -> { communityMap.stream().forEach(xml -> {
final Document doc; final Document doc;
@ -59,7 +53,6 @@ public void set(String isLookUpUrl){
e.printStackTrace(); e.printStackTrace();
} }
}); });
return map; return map;

View File

@ -1,10 +1,11 @@
package eu.dnetlib.dhp.oa.graph.dump;
import eu.dnetlib.dhp.schema.dump.oaf.Projects; package eu.dnetlib.dhp.oa.graph.dump;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import eu.dnetlib.dhp.schema.dump.oaf.Projects;
public class ResultProject implements Serializable { public class ResultProject implements Serializable {
private String resultId; private String resultId;
private List<Projects> projectsList; private List<Projects> projectsList;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump; package eu.dnetlib.dhp.oa.graph.dump;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
@ -6,10 +7,8 @@ import java.io.Serializable;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.oaf.Context; import javax.management.Query;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -19,17 +18,19 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import javax.management.Query; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class SparkDumpCommunityProducts implements Serializable { public class SparkDumpCommunityProducts implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkDumpCommunityProducts.class); private static final Logger log = LoggerFactory.getLogger(SparkDumpCommunityProducts.class);
private QueryInformationSystem queryInformationSystem; private static QueryInformationSystem queryInformationSystem;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
@ -47,7 +48,6 @@ public class SparkDumpCommunityProducts implements Serializable {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath"); final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath); log.info("inputPath: {}", inputPath);
@ -63,83 +63,84 @@ public class SparkDumpCommunityProducts implements Serializable {
final String isLookUpUrl = parser.get("isLookUpUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl); log.info("isLookUpUrl: {}", isLookUpUrl);
final String resultType = parser.get("resultType"); // final String resultType = parser.get("resultType");
log.info("resultType: {}", resultType); // log.info("resultType: {}", resultType);
final Optional<String> cm = Optional.ofNullable(parser.get("communityMap"));
SparkDumpCommunityProducts sdcp = new SparkDumpCommunityProducts();
sdcp.exec(isLookUpUrl, isSparkSessionManaged, outputPath,
inputPath, resultClassName, dumpClassName);
}
public QueryInformationSystem getQueryInformationSystem() {
return queryInformationSystem;
}
public void setQueryInformationSystem(QueryInformationSystem queryInformationSystem) {
this.queryInformationSystem = queryInformationSystem;
}
public ISLookUpService getIsLookUpService(String isLookUpUrl){
return ISLookupClientFactory.getLookUpService(isLookUpUrl);
}
public void exec(String isLookUpUrl, Boolean isSparkSessionManaged, String outputPath, String inputPath,
String resultClassName, String dumpClassName) throws ISLookUpException, ClassNotFoundException {
SparkConf conf = new SparkConf();
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName); Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
Class<? extends eu.dnetlib.dhp.schema.dump.oaf.Result> dumpClazz = Class<? extends eu.dnetlib.dhp.schema.dump.oaf.Result> dumpClazz = (Class<? extends eu.dnetlib.dhp.schema.dump.oaf.Result>) Class
(Class<? extends eu.dnetlib.dhp.schema.dump.oaf.Result>) Class.forName(dumpClassName); .forName(dumpClassName);
SparkConf conf = new SparkConf();
CommunityMap communityMap;
if (!isLookUpUrl.equals("BASEURL:8280/is/services/isLookUp")) {
queryInformationSystem = new QueryInformationSystem();
queryInformationSystem.setIsLookUp(getIsLookUpService(isLookUpUrl)); queryInformationSystem.setIsLookUp(getIsLookUpService(isLookUpUrl));
Map<String,String>
communityMap = queryInformationSystem.getCommunityMap(); communityMap = queryInformationSystem.getCommunityMap();
} else {
communityMap = new Gson().fromJson(cm.get(), CommunityMap.class);
}
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath); Utils.removeOutputDir(spark, outputPath);
execDump(spark, inputPath, outputPath , communityMap, inputClazz, dumpClazz); execDump(spark, inputPath, outputPath, communityMap, inputClazz, dumpClazz);
}); });
} }
public static ISLookUpService getIsLookUpService(String isLookUpUrl) {
return ISLookupClientFactory.getLookUpService(isLookUpUrl);
}
private <I extends Result, O extends eu.dnetlib.dhp.schema.dump.oaf.Result > void execDump( public static <I extends Result, O extends eu.dnetlib.dhp.schema.dump.oaf.Result> void execDump(SparkSession spark,
SparkSession spark,
String inputPath, String inputPath,
String outputPath, String outputPath,
Map<String,String> communityMap, CommunityMap communityMap,
Class<I> inputClazz, Class<I> inputClazz,
Class<O> dumpClazz) { Class<O> dumpClazz) {
Set<String> communities = communityMap.keySet(); // Set<String> communities = communityMap.keySet();
Dataset<I> tmp = Utils.readPath(spark, inputPath, inputClazz); Dataset<I> tmp = Utils.readPath(spark, inputPath, inputClazz);
tmp.map(value -> {
tmp
.map(value -> execMap(value, communityMap), Encoders.bean(dumpClazz))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
private static <O extends eu.dnetlib.dhp.schema.dump.oaf.Result, I extends Result> O execMap(I value,
CommunityMap communityMap) {
{
Set<String> communities = communityMap.keySet();
Optional<List<Context>> inputContext = Optional.ofNullable(value.getContext()); Optional<List<Context>> inputContext = Optional.ofNullable(value.getContext());
if(!inputContext.isPresent()){ if (!inputContext.isPresent()) {
return null; return null;
} }
List<String> toDumpFor = inputContext.get().stream().map(c -> { List<String> toDumpFor = inputContext.get().stream().map(c -> {
if (communities.contains(c.getId())) { if (communities.contains(c.getId())) {
return c.getId(); return c.getId();
} }
if (c.getId().contains("::") && communities.contains(c.getId().substring(0, c.getId().indexOf("::")))) {
return c.getId().substring(0, 3);
}
return null; return null;
}).filter(Objects::nonNull).collect(Collectors.toList()); }).filter(Objects::nonNull).collect(Collectors.toList());
if(toDumpFor.size() == 0){ if (toDumpFor.size() == 0) {
return null; return null;
} }
return Mapper.map(value, communityMap); return Mapper.map(value, communityMap);
},Encoders.bean(dumpClazz)) }
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(outputPath);
} }
} }

View File

@ -1,11 +1,13 @@
package eu.dnetlib.dhp.oa.graph.dump; package eu.dnetlib.dhp.oa.graph.dump;
import com.fasterxml.jackson.databind.ObjectMapper; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.dump.oaf.Projects; import java.io.Serializable;
import eu.dnetlib.dhp.schema.dump.oaf.Result; import java.io.StringReader;
import eu.dnetlib.dhp.schema.oaf.Project; import java.util.*;
import eu.dnetlib.dhp.schema.oaf.Relation; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -14,22 +16,28 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.dump.oaf.Funder;
import eu.dnetlib.dhp.schema.dump.oaf.Projects;
import eu.dnetlib.dhp.schema.dump.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2; import scala.Tuple2;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class SparkPrepareResultProject implements Serializable { public class SparkPrepareResultProject implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkPrepareResultProject.class); private static final Logger log = LoggerFactory.getLogger(SparkPrepareResultProject.class);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
@ -52,7 +60,6 @@ public class SparkPrepareResultProject implements Serializable {
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -65,35 +72,109 @@ public class SparkPrepareResultProject implements Serializable {
} }
private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath) { private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath) {
Dataset<Relation> relation = Utils.readPath(spark, inputPath + "/relation" , Relation.class) Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class)
.filter("dataInfo.deletedbyinference = false and relClass = 'produces'"); .filter("dataInfo.deletedbyinference = false and relClass = 'produces'");
Dataset<Project> projects = Utils.readPath(spark, inputPath + "/project" , Project.class); Dataset<Project> projects = Utils.readPath(spark, inputPath + "/project", Project.class);
projects.joinWith(relation, projects.col("id").equalTo(relation.col("source"))) projects
.groupByKey((MapFunction<Tuple2<Project,Relation>,String>)value -> value._2().getTarget(), Encoders.STRING()) .joinWith(relation, projects.col("id").equalTo(relation.col("source")))
.mapGroups((MapGroupsFunction<String, Tuple2<Project,Relation>, ResultProject>) (s, it) -> .groupByKey(
{ (MapFunction<Tuple2<Project, Relation>, String>) value -> value._2().getTarget(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<Project, Relation>, ResultProject>) (s, it) -> {
Set<String> projectSet = new HashSet<>();
Tuple2<Project, Relation> first = it.next(); Tuple2<Project, Relation> first = it.next();
ResultProject rp = new ResultProject(); ResultProject rp = new ResultProject();
rp.setResultId(first._2().getTarget()); rp.setResultId(first._2().getTarget());
Project p = first._1(); Project p = first._1();
Projects ps = Projects.newInstance(p.getId(), p.getCode().getValue(), p.getAcronym().getValue(), projectSet.add(p.getId());
p.getTitle().getValue(), p.getFundingtree() Projects ps = Projects
.newInstance(
p.getId(), p.getCode().getValue(),
Optional
.ofNullable(p.getAcronym())
.map(a -> a.getValue())
.orElse(null),
Optional
.ofNullable(p.getTitle())
.map(v -> v.getValue())
.orElse(null),
Optional
.ofNullable(p.getFundingtree())
.map(
value -> value
.stream() .stream()
.map(ft -> ft.getValue()).collect(Collectors.toList())); .map(ft -> getFunder(ft.getValue()))
List<Projects> projList = Arrays.asList(ps); .collect(Collectors.toList())
.get(0))
.orElse(null));
List<Projects> projList = new ArrayList<>();
projList.add(ps);
rp.setProjectsList(projList); rp.setProjectsList(projList);
it.forEachRemaining(c -> { it.forEachRemaining(c -> {
Project op = c._1(); Project op = c._1();
projList.add(Projects.newInstance(op.getId(), op.getCode().getValue(), if (!projectSet.contains(op.getId())) {
op.getAcronym().getValue(), op.getTitle().getValue(), projList
op.getFundingtree().stream().map(ft -> ft.getValue()).collect(Collectors.toList()))); .add(
Projects
.newInstance(
op.getId(),
op.getCode().getValue(),
Optional
.ofNullable(op.getAcronym())
.map(a -> a.getValue())
.orElse(null),
Optional
.ofNullable(op.getTitle())
.map(v -> v.getValue())
.orElse(null),
Optional
.ofNullable(op.getFundingtree())
.map(
value -> value
.stream()
.map(ft -> getFunder(ft.getValue()))
.collect(Collectors.toList())
.get(0))
.orElse(null)));
projectSet.add(op.getId());
}
}); });
return rp; return rp;
} ,Encoders.bean(ResultProject.class)) }, Encoders.bean(ResultProject.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath); .json(outputPath);
} }
private static Funder getFunder(String fundingtree) {
// ["<fundingtree><funder><id>nsf_________::NSF</id><shortname>NSF</shortname><name>National Science
// Foundation</name><jurisdiction>US</jurisdiction></funder><funding_level_1><id>nsf_________::NSF::CISE/OAD::CISE/CCF</id><description>Division
// of Computing and Communication Foundations</description><name>Division of Computing and Communication
// Foundations</name><parent><funding_level_0><id>nsf_________::NSF::CISE/OAD</id><description>Directorate for
// Computer &amp; Information Science &amp; Engineering</description><name>Directorate for Computer &amp;
// Information Science &amp;
// Engineering</name><parent/><class>nsf:fundingStream</class></funding_level_0></parent></funding_level_1></fundingtree>"]
Funder f = new Funder();
final Document doc;
try {
doc = new SAXReader().read(new StringReader(fundingtree));
f.setShortName(((Node) (doc.selectNodes("//funder/shortname").get(0))).getText());
f.setName(((Node) (doc.selectNodes("//funder/name").get(0))).getText());
f.setJurisdiction(((Node) (doc.selectNodes("//funder/jurisdiction").get(0))).getText());
for (Object o : doc.selectNodes("//funding_level_0")) {
List node = ((Node) o).selectNodes("./name");
f.setFundingStream(((Node) node.get(0)).getText());
}
return f;
} catch (DocumentException e) {
e.printStackTrace();
}
return f;
}
} }

View File

@ -1,29 +1,36 @@
package eu.dnetlib.dhp.oa.graph.dump; package eu.dnetlib.dhp.oa.graph.dump;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.io.StringReader;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.dump.oaf.Result;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.Serializable; import com.google.gson.Gson;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.dump.oaf.Result;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class SparkSplitForCommunity implements Serializable { public class SparkSplitForCommunity implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkSplitForCommunity.class); private static final Logger log = LoggerFactory.getLogger(SparkSplitForCommunity.class);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
@ -40,7 +47,6 @@ public class SparkSplitForCommunity implements Serializable {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath"); final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath); log.info("inputPath: {}", inputPath);
@ -53,45 +59,63 @@ public class SparkSplitForCommunity implements Serializable {
final String isLookUpUrl = parser.get("isLookUpUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl); log.info("isLookUpUrl: {}", isLookUpUrl);
final Optional<String> cm = Optional.ofNullable(parser.get("communityMap"));
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName); Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
Map<String,String> CommunityMap communityMap;
communityMap = QueryInformationSystem.getCommunityMap(isLookUpUrl);
if (!isLookUpUrl.equals("BASEURL:8280/is/services/isLookUp")) {
QueryInformationSystem queryInformationSystem = new QueryInformationSystem();
queryInformationSystem.setIsLookUp(getIsLookUpService(isLookUpUrl));
communityMap = queryInformationSystem.getCommunityMap();
} else {
communityMap = new Gson().fromJson(cm.get(), CommunityMap.class);
}
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath); Utils.removeOutputDir(spark, outputPath);
execSplit(spark, inputPath, outputPath , communityMap.keySet(), inputClazz); execSplit(spark, inputPath, outputPath, communityMap.keySet(), inputClazz);
}); });
} }
private static <R extends Result> void execSplit(SparkSession spark, String inputPath, String outputPath, Set<String> communities public static ISLookUpService getIsLookUpService(String isLookUpUrl) {
, Class<R> inputClazz) { return ISLookupClientFactory.getLookUpService(isLookUpUrl);
}
private static <R extends Result> void execSplit(SparkSession spark, String inputPath, String outputPath,
Set<String> communities, Class<R> inputClazz) {
Dataset<R> result = Utils.readPath(spark, inputPath, inputClazz); Dataset<R> result = Utils.readPath(spark, inputPath, inputClazz);
communities.stream() communities
.stream()
.forEach(c -> printResult(c, result, outputPath)); .forEach(c -> printResult(c, result, outputPath));
} }
private static <R extends Result> void printResult(String c, Dataset<R> result, String outputPath) { private static <R extends Result> void printResult(String c, Dataset<R> result, String outputPath) {
result.filter(r -> containsCommunity(r, c)) result
.filter(r -> containsCommunity(r, c))
.write() .write()
.option("compression","gzip") .option("compression", "gzip")
.mode(SaveMode.Append) .mode(SaveMode.Append)
.json(outputPath + "/" + c); .json(outputPath + "/" + c);
} }
private static <R extends Result> boolean containsCommunity(R r, String c) { private static <R extends Result> boolean containsCommunity(R r, String c) {
if(Optional.ofNullable(r.getContext()).isPresent()) { if (Optional.ofNullable(r.getContext()).isPresent()) {
return r.getContext().stream().filter(con -> con.getCode().equals(c)).collect(Collectors.toList()).size() > 0; return r
.getContext()
.stream()
.filter(con -> con.getCode().equals(c))
.collect(Collectors.toList())
.size() > 0;
} }
return false; return false;
} }

View File

@ -1,11 +1,11 @@
package eu.dnetlib.dhp.oa.graph.dump; package eu.dnetlib.dhp.oa.graph.dump;
import com.fasterxml.jackson.databind.ObjectMapper; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import java.io.Serializable;
import java.util.Optional;
import eu.dnetlib.dhp.schema.dump.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -16,13 +16,15 @@ import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.dump.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2; import scala.Tuple2;
import java.io.Serializable;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class SparkUpdateProjectInfo implements Serializable { public class SparkUpdateProjectInfo implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkUpdateProjectInfo.class); private static final Logger log = LoggerFactory.getLogger(SparkUpdateProjectInfo.class);
@ -53,7 +55,6 @@ public class SparkUpdateProjectInfo implements Serializable {
final String resultClassName = parser.get("resultTableName"); final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName); log.info("resultTableName: {}", resultClassName);
final String preparedInfoPath = parser.get("preparedInfoPath"); final String preparedInfoPath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", preparedInfoPath); log.info("preparedInfoPath: {}", preparedInfoPath);
@ -66,20 +67,22 @@ public class SparkUpdateProjectInfo implements Serializable {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath); Utils.removeOutputDir(spark, outputPath);
extend(spark, inputPath, outputPath , preparedInfoPath, inputClazz); extend(spark, inputPath, outputPath, preparedInfoPath, inputClazz);
}); });
} }
private static <R extends Result > void extend( private static <R extends Result> void extend(
SparkSession spark, SparkSession spark,
String inputPath, String inputPath,
String outputPath, String outputPath,
String preparedInfoPath, String preparedInfoPath,
Class<R> inputClazz) { Class<R> inputClazz) {
Dataset<R> result = Utils.readPath(spark, inputPath , inputClazz); Dataset<R> result = Utils.readPath(spark, inputPath, inputClazz);
Dataset<ResultProject> resultProject = Utils.readPath(spark, preparedInfoPath, ResultProject.class); Dataset<ResultProject> resultProject = Utils.readPath(spark, preparedInfoPath, ResultProject.class);
result.joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId")), result
.joinWith(
resultProject, result.col("id").equalTo(resultProject.col("resultId")),
"left") "left")
.map(value -> { .map(value -> {
R r = value._1(); R r = value._1();
@ -87,13 +90,12 @@ public class SparkUpdateProjectInfo implements Serializable {
r.setProjects(rp.getProjectsList()); r.setProjects(rp.getProjectsList());
}); });
return r; return r;
},Encoders.bean(inputClazz)) }, Encoders.bean(inputClazz))
.write() .write()
.option("compression","gzip") .option("compression", "gzip")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(outputPath); .json(outputPath);
} }
} }

View File

@ -1,14 +1,18 @@
package eu.dnetlib.dhp.oa.graph.dump; package eu.dnetlib.dhp.oa.graph.dump;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
public class Utils { public class Utils {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void removeOutputDir(SparkSession spark, String path) { public static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
} }