merge with upstream

This commit is contained in:
Miriam Baglioni 2020-04-27 10:43:59 +02:00
commit 5dccbe13db
38 changed files with 1242 additions and 683 deletions

View File

@ -0,0 +1,40 @@
package eu.dnetlib.dhp.schema.common;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class ModelConstants {
public static final String DNET_RESULT_TYPOLOGIES = "dnet:result_typologies";
public static final String DATASET_RESULTTYPE_CLASSID = "dataset";
public static final String PUBLICATION_RESULTTYPE_CLASSID = "publication";
public static final String SOFTWARE_RESULTTYPE_CLASSID = "software";
public static final String ORP_RESULTTYPE_CLASSID = "other";
public static Qualifier PUBLICATION_DEFAULT_RESULTTYPE = new Qualifier();
public static Qualifier DATASET_DEFAULT_RESULTTYPE = new Qualifier();
public static Qualifier SOFTWARE_DEFAULT_RESULTTYPE = new Qualifier();
public static Qualifier ORP_DEFAULT_RESULTTYPE = new Qualifier();
static {
PUBLICATION_DEFAULT_RESULTTYPE.setClassid(PUBLICATION_RESULTTYPE_CLASSID);
PUBLICATION_DEFAULT_RESULTTYPE.setClassname(PUBLICATION_RESULTTYPE_CLASSID);
PUBLICATION_DEFAULT_RESULTTYPE.setSchemeid(DNET_RESULT_TYPOLOGIES);
PUBLICATION_DEFAULT_RESULTTYPE.setSchemename(DNET_RESULT_TYPOLOGIES);
DATASET_DEFAULT_RESULTTYPE.setClassid(DATASET_RESULTTYPE_CLASSID);
DATASET_DEFAULT_RESULTTYPE.setClassname(DATASET_RESULTTYPE_CLASSID);
DATASET_DEFAULT_RESULTTYPE.setSchemeid(DNET_RESULT_TYPOLOGIES);
DATASET_DEFAULT_RESULTTYPE.setSchemename(DNET_RESULT_TYPOLOGIES);
SOFTWARE_DEFAULT_RESULTTYPE.setClassid(SOFTWARE_RESULTTYPE_CLASSID);
SOFTWARE_DEFAULT_RESULTTYPE.setClassname(SOFTWARE_RESULTTYPE_CLASSID);
SOFTWARE_DEFAULT_RESULTTYPE.setSchemeid(DNET_RESULT_TYPOLOGIES);
SOFTWARE_DEFAULT_RESULTTYPE.setSchemename(DNET_RESULT_TYPOLOGIES);
ORP_DEFAULT_RESULTTYPE.setClassid(ORP_RESULTTYPE_CLASSID);
ORP_DEFAULT_RESULTTYPE.setClassname(ORP_RESULTTYPE_CLASSID);
ORP_DEFAULT_RESULTTYPE.setSchemeid(DNET_RESULT_TYPOLOGIES);
ORP_DEFAULT_RESULTTYPE.setSchemename(DNET_RESULT_TYPOLOGIES);
}
}

View File

@ -3,6 +3,8 @@ package eu.dnetlib.dhp.schema.common;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.*;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
/** Oaf model utility methods. */
public class ModelSupport {
@ -146,4 +148,66 @@ public class ModelSupport {
entityMapping.get(EntityType.valueOf(sourceType)).name(),
entityMapping.get(EntityType.valueOf(targetType)).name());
}
public static <T extends Oaf> Function<T, String> idFn() {
return x -> {
if (isSubClass(x, Relation.class)) {
return idFnForRelation(x);
}
return idFnForOafEntity(x);
};
}
private static <T extends Oaf> String idFnForRelation(T t) {
Relation r = (Relation) t;
return Optional.ofNullable(r.getSource())
.map(
source ->
Optional.ofNullable(r.getTarget())
.map(
target ->
Optional.ofNullable(r.getRelType())
.map(
relType ->
Optional.ofNullable(
r
.getSubRelType())
.map(
subRelType ->
Optional
.ofNullable(
r
.getRelClass())
.map(
relClass ->
String
.join(
source,
target,
relType,
subRelType,
relClass))
.orElse(
String
.join(
source,
target,
relType,
subRelType)))
.orElse(
String
.join(
source,
target,
relType)))
.orElse(
String.join(
source, target)))
.orElse(source))
.orElse(null);
}
private static <T extends Oaf> String idFnForOafEntity(T t) {
return ((OafEntity) t).getId();
}
}

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.schema.oaf;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
public class Dataset extends Result implements Serializable {
@ -20,6 +20,10 @@ public class Dataset extends Result implements Serializable {
private List<GeoLocation> geolocation;
public Dataset() {
setResulttype(ModelConstants.DATASET_DEFAULT_RESULTTYPE);
}
public Field<String> getStoragedate() {
return storagedate;
}
@ -111,32 +115,4 @@ public class Dataset extends Result implements Serializable {
mergeOAFDataInfo(d);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Dataset dataset = (Dataset) o;
return Objects.equals(storagedate, dataset.storagedate)
&& Objects.equals(device, dataset.device)
&& Objects.equals(size, dataset.size)
&& Objects.equals(version, dataset.version)
&& Objects.equals(lastmetadataupdate, dataset.lastmetadataupdate)
&& Objects.equals(metadataversionnumber, dataset.metadataversionnumber)
&& Objects.equals(geolocation, dataset.geolocation);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
storagedate,
device,
size,
version,
lastmetadataupdate,
metadataversionnumber,
geolocation);
}
}

View File

@ -2,7 +2,6 @@ package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
public class Datasource extends OafEntity implements Serializable {
@ -512,88 +511,4 @@ public class Datasource extends OafEntity implements Serializable {
mergeOAFDataInfo(e);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Datasource that = (Datasource) o;
return Objects.equals(datasourcetype, that.datasourcetype)
&& Objects.equals(openairecompatibility, that.openairecompatibility)
&& Objects.equals(officialname, that.officialname)
&& Objects.equals(englishname, that.englishname)
&& Objects.equals(websiteurl, that.websiteurl)
&& Objects.equals(logourl, that.logourl)
&& Objects.equals(contactemail, that.contactemail)
&& Objects.equals(namespaceprefix, that.namespaceprefix)
&& Objects.equals(latitude, that.latitude)
&& Objects.equals(longitude, that.longitude)
&& Objects.equals(dateofvalidation, that.dateofvalidation)
&& Objects.equals(description, that.description)
&& Objects.equals(subjects, that.subjects)
&& Objects.equals(odnumberofitems, that.odnumberofitems)
&& Objects.equals(odnumberofitemsdate, that.odnumberofitemsdate)
&& Objects.equals(odpolicies, that.odpolicies)
&& Objects.equals(odlanguages, that.odlanguages)
&& Objects.equals(odcontenttypes, that.odcontenttypes)
&& Objects.equals(accessinfopackage, that.accessinfopackage)
&& Objects.equals(releasestartdate, that.releasestartdate)
&& Objects.equals(releaseenddate, that.releaseenddate)
&& Objects.equals(missionstatementurl, that.missionstatementurl)
&& Objects.equals(dataprovider, that.dataprovider)
&& Objects.equals(serviceprovider, that.serviceprovider)
&& Objects.equals(databaseaccesstype, that.databaseaccesstype)
&& Objects.equals(datauploadtype, that.datauploadtype)
&& Objects.equals(databaseaccessrestriction, that.databaseaccessrestriction)
&& Objects.equals(datauploadrestriction, that.datauploadrestriction)
&& Objects.equals(versioning, that.versioning)
&& Objects.equals(citationguidelineurl, that.citationguidelineurl)
&& Objects.equals(qualitymanagementkind, that.qualitymanagementkind)
&& Objects.equals(pidsystems, that.pidsystems)
&& Objects.equals(certificates, that.certificates)
&& Objects.equals(policies, that.policies)
&& Objects.equals(journal, that.journal);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
datasourcetype,
openairecompatibility,
officialname,
englishname,
websiteurl,
logourl,
contactemail,
namespaceprefix,
latitude,
longitude,
dateofvalidation,
description,
subjects,
odnumberofitems,
odnumberofitemsdate,
odpolicies,
odlanguages,
odcontenttypes,
accessinfopackage,
releasestartdate,
releaseenddate,
missionstatementurl,
dataprovider,
serviceprovider,
databaseaccesstype,
datauploadtype,
databaseaccessrestriction,
datauploadrestriction,
versioning,
citationguidelineurl,
qualitymanagementkind,
pidsystems,
certificates,
policies,
journal);
}
}

View File

@ -113,27 +113,11 @@ public abstract class OafEntity extends Oaf implements Serializable {
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
OafEntity oafEntity = (OafEntity) o;
return Objects.equals(id, oafEntity.id)
&& Objects.equals(originalId, oafEntity.originalId)
&& Objects.equals(collectedfrom, oafEntity.collectedfrom)
&& Objects.equals(pid, oafEntity.pid)
&& Objects.equals(dateofcollection, oafEntity.dateofcollection)
&& Objects.equals(dateoftransformation, oafEntity.dateoftransformation)
&& Objects.equals(extraInfo, oafEntity.extraInfo)
&& Objects.equals(oaiprovenance, oafEntity.oaiprovenance);
return Objects.equals(id, oafEntity.id);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
id,
originalId,
collectedfrom,
pid,
dateofcollection,
dateoftransformation,
extraInfo,
oaiprovenance);
return Objects.hash(super.hashCode(), id);
}
}

View File

@ -2,7 +2,6 @@ package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
public class Organization extends OafEntity implements Serializable {
@ -233,52 +232,4 @@ public class Organization extends OafEntity implements Serializable {
country = o.getCountry() != null && compareTrust(this, e) < 0 ? o.getCountry() : country;
mergeOAFDataInfo(o);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Organization that = (Organization) o;
return Objects.equals(legalshortname, that.legalshortname)
&& Objects.equals(legalname, that.legalname)
&& Objects.equals(alternativeNames, that.alternativeNames)
&& Objects.equals(websiteurl, that.websiteurl)
&& Objects.equals(logourl, that.logourl)
&& Objects.equals(eclegalbody, that.eclegalbody)
&& Objects.equals(eclegalperson, that.eclegalperson)
&& Objects.equals(ecnonprofit, that.ecnonprofit)
&& Objects.equals(ecresearchorganization, that.ecresearchorganization)
&& Objects.equals(echighereducation, that.echighereducation)
&& Objects.equals(
ecinternationalorganizationeurinterests,
that.ecinternationalorganizationeurinterests)
&& Objects.equals(ecinternationalorganization, that.ecinternationalorganization)
&& Objects.equals(ecenterprise, that.ecenterprise)
&& Objects.equals(ecsmevalidated, that.ecsmevalidated)
&& Objects.equals(ecnutscode, that.ecnutscode)
&& Objects.equals(country, that.country);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
legalshortname,
legalname,
alternativeNames,
websiteurl,
logourl,
eclegalbody,
eclegalperson,
ecnonprofit,
ecresearchorganization,
echighereducation,
ecinternationalorganizationeurinterests,
ecinternationalorganization,
ecenterprise,
ecsmevalidated,
ecnutscode,
country);
}
}

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.schema.oaf;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
public class OtherResearchProduct extends Result implements Serializable {
@ -12,6 +12,10 @@ public class OtherResearchProduct extends Result implements Serializable {
private List<Field<String>> tool;
public OtherResearchProduct() {
setResulttype(ModelConstants.ORP_DEFAULT_RESULTTYPE);
}
public List<Field<String>> getContactperson() {
return contactperson;
}
@ -51,20 +55,4 @@ public class OtherResearchProduct extends Result implements Serializable {
tool = mergeLists(tool, o.getTool());
mergeOAFDataInfo(e);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
OtherResearchProduct that = (OtherResearchProduct) o;
return Objects.equals(contactperson, that.contactperson)
&& Objects.equals(contactgroup, that.contactgroup)
&& Objects.equals(tool, that.tool);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), contactperson, contactgroup, tool);
}
}

View File

@ -2,7 +2,6 @@ package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
public class Project extends OafEntity implements Serializable {
@ -352,70 +351,4 @@ public class Project extends OafEntity implements Serializable {
: fundedamount;
mergeOAFDataInfo(e);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Project project = (Project) o;
return Objects.equals(websiteurl, project.websiteurl)
&& Objects.equals(code, project.code)
&& Objects.equals(acronym, project.acronym)
&& Objects.equals(title, project.title)
&& Objects.equals(startdate, project.startdate)
&& Objects.equals(enddate, project.enddate)
&& Objects.equals(callidentifier, project.callidentifier)
&& Objects.equals(keywords, project.keywords)
&& Objects.equals(duration, project.duration)
&& Objects.equals(ecsc39, project.ecsc39)
&& Objects.equals(oamandatepublications, project.oamandatepublications)
&& Objects.equals(ecarticle29_3, project.ecarticle29_3)
&& Objects.equals(subjects, project.subjects)
&& Objects.equals(fundingtree, project.fundingtree)
&& Objects.equals(contracttype, project.contracttype)
&& Objects.equals(optional1, project.optional1)
&& Objects.equals(optional2, project.optional2)
&& Objects.equals(jsonextrainfo, project.jsonextrainfo)
&& Objects.equals(contactfullname, project.contactfullname)
&& Objects.equals(contactfax, project.contactfax)
&& Objects.equals(contactphone, project.contactphone)
&& Objects.equals(contactemail, project.contactemail)
&& Objects.equals(summary, project.summary)
&& Objects.equals(currency, project.currency)
&& Objects.equals(totalcost, project.totalcost)
&& Objects.equals(fundedamount, project.fundedamount);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
websiteurl,
code,
acronym,
title,
startdate,
enddate,
callidentifier,
keywords,
duration,
ecsc39,
oamandatepublications,
ecarticle29_3,
subjects,
fundingtree,
contracttype,
optional1,
optional2,
jsonextrainfo,
contactfullname,
contactfax,
contactphone,
contactemail,
summary,
currency,
totalcost,
fundedamount);
}
}

View File

@ -1,13 +1,17 @@
package eu.dnetlib.dhp.schema.oaf;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import java.io.Serializable;
import java.util.Objects;
public class Publication extends Result implements Serializable {
// publication specific
private Journal journal;
public Publication() {
setResulttype(ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE);
}
public Journal getJournal() {
return journal;
}
@ -29,18 +33,4 @@ public class Publication extends Result implements Serializable {
if (p.getJournal() != null && compareTrust(this, e) < 0) journal = p.getJournal();
mergeOAFDataInfo(e);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Publication that = (Publication) o;
return Objects.equals(journal, that.journal);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), journal);
}
}

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
public class Result extends OafEntity implements Serializable {
@ -231,6 +230,9 @@ public class Result extends OafEntity implements Serializable {
instance = mergeLists(instance, r.getInstance());
if (r.getBestaccessright() != null && compareTrust(this, r) < 0)
bestaccessright = r.getBestaccessright();
if (r.getResulttype() != null && compareTrust(this, r) < 0) resulttype = r.getResulttype();
if (r.getLanguage() != null && compareTrust(this, r) < 0) language = r.getLanguage();
@ -286,60 +288,4 @@ public class Result extends OafEntity implements Serializable {
}
return a.size() > b.size() ? a : b;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Result result = (Result) o;
return Objects.equals(author, result.author)
&& Objects.equals(resulttype, result.resulttype)
&& Objects.equals(language, result.language)
&& Objects.equals(country, result.country)
&& Objects.equals(subject, result.subject)
&& Objects.equals(title, result.title)
&& Objects.equals(relevantdate, result.relevantdate)
&& Objects.equals(description, result.description)
&& Objects.equals(dateofacceptance, result.dateofacceptance)
&& Objects.equals(publisher, result.publisher)
&& Objects.equals(embargoenddate, result.embargoenddate)
&& Objects.equals(source, result.source)
&& Objects.equals(fulltext, result.fulltext)
&& Objects.equals(format, result.format)
&& Objects.equals(contributor, result.contributor)
&& Objects.equals(resourcetype, result.resourcetype)
&& Objects.equals(coverage, result.coverage)
&& Objects.equals(bestaccessright, result.bestaccessright)
&& Objects.equals(context, result.context)
&& Objects.equals(externalReference, result.externalReference)
&& Objects.equals(instance, result.instance);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
author,
resulttype,
language,
country,
subject,
title,
relevantdate,
description,
dateofacceptance,
publisher,
embargoenddate,
source,
fulltext,
format,
contributor,
resourcetype,
coverage,
bestaccessright,
context,
externalReference,
instance);
}
}

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.schema.oaf;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
public class Software extends Result implements Serializable {
@ -14,6 +14,10 @@ public class Software extends Result implements Serializable {
private Qualifier programmingLanguage;
public Software() {
setResulttype(ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE);
}
public List<Field<String>> getDocumentationUrl() {
return documentationUrl;
}
@ -71,26 +75,4 @@ public class Software extends Result implements Serializable {
mergeOAFDataInfo(e);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Software software = (Software) o;
return Objects.equals(documentationUrl, software.documentationUrl)
&& Objects.equals(license, software.license)
&& Objects.equals(codeRepositoryUrl, software.codeRepositoryUrl)
&& Objects.equals(programmingLanguage, software.programmingLanguage);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
documentationUrl,
license,
codeRepositoryUrl,
programmingLanguage);
}
}

View File

@ -166,10 +166,8 @@ public class PromoteActionPayloadForGraphTableJob {
actionPayloadClazz.getSimpleName(),
rowClazz.getSimpleName());
SerializableSupplier<Function<G, String>> rowIdFn =
PromoteActionPayloadForGraphTableJob::idFn;
SerializableSupplier<Function<A, String>> actionPayloadIdFn =
PromoteActionPayloadForGraphTableJob::idFn;
SerializableSupplier<Function<G, String>> rowIdFn = ModelSupport::idFn;
SerializableSupplier<Function<A, String>> actionPayloadIdFn = ModelSupport::idFn;
SerializableSupplier<BiFunction<G, A, G>> mergeRowWithActionPayloadAndGetFn =
MergeAndGet.functionFor(strategy);
SerializableSupplier<BiFunction<G, G, G>> mergeRowsAndGetFn =
@ -192,68 +190,6 @@ public class PromoteActionPayloadForGraphTableJob {
joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz);
}
private static <T extends Oaf> Function<T, String> idFn() {
return x -> {
if (isSubClass(x, Relation.class)) {
return idFnForRelation(x);
}
return idFnForOafEntity(x);
};
}
private static <T extends Oaf> String idFnForRelation(T t) {
Relation r = (Relation) t;
return Optional.ofNullable(r.getSource())
.map(
source ->
Optional.ofNullable(r.getTarget())
.map(
target ->
Optional.ofNullable(r.getRelType())
.map(
relType ->
Optional.ofNullable(
r
.getSubRelType())
.map(
subRelType ->
Optional
.ofNullable(
r
.getRelClass())
.map(
relClass ->
String
.join(
source,
target,
relType,
subRelType,
relClass))
.orElse(
String
.join(
source,
target,
relType,
subRelType)))
.orElse(
String
.join(
source,
target,
relType)))
.orElse(
String.join(
source, target)))
.orElse(source))
.orElse(null);
}
private static <T extends Oaf> String idFnForOafEntity(T t) {
return ((OafEntity) t).getId();
}
private static <T extends Oaf> SerializableSupplier<T> zeroFn(Class<T> clazz) {
switch (clazz.getCanonicalName()) {
case "eu.dnetlib.dhp.schema.oaf.Dataset":

View File

@ -25,6 +25,7 @@ public class DedupRecordFactory {
public static <T extends OafEntity> Dataset<T> createDedupRecord(
final SparkSession spark,
final DataInfo dataInfo,
final String mergeRelsInputPath,
final String entitiesInputPath,
final Class<T> clazz) {
@ -67,41 +68,39 @@ public class DedupRecordFactory {
Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Tuple2<String, T>, T>)
(key, values) -> entityMerger(key, values, ts, clazz),
(key, values) -> entityMerger(key, values, ts, dataInfo),
Encoders.bean(clazz));
}
private static <T extends OafEntity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, Class<T> clazz) {
try {
T entity = clazz.newInstance();
entity.setId(id);
entity.setDataInfo(new DataInfo());
entity.getDataInfo().setTrust("0.9");
entity.setLastupdatetimestamp(ts);
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo) {
final Collection<String> dates = Lists.newArrayList();
entities.forEachRemaining(
t -> {
T duplicate = t._2();
entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result r1 = (Result) duplicate;
Result er = (Result) entity;
er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor()));
T entity = entities.next()._2();
if (er.getDateofacceptance() != null) {
dates.add(r1.getDateofacceptance().getValue());
}
final Collection<String> dates = Lists.newArrayList();
entities.forEachRemaining(
t -> {
T duplicate = t._2();
entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result r1 = (Result) duplicate;
Result er = (Result) entity;
er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor()));
if (r1.getDateofacceptance() != null) {
dates.add(r1.getDateofacceptance().getValue());
}
});
}
});
if (ModelSupport.isSubClass(entity, Result.class)) {
((Result) entity).setDateofacceptance(DatePicker.pick(dates));
}
return entity;
} catch (IllegalAccessException | InstantiationException e) {
throw new RuntimeException(e);
if (ModelSupport.isSubClass(entity, Result.class)) {
((Result) entity).setDateofacceptance(DatePicker.pick(dates));
}
entity.setId(id);
entity.setLastupdatetimestamp(ts);
entity.setDataInfo(dataInfo);
return entity;
}
}

View File

@ -3,7 +3,9 @@ package eu.dnetlib.dhp.oa.dedup;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -21,6 +23,10 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
public static final String ROOT_TRUST = "0.8";
public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup";
public static final String PROVENANCE_ACTIONS = "dnet:provenanceActions";
public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
@ -67,13 +73,30 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
Class<OafEntity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz)
final Class<OafEntity> clazz =
ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
final DataInfo dataInfo = getDataInfo(dedupConf);
DedupRecordFactory.createDedupRecord(spark, dataInfo, mergeRelPath, entityPath, clazz)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
}
private static DataInfo getDataInfo(DedupConfig dedupConf) {
DataInfo info = new DataInfo();
info.setDeletedbyinference(false);
info.setInferred(true);
info.setInvisible(false);
info.setTrust(ROOT_TRUST);
info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
Qualifier provenance = new Qualifier();
provenance.setClassid(PROVENANCE_ACTION_CLASS);
provenance.setClassname(PROVENANCE_ACTION_CLASS);
provenance.setSchemeid(PROVENANCE_ACTIONS);
provenance.setSchemename(PROVENANCE_ACTIONS);
info.setProvenanceaction(provenance);
return info;
}
}

View File

@ -25,7 +25,7 @@ public class ConnectedComponent implements Serializable {
if (docIds.size() > 1) {
final String s = getMin();
String prefix = s.split("\\|")[0];
ccId = prefix + "|dedup_______::" + DedupUtility.md5(s);
ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s);
return ccId;
} else {
return docIds.iterator().next();

View File

@ -57,7 +57,6 @@ public class SparkDedupTest implements Serializable {
.toURI())
.toFile()
.getAbsolutePath();
testOutputBasePath =
createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath()
@ -110,6 +109,22 @@ public class SparkDedupTest implements Serializable {
IOUtils.toString(
SparkDedupTest.class.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset")))
.thenReturn(
IOUtils.toString(
SparkDedupTest.class.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json")));
lenient()
.when(
isLookUpService.getResourceProfileByQuery(
Mockito.contains("otherresearchproduct")))
.thenReturn(
IOUtils.toString(
SparkDedupTest.class.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json")));
}
@Test
@ -144,9 +159,25 @@ public class SparkDedupTest implements Serializable {
.load(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
.count();
long ds_simrel =
spark.read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
.count();
long orp_simrel =
spark.read()
.load(
testOutputBasePath
+ "/"
+ testActionSetId
+ "/otherresearchproduct_simrel")
.count();
assertEquals(3432, orgs_simrel);
assertEquals(7260, pubs_simrel);
assertEquals(344, sw_simrel);
assertEquals(458, ds_simrel);
assertEquals(6740, orp_simrel);
}
@Test
@ -181,9 +212,25 @@ public class SparkDedupTest implements Serializable {
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
.count();
long ds_mergerel =
spark.read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
.count();
long orp_mergerel =
spark.read()
.load(
testOutputBasePath
+ "/"
+ testActionSetId
+ "/otherresearchproduct_mergerel")
.count();
assertEquals(1276, orgs_mergerel);
assertEquals(1460, pubs_mergerel);
assertEquals(288, sw_mergerel);
assertEquals(472, ds_mergerel);
assertEquals(714, orp_mergerel);
}
@Test
@ -222,10 +269,22 @@ public class SparkDedupTest implements Serializable {
long sw_deduprecord =
jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord")
.count();
long ds_deduprecord =
jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord")
.count();
long orp_deduprecord =
jsc.textFile(
testOutputBasePath
+ "/"
+ testActionSetId
+ "/otherresearchproduct_deduprecord")
.count();
assertEquals(82, orgs_deduprecord);
assertEquals(66, pubs_deduprecord);
assertEquals(51, sw_deduprecord);
assertEquals(96, ds_deduprecord);
assertEquals(89, orp_deduprecord);
}
@Test
@ -251,6 +310,9 @@ public class SparkDedupTest implements Serializable {
long projects = jsc.textFile(testDedupGraphBasePath + "/project").count();
long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count();
long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count();
long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count();
long otherresearchproduct =
jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count();
long mergedOrgs =
spark.read()
@ -282,11 +344,37 @@ public class SparkDedupTest implements Serializable {
.distinct()
.count();
long mergedDs =
spark.read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
.as(Encoders.bean(Relation.class))
.where("relClass=='merges'")
.javaRDD()
.map(Relation::getTarget)
.distinct()
.count();
long mergedOrp =
spark.read()
.load(
testOutputBasePath
+ "/"
+ testActionSetId
+ "/otherresearchproduct_mergerel")
.as(Encoders.bean(Relation.class))
.where("relClass=='merges'")
.javaRDD()
.map(Relation::getTarget)
.distinct()
.count();
assertEquals(897, publications);
assertEquals(835, organizations);
assertEquals(100, projects);
assertEquals(100, datasource);
assertEquals(200, softwares);
assertEquals(388, dataset);
assertEquals(517, otherresearchproduct);
long deletedOrgs =
jsc.textFile(testDedupGraphBasePath + "/organization")
@ -303,9 +391,21 @@ public class SparkDedupTest implements Serializable {
.filter(this::isDeletedByInference)
.count();
long deletedDs =
jsc.textFile(testDedupGraphBasePath + "/dataset")
.filter(this::isDeletedByInference)
.count();
long deletedOrp =
jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct")
.filter(this::isDeletedByInference)
.count();
assertEquals(mergedOrgs, deletedOrgs);
assertEquals(mergedPubs, deletedPubs);
assertEquals(mergedSw, deletedSw);
assertEquals(mergedDs, deletedDs);
assertEquals(mergedOrp, deletedOrp);
}
@Test

View File

@ -11,7 +11,9 @@
"maxChildren" : "100",
"slidingWindowSize" : "200",
"rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true"
"includeChildren" : "true",
"idPath" : "$.id",
"maxIterations" : 20
},
"pace" : {
"clustering" : [
@ -70,7 +72,8 @@
"field": "title",
"comparator": "levensteinTitle",
"weight": 1.0,
"countIfUndefined": "true"
"countIfUndefined": "true",
"params": {}
}
],
"threshold": 0.99,
@ -85,7 +88,7 @@
{
"name" : "doi",
"type" : "String",
"path" : "$.pid[@.qualifier.classid = 'doi'].value"
"path" : "$.pid[?(@.qualifier.classid == 'doi')].value"
},
{
"name" : "pid",
@ -96,7 +99,7 @@
{
"name" : "title",
"type" : "String",
"path" : "$.title[@.qualifier.classid = 'main title'].value",
"path" : "$.title[?(@.qualifier.classid == 'main title')].value",
"length" : 250,
"size" : 5
},

View File

@ -11,7 +11,9 @@
"maxChildren" : "100",
"slidingWindowSize" : "200",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true"
"includeChildren" : "true",
"idPath" : "$.id",
"maxIterations" : 20
},
"pace" : {
"clustering" : [
@ -70,7 +72,8 @@
"field": "title",
"comparator": "levensteinTitle",
"weight": 1.0,
"countIfUndefined": "true"
"countIfUndefined": "true",
"params": {}
}
],
"threshold": 0.99,
@ -85,7 +88,7 @@
{
"name" : "doi",
"type" : "String",
"path" : "$.pid[@.qualifier.classid = 'doi'}].value"
"path" : "$.pid[?(@.qualifier.classid == 'doi')].value"
},
{
"name" : "pid",
@ -96,7 +99,7 @@
{
"name" : "title",
"type" : "String",
"path" : "$.title[@.qualifier.classid = 'main title'].value",
"path" : "$.title[?(@.qualifier.classid == 'main title')].value",
"length" : 250,
"size" : 5
},

View File

@ -11,7 +11,9 @@
"maxChildren" : "100",
"slidingWindowSize" : "200",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true"
"includeChildren" : "true",
"idPath" : "$.id",
"maxIterations" : 20
},
"pace" : {
"clustering" : [

File diff suppressed because one or more lines are too long

View File

@ -15,6 +15,8 @@
<SCAN id="organization"/>
<SCAN id="publication"/>
<SCAN id="software"/>
<SCAN id="dataset"/>
<SCAN id="otherresearchproduct"/>
</SCAN_SEQUENCE>
</DEDUPLICATION>
</CONFIGURATION>

View File

@ -25,7 +25,7 @@ public class ConnectedComponent implements Serializable {
if (docIds.size() > 1) {
final String s = getMin();
String prefix = s.split("\\|")[0];
ccId = prefix + "|dedup_______::" + DedupUtility.md5(s);
ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s);
return ccId;
} else {
return docIds.iterator().next();

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.oa.graph;
package eu.dnetlib.dhp.oa.graph.hive;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
@ -19,6 +19,8 @@ public class GraphHiveImporterJob {
private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser =
@ -37,12 +39,12 @@ public class GraphHiveImporterJob {
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
String hiveDbName = parser.get("hiveDbName");
log.info("hiveDbName: {}", hiveDbName);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris);
@ -58,13 +60,13 @@ public class GraphHiveImporterJob {
spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName));
spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName));
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
// Read the input file and convert it into RDD of serializable object
ModelSupport.oafTypes.forEach(
(name, clazz) ->
spark.createDataset(
sc.textFile(inputPath + "/" + name)
.map(s -> new ObjectMapper().readValue(s, clazz))
.map(s -> OBJECT_MAPPER.readValue(s, clazz))
.rdd(),
Encoders.bean(clazz))
.write()

View File

@ -10,7 +10,9 @@ import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.oaiIProvenance;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import eu.dnetlib.dhp.oa.graph.raw.common.MigrationConstants;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Field;
@ -46,25 +48,6 @@ public abstract class AbstractMdRecordToOafMapper {
protected static final Qualifier MAIN_TITLE_QUALIFIER =
qualifier("main title", "main title", "dnet:dataCite_title", "dnet:dataCite_title");
protected static final Qualifier PUBLICATION_RESULTTYPE_QUALIFIER =
qualifier(
"publication",
"publication",
"dnet:result_typologies",
"dnet:result_typologies");
protected static final Qualifier DATASET_RESULTTYPE_QUALIFIER =
qualifier("dataset", "dataset", "dnet:result_typologies", "dnet:result_typologies");
protected static final Qualifier SOFTWARE_RESULTTYPE_QUALIFIER =
qualifier("software", "software", "dnet:result_typologies", "dnet:result_typologies");
protected static final Qualifier OTHER_RESULTTYPE_QUALIFIER =
qualifier("other", "other", "dnet:result_typologies", "dnet:result_typologies");
protected static final Qualifier REPOSITORY_QUALIFIER =
qualifier(
"sysimport:crosswalk:repository",
"sysimport:crosswalk:repository",
"dnet:provenanceActions",
"dnet:provenanceActions");
protected AbstractMdRecordToOafMapper(final Map<String, String> code2name) {
this.code2name = code2name;
}
@ -123,14 +106,14 @@ public abstract class AbstractMdRecordToOafMapper {
case "publication":
final Publication p = new Publication();
populateResultFields(p, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
p.setResulttype(PUBLICATION_RESULTTYPE_QUALIFIER);
p.setResulttype(MigrationConstants.PUBLICATION_RESULTTYPE_QUALIFIER);
p.setJournal(prepareJournal(doc, info));
oafs.add(p);
break;
case "dataset":
final Dataset d = new Dataset();
populateResultFields(d, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
d.setResulttype(DATASET_RESULTTYPE_QUALIFIER);
d.setResulttype(MigrationConstants.DATASET_RESULTTYPE_QUALIFIER);
d.setStoragedate(prepareDatasetStorageDate(doc, info));
d.setDevice(prepareDatasetDevice(doc, info));
d.setSize(prepareDatasetSize(doc, info));
@ -143,7 +126,7 @@ public abstract class AbstractMdRecordToOafMapper {
case "software":
final Software s = new Software();
populateResultFields(s, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
s.setResulttype(SOFTWARE_RESULTTYPE_QUALIFIER);
s.setResulttype(MigrationConstants.SOFTWARE_RESULTTYPE_QUALIFIER);
s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info));
s.setLicense(prepareSoftwareLicenses(doc, info));
s.setCodeRepositoryUrl(prepareSoftwareCodeRepositoryUrl(doc, info));
@ -154,7 +137,7 @@ public abstract class AbstractMdRecordToOafMapper {
default:
final OtherResearchProduct o = new OtherResearchProduct();
populateResultFields(o, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
o.setResulttype(OTHER_RESULTTYPE_QUALIFIER);
o.setResulttype(MigrationConstants.OTHER_RESULTTYPE_QUALIFIER);
o.setContactperson(prepareOtherResearchProductContactPersons(doc, info));
o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info));
o.setTool(prepareOtherResearchProductTools(doc, info));
@ -255,11 +238,25 @@ public abstract class AbstractMdRecordToOafMapper {
r.setContributor(prepareContributors(doc, info));
r.setResourcetype(prepareResourceType(doc, info));
r.setCoverage(prepareCoverages(doc, info));
r.setContext(new ArrayList<>()); // NOT PRESENT IN MDSTORES
r.setContext(prepareContexts(doc, info));
r.setExternalReference(new ArrayList<>()); // NOT PRESENT IN MDSTORES
r.setInstance(prepareInstances(doc, info, collectedFrom, hostedBy));
}
private List<Context> prepareContexts(final Document doc, final DataInfo info) {
final List<Context> list = new ArrayList<>();
for (final Object o : doc.selectNodes("//oaf:concept")) {
final String cid = ((Node) o).valueOf("@id");
if (StringUtils.isNotBlank(cid)) {
final Context c = new Context();
c.setId(cid);
c.setDataInfo(Arrays.asList(info));
list.add(c);
}
}
return list;
}
protected abstract Qualifier prepareResourceType(Document doc, DataInfo info);
protected abstract List<Instance> prepareInstances(
@ -433,7 +430,13 @@ public abstract class AbstractMdRecordToOafMapper {
final Node n = doc.selectSingleNode("//oaf:datainfo");
if (n == null) {
return dataInfo(false, null, false, false, REPOSITORY_QUALIFIER, "0.9");
return dataInfo(
false,
null,
false,
false,
MigrationConstants.REPOSITORY_PROVENANCE_ACTIONS,
"0.9");
}
final String paClassId = n.valueOf("./oaf:provenanceaction/@classid");

View File

@ -2,16 +2,18 @@ package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -20,8 +22,6 @@ public class DispatchEntitiesApplication {
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesApplication.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser =
new ArgumentApplicationParser(
@ -45,15 +45,9 @@ public class DispatchEntitiesApplication {
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, targetPath);
processEntity(spark, Publication.class, sourcePath, targetPath);
processEntity(spark, Dataset.class, sourcePath, targetPath);
processEntity(spark, Software.class, sourcePath, targetPath);
processEntity(spark, OtherResearchProduct.class, sourcePath, targetPath);
processEntity(spark, Datasource.class, sourcePath, targetPath);
processEntity(spark, Organization.class, sourcePath, targetPath);
processEntity(spark, Project.class, sourcePath, targetPath);
processEntity(spark, Relation.class, sourcePath, targetPath);
ModelSupport.oafTypes
.values()
.forEach(clazz -> processEntity(spark, clazz, sourcePath, targetPath));
});
}
@ -64,26 +58,18 @@ public class DispatchEntitiesApplication {
final String targetPath) {
final String type = clazz.getSimpleName().toLowerCase();
log.info(String.format("Processing entities (%s) in file: %s", type, sourcePath));
log.info("Processing entities ({}) in file: {}", type, sourcePath);
/*
spark.read()
.textFile(sourcePath)
.filter((FilterFunction<String>) value -> isEntityType(value, type))
.map((MapFunction<String, String>) value -> StringUtils.substringAfter(value, "|"), Encoders.STRING())
.map((MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.parquet(targetPath + "/" + type);
*/
JavaSparkContext.fromSparkContext(spark.sparkContext())
.textFile(sourcePath)
.filter(l -> isEntityType(l, type))
.map(l -> StringUtils.substringAfter(l, "|"))
.saveAsTextFile(
targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ???
.filter((FilterFunction<String>) value -> isEntityType(value, type))
.map(
(MapFunction<String, String>) l -> StringUtils.substringAfter(l, "|"),
Encoders.STRING())
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.text(targetPath + "/" + type);
}
private static boolean isEntityType(final String line, final String type) {

View File

@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.DbClient;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import java.io.IOException;
import java.sql.SQLException;
@ -29,6 +30,8 @@ public class GenerateEntitiesApplication {
private static final Logger log = LoggerFactory.getLogger(GenerateEntitiesApplication.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser =
new ArgumentApplicationParser(
@ -78,7 +81,7 @@ public class GenerateEntitiesApplication {
log.info("Generate entities from files:");
existingSourcePaths.forEach(log::info);
JavaRDD<String> inputRdd = sc.emptyRDD();
JavaRDD<Oaf> inputRdd = sc.emptyRDD();
for (final String sp : existingSourcePaths) {
inputRdd =
@ -86,15 +89,29 @@ public class GenerateEntitiesApplication {
sc.sequenceFile(sp, Text.class, Text.class)
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
.map(k -> convertToListOaf(k._1(), k._2(), code2name))
.flatMap(list -> list.iterator())
.map(
oaf ->
oaf.getClass().getSimpleName().toLowerCase()
+ "|"
+ convertToJson(oaf)));
.flatMap(list -> list.iterator()));
}
inputRdd.saveAsTextFile(targetPath, GzipCodec.class);
inputRdd.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
.reduceByKey((o1, o2) -> merge(o1, o2))
.map(Tuple2::_2)
.map(
oaf ->
oaf.getClass().getSimpleName().toLowerCase()
+ "|"
+ OBJECT_MAPPER.writeValueAsString(oaf))
.saveAsTextFile(targetPath, GzipCodec.class);
}
private static Oaf merge(Oaf o1, Oaf o2) {
if (ModelSupport.isSubClass(o1, OafEntity.class)) {
((OafEntity) o1).mergeFrom((OafEntity) o2);
} else if (ModelSupport.isSubClass(o1, Relation.class)) {
((Relation) o1).mergeFrom((Relation) o2);
} else {
throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName());
}
return o1;
}
private static List<Oaf> convertToListOaf(
@ -120,9 +137,10 @@ public class GenerateEntitiesApplication {
return Arrays.asList(convertFromJson(s, Dataset.class));
case "software":
return Arrays.asList(convertFromJson(s, Software.class));
case "otherresearchproducts":
default:
case "otherresearchproduct":
return Arrays.asList(convertFromJson(s, OtherResearchProduct.class));
default:
throw new RuntimeException("type not managed: " + type.toLowerCase());
}
}
@ -150,17 +168,9 @@ public class GenerateEntitiesApplication {
return map;
}
private static String convertToJson(final Oaf oaf) {
try {
return new ObjectMapper().writeValueAsString(oaf);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
private static Oaf convertFromJson(final String s, final Class<? extends Oaf> clazz) {
try {
return new ObjectMapper().readValue(s, clazz);
return OBJECT_MAPPER.readValue(s, clazz);
} catch (final Exception e) {
log.error("Error parsing object of class: " + clazz);
log.error(s);

View File

@ -1,7 +1,6 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
@ -10,7 +9,6 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
@ -83,7 +81,9 @@ public class MergeClaimsApplication {
readFromPath(spark, rawPath, clazz)
.map(
(MapFunction<T, Tuple2<String, T>>)
value -> new Tuple2<>(idFn().apply(value), value),
value ->
new Tuple2<>(
ModelSupport.idFn().apply(value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -92,14 +92,11 @@ public class MergeClaimsApplication {
.getValue()
.map(
(MapFunction<T, Tuple2<String, T>>)
value -> new Tuple2<>(idFn().apply(value), value),
value ->
new Tuple2<>(
ModelSupport.idFn().apply(value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
/*
Dataset<Tuple2<String, T>> claim = readFromPath(spark, claimPath, clazz)
.map((MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
*/
raw.joinWith(claim, raw.col("_1").equalTo(claim.col("_1")), "full_outer")
.map(
(MapFunction<Tuple2<Tuple2<String, T>, Tuple2<String, T>>, T>)
@ -131,78 +128,12 @@ public class MergeClaimsApplication {
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
Encoders.bean(clazz))
.filter((FilterFunction<T>) value -> Objects.nonNull(idFn().apply(value)));
/*
return spark.read()
.load(path)
.as(Encoders.bean(clazz))
.filter((FilterFunction<T>) value -> Objects.nonNull(idFn().apply(value)));
*/
.filter(
(FilterFunction<T>)
value -> Objects.nonNull(ModelSupport.idFn().apply(value)));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static <T extends Oaf> Function<T, String> idFn() {
return x -> {
if (isSubClass(x, Relation.class)) {
return idFnForRelation(x);
}
return idFnForOafEntity(x);
};
}
private static <T extends Oaf> String idFnForRelation(T t) {
Relation r = (Relation) t;
return Optional.ofNullable(r.getSource())
.map(
source ->
Optional.ofNullable(r.getTarget())
.map(
target ->
Optional.ofNullable(r.getRelType())
.map(
relType ->
Optional.ofNullable(
r
.getSubRelType())
.map(
subRelType ->
Optional
.ofNullable(
r
.getRelClass())
.map(
relClass ->
String
.join(
source,
target,
relType,
subRelType,
relClass))
.orElse(
String
.join(
source,
target,
relType,
subRelType)))
.orElse(
String
.join(
source,
target,
relType)))
.orElse(
String.join(
source, target)))
.orElse(source))
.orElse(null);
}
private static <T extends Oaf> String idFnForOafEntity(T t) {
return ((OafEntity) t).getId();
}
}

View File

@ -13,6 +13,7 @@ import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProper
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.DbClient;
import eu.dnetlib.dhp.oa.graph.raw.common.MigrationConstants;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
@ -49,13 +50,6 @@ import org.apache.commons.logging.LogFactory;
public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
implements Closeable {
private static final Qualifier ENTITYREGISTRY_PROVENANCE_ACTION =
qualifier(
"sysimport:crosswalk:entityregistry",
"sysimport:crosswalk:entityregistry",
"dnet:provenance_actions",
"dnet:provenance_actions");
private static final Log log = LogFactory.getLog(MigrateDbEntitiesApplication.class);
private final DbClient dbClient;
@ -402,12 +396,16 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
if (rs.getString("target_type").equals("dataset")) {
r = new Dataset();
r.setResulttype(MigrationConstants.DATASET_RESULTTYPE_QUALIFIER);
} else if (rs.getString("target_type").equals("software")) {
r = new Software();
r.setResulttype(MigrationConstants.SOFTWARE_RESULTTYPE_QUALIFIER);
} else if (rs.getString("target_type").equals("other")) {
r = new OtherResearchProduct();
r.setResulttype(MigrationConstants.OTHER_RESULTTYPE_QUALIFIER);
} else {
r = new Publication();
r.setResulttype(MigrationConstants.PUBLICATION_RESULTTYPE_QUALIFIER);
}
r.setId(createOpenaireId(50, rs.getString("target_id"), false));
r.setLastupdatetimestamp(lastUpdateTimestamp);
@ -484,7 +482,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
inferenceprovenance,
inferred,
false,
ENTITYREGISTRY_PROVENANCE_ACTION,
MigrationConstants.ENTITYREGISTRY_PROVENANCE_ACTION,
trust);
}

View File

@ -1,8 +1,19 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.GeoLocation;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -62,33 +73,44 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
final DataInfo info,
final KeyValue collectedfrom,
final KeyValue hostedby) {
final List<Instance> res = new ArrayList<>();
final Instance instance = new Instance();
instance.setUrl(new ArrayList<>());
instance.setInstancetype(
prepareQualifier(
doc,
"//dr:CobjCategory",
"dnet:publication_resource",
"dnet:publication_resource"));
instance.setCollectedfrom(collectedfrom);
instance.setHostedby(hostedby);
instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info));
instance.setDistributionlocation(doc.valueOf("//oaf:distributionlocation"));
instance.setAccessright(
prepareQualifier(
doc, "//oaf:accessrights", "dnet:access_modes", "dnet:access_modes"));
instance.setLicense(field(doc.valueOf("//oaf:license"), info));
instance.setRefereed(field(doc.valueOf("//oaf:refereed"), info));
instance.setProcessingchargeamount(
field(doc.valueOf("//oaf:processingchargeamount"), info));
instance.setProcessingchargecurrency(
field(doc.valueOf("//oaf:processingchargeamount/@currency"), info));
for (final Object o :
doc.selectNodes("//datacite:alternateIdentifier[@alternateIdentifierType='URL']")) {
final Instance instance = new Instance();
instance.setUrl(Arrays.asList(((Node) o).getText().trim()));
instance.setInstancetype(
prepareQualifier(
doc,
"//dr:CobjCategory",
"dnet:publication_resource",
"dnet:publication_resource"));
instance.setCollectedfrom(collectedfrom);
instance.setHostedby(hostedby);
instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info));
instance.setDistributionlocation(doc.valueOf("//oaf:distributionlocation"));
instance.setAccessright(
prepareQualifier(
doc, "//oaf:accessrights", "dnet:access_modes", "dnet:access_modes"));
instance.setLicense(field(doc.valueOf("//oaf:license"), info));
instance.setRefereed(field(doc.valueOf("//oaf:refereed"), info));
instance.setProcessingchargeamount(
field(doc.valueOf("//oaf:processingchargeamount"), info));
instance.setProcessingchargecurrency(
field(doc.valueOf("//oaf:processingchargeamount/@currency"), info));
res.add(instance);
instance.getUrl().add(((Node) o).getText().trim());
}
return res;
for (final Object o : doc.selectNodes("//datacite:identifier[@identifierType='URL']")) {
instance.getUrl().add(((Node) o).getText().trim());
}
for (final Object o :
doc.selectNodes("//datacite:alternateIdentifier[@alternateIdentifierType='DOI']")) {
instance.getUrl().add("http://dx.doi.org/" + ((Node) o).getText().trim());
}
for (final Object o : doc.selectNodes("//datacite:identifier[@identifierType='DOI']")) {
instance.getUrl().add("http://dx.doi.org/" + ((Node) o).getText().trim());
}
return Arrays.asList(instance);
}
@Override

View File

@ -0,0 +1,33 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class MigrationConstants {
public static final Qualifier PUBLICATION_RESULTTYPE_QUALIFIER =
qualifier(
"publication",
"publication",
"dnet:result_typologies",
"dnet:result_typologies");
public static final Qualifier DATASET_RESULTTYPE_QUALIFIER =
qualifier("dataset", "dataset", "dnet:result_typologies", "dnet:result_typologies");
public static final Qualifier SOFTWARE_RESULTTYPE_QUALIFIER =
qualifier("software", "software", "dnet:result_typologies", "dnet:result_typologies");
public static final Qualifier OTHER_RESULTTYPE_QUALIFIER =
qualifier("other", "other", "dnet:result_typologies", "dnet:result_typologies");
public static final Qualifier REPOSITORY_PROVENANCE_ACTIONS =
qualifier(
"sysimport:crosswalk:repository",
"sysimport:crosswalk:repository",
"dnet:provenanceActions",
"dnet:provenanceActions");
public static final Qualifier ENTITYREGISTRY_PROVENANCE_ACTION =
qualifier(
"sysimport:crosswalk:entityregistry",
"sysimport:crosswalk:entityregistry",
"dnet:provenanceActions",
"dnet:provenanceActions");
}

View File

@ -12,19 +12,15 @@
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hive_jdbc_url</name>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hive_db_name</name>
<name>hiveDbName</name>
<value>openaire</value>
</property>
</configuration>

View File

@ -1,10 +1,10 @@
DROP VIEW IF EXISTS ${hive_db_name}.result;
DROP VIEW IF EXISTS ${hiveDbName}.result;
CREATE VIEW IF NOT EXISTS result as
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.publication p
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.publication p
union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.dataset d
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.dataset d
union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.software s
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.software s
union all
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.otherresearchproduct o;
select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.otherresearchproduct o;

View File

@ -2,13 +2,21 @@
<parameters>
<property>
<name>sourcePath</name>
<name>inputPath</name>
<description>the source path</description>
</property>
<property>
<name>hive_db_name</name>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -75,7 +83,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>MapGraphAsHiveDB</name>
<class>eu.dnetlib.dhp.oa.graph.GraphHiveImporterJob</class>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
@ -87,9 +95,9 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_db_name</arg><arg>${hive_db_name}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--inputPath</arg><arg>${inputPath}</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="PostProcessing"/>
<error to="Kill"/>
@ -102,12 +110,12 @@
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>${hive_metastore_uris}</value>
<value>${hiveMetastoreUris}</value>
</property>
</configuration>
<jdbc-url>${hive_jdbc_url}/${hive_db_name}</jdbc-url>
<jdbc-url>${hiveJdbcUrl}/${hiveDbName}</jdbc-url>
<script>lib/scripts/postprocessing.sql</script>
<param>hive_db_name=${hive_db_name}</param>
<param>hiveDbName=${hiveDbName}</param>
</hive2>
<ok to="End"/>
<error to="Kill"/>

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.oa.graph;
import eu.dnetlib.dhp.oa.graph.hive.GraphHiveImporterJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import java.io.IOException;
import java.nio.file.Files;

View File

@ -54,6 +54,7 @@ public class MappersTest {
assertTrue(p.getSubject().size() > 0);
assertTrue(StringUtils.isNotBlank(p.getJournal().getIssnOnline()));
assertTrue(StringUtils.isNotBlank(p.getJournal().getName()));
assertTrue(p.getInstance().size() > 0);
assertValidId(r1.getSource());
assertValidId(r1.getTarget());
@ -96,6 +97,9 @@ public class MappersTest {
assertTrue(StringUtils.isNotBlank(d.getTitle().get(0).getValue()));
assertTrue(d.getAuthor().size() > 0);
assertTrue(d.getSubject().size() > 0);
assertTrue(d.getInstance().size() > 0);
assertTrue(d.getContext().size() > 0);
assertTrue(d.getContext().get(0).getId().length() > 0);
assertValidId(r1.getSource());
assertValidId(r1.getTarget());
@ -129,6 +133,7 @@ public class MappersTest {
assertTrue(StringUtils.isNotBlank(s.getTitle().get(0).getValue()));
assertTrue(s.getAuthor().size() > 0);
assertTrue(s.getSubject().size() > 0);
assertTrue(s.getInstance().size() > 0);
}
private void assertValidId(final String id) {

View File

@ -216,6 +216,7 @@ public class CreateRelatedEntitiesJob_phase2 {
(MapFunction<String, E>)
value -> OBJECT_MAPPER.readValue(value, entityClazz),
Encoders.bean(entityClazz))
.filter("dataInfo.invisible == false")
.map(
(MapFunction<E, TypedRow>)
value ->

View File

@ -292,6 +292,12 @@
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-actionmanager-common</artifactId>
<version>6.0.5</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
@ -307,7 +313,7 @@
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-pace-core</artifactId>
<version>4.0.0</version>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>