enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
5 changed files with 217 additions and 205 deletions
Showing only changes of commit 7a7272d9ec - Show all commits

View File

@ -10,10 +10,27 @@ import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.oaiIProvenance; import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.oaiIProvenance;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier; import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty; import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*; import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_ACCESS_MODES; import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_ACCESS_MODES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PID_TYPES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.NOT_AVAILABLE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ORP_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.REPOSITORY_PROVENANCE_ACTIONS;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.UNKNOWN;
import java.util.*; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document; import org.dom4j.Document;
@ -47,10 +64,8 @@ public abstract class AbstractMdRecordToOafMapper {
protected static final String DATACITE_SCHEMA_KERNEL_4 = "http://datacite.org/schema/kernel-4"; protected static final String DATACITE_SCHEMA_KERNEL_4 = "http://datacite.org/schema/kernel-4";
protected static final String DATACITE_SCHEMA_KERNEL_3 = "http://datacite.org/schema/kernel-3"; protected static final String DATACITE_SCHEMA_KERNEL_3 = "http://datacite.org/schema/kernel-3";
protected static final Qualifier ORCID_PID_TYPE = qualifier( protected static final Qualifier ORCID_PID_TYPE = qualifier("ORCID", "Open Researcher and Contributor ID", DNET_PID_TYPES, DNET_PID_TYPES);
"ORCID", "Open Researcher and Contributor ID", DNET_PID_TYPES, DNET_PID_TYPES); protected static final Qualifier MAG_PID_TYPE = qualifier("MAGIdentifier", "Microsoft Academic Graph Identifier", DNET_PID_TYPES, DNET_PID_TYPES);
protected static final Qualifier MAG_PID_TYPE = qualifier(
"MAGIdentifier", "Microsoft Academic Graph Identifier", DNET_PID_TYPES, DNET_PID_TYPES);
protected static final Map<String, String> nsContext = new HashMap<>(); protected static final Map<String, String> nsContext = new HashMap<>();
@ -64,8 +79,7 @@ public abstract class AbstractMdRecordToOafMapper {
nsContext.put("datacite", DATACITE_SCHEMA_KERNEL_3); nsContext.put("datacite", DATACITE_SCHEMA_KERNEL_3);
} }
protected static final Qualifier MAIN_TITLE_QUALIFIER = qualifier( protected static final Qualifier MAIN_TITLE_QUALIFIER = qualifier("main title", "main title", "dnet:dataCite_title", "dnet:dataCite_title");
"main title", "main title", "dnet:dataCite_title", "dnet:dataCite_title");
protected AbstractMdRecordToOafMapper(final Map<String, String> code2name) { protected AbstractMdRecordToOafMapper(final Map<String, String> code2name) {
this.code2name = code2name; this.code2name = code2name;
@ -79,20 +93,15 @@ public abstract class AbstractMdRecordToOafMapper {
.parseText(xml.replaceAll(DATACITE_SCHEMA_KERNEL_4, DATACITE_SCHEMA_KERNEL_3)); .parseText(xml.replaceAll(DATACITE_SCHEMA_KERNEL_4, DATACITE_SCHEMA_KERNEL_3));
final String type = doc.valueOf("//dr:CobjCategory/@type"); final String type = doc.valueOf("//dr:CobjCategory/@type");
final KeyValue collectedFrom = getProvenanceDatasource( final KeyValue collectedFrom = getProvenanceDatasource(doc, "//oaf:collectedFrom/@id", "//oaf:collectedFrom/@name");
doc, "//oaf:collectedFrom/@id", "//oaf:collectedFrom/@name");
if (collectedFrom == null) { if (collectedFrom == null) { return null; }
return null;
}
final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id")) final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id"))
? collectedFrom ? collectedFrom
: getProvenanceDatasource(doc, "//oaf:hostedBy/@id", "//oaf:hostedBy/@name"); : getProvenanceDatasource(doc, "//oaf:hostedBy/@id", "//oaf:hostedBy/@name");
if (hostedBy == null) { if (hostedBy == null) { return null; }
return null;
}
final DataInfo info = prepareDataInfo(doc); final DataInfo info = prepareDataInfo(doc);
final long lastUpdateTimestamp = new Date().getTime(); final long lastUpdateTimestamp = new Date().getTime();
@ -107,9 +116,7 @@ public abstract class AbstractMdRecordToOafMapper {
final String dsId = doc.valueOf(xpathId); final String dsId = doc.valueOf(xpathId);
final String dsName = doc.valueOf(xpathName); final String dsName = doc.valueOf(xpathName);
if (StringUtils.isBlank(dsId) | StringUtils.isBlank(dsName)) { if (StringUtils.isBlank(dsId) | StringUtils.isBlank(dsName)) { return null; }
return null;
}
return keyValue(createOpenaireId(10, dsId, true), dsName); return keyValue(createOpenaireId(10, dsId, true), dsName);
} }
@ -125,47 +132,47 @@ public abstract class AbstractMdRecordToOafMapper {
final List<Oaf> oafs = new ArrayList<>(); final List<Oaf> oafs = new ArrayList<>();
switch (type.toLowerCase()) { switch (type.toLowerCase()) {
case "publication": case "publication":
final Publication p = new Publication(); final Publication p = new Publication();
populateResultFields(p, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp); populateResultFields(p, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
p.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE); p.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE);
p.setJournal(prepareJournal(doc, info)); p.setJournal(prepareJournal(doc, info));
oafs.add(p); oafs.add(p);
break; break;
case "dataset": case "dataset":
final Dataset d = new Dataset(); final Dataset d = new Dataset();
populateResultFields(d, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp); populateResultFields(d, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
d.setResulttype(DATASET_DEFAULT_RESULTTYPE); d.setResulttype(DATASET_DEFAULT_RESULTTYPE);
d.setStoragedate(prepareDatasetStorageDate(doc, info)); d.setStoragedate(prepareDatasetStorageDate(doc, info));
d.setDevice(prepareDatasetDevice(doc, info)); d.setDevice(prepareDatasetDevice(doc, info));
d.setSize(prepareDatasetSize(doc, info)); d.setSize(prepareDatasetSize(doc, info));
d.setVersion(prepareDatasetVersion(doc, info)); d.setVersion(prepareDatasetVersion(doc, info));
d.setLastmetadataupdate(prepareDatasetLastMetadataUpdate(doc, info)); d.setLastmetadataupdate(prepareDatasetLastMetadataUpdate(doc, info));
d.setMetadataversionnumber(prepareDatasetMetadataVersionNumber(doc, info)); d.setMetadataversionnumber(prepareDatasetMetadataVersionNumber(doc, info));
d.setGeolocation(prepareDatasetGeoLocations(doc, info)); d.setGeolocation(prepareDatasetGeoLocations(doc, info));
oafs.add(d); oafs.add(d);
break; break;
case "software": case "software":
final Software s = new Software(); final Software s = new Software();
populateResultFields(s, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp); populateResultFields(s, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
s.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE); s.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE);
s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info)); s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info));
s.setLicense(prepareSoftwareLicenses(doc, info)); s.setLicense(prepareSoftwareLicenses(doc, info));
s.setCodeRepositoryUrl(prepareSoftwareCodeRepositoryUrl(doc, info)); s.setCodeRepositoryUrl(prepareSoftwareCodeRepositoryUrl(doc, info));
s.setProgrammingLanguage(prepareSoftwareProgrammingLanguage(doc, info)); s.setProgrammingLanguage(prepareSoftwareProgrammingLanguage(doc, info));
oafs.add(s); oafs.add(s);
break; break;
case "": case "":
case "otherresearchproducts": case "otherresearchproducts":
default: default:
final OtherResearchProduct o = new OtherResearchProduct(); final OtherResearchProduct o = new OtherResearchProduct();
populateResultFields(o, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp); populateResultFields(o, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
o.setResulttype(ORP_DEFAULT_RESULTTYPE); o.setResulttype(ORP_DEFAULT_RESULTTYPE);
o.setContactperson(prepareOtherResearchProductContactPersons(doc, info)); o.setContactperson(prepareOtherResearchProductContactPersons(doc, info));
o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info)); o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info));
o.setTool(prepareOtherResearchProductTools(doc, info)); o.setTool(prepareOtherResearchProductTools(doc, info));
oafs.add(o); oafs.add(o);
break; break;
} }
if (!oafs.isEmpty()) { if (!oafs.isEmpty()) {
@ -194,15 +201,9 @@ public abstract class AbstractMdRecordToOafMapper {
final String projectId = createOpenaireId(40, originalId, true); final String projectId = createOpenaireId(40, originalId, true);
res res
.add( .add(getRelation(docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, collectedFrom, info, lastUpdateTimestamp));
getRelation(
docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, collectedFrom, info,
lastUpdateTimestamp));
res res
.add( .add(getRelation(projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, collectedFrom, info, lastUpdateTimestamp));
getRelation(
projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, collectedFrom, info,
lastUpdateTimestamp));
} }
} }
@ -247,10 +248,7 @@ public abstract class AbstractMdRecordToOafMapper {
r.setId(createOpenaireId(50, doc.valueOf("//dri:objIdentifier"), false)); r.setId(createOpenaireId(50, doc.valueOf("//dri:objIdentifier"), false));
r.setOriginalId(Arrays.asList(doc.valueOf("//dri:objIdentifier"))); r.setOriginalId(Arrays.asList(doc.valueOf("//dri:objIdentifier")));
r.setCollectedfrom(Arrays.asList(collectedFrom)); r.setCollectedfrom(Arrays.asList(collectedFrom));
r r.setPid(prepareResultPids(doc, info));
.setPid(
prepareListStructProps(
doc, "//oaf:identifier", "@identifierType", "dnet:pid_types", "dnet:pid_types", info));
r.setDateofcollection(doc.valueOf("//dr:dateOfCollection")); r.setDateofcollection(doc.valueOf("//dr:dateOfCollection"));
r.setDateoftransformation(doc.valueOf("//dr:dateOfTransformation")); r.setDateoftransformation(doc.valueOf("//dr:dateOfTransformation"));
r.setExtraInfo(new ArrayList<>()); // NOT PRESENT IN MDSTORES r.setExtraInfo(new ArrayList<>()); // NOT PRESENT IN MDSTORES
@ -278,6 +276,8 @@ public abstract class AbstractMdRecordToOafMapper {
r.setBestaccessright(getBestAccessRights(instances)); r.setBestaccessright(getBestAccessRights(instances));
} }
protected abstract List<StructuredProperty> prepareResultPids(Document doc, DataInfo info);
private List<Context> prepareContexts(final Document doc, final DataInfo info) { private List<Context> prepareContexts(final Document doc, final DataInfo info) {
final List<Context> list = new ArrayList<>(); final List<Context> list = new ArrayList<>();
for (final Object o : doc.selectNodes("//oaf:concept")) { for (final Object o : doc.selectNodes("//oaf:concept")) {
@ -358,7 +358,7 @@ public abstract class AbstractMdRecordToOafMapper {
protected abstract Field<String> prepareDatasetStorageDate(Document doc, DataInfo info); protected abstract Field<String> prepareDatasetStorageDate(Document doc, DataInfo info);
protected static Qualifier getBestAccessRights(List<Instance> instanceList) { protected static Qualifier getBestAccessRights(final List<Instance> instanceList) {
if (instanceList != null) { if (instanceList != null) {
final Optional<Qualifier> min = instanceList final Optional<Qualifier> min = instanceList
.stream() .stream()
@ -398,9 +398,7 @@ public abstract class AbstractMdRecordToOafMapper {
final String sp = n.valueOf("@sp"); final String sp = n.valueOf("@sp");
final String vol = n.valueOf("@vol"); final String vol = n.valueOf("@vol");
final String edition = n.valueOf("@edition"); final String edition = n.valueOf("@edition");
if (StringUtils.isNotBlank(name)) { if (StringUtils.isNotBlank(name)) { return journal(name, issnPrinted, issnOnline, issnLinking, ep, iss, sp, vol, edition, null, null, info); }
return journal(name, issnPrinted, issnOnline, issnLinking, ep, iss, sp, vol, edition, null, null, info);
}
} }
return null; return null;
} }
@ -453,10 +451,7 @@ public abstract class AbstractMdRecordToOafMapper {
for (final Object o : node.selectNodes(xpath)) { for (final Object o : node.selectNodes(xpath)) {
final Node n = (Node) o; final Node n = (Node) o;
res res
.add( .add(structuredProperty(n.getText(), n.valueOf("@classid"), n.valueOf("@classname"), n.valueOf("@schemeid"), n.valueOf("@schemename"), info));
structuredProperty(
n.getText(), n.valueOf("@classid"), n.valueOf("@classname"), n.valueOf("@schemeid"),
n.valueOf("@schemename"), info));
} }
return res; return res;
} }
@ -464,9 +459,7 @@ public abstract class AbstractMdRecordToOafMapper {
protected OAIProvenance prepareOAIprovenance(final Document doc) { protected OAIProvenance prepareOAIprovenance(final Document doc) {
final Node n = doc.selectSingleNode("//*[local-name()='provenance']/*[local-name()='originDescription']"); final Node n = doc.selectSingleNode("//*[local-name()='provenance']/*[local-name()='originDescription']");
if (n == null) { if (n == null) { return null; }
return null;
}
final String identifier = n.valueOf("./*[local-name()='identifier']"); final String identifier = n.valueOf("./*[local-name()='identifier']");
final String baseURL = n.valueOf("./*[local-name()='baseURL']"); final String baseURL = n.valueOf("./*[local-name()='baseURL']");
@ -481,9 +474,7 @@ public abstract class AbstractMdRecordToOafMapper {
protected DataInfo prepareDataInfo(final Document doc) { protected DataInfo prepareDataInfo(final Document doc) {
final Node n = doc.selectSingleNode("//oaf:datainfo"); final Node n = doc.selectSingleNode("//oaf:datainfo");
if (n == null) { if (n == null) { return dataInfo(false, null, false, false, REPOSITORY_PROVENANCE_ACTIONS, "0.9"); }
return dataInfo(false, null, false, false, REPOSITORY_PROVENANCE_ACTIONS, "0.9");
}
final String paClassId = n.valueOf("./oaf:provenanceaction/@classid"); final String paClassId = n.valueOf("./oaf:provenanceaction/@classid");
final String paClassName = n.valueOf("./oaf:provenanceaction/@classname"); final String paClassName = n.valueOf("./oaf:provenanceaction/@classname");
@ -495,9 +486,7 @@ public abstract class AbstractMdRecordToOafMapper {
final Boolean inferred = Boolean.parseBoolean(n.valueOf("./oaf:inferred")); final Boolean inferred = Boolean.parseBoolean(n.valueOf("./oaf:inferred"));
final String trust = n.valueOf("./oaf:trust"); final String trust = n.valueOf("./oaf:trust");
return dataInfo( return dataInfo(deletedbyinference, inferenceprovenance, inferred, false, qualifier(paClassId, paClassName, paSchemeId, paSchemeName), trust);
deletedbyinference, inferenceprovenance, inferred, false,
qualifier(paClassId, paClassName, paSchemeId, paSchemeName), trust);
} }
protected Field<String> prepareField(final Node node, final String xpath, final DataInfo info) { protected Field<String> prepareField(final Node node, final String xpath, final DataInfo info) {

View File

@ -5,7 +5,12 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.*; import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -27,7 +32,19 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient; import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2; import scala.Tuple2;
public class GenerateEntitiesApplication { public class GenerateEntitiesApplication {
@ -39,14 +56,12 @@ public class GenerateEntitiesApplication {
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(GenerateEntitiesApplication.class
MigrateMongoMdstoresApplication.class .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json")));
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged")) .ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf) .map(Boolean::valueOf)
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
@ -58,17 +73,17 @@ public class GenerateEntitiesApplication {
final String dbUrl = parser.get("postgresUrl"); final String dbUrl = parser.get("postgresUrl");
final String dbUser = parser.get("postgresUser"); final String dbUser = parser.get("postgresUser");
final String dbPassword = parser.get("postgresPassword"); final String dbPassword = parser.get("postgresPassword");
final String isLookupUrl = parser.get("isLookupUrl");
final Map<String, String> code2name = loadClassNames(dbUrl, dbUser, dbPassword); final Map<String, String> code2name = loadVocsFromDB(dbUrl, dbUser, dbPassword);
SparkConf conf = new SparkConf(); code2name.putAll(loadVocsFromIS(isLookupUrl));
runWithSparkSession(
conf, final SparkConf conf = new SparkConf();
isSparkSessionManaged, runWithSparkSession(conf, isSparkSessionManaged, spark -> {
spark -> { removeOutputDir(spark, targetPath);
removeOutputDir(spark, targetPath); generateEntities(spark, code2name, sourcePaths, targetPath);
generateEntities(spark, code2name, sourcePaths, targetPath); });
});
} }
private static void generateEntities( private static void generateEntities(
@ -77,7 +92,7 @@ public class GenerateEntitiesApplication {
final String sourcePaths, final String sourcePaths,
final String targetPath) { final String targetPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final List<String> existingSourcePaths = Arrays final List<String> existingSourcePaths = Arrays
.stream(sourcePaths.split(",")) .stream(sourcePaths.split(","))
.filter(p -> exists(sc, p)) .filter(p -> exists(sc, p))
@ -90,27 +105,25 @@ public class GenerateEntitiesApplication {
for (final String sp : existingSourcePaths) { for (final String sp : existingSourcePaths) {
inputRdd = inputRdd inputRdd = inputRdd
.union( .union(sc
sc .sequenceFile(sp, Text.class, Text.class)
.sequenceFile(sp, Text.class, Text.class) .map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) .map(k -> convertToListOaf(k._1(), k._2(), code2name))
.map(k -> convertToListOaf(k._1(), k._2(), code2name)) .filter(Objects::nonNull)
.filter(Objects::nonNull) .flatMap(list -> list.iterator()));
.flatMap(list -> list.iterator()));
} }
inputRdd inputRdd
.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
.reduceByKey((o1, o2) -> merge(o1, o2)) .reduceByKey((o1, o2) -> merge(o1, o2))
.map(Tuple2::_2) .map(Tuple2::_2)
.map( .map(oaf -> oaf.getClass().getSimpleName().toLowerCase()
oaf -> oaf.getClass().getSimpleName().toLowerCase() + "|"
+ "|" + OBJECT_MAPPER.writeValueAsString(oaf))
+ OBJECT_MAPPER.writeValueAsString(oaf))
.saveAsTextFile(targetPath, GzipCodec.class); .saveAsTextFile(targetPath, GzipCodec.class);
} }
private static Oaf merge(Oaf o1, Oaf o2) { private static Oaf merge(final Oaf o1, final Oaf o2) {
if (ModelSupport.isSubClass(o1, OafEntity.class)) { if (ModelSupport.isSubClass(o1, OafEntity.class)) {
((OafEntity) o1).mergeFrom((OafEntity) o2); ((OafEntity) o1).mergeFrom((OafEntity) o2);
} else if (ModelSupport.isSubClass(o1, Relation.class)) { } else if (ModelSupport.isSubClass(o1, Relation.class)) {
@ -122,37 +135,41 @@ public class GenerateEntitiesApplication {
} }
private static List<Oaf> convertToListOaf( private static List<Oaf> convertToListOaf(
final String id, final String s, final Map<String, String> code2name) { final String id,
final String s,
final Map<String, String> code2name) {
final String type = StringUtils.substringAfter(id, ":"); final String type = StringUtils.substringAfter(id, ":");
switch (type.toLowerCase()) { switch (type.toLowerCase()) {
case "native_oaf": case "native_oaf":
return new OafToOafMapper(code2name).processMdRecord(s); return new OafToOafMapper(code2name).processMdRecord(s);
case "native_odf": case "native_odf":
return new OdfToOafMapper(code2name).processMdRecord(s); return new OdfToOafMapper(code2name).processMdRecord(s);
case "datasource": case "datasource":
return Arrays.asList(convertFromJson(s, Datasource.class)); return Arrays.asList(convertFromJson(s, Datasource.class));
case "organization": case "organization":
return Arrays.asList(convertFromJson(s, Organization.class)); return Arrays.asList(convertFromJson(s, Organization.class));
case "project": case "project":
return Arrays.asList(convertFromJson(s, Project.class)); return Arrays.asList(convertFromJson(s, Project.class));
case "relation": case "relation":
return Arrays.asList(convertFromJson(s, Relation.class)); return Arrays.asList(convertFromJson(s, Relation.class));
case "publication": case "publication":
return Arrays.asList(convertFromJson(s, Publication.class)); return Arrays.asList(convertFromJson(s, Publication.class));
case "dataset": case "dataset":
return Arrays.asList(convertFromJson(s, Dataset.class)); return Arrays.asList(convertFromJson(s, Dataset.class));
case "software": case "software":
return Arrays.asList(convertFromJson(s, Software.class)); return Arrays.asList(convertFromJson(s, Software.class));
case "otherresearchproduct": case "otherresearchproduct":
return Arrays.asList(convertFromJson(s, OtherResearchProduct.class)); return Arrays.asList(convertFromJson(s, OtherResearchProduct.class));
default: default:
throw new RuntimeException("type not managed: " + type.toLowerCase()); throw new RuntimeException("type not managed: " + type.toLowerCase());
} }
} }
private static Map<String, String> loadClassNames( private static Map<String, String> loadVocsFromDB(
final String dbUrl, final String dbUser, final String dbPassword) throws IOException { final String dbUrl,
final String dbUser,
final String dbPassword) throws IOException {
log.info("Loading vocabulary terms from db..."); log.info("Loading vocabulary terms from db...");
@ -160,15 +177,13 @@ public class GenerateEntitiesApplication {
try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) {
dbClient dbClient
.processResults( .processResults("select code, name from class", rs -> {
"select code, name from class", try {
rs -> { map.put(rs.getString("code"), rs.getString("name"));
try { } catch (final SQLException e) {
map.put(rs.getString("code"), rs.getString("name")); e.printStackTrace();
} catch (final SQLException e) { }
e.printStackTrace(); });
}
});
} }
log.info("Found " + map.size() + " terms."); log.info("Found " + map.size() + " terms.");
@ -176,6 +191,24 @@ public class GenerateEntitiesApplication {
return map; return map;
} }
private static Map<String, String> loadVocsFromIS(final String isLookupUrl) throws IOException, ISLookUpException {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String xquery =
IOUtils.toString(GenerateEntitiesApplication.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/xquery/load_vocabularies.xquery"));
final Map<String, String> map = new HashMap<>();
for (final String s : isLookUpService.quickSearchProfile(xquery)) {
final String[] arr = s.split("@=@");
if (arr.length == 4) {
map.put(arr[2].trim(), arr[3].trim());
}
}
return map;
}
private static Oaf convertFromJson(final String s, final Class<? extends Oaf> clazz) { private static Oaf convertFromJson(final String s, final Class<? extends Oaf> clazz) {
try { try {
return OBJECT_MAPPER.readValue(s, clazz); return OBJECT_MAPPER.readValue(s, clazz);
@ -196,7 +229,7 @@ public class GenerateEntitiesApplication {
} }
} }
private static void removeOutputDir(SparkSession spark, String path) { private static void removeOutputDir(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
} }
} }

View File

@ -130,8 +130,7 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
final Instance instance = new Instance(); final Instance instance = new Instance();
instance instance
.setInstancetype( .setInstancetype(prepareQualifier(doc, "//dr:CobjCategory", DNET_PUBLICATION_RESOURCE, DNET_PUBLICATION_RESOURCE));
prepareQualifier(doc, "//dr:CobjCategory", DNET_PUBLICATION_RESOURCE, DNET_PUBLICATION_RESOURCE));
instance.setCollectedfrom(collectedfrom); instance.setCollectedfrom(collectedfrom);
instance.setHostedby(hostedby); instance.setHostedby(hostedby);
instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info)); instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info));
@ -147,14 +146,13 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
final List<Node> nodes = Lists.newArrayList(doc.selectNodes("//dc:identifier")); final List<Node> nodes = Lists.newArrayList(doc.selectNodes("//dc:identifier"));
instance instance
.setUrl( .setUrl(nodes
nodes .stream()
.stream() .filter(n -> StringUtils.isNotBlank(n.getText()))
.filter(n -> StringUtils.isNotBlank(n.getText())) .map(n -> n.getText().trim())
.map(n -> n.getText().trim()) .filter(u -> u.startsWith("http"))
.filter(u -> u.startsWith("http")) .distinct()
.distinct() .collect(Collectors.toCollection(ArrayList::new)));
.collect(Collectors.toCollection(ArrayList::new)));
return Lists.newArrayList(instance); return Lists.newArrayList(instance);
} }
@ -279,15 +277,9 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
final String otherId = createOpenaireId(50, originalId, false); final String otherId = createOpenaireId(50, originalId, false);
res res
.add( .add(getRelation(docId, otherId, RESULT_RESULT, PUBLICATION_DATASET, IS_RELATED_TO, collectedFrom, info, lastUpdateTimestamp));
getRelation(
docId, otherId, RESULT_RESULT, PUBLICATION_DATASET, IS_RELATED_TO, collectedFrom, info,
lastUpdateTimestamp));
res res
.add( .add(getRelation(otherId, docId, RESULT_RESULT, PUBLICATION_DATASET, IS_RELATED_TO, collectedFrom, info, lastUpdateTimestamp));
getRelation(
otherId, docId, RESULT_RESULT, PUBLICATION_DATASET, IS_RELATED_TO, collectedFrom, info,
lastUpdateTimestamp));
} }
} }
return res; return res;
@ -297,4 +289,9 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
protected Qualifier prepareResourceType(final Document doc, final DataInfo info) { protected Qualifier prepareResourceType(final Document doc, final DataInfo info) {
return null; // NOT PRESENT IN OAF return null; // NOT PRESENT IN OAF
} }
@Override
protected List<StructuredProperty> prepareResultPids(final Document doc, final DataInfo info) {
return prepareListStructProps(doc, "//oaf:identifier", "@identifierType", "dnet:pid_types", "dnet:pid_types", info);
}
} }

View File

@ -120,8 +120,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
final Instance instance = new Instance(); final Instance instance = new Instance();
instance instance
.setInstancetype( .setInstancetype(prepareQualifier(doc, "//dr:CobjCategory", DNET_PUBLICATION_RESOURCE, DNET_PUBLICATION_RESOURCE));
prepareQualifier(doc, "//dr:CobjCategory", DNET_PUBLICATION_RESOURCE, DNET_PUBLICATION_RESOURCE));
instance.setCollectedfrom(collectedfrom); instance.setCollectedfrom(collectedfrom);
instance.setHostedby(hostedby); instance.setHostedby(hostedby);
instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info)); instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info));
@ -170,10 +169,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
&& !dateType.equalsIgnoreCase("Updated") && !dateType.equalsIgnoreCase("Updated")
&& !dateType.equalsIgnoreCase("Available")) { && !dateType.equalsIgnoreCase("Available")) {
res res
.add( .add(structuredProperty(((Node) o).getText(), "UNKNOWN", "UNKNOWN", DNET_DATA_CITE_DATE, DNET_DATA_CITE_DATE, info));
structuredProperty(
((Node) o).getText(), "UNKNOWN", "UNKNOWN", DNET_DATA_CITE_DATE, DNET_DATA_CITE_DATE,
info));
} }
} }
return res; return res;
@ -225,16 +221,14 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
protected List<Field<String>> prepareOtherResearchProductContactGroups( protected List<Field<String>> prepareOtherResearchProductContactGroups(
final Document doc, final Document doc,
final DataInfo info) { final DataInfo info) {
return prepareListFields( return prepareListFields(doc, "//datacite:contributor[@contributorType='ContactGroup']/datacite:contributorName", info);
doc, "//datacite:contributor[@contributorType='ContactGroup']/datacite:contributorName", info);
} }
@Override @Override
protected List<Field<String>> prepareOtherResearchProductContactPersons( protected List<Field<String>> prepareOtherResearchProductContactPersons(
final Document doc, final Document doc,
final DataInfo info) { final DataInfo info) {
return prepareListFields( return prepareListFields(doc, "//datacite:contributor[@contributorType='ContactPerson']/datacite:contributorName", info);
doc, "//datacite:contributor[@contributorType='ContactPerson']/datacite:contributorName", info);
} }
@Override @Override
@ -260,8 +254,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
protected List<Field<String>> prepareSoftwareDocumentationUrls( protected List<Field<String>> prepareSoftwareDocumentationUrls(
final Document doc, final Document doc,
final DataInfo info) { final DataInfo info) {
return prepareListFields( return prepareListFields(doc, "//datacite:relatedIdentifier[@relatedIdentifierType='URL' and @relationType='IsDocumentedBy']", info);
doc, "//datacite:relatedIdentifier[@relatedIdentifierType='URL' and @relationType='IsDocumentedBy']", info);
} }
// DATASETS // DATASETS
@ -335,29 +328,16 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
if (type.equalsIgnoreCase("IsSupplementTo")) { if (type.equalsIgnoreCase("IsSupplementTo")) {
res res
.add( .add(getRelation(docId, otherId, RESULT_RESULT, SUPPLEMENT, IS_SUPPLEMENT_TO, collectedFrom, info, lastUpdateTimestamp));
getRelation(
docId, otherId, RESULT_RESULT, SUPPLEMENT, IS_SUPPLEMENT_TO, collectedFrom, info,
lastUpdateTimestamp));
res res
.add( .add(getRelation(otherId, docId, RESULT_RESULT, SUPPLEMENT, IS_SUPPLEMENTED_BY, collectedFrom, info, lastUpdateTimestamp));
getRelation(
otherId, docId, RESULT_RESULT, SUPPLEMENT, IS_SUPPLEMENTED_BY, collectedFrom, info,
lastUpdateTimestamp));
} else if (type.equals("IsPartOf")) { } else if (type.equals("IsPartOf")) {
res res
.add( .add(getRelation(docId, otherId, RESULT_RESULT, PART, IS_PART_OF, collectedFrom, info, lastUpdateTimestamp));
getRelation(
docId, otherId, RESULT_RESULT, PART, IS_PART_OF, collectedFrom, info,
lastUpdateTimestamp));
res res
.add( .add(getRelation(otherId, docId, RESULT_RESULT, PART, HAS_PARTS, collectedFrom, info, lastUpdateTimestamp));
getRelation( } else {}
otherId, docId, RESULT_RESULT, PART, HAS_PARTS, collectedFrom, info,
lastUpdateTimestamp));
} else {
}
} }
} }
return res; return res;
@ -365,8 +345,16 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
@Override @Override
protected Qualifier prepareResourceType(final Document doc, final DataInfo info) { protected Qualifier prepareResourceType(final Document doc, final DataInfo info) {
return prepareQualifier( return prepareQualifier(doc, "//*[local-name() = 'resource']//*[local-name() = 'resourceType']", DNET_DATA_CITE_RESOURCE, DNET_DATA_CITE_RESOURCE);
doc, "//*[local-name() = 'resource']//*[local-name() = 'resourceType']", DNET_DATA_CITE_RESOURCE,
DNET_DATA_CITE_RESOURCE);
} }
@Override
protected List<StructuredProperty> prepareResultPids(final Document doc, final DataInfo info) {
final List<StructuredProperty> res = new ArrayList<>();
res.addAll(prepareListStructProps(doc, "//oaf:identifier", "@identifierType", "dnet:pid_types", "dnet:pid_types", info));
res.addAll(prepareListStructProps(doc, "//datacite:identifier[@identifierType != 'URL']", "@identifierType", "dnet:pid_types", "dnet:pid_types", info));
res.addAll(prepareListStructProps(doc, "//datacite:alternateIdentifier[@alternateIdentifierType != 'URL']", "@alternateIdentifierType", "dnet:pid_types", "dnet:pid_types", info));
return res;
}
} }

View File

@ -34,6 +34,11 @@
"paramLongName": "postgresPassword", "paramLongName": "postgresPassword",
"paramDescription": "postgres password", "paramDescription": "postgres password",
"paramRequired": false "paramRequired": false
},
{
"paramName": "islookup",
"paramLongName": "isLookupUrl",
"paramDescription": "the url of the ISLookupService",
"paramRequired": true
} }
] ]