added fix for mixing result types, added configuration default to funder subworkflow

This commit is contained in:
Miriam Baglioni 2021-10-13 11:28:28 +02:00
parent fec40bdd95
commit 63933808d4
6 changed files with 389 additions and 338 deletions

View File

@ -11,6 +11,7 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
@ -57,7 +58,7 @@ public class DumpProducts implements Serializable {
Utils Utils
.readPath(spark, inputPath, inputClazz) .readPath(spark, inputPath, inputClazz)
.map((MapFunction<I, O>) value -> execMap(value, communityMap, dumpType), Encoders.bean(outputClazz)) .map((MapFunction<I, O>) value -> execMap(value, communityMap, dumpType), Encoders.bean(outputClazz))
.filter(Objects::nonNull) .filter((FilterFunction<O>) value -> value != null)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")

View File

@ -62,7 +62,7 @@ public class QueryInformationSystem {
for (String xml : communityMap) { for (String xml : communityMap) {
final Document doc; final Document doc;
final SAXReader reader = new SAXReader(); final SAXReader reader = new SAXReader();
reader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true); // reader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
doc = reader.read(new StringReader(xml)); doc = reader.read(new StringReader(xml));
Element root = doc.getRootElement(); Element root = doc.getRootElement();
map.put(root.attribute("id").getValue(), root.attribute("label").getValue()); map.put(root.attribute("id").getValue(), root.attribute("label").getValue());

View File

@ -40,105 +40,9 @@ public class ResultMapper implements Serializable {
eu.dnetlib.dhp.schema.oaf.Result input = (eu.dnetlib.dhp.schema.oaf.Result) in; eu.dnetlib.dhp.schema.oaf.Result input = (eu.dnetlib.dhp.schema.oaf.Result) in;
Optional<eu.dnetlib.dhp.schema.oaf.Qualifier> ort = Optional.ofNullable(input.getResulttype()); Optional<eu.dnetlib.dhp.schema.oaf.Qualifier> ort = Optional.ofNullable(input.getResulttype());
if (ort.isPresent()) { if (ort.isPresent()) {
switch (ort.get().getClassid()) { try {
case "publication":
Optional<Journal> journal = Optional
.ofNullable(((eu.dnetlib.dhp.schema.oaf.Publication) input).getJournal());
if (journal.isPresent()) {
Journal j = journal.get();
Container c = new Container();
c.setConferencedate(j.getConferencedate());
c.setConferenceplace(j.getConferenceplace());
c.setEdition(j.getEdition());
c.setEp(j.getEp());
c.setIss(j.getIss());
c.setIssnLinking(j.getIssnLinking());
c.setIssnOnline(j.getIssnOnline());
c.setIssnPrinted(j.getIssnPrinted());
c.setName(j.getName());
c.setSp(j.getSp());
c.setVol(j.getVol());
out.setContainer(c);
out.setType(ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE.getClassname());
}
break;
case "dataset":
eu.dnetlib.dhp.schema.oaf.Dataset id = (eu.dnetlib.dhp.schema.oaf.Dataset) input;
Optional.ofNullable(id.getSize()).ifPresent(v -> out.setSize(v.getValue()));
Optional.ofNullable(id.getVersion()).ifPresent(v -> out.setVersion(v.getValue()));
out
.setGeolocation(
Optional
.ofNullable(id.getGeolocation())
.map(
igl -> igl
.stream()
.filter(Objects::nonNull)
.map(gli -> {
GeoLocation gl = new GeoLocation();
gl.setBox(gli.getBox());
gl.setPlace(gli.getPlace());
gl.setPoint(gli.getPoint());
return gl;
})
.collect(Collectors.toList()))
.orElse(null));
out.setType(ModelConstants.DATASET_DEFAULT_RESULTTYPE.getClassname());
break;
case "software":
eu.dnetlib.dhp.schema.oaf.Software is = (eu.dnetlib.dhp.schema.oaf.Software) input;
Optional
.ofNullable(is.getCodeRepositoryUrl())
.ifPresent(value -> out.setCodeRepositoryUrl(value.getValue()));
Optional
.ofNullable(is.getDocumentationUrl())
.ifPresent(
value -> out
.setDocumentationUrl(
value
.stream()
.map(Field::getValue)
.collect(Collectors.toList())));
Optional
.ofNullable(is.getProgrammingLanguage())
.ifPresent(value -> out.setProgrammingLanguage(value.getClassid()));
out.setType(ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE.getClassname());
break;
case "other":
eu.dnetlib.dhp.schema.oaf.OtherResearchProduct ir = (eu.dnetlib.dhp.schema.oaf.OtherResearchProduct) input;
out
.setContactgroup(
Optional
.ofNullable(ir.getContactgroup())
.map(value -> value.stream().map(Field::getValue).collect(Collectors.toList()))
.orElse(null));
out
.setContactperson(
Optional
.ofNullable(ir.getContactperson())
.map(value -> value.stream().map(Field::getValue).collect(Collectors.toList()))
.orElse(null));
out
.setTool(
Optional
.ofNullable(ir.getTool())
.map(value -> value.stream().map(Field::getValue).collect(Collectors.toList()))
.orElse(null));
out.setType(ModelConstants.ORP_DEFAULT_RESULTTYPE.getClassname());
break;
default:
throw new NoAvailableEntityTypeException();
}
addTypeSpecificInformation(out, input, ort);
Optional<List<Measure>> mes = Optional.ofNullable(input.getMeasures()); Optional<List<Measure>> mes = Optional.ofNullable(input.getMeasures());
if (mes.isPresent()) { if (mes.isPresent()) {
List<KeyValue> measure = new ArrayList<>(); List<KeyValue> measure = new ArrayList<>();
@ -242,7 +146,11 @@ public class ResultMapper implements Serializable {
} else { } else {
((CommunityResult) out) ((CommunityResult) out)
.setInstance( .setInstance(
oInst.get().stream().map(ResultMapper::getCommunityInstance).collect(Collectors.toList())); oInst
.get()
.stream()
.map(ResultMapper::getCommunityInstance)
.collect(Collectors.toList()));
} }
} }
@ -300,7 +208,8 @@ public class ResultMapper implements Serializable {
Optional Optional
.ofNullable(input.getSource()) .ofNullable(input.getSource())
.ifPresent(value -> out.setSource(value.stream().map(Field::getValue).collect(Collectors.toList()))); .ifPresent(
value -> out.setSource(value.stream().map(Field::getValue).collect(Collectors.toList())));
List<Subject> subjectList = new ArrayList<>(); List<Subject> subjectList = new ArrayList<>();
Optional Optional
@ -312,7 +221,6 @@ public class ResultMapper implements Serializable {
out.setSubjects(subjectList); out.setSubjects(subjectList);
out.setType(input.getResulttype().getClassid()); out.setType(input.getResulttype().getClassid());
}
if (!Constants.DUMPTYPE.COMPLETE.getType().equals(dumpType)) { if (!Constants.DUMPTYPE.COMPLETE.getType().equals(dumpType)) {
((CommunityResult) out) ((CommunityResult) out)
@ -354,7 +262,8 @@ public class ResultMapper implements Serializable {
.map( .map(
provenanceaction -> Provenance provenanceaction -> Provenance
.newInstance( .newInstance(
provenanceaction.getClassname(), di.getTrust())) provenanceaction.getClassname(),
di.getTrust()))
.orElse(null)) .orElse(null))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.collect(Collectors.toSet())); .collect(Collectors.toSet()));
@ -385,10 +294,117 @@ public class ResultMapper implements Serializable {
((CommunityResult) out).setContext(remainigContext); ((CommunityResult) out).setContext(remainigContext);
} }
} }
} catch (ClassCastException cce) {
return out;
}
}
return out; return out;
} }
private static void addTypeSpecificInformation(Result out, eu.dnetlib.dhp.schema.oaf.Result input,
Optional<eu.dnetlib.dhp.schema.oaf.Qualifier> ort) throws NoAvailableEntityTypeException {
switch (ort.get().getClassid()) {
case "publication":
Optional<Journal> journal = Optional
.ofNullable(((Publication) input).getJournal());
if (journal.isPresent()) {
Journal j = journal.get();
Container c = new Container();
c.setConferencedate(j.getConferencedate());
c.setConferenceplace(j.getConferenceplace());
c.setEdition(j.getEdition());
c.setEp(j.getEp());
c.setIss(j.getIss());
c.setIssnLinking(j.getIssnLinking());
c.setIssnOnline(j.getIssnOnline());
c.setIssnPrinted(j.getIssnPrinted());
c.setName(j.getName());
c.setSp(j.getSp());
c.setVol(j.getVol());
out.setContainer(c);
out.setType(ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE.getClassname());
}
break;
case "dataset":
Dataset id = (Dataset) input;
Optional.ofNullable(id.getSize()).ifPresent(v -> out.setSize(v.getValue()));
Optional.ofNullable(id.getVersion()).ifPresent(v -> out.setVersion(v.getValue()));
out
.setGeolocation(
Optional
.ofNullable(id.getGeolocation())
.map(
igl -> igl
.stream()
.filter(Objects::nonNull)
.map(gli -> {
GeoLocation gl = new GeoLocation();
gl.setBox(gli.getBox());
gl.setPlace(gli.getPlace());
gl.setPoint(gli.getPoint());
return gl;
})
.collect(Collectors.toList()))
.orElse(null));
out.setType(ModelConstants.DATASET_DEFAULT_RESULTTYPE.getClassname());
break;
case "software":
Software is = (Software) input;
Optional
.ofNullable(is.getCodeRepositoryUrl())
.ifPresent(value -> out.setCodeRepositoryUrl(value.getValue()));
Optional
.ofNullable(is.getDocumentationUrl())
.ifPresent(
value -> out
.setDocumentationUrl(
value
.stream()
.map(Field::getValue)
.collect(Collectors.toList())));
Optional
.ofNullable(is.getProgrammingLanguage())
.ifPresent(value -> out.setProgrammingLanguage(value.getClassid()));
out.setType(ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE.getClassname());
break;
case "other":
OtherResearchProduct ir = (OtherResearchProduct) input;
out
.setContactgroup(
Optional
.ofNullable(ir.getContactgroup())
.map(value -> value.stream().map(Field::getValue).collect(Collectors.toList()))
.orElse(null));
out
.setContactperson(
Optional
.ofNullable(ir.getContactperson())
.map(value -> value.stream().map(Field::getValue).collect(Collectors.toList()))
.orElse(null));
out
.setTool(
Optional
.ofNullable(ir.getTool())
.map(value -> value.stream().map(Field::getValue).collect(Collectors.toList()))
.orElse(null));
out.setType(ModelConstants.ORP_DEFAULT_RESULTTYPE.getClassname());
break;
default:
throw new NoAvailableEntityTypeException();
}
}
private static Instance getGraphInstance(eu.dnetlib.dhp.schema.oaf.Instance i) { private static Instance getGraphInstance(eu.dnetlib.dhp.schema.oaf.Instance i) {
Instance instance = new Instance(); Instance instance = new Instance();

View File

@ -19,6 +19,7 @@ import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException; import org.xml.sax.SAXException;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
/** /**
@ -86,10 +87,13 @@ public class SaveCommunityMap implements Serializable {
private void saveCommunityMap(boolean singleCommunity, String communityId) private void saveCommunityMap(boolean singleCommunity, String communityId)
throws ISLookUpException, IOException, DocumentException, SAXException { throws ISLookUpException, IOException, DocumentException, SAXException {
final String communityMapString = Utils.OBJECT_MAPPER
.writeValueAsString(queryInformationSystem.getCommunityMap(singleCommunity, communityId));
log.info("communityMap {} ", communityMapString);
writer writer
.write( .write(
Utils.OBJECT_MAPPER communityMapString);
.writeValueAsString(queryInformationSystem.getCommunityMap(singleCommunity, communityId))); writer.close();
} }
} }

View File

@ -30,7 +30,7 @@
"paramLongName": "communityId", "paramLongName": "communityId",
"paramDescription": "the id of the community for which to create the dump", "paramDescription": "the id of the community for which to create the dump",
"paramRequired": true "paramRequired": true
} } }
] ]

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>