Compare commits
23 Commits
50b21c5d82
...
3e1b444e13
Author | SHA1 | Date |
---|---|---|
Miriam Baglioni | 3e1b444e13 | |
Miriam Baglioni | 84466d4b39 | |
Claudio Atzori | cf7d9a32ab | |
Claudio Atzori | 5f512f510e | |
Claudio Atzori | b95672b420 | |
Claudio Atzori | 9e8849b753 | |
Claudio Atzori | 4a3b173ca2 | |
Giambattista Bloisi | 5ee8881646 | |
Miriam Baglioni | fb1f0f8850 | |
Giambattista Bloisi | 5b4d821bf9 | |
Giambattista Bloisi | 03c262ccb9 | |
Claudio Atzori | 07f267bb10 | |
Claudio Atzori | 8088943399 | |
Claudio Atzori | 6c5df761e2 | |
Claudio Atzori | 9f7a606ddd | |
Miriam Baglioni | 250f101779 | |
Miriam Baglioni | f1ea9da5bc | |
Miriam Baglioni | b0283fe94c | |
Giambattista Bloisi | f31f22801f | |
Miriam Baglioni | 6fd9ec8566 | |
Giambattista Bloisi | 8f5171557e | |
Claudio Atzori | f7bb53fe78 | |
Claudio Atzori | 973aa7dca6 |
|
@ -28,3 +28,4 @@ spark-warehouse
|
|||
/**/.scalafmt.conf
|
||||
/.java-version
|
||||
/dhp-shade-package/dependency-reduced-pom.xml
|
||||
/**/job.properties
|
||||
|
|
|
@ -2,8 +2,7 @@
|
|||
package eu.dnetlib.dhp.oa.merge;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import static org.apache.spark.sql.functions.col;
|
||||
import static org.apache.spark.sql.functions.when;
|
||||
import static org.apache.spark.sql.functions.*;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -135,7 +134,9 @@ public class GroupEntitiesSparkJob {
|
|||
.applyCoarVocabularies(entity, vocs),
|
||||
OAFENTITY_KRYO_ENC)
|
||||
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, OafEntity, OafEntity>) MergeUtils::mergeById, OAFENTITY_KRYO_ENC)
|
||||
.mapGroups(
|
||||
(MapGroupsFunction<String, OafEntity, OafEntity>) (key, group) -> MergeUtils.mergeById(group, vocs),
|
||||
OAFENTITY_KRYO_ENC)
|
||||
.map(
|
||||
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
|
||||
t.getClass().getName(), t),
|
||||
|
|
|
@ -6,14 +6,14 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
|
|||
|
||||
public class HashableKeyValue extends KeyValue {
|
||||
|
||||
public static HashableKeyValue newInstance(String key, String value) {
|
||||
public static HashableKeyValue newInstance(String key, String value, DataInfo dataInfo) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
final HashableKeyValue kv = new HashableKeyValue();
|
||||
kv.setValue(value);
|
||||
kv.setKey(key);
|
||||
|
||||
kv.setDataInfo(dataInfo);
|
||||
return kv;
|
||||
}
|
||||
|
||||
|
@ -21,7 +21,7 @@ public class HashableKeyValue extends KeyValue {
|
|||
HashableKeyValue hkv = new HashableKeyValue();
|
||||
hkv.setKey(kv.getKey());
|
||||
hkv.setValue(kv.getValue());
|
||||
|
||||
hkv.setDataInfo(kv.getDataInfo());
|
||||
return hkv;
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,7 @@ public class HashableKeyValue extends KeyValue {
|
|||
KeyValue kv = new KeyValue();
|
||||
kv.setKey(hkv.getKey());
|
||||
kv.setValue(hkv.getValue());
|
||||
|
||||
kv.setDataInfo(hkv.getDataInfo());
|
||||
return kv;
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
package eu.dnetlib.dhp.schema.oaf.utils;
|
||||
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.OPENAIRE_META_RESOURCE_TYPE;
|
||||
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.getProvenance;
|
||||
|
||||
import java.net.MalformedURLException;
|
||||
|
@ -696,6 +695,7 @@ public class GraphCleaningFunctions extends CleaningFunctions {
|
|||
}
|
||||
}
|
||||
|
||||
// set ORCID_PENDING to all orcid values that are not coming from ORCID provenance
|
||||
for (Author a : r.getAuthor()) {
|
||||
if (Objects.isNull(a.getPid())) {
|
||||
a.setPid(Lists.newArrayList());
|
||||
|
@ -752,6 +752,40 @@ public class GraphCleaningFunctions extends CleaningFunctions {
|
|||
.collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
|
||||
// Identify clashing ORCIDS:that is same ORCID associated to multiple authors in this result
|
||||
Map<String, Integer> clashing_orcid = new HashMap<>();
|
||||
|
||||
for (Author a : r.getAuthor()) {
|
||||
a
|
||||
.getPid()
|
||||
.stream()
|
||||
.filter(
|
||||
p -> StringUtils
|
||||
.contains(StringUtils.lowerCase(p.getQualifier().getClassid()), ORCID_PENDING))
|
||||
.map(StructuredProperty::getValue)
|
||||
.distinct()
|
||||
.forEach(orcid -> clashing_orcid.compute(orcid, (k, v) -> (v == null) ? 1 : v + 1));
|
||||
}
|
||||
|
||||
Set<String> clashing = clashing_orcid
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(ee -> ee.getValue() > 1)
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
// filter out clashing orcids
|
||||
for (Author a : r.getAuthor()) {
|
||||
a
|
||||
.setPid(
|
||||
a
|
||||
.getPid()
|
||||
.stream()
|
||||
.filter(p -> !clashing.contains(p.getValue()))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
}
|
||||
if (value instanceof Publication) {
|
||||
|
||||
|
|
|
@ -23,24 +23,30 @@ import org.apache.commons.lang3.tuple.Pair;
|
|||
import com.github.sisyphsu.dateparser.DateParserUtils;
|
||||
import com.google.common.base.Joiner;
|
||||
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
|
||||
import eu.dnetlib.dhp.schema.common.AccessRightComparator;
|
||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
|
||||
public class MergeUtils {
|
||||
|
||||
public static <T extends Oaf> T mergeById(String s, Iterator<T> oafEntityIterator) {
|
||||
return mergeGroup(s, oafEntityIterator, true);
|
||||
public static <T extends Oaf> T mergeById(Iterator<T> oafEntityIterator, VocabularyGroup vocs) {
|
||||
return mergeGroup(oafEntityIterator, true, vocs);
|
||||
}
|
||||
|
||||
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator) {
|
||||
return mergeGroup(s, oafEntityIterator, false);
|
||||
public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator) {
|
||||
return mergeGroup(oafEntityIterator, false);
|
||||
}
|
||||
|
||||
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator,
|
||||
boolean checkDelegateAuthority) {
|
||||
public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator, boolean checkDelegateAuthority) {
|
||||
return mergeGroup(oafEntityIterator, checkDelegateAuthority, null);
|
||||
}
|
||||
|
||||
public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator,
|
||||
boolean checkDelegateAuthority, VocabularyGroup vocs) {
|
||||
|
||||
ArrayList<T> sortedEntities = new ArrayList<>();
|
||||
oafEntityIterator.forEachRemaining(sortedEntities::add);
|
||||
|
@ -49,13 +55,55 @@ public class MergeUtils {
|
|||
Iterator<T> it = sortedEntities.iterator();
|
||||
T merged = it.next();
|
||||
|
||||
if (!it.hasNext() && merged instanceof Result && vocs != null) {
|
||||
return enforceResultType(vocs, (Result) merged);
|
||||
} else {
|
||||
while (it.hasNext()) {
|
||||
merged = checkedMerge(merged, it.next(), checkDelegateAuthority);
|
||||
}
|
||||
|
||||
}
|
||||
return merged;
|
||||
}
|
||||
|
||||
private static <T extends Oaf> T enforceResultType(VocabularyGroup vocs, Result mergedResult) {
|
||||
if (Optional.ofNullable(mergedResult.getInstance()).map(List::isEmpty).orElse(true)) {
|
||||
return (T) mergedResult;
|
||||
} else {
|
||||
final Instance i = mergedResult.getInstance().get(0);
|
||||
|
||||
if (!vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
|
||||
return (T) mergedResult;
|
||||
} else {
|
||||
final String expectedResultType = Optional
|
||||
.ofNullable(
|
||||
vocs
|
||||
.lookupTermBySynonym(
|
||||
ModelConstants.DNET_RESULT_TYPOLOGIES, i.getInstancetype().getClassid()))
|
||||
.orElse(ModelConstants.ORP_DEFAULT_RESULTTYPE)
|
||||
.getClassid();
|
||||
|
||||
// there is a clash among the result types
|
||||
if (!expectedResultType.equals(mergedResult.getResulttype().getClassid())) {
|
||||
|
||||
Result result = (Result) Optional
|
||||
.ofNullable(ModelSupport.oafTypes.get(expectedResultType))
|
||||
.map(r -> {
|
||||
try {
|
||||
return r.newInstance();
|
||||
} catch (InstantiationException | IllegalAccessException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
})
|
||||
.orElse(new OtherResearchProduct());
|
||||
result.setId(mergedResult.getId());
|
||||
return (T) mergeResultFields(result, mergedResult);
|
||||
} else {
|
||||
return (T) mergedResult;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static <T extends Oaf> T checkedMerge(final T left, final T right, boolean checkDelegateAuthority) {
|
||||
return (T) merge(left, right, checkDelegateAuthority);
|
||||
}
|
||||
|
@ -106,7 +154,7 @@ public class MergeUtils {
|
|||
return mergeSoftware((Software) left, (Software) right);
|
||||
}
|
||||
|
||||
return mergeResultFields((Result) left, (Result) right);
|
||||
return left;
|
||||
} else if (sameClass(left, right, Datasource.class)) {
|
||||
// TODO
|
||||
final int trust = compareTrust(left, right);
|
||||
|
|
|
@ -104,22 +104,22 @@ public class PrepareAffiliationRelations implements Serializable {
|
|||
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
|
||||
|
||||
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelationsNewModel(
|
||||
spark, crossrefInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::crossref");
|
||||
spark, crossrefInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":crossref");
|
||||
|
||||
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
|
||||
spark, pubmedInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::pubmed");
|
||||
spark, pubmedInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":pubmed");
|
||||
|
||||
JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelationsNewModel(
|
||||
spark, openapcInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::openapc");
|
||||
spark, openapcInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":openapc");
|
||||
|
||||
JavaPairRDD<Text, Text> dataciteRelations = prepareAffiliationRelationsNewModel(
|
||||
spark, dataciteInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::datacite");
|
||||
spark, dataciteInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":datacite");
|
||||
|
||||
JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelationsNewModel(
|
||||
spark, webcrawlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::rawaff");
|
||||
spark, webcrawlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":rawaff");
|
||||
|
||||
JavaPairRDD<Text, Text> publisherRelations = prepareAffiliationRelationFromPublisherNewModel(
|
||||
spark, publisherlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::webcrawl");
|
||||
spark, publisherlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":webcrawl");
|
||||
|
||||
crossrefRelations
|
||||
.union(pubmedRelations)
|
||||
|
|
|
@ -15,8 +15,10 @@ import java.sql.SQLException;
|
|||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -55,6 +57,60 @@ public class ExtractPerson implements Serializable {
|
|||
private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class);
|
||||
private static final String QUERY = "SELECT * FROM project_person WHERE pid_type = 'ORCID'";
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
private static final String OPENAIRE_PREFIX = "openaire____";
|
||||
private static final String SEPARATOR = "::";
|
||||
private static final String orcidKey = "10|" + OPENAIRE_PREFIX + SEPARATOR
|
||||
+ DHPUtils.md5(ModelConstants.ORCID.toLowerCase());
|
||||
|
||||
private static final String DOI_PREFIX = "50|doi_________::";
|
||||
|
||||
private static final String PMID_PREFIX = "50|pmid________::";
|
||||
private static final String ARXIV_PREFIX = "50|arXiv_______::";
|
||||
|
||||
private static final String PMCID_PREFIX = "50|pmcid_______::";
|
||||
private static final String ROR_PREFIX = "20|ror_________::";
|
||||
private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class)
|
||||
+ IdentifierFactory.ID_PREFIX_SEPARATOR + ModelConstants.ORCID + "_______";
|
||||
private static final String PROJECT_ID_PREFIX = ModelSupport.getIdPrefix(Project.class)
|
||||
+ IdentifierFactory.ID_PREFIX_SEPARATOR;
|
||||
|
||||
public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid";
|
||||
public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID";
|
||||
public static final String FUNDER_AUTHORS_CLASSID = "sysimport:crosswalk:funderdatabase";
|
||||
public static final String FUNDER_AUTHORS_CLASSNAME = "Imported from Funder Database";
|
||||
public static final String OPENAIRE_DATASOURCE_ID = "10|infrastruct_::f66f1bd369679b5b077dcdf006089556";
|
||||
public static final String OPENAIRE_DATASOURCE_NAME = "OpenAIRE";
|
||||
|
||||
public static List<KeyValue> collectedfromOpenAIRE = OafMapperUtils
|
||||
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
|
||||
|
||||
public static final DataInfo ORCIDDATAINFO = OafMapperUtils
|
||||
.dataInfo(
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
ORCID_AUTHORS_CLASSID,
|
||||
ORCID_AUTHORS_CLASSNAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.91");
|
||||
|
||||
public static final DataInfo FUNDERDATAINFO = OafMapperUtils
|
||||
.dataInfo(
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
FUNDER_AUTHORS_CLASSID,
|
||||
FUNDER_AUTHORS_CLASSNAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.91");
|
||||
|
||||
public static void main(final String[] args) throws IOException, ParseException {
|
||||
|
||||
|
@ -200,7 +256,7 @@ public class ExtractPerson implements Serializable {
|
|||
return r;
|
||||
}
|
||||
|
||||
private static @NotNull Relation getAuthorshipRelation(Row a) {
|
||||
private static @NotNull Relation getAuthorshipRelation(Row a) throws JsonProcessingException {
|
||||
String target = DOI_PREFIX
|
||||
+ IdentifierFactory
|
||||
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), a.getAs("DOI")));
|
||||
|
@ -216,6 +272,8 @@ public class ExtractPerson implements Serializable {
|
|||
null,
|
||||
null);
|
||||
|
||||
final Double trust = a.getAs("trust");
|
||||
|
||||
if (StringUtil.isNotBlank(a.getAs("orgid"))) {
|
||||
KeyValue kv = new KeyValue();
|
||||
kv.setKey("declared_affiliation");
|
||||
|
@ -226,9 +284,13 @@ public class ExtractPerson implements Serializable {
|
|||
.setValue(
|
||||
OPENORGS_PREFIX
|
||||
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("OPENORGS", a.getAs("orgid"))));
|
||||
kv.setDataInfo(OafMapperUtils.dataInfo(false,"openaire",true,false,null,
|
||||
String.valueOf(trust)));
|
||||
|
||||
if (!Optional.ofNullable(relation.getProperties()).isPresent())
|
||||
relation.setProperties(new ArrayList<>());
|
||||
relation.getProperties().add(kv);
|
||||
System.out.println(new ObjectMapper().writeValueAsString(relation));
|
||||
}
|
||||
if (Optional.ofNullable(a.getAs("corresponding")).isPresent() &&
|
||||
a.getAs("corresponding").equals("true")) {
|
||||
|
@ -309,8 +371,9 @@ public class ExtractPerson implements Serializable {
|
|||
private static Relation getProjectRelation(String project, String orcid, String role) {
|
||||
|
||||
String source = PERSON_PREFIX + SEPARATOR + IdentifierFactory.md5(orcid);
|
||||
String target = PROJECT_ID_PREFIX + project.substring(0, 14)
|
||||
+ IdentifierFactory.md5(project.substring(15));
|
||||
|
||||
String target = PROJECT_ID_PREFIX + StringUtils.substringBefore(project, "::") + "::"
|
||||
+ IdentifierFactory.md5(StringUtils.substringAfter(project, "::"));
|
||||
List<KeyValue> properties = new ArrayList<>();
|
||||
|
||||
Relation relation = OafMapperUtils
|
||||
|
@ -650,25 +713,25 @@ public class ExtractPerson implements Serializable {
|
|||
.readValue(value, Person.class),
|
||||
Encoders.bean(Person.class));
|
||||
|
||||
Dataset<Relation> relations = getRelations(spark, workingDir + "/coauthorship")
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/authorship"))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/affiliation"))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/project"))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/publishers"));
|
||||
|
||||
System.out.println(relations.count());
|
||||
|
||||
|
||||
people
|
||||
.toJavaRDD()
|
||||
.map(p -> new AtomicAction(p.getClass(), p))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/authorship").toJavaRDD().map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/coauthorship")
|
||||
.toJavaRDD()
|
||||
.map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/affiliation")
|
||||
.toJavaRDD()
|
||||
.map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/project")
|
||||
.toJavaRDD()
|
||||
.map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/publishers")
|
||||
.union(relations
|
||||
.groupByKey((MapFunction<Relation, String>) r -> r.getSource() + r.getRelClass() + r.getTarget(), Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, Relation, Relation>) (k,it) -> mergeRelation(it), Encoders.bean(Relation.class))
|
||||
.toJavaRDD()
|
||||
.map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.mapToPair(
|
||||
|
|
|
@ -673,11 +673,12 @@ case object Crossref2Oaf {
|
|||
val doi = input.getString(0)
|
||||
val rorId = input.getString(1)
|
||||
|
||||
val pubId = s"50|${PidType.doi.toString.padTo(12, "_")}::${DoiCleaningRule.clean(doi)}"
|
||||
|
||||
val pubId = IdentifierFactory.idFromPid("50", "doi", DoiCleaningRule.clean(doi), true)
|
||||
val affId = GenerateRorActionSetJob.calculateOpenaireId(rorId)
|
||||
|
||||
val r: Relation = new Relation
|
||||
DoiCleaningRule.clean(doi)
|
||||
|
||||
r.setSource(pubId)
|
||||
r.setTarget(affId)
|
||||
r.setRelType(ModelConstants.RESULT_ORGANIZATION)
|
||||
|
@ -978,7 +979,26 @@ case object Crossref2Oaf {
|
|||
case "10.13039/501100010790" =>
|
||||
generateSimpleRelationFromAward(funder, "erasmusplus_", a => a)
|
||||
case _ => logger.debug("no match for " + funder.DOI.get)
|
||||
|
||||
//Add for Danish funders
|
||||
//Independent Research Fund Denmark (IRFD)
|
||||
case "10.13039/501100004836" =>
|
||||
generateSimpleRelationFromAward(funder, "irfd________", a => a)
|
||||
val targetId = getProjectId("irfd________", "1e5e62235d094afd01cd56e65112fc63")
|
||||
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||
//Carlsberg Foundation (CF)
|
||||
case "10.13039/501100002808" =>
|
||||
generateSimpleRelationFromAward(funder, "cf__________", a => a)
|
||||
val targetId = getProjectId("cf__________", "1e5e62235d094afd01cd56e65112fc63")
|
||||
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||
//Novo Nordisk Foundation (NNF)
|
||||
case "10.13039/501100009708" =>
|
||||
generateSimpleRelationFromAward(funder, "nnf___________", a => a)
|
||||
val targetId = getProjectId("nnf_________", "1e5e62235d094afd01cd56e65112fc63")
|
||||
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||
case _ => logger.debug("no match for " + funder.DOI.get)
|
||||
}
|
||||
|
||||
} else {
|
||||
|
|
|
@ -4,7 +4,11 @@ package eu.dnetlib.dhp.actionmanager.person;
|
|||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import com.esotericsoftware.kryo.util.ObjectMap;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -155,17 +159,7 @@ public class CreatePersonAS {
|
|||
.first()
|
||||
.getPid()
|
||||
.size());
|
||||
System.out
|
||||
.println(
|
||||
new ObjectMapper()
|
||||
.writeValueAsString(
|
||||
people
|
||||
.filter(
|
||||
p -> p
|
||||
.getPid()
|
||||
.stream()
|
||||
.anyMatch(id -> id.getValue().equalsIgnoreCase("0000-0003-0046-4895")))
|
||||
.first()));
|
||||
|
||||
Assertions
|
||||
.assertTrue(
|
||||
people
|
||||
|
@ -177,10 +171,15 @@ public class CreatePersonAS {
|
|||
.anyMatch(
|
||||
p -> p.getQualifier().getClassname().equalsIgnoreCase("Scopus Author ID")
|
||||
&& p.getValue().equalsIgnoreCase("6603539671")));
|
||||
relations
|
||||
.filter(
|
||||
r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619"))
|
||||
&& r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED))
|
||||
.foreach(r -> System.out.println(new ObjectMapper().writeValueAsString(r)));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
19,
|
||||
18,
|
||||
relations
|
||||
.filter(r -> r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED))
|
||||
.count());
|
||||
|
@ -190,6 +189,7 @@ public class CreatePersonAS {
|
|||
relations
|
||||
.filter(r -> r.getRelClass().equalsIgnoreCase(ModelConstants.PERSON_PERSON_HASCOAUTHORED))
|
||||
.count());
|
||||
//four relations are expected: one from publisher, three from works. the same work has two valid pids so two results produce three relations
|
||||
Assertions
|
||||
.assertEquals(
|
||||
3,
|
||||
|
@ -198,6 +198,8 @@ public class CreatePersonAS {
|
|||
r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619"))
|
||||
&& r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED))
|
||||
.count());
|
||||
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
2,
|
||||
|
@ -216,18 +218,49 @@ public class CreatePersonAS {
|
|||
&& r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED)
|
||||
&& r.getTarget().startsWith("50|arXiv"))
|
||||
.count());
|
||||
relations.foreach(r-> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
2,
|
||||
relations
|
||||
.filter(
|
||||
r -> r.getSource().equalsIgnoreCase("30|orcid_______::" + DHPUtils.md5("0000-0001-6291-9619"))
|
||||
&& r.getRelClass().equalsIgnoreCase(ModelConstants.PERSON_PERSON_HASCOAUTHORED))
|
||||
.count());
|
||||
Assertions.assertEquals(38, relations.count());
|
||||
Assertions.assertEquals(37, relations.count());
|
||||
relations.foreach(r -> System.out.println(new ObjectMapper().writeValueAsString(r)));
|
||||
|
||||
//check contribution from publisher papers
|
||||
//the relation was merged with the other one already extracted from orcid
|
||||
JavaRDD<Relation> filterRelations = relations
|
||||
.filter(
|
||||
r -> r.getSource().equalsIgnoreCase("30|orcid_______::4e3bfd34079624f293a03e03c243b96b")
|
||||
&& r.getRelClass().equalsIgnoreCase(ModelConstants.RESULT_PERSON_HASAUTHORED)
|
||||
&& r.getTarget().startsWith("50|doi_________::a69682d48d289d8b5d735a70a5ef00ec"));
|
||||
Assertions.assertEquals(1, filterRelations.count());
|
||||
|
||||
List<KeyValue> properties = filterRelations.first().getProperties();
|
||||
Assertions.assertFalse(properties.isEmpty());
|
||||
Assertions.assertEquals(4, properties.size());
|
||||
properties.forEach(p-> {
|
||||
try {
|
||||
System.out.println(new ObjectMapper().writeValueAsString(p));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
Assertions.assertEquals(1, properties.stream().filter(p->p.getKey().equalsIgnoreCase("corresponding")).count());
|
||||
Assertions.assertEquals(1, properties.stream().filter(p->p.getKey().equalsIgnoreCase("corresponding") &&
|
||||
p.getValue().equalsIgnoreCase("true")).count());
|
||||
Assertions.assertEquals(1, properties.stream().filter(p->p.getKey().equalsIgnoreCase("declared_affiliation")).count());
|
||||
Assertions.assertEquals(1, properties.stream().filter(p->p.getKey().equalsIgnoreCase("declared_affiliation") &&
|
||||
p.getValue().equalsIgnoreCase("https://ror.org/05582kr93") &&
|
||||
p.getDataInfo()!= null && p.getDataInfo().getTrust().equalsIgnoreCase("1.0")
|
||||
).count());
|
||||
Assertions.assertEquals(2, properties.stream().filter(p->p.getKey().equalsIgnoreCase("role")).count());
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
{"DOI":"10.1109\/ETFA46521.2020.9212101","Authors":[{"Name":{"Full":"Chris Paul Iatrou","First":"Chris Paul","Last":"Iatrou"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Chair of Process Control Systems, Technische Universit\u00e4t Dresden, Dresden, Germany"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/042aqky30","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"IEEE","Value":"37085798458"}},{"Name":{"Full":"Lukas Ketzel","First":"Lukas","Last":"Ketzel"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Chair of Process Control Systems, Technische Universit\u00e4t Dresden, Dresden, Germany"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/042aqky30","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"IEEE","Value":"37088521467"}},{"Name":{"Full":"Markus Graube","First":"Markus","Last":"Graube"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["TraceTronic GmbH, Dresden, Germany"],"Matchings":[],"PIDs":{"Schema":"IEEE","Value":"37889063900"}},{"Name":{"Full":"Martin H\u00e4fner","First":"Martin","Last":"H\u00e4fner"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["B\u00fcrkert Fluidic Control Systems, Ingelfingen, Germay"],"Matchings":[],"PIDs":{"Schema":"IEEE","Value":"37824535500"}},{"Name":{"Full":"Leon Urbas","First":"Leon","Last":"Urbas"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Chair of Process Control Systems, Technische Universit\u00e4t Dresden, Dresden, Germany"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/042aqky30","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"IEEE","Value":"37265255100"}}],"Organizations":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/042aqky30","Confidence":1,"Status":"active"}]}
|
||||
{"DOI":"10.1109\/4.735554","Authors":[{"Name":{"Full":"T. Cummins","First":"T.","Last":"Cummins"},"Corresponding":true,"Contributor_roles":[{"Schema": "credit", "Value": "writer"},{"Schema": "credit","Value": "developer"}],"Raw_affiliations":["Analog Devices, Inc., Limerick, Ireland"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/05582kr93","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"ORCID","Value":"37372617500"}},{"Name":{"Full":"E. Byrne","First":"E.","Last":"Byrne"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Analog Devices, Inc., Limerick, Ireland"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/05582kr93","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"ORCID","Value":"38226811000"}},{"Name":{"Full":"D. Brannick","First":"D.","Last":"Brannick"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Analog Devices, Inc., Limerick, Ireland"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/05582kr93","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"IEEE","Value":"37375404700"}},{"Name":{"Full":"D.A. Dempsey","First":"D.A.","Last":"Dempsey"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Analog Devices, Inc., Limerick, Ireland"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/05582kr93","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"IEEE","Value":"37341692700"}}],"Organizations":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/05582kr93","Confidence":1,"Status":"active"}]}
|
||||
{"DOI":"10.1021/acs.langmuir.1c02956","Authors":[{"Name":{"Full":"T. Cummins","First":"T.","Last":"Cummins"},"Corresponding":true,"Contributor_roles":[{"Schema": "credit", "Value": "writer"},{"Schema": "credit","Value": "developer"}],"Raw_affiliations":["Analog Devices, Inc., Limerick, Ireland"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/05582kr93","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"ORCID","Value":"0000-0001-6291-9619"}},{"Name":{"Full":"E. Byrne","First":"E.","Last":"Byrne"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Analog Devices, Inc., Limerick, Ireland"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/05582kr93","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"ORCID","Value":"38226811000"}},{"Name":{"Full":"D. Brannick","First":"D.","Last":"Brannick"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Analog Devices, Inc., Limerick, Ireland"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/05582kr93","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"IEEE","Value":"37375404700"}},{"Name":{"Full":"D.A. Dempsey","First":"D.A.","Last":"Dempsey"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Analog Devices, Inc., Limerick, Ireland"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/05582kr93","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"IEEE","Value":"37341692700"}}],"Organizations":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/05582kr93","Confidence":1,"Status":"active"}]}
|
||||
{"DOI":"10.1109\/AICS60730.2023.10470926","Authors":[{"Name":{"Full":"Gyanendar Manohar","First":"Gyanendar","Last":"Manohar"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Department of Computer Science, Munster Technological University, Cork, Ireland"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/013xpqh61","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"ORCID","Value":"961565437242602"}},{"Name":{"Full":"Ruairi O'Reilly","First":"Ruairi","Last":"O'Reilly"},"Corresponding":true,"Contributor_roles":[{"Schema": "credit", "Value": "writer"},{"Schema": "credit","Value": "developer"}],"Raw_affiliations":["Department of Computer Science, Munster Technological University, Cork, Ireland"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/013xpqh61","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"IEEE","Value":"37547617100"}}],"Organizations":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/013xpqh61","Confidence":1,"Status":"active"}]}
|
||||
{"DOI":"10.1109\/20.706738","Authors":[{"Name":{"Full":"J.H. Steele","First":"J.H.","Last":"Steele"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Department of Electrical and Computer Engineering, Carnegie Mellon University, Pittsburgh, PA, USA"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/05x2bcf33","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"IEEE","Value":"37361970600"}},{"Name":{"Full":"W.C. Messner","First":"W.C.","Last":"Messner"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Department of Electrical and Computer Engineering, Carnegie Mellon University, Pittsburgh, PA, USA"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/05x2bcf33","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"IEEE","Value":"37315619100"}},{"Name":{"Full":"J.A. Bain","First":"J.A.","Last":"Bain"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Department of Electrical and Computer Engineering, Carnegie Mellon University, Pittsburgh, PA, USA"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/05x2bcf33","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"IEEE","Value":"37266530200"}},{"Name":{"Full":"T.A. Schwarz","First":"T.A.","Last":"Schwarz"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Peregrine Recording Technology, Woodbury, MN, USA"],"Matchings":[],"PIDs":{"Schema":"IEEE","Value":"37334354900"}},{"Name":{"Full":"W.J. O'Kane","First":"W.J.","Last":"O'Kane"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":[],"Matchings":[],"PIDs":{"Schema":"IEEE","Value":"37369941900"}},{"Name":{"Full":"M.P. Connolly","First":"M.P.","Last":"Connolly"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Springtown Industrial Estate, Seagate Technology Ireland, Londonderry, UK"],"Matchings":[],"PIDs":{"Schema":"IEEE","Value":"37373077300"}}],"Organizations":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/05x2bcf33","Confidence":1,"Status":"active"}]}
|
||||
{"DOI":"10.1109\/GLOBECOM48099.2022.10001219","Authors":[{"Name":{"Full":"Anestis Dalgkitsis","First":"Anestis","Last":"Dalgkitsis"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Iquadrat Informatica S.L., Barcelona, Spain"],"Matchings":[],"PIDs":{"Schema":"IEEE","Value":"37088665104"}},{"Name":{"Full":"Ashima Chawla","First":"Ashima","Last":"Chawla"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Network Management Research Lab, LM Ericsson, Athlone, Ireland"],"Matchings":[],"PIDs":{"Schema":"IEEE","Value":"37088569286"}},{"Name":{"Full":"Anne-Marie Bosneag","First":"Anne-Marie","Last":"Bosneag"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Network Management Research Lab, LM Ericsson, Athlone, Ireland"],"Matchings":[],"PIDs":{"Schema":"IEEE","Value":"37283409900"}},{"Name":{"Full":"Christos Verikoukis","First":"Christos","Last":"Verikoukis"},"Corresponding":null,"Contributor_roles":null,"Raw_affiliations":["Computer Engineering & Informatics Dept., University of Patras, Greece"],"Matchings":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/017wvtq80","Confidence":1,"Status":"active"}],"PIDs":{"Schema":"IEEE","Value":"37272215400"}}],"Organizations":[{"Provenance":"AffRo","PID":"ROR","Value":"https:\/\/ror.org\/017wvtq80","Confidence":1,"Status":"active"}]}
|
||||
|
|
|
@ -135,7 +135,7 @@ public class DedupRecordFactory {
|
|||
return Collections.emptyIterator();
|
||||
}
|
||||
|
||||
OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator());
|
||||
OafEntity mergedEntity = MergeUtils.mergeGroup(cliques.iterator());
|
||||
// dedup records do not have date of transformation attribute
|
||||
mergedEntity.setDateoftransformation(null);
|
||||
mergedEntity
|
||||
|
|
|
@ -69,6 +69,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
|
|||
|
||||
Dataset<Relation> mergeRels = spark
|
||||
.read()
|
||||
.schema(REL_BEAN_ENC.schema())
|
||||
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
|
||||
.as(REL_BEAN_ENC);
|
||||
|
||||
|
|
|
@ -46,8 +46,8 @@ class DatasetMergerTest implements Serializable {
|
|||
}
|
||||
|
||||
@Test
|
||||
void datasetMergerTest() throws InstantiationException, IllegalAccessException, InvocationTargetException {
|
||||
Dataset pub_merged = MergeUtils.mergeGroup(dedupId, datasets.stream().map(Tuple2::_2).iterator());
|
||||
void datasetMergerTest() {
|
||||
Dataset pub_merged = MergeUtils.mergeGroup(datasets.stream().map(Tuple2::_2).iterator());
|
||||
|
||||
// verify id
|
||||
assertEquals(dedupId, pub_merged.getId());
|
||||
|
|
|
@ -566,7 +566,26 @@ case object Crossref2Oaf {
|
|||
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||
case _ => logger.debug("no match for " + funder.DOI.get)
|
||||
|
||||
//Add for Danish funders
|
||||
//Independent Research Fund Denmark (IRFD)
|
||||
case "10.13039/501100004836" =>
|
||||
generateSimpleRelationFromAward(funder, "irfd________", a => a)
|
||||
val targetId = getProjectId("irfd________", "1e5e62235d094afd01cd56e65112fc63")
|
||||
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||
//Carlsberg Foundation (CF)
|
||||
case "10.13039/501100002808" =>
|
||||
generateSimpleRelationFromAward(funder, "cf__________", a => a)
|
||||
val targetId = getProjectId("cf__________", "1e5e62235d094afd01cd56e65112fc63")
|
||||
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||
//Novo Nordisk Foundation (NNF)
|
||||
case "10.13039/501100009708" =>
|
||||
generateSimpleRelationFromAward(funder, "nnf___________", a => a)
|
||||
val targetId = getProjectId("nnf_________", "1e5e62235d094afd01cd56e65112fc63")
|
||||
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
|
||||
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
|
||||
case _ => logger.debug("no match for " + funder.DOI.get)
|
||||
}
|
||||
|
||||
} else {
|
||||
|
|
|
@ -153,34 +153,40 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
final DataInfo entityInfo = prepareDataInfo(doc, this.invisible);
|
||||
final long lastUpdateTimestamp = new Date().getTime();
|
||||
|
||||
final List<Instance> instances = prepareInstances(doc, entityInfo, collectedFrom, hostedBy);
|
||||
final Instance instance = prepareInstances(doc, entityInfo, collectedFrom, hostedBy);
|
||||
|
||||
final String type = getResultType(doc, instances);
|
||||
if (!Optional
|
||||
.ofNullable(instance.getInstancetype())
|
||||
.map(Qualifier::getClassid)
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.isPresent()) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
return createOafs(doc, type, instances, collectedFrom, entityInfo, lastUpdateTimestamp);
|
||||
final String type = getResultType(instance);
|
||||
|
||||
return createOafs(doc, type, instance, collectedFrom, entityInfo, lastUpdateTimestamp);
|
||||
} catch (final DocumentException e) {
|
||||
log.error("Error with record:\n" + xml);
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
}
|
||||
|
||||
protected String getResultType(final Document doc, final List<Instance> instances) {
|
||||
final String type = doc.valueOf("//dr:CobjCategory/@type");
|
||||
|
||||
if (StringUtils.isBlank(type) && this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
|
||||
final String instanceType = instances
|
||||
.stream()
|
||||
.map(i -> i.getInstancetype().getClassid())
|
||||
.findFirst()
|
||||
.filter(s -> !UNKNOWN.equalsIgnoreCase(s))
|
||||
.orElse("0000"); // Unknown
|
||||
protected String getResultType(final Instance instance) {
|
||||
if (this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
|
||||
return Optional
|
||||
.ofNullable(this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType))
|
||||
.ofNullable(instance.getInstancetype())
|
||||
.map(Qualifier::getClassid)
|
||||
.map(
|
||||
instanceType -> Optional
|
||||
.ofNullable(
|
||||
this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType))
|
||||
.map(Qualifier::getClassid)
|
||||
.orElse("0000"))
|
||||
.orElse("0000");
|
||||
} else {
|
||||
throw new IllegalStateException("Missing vocabulary: " + ModelConstants.DNET_RESULT_TYPOLOGIES);
|
||||
}
|
||||
|
||||
return type;
|
||||
}
|
||||
|
||||
private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) {
|
||||
|
@ -197,12 +203,12 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
protected List<Oaf> createOafs(
|
||||
final Document doc,
|
||||
final String type,
|
||||
final List<Instance> instances,
|
||||
final Instance instance,
|
||||
final KeyValue collectedFrom,
|
||||
final DataInfo info,
|
||||
final long lastUpdateTimestamp) {
|
||||
|
||||
final OafEntity entity = createEntity(doc, type, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
final OafEntity entity = createEntity(doc, type, instance, collectedFrom, info, lastUpdateTimestamp);
|
||||
|
||||
final Set<String> originalId = Sets.newHashSet(entity.getOriginalId());
|
||||
originalId.add(entity.getId());
|
||||
|
@ -235,19 +241,19 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
|
||||
private OafEntity createEntity(final Document doc,
|
||||
final String type,
|
||||
final List<Instance> instances,
|
||||
final Instance instance,
|
||||
final KeyValue collectedFrom,
|
||||
final DataInfo info,
|
||||
final long lastUpdateTimestamp) {
|
||||
switch (type.toLowerCase()) {
|
||||
case "publication":
|
||||
final Publication p = new Publication();
|
||||
populateResultFields(p, doc, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
populateResultFields(p, doc, instance, collectedFrom, info, lastUpdateTimestamp);
|
||||
p.setJournal(prepareJournal(doc, info));
|
||||
return p;
|
||||
case "dataset":
|
||||
final Dataset d = new Dataset();
|
||||
populateResultFields(d, doc, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
populateResultFields(d, doc, instance, collectedFrom, info, lastUpdateTimestamp);
|
||||
d.setStoragedate(prepareDatasetStorageDate(doc, info));
|
||||
d.setDevice(prepareDatasetDevice(doc, info));
|
||||
d.setSize(prepareDatasetSize(doc, info));
|
||||
|
@ -258,7 +264,7 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
return d;
|
||||
case "software":
|
||||
final Software s = new Software();
|
||||
populateResultFields(s, doc, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
populateResultFields(s, doc, instance, collectedFrom, info, lastUpdateTimestamp);
|
||||
s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info));
|
||||
s.setLicense(prepareSoftwareLicenses(doc, info));
|
||||
s.setCodeRepositoryUrl(prepareSoftwareCodeRepositoryUrl(doc, info));
|
||||
|
@ -268,7 +274,7 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
case "otherresearchproducts":
|
||||
default:
|
||||
final OtherResearchProduct o = new OtherResearchProduct();
|
||||
populateResultFields(o, doc, instances, collectedFrom, info, lastUpdateTimestamp);
|
||||
populateResultFields(o, doc, instance, collectedFrom, info, lastUpdateTimestamp);
|
||||
o.setContactperson(prepareOtherResearchProductContactPersons(doc, info));
|
||||
o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info));
|
||||
o.setTool(prepareOtherResearchProductTools(doc, info));
|
||||
|
@ -415,7 +421,7 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
private void populateResultFields(
|
||||
final Result r,
|
||||
final Document doc,
|
||||
final List<Instance> instances,
|
||||
final Instance instance,
|
||||
final KeyValue collectedFrom,
|
||||
final DataInfo info,
|
||||
final long lastUpdateTimestamp) {
|
||||
|
@ -449,8 +455,8 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
r.setExternalReference(new ArrayList<>()); // NOT PRESENT IN MDSTORES
|
||||
r.setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info));
|
||||
r.setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info));
|
||||
r.setInstance(instances);
|
||||
r.setBestaccessright(OafMapperUtils.createBestAccessRights(instances));
|
||||
r.setInstance(Arrays.asList(instance));
|
||||
r.setBestaccessright(OafMapperUtils.createBestAccessRights(Arrays.asList(instance)));
|
||||
r.setEoscifguidelines(prepareEOSCIfGuidelines(doc, info));
|
||||
}
|
||||
|
||||
|
@ -509,7 +515,7 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
|
||||
protected abstract Qualifier prepareResourceType(Document doc, DataInfo info);
|
||||
|
||||
protected abstract List<Instance> prepareInstances(
|
||||
protected abstract Instance prepareInstances(
|
||||
Document doc,
|
||||
DataInfo info,
|
||||
KeyValue collectedfrom,
|
||||
|
|
|
@ -133,7 +133,7 @@ public class GenerateEntitiesApplication extends AbstractMigrationApplication {
|
|||
inputRdd
|
||||
.keyBy(oaf -> ModelSupport.idFn().apply(oaf))
|
||||
.groupByKey()
|
||||
.map(t -> MergeUtils.mergeGroup(t._1, t._2.iterator())),
|
||||
.map(t -> MergeUtils.mergeGroup(t._2.iterator())),
|
||||
// .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
|
||||
// .reduceByKey(MergeUtils::merge)
|
||||
// .map(Tuple2::_2),
|
||||
|
|
|
@ -135,7 +135,7 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<Instance> prepareInstances(
|
||||
protected Instance prepareInstances(
|
||||
final Document doc,
|
||||
final DataInfo info,
|
||||
final KeyValue collectedfrom,
|
||||
|
@ -197,7 +197,7 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
|
|||
instance.getUrl().addAll(validUrl);
|
||||
}
|
||||
|
||||
return Lists.newArrayList(instance);
|
||||
return instance;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -126,7 +126,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<Instance> prepareInstances(
|
||||
protected Instance prepareInstances(
|
||||
final Document doc,
|
||||
final DataInfo info,
|
||||
final KeyValue collectedfrom,
|
||||
|
@ -210,7 +210,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
|
|||
instance.setUrl(new ArrayList<>());
|
||||
instance.getUrl().addAll(validUrl);
|
||||
}
|
||||
return Arrays.asList(instance);
|
||||
return instance;
|
||||
}
|
||||
|
||||
protected String trimAndDecodeUrl(String url) {
|
||||
|
|
|
@ -51,6 +51,7 @@
|
|||
<arg>--orcidPath</arg><arg>${orcidPath}</arg>
|
||||
<arg>--targetPath</arg><arg>${targetPath}</arg>
|
||||
<arg>--graphPath</arg><arg>${graphPath}</arg>
|
||||
<arg>--workingDir</arg><arg>${workingDir}</arg>
|
||||
<arg>--master</arg><arg>yarn</arg>
|
||||
</spark>
|
||||
<ok to="reset_outputpath"/>
|
||||
|
|
|
@ -162,6 +162,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.autoBroadcastJoinThreshold=-1
|
||||
--conf spark.sql.shuffle.partitions=15000
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/publication</arg>
|
||||
|
@ -197,6 +198,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.autoBroadcastJoinThreshold=-1
|
||||
--conf spark.sql.shuffle.partitions=8000
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/dataset</arg>
|
||||
|
@ -232,6 +234,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.autoBroadcastJoinThreshold=-1
|
||||
--conf spark.sql.shuffle.partitions=5000
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/otherresearchproduct</arg>
|
||||
|
@ -267,6 +270,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.autoBroadcastJoinThreshold=-1
|
||||
--conf spark.sql.shuffle.partitions=2000
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/software</arg>
|
||||
|
@ -302,6 +306,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.autoBroadcastJoinThreshold=-1
|
||||
--conf spark.sql.shuffle.partitions=1000
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/datasource</arg>
|
||||
|
@ -337,6 +342,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.autoBroadcastJoinThreshold=-1
|
||||
--conf spark.sql.shuffle.partitions=1000
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/organization</arg>
|
||||
|
@ -372,6 +378,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.autoBroadcastJoinThreshold=-1
|
||||
--conf spark.sql.shuffle.partitions=2000
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/project</arg>
|
||||
|
@ -407,6 +414,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.autoBroadcastJoinThreshold=-1
|
||||
--conf spark.sql.shuffle.partitions=2000
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/person</arg>
|
||||
|
@ -442,6 +450,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.autoBroadcastJoinThreshold=-1
|
||||
--conf spark.sql.shuffle.partitions=20000
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/relation</arg>
|
||||
|
|
|
@ -133,7 +133,7 @@ object SparkCreateInputGraph {
|
|||
val ds: Dataset[T] = spark.read.load(sourcePath).as[T]
|
||||
|
||||
ds.groupByKey(_.getId)
|
||||
.mapGroups { (id, it) => MergeUtils.mergeGroup(id, it.asJava).asInstanceOf[T] }
|
||||
.mapGroups { (id, it) => MergeUtils.mergeGroup(it.asJava).asInstanceOf[T] }
|
||||
// .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] }
|
||||
// .map(_)
|
||||
.write
|
||||
|
|
Loading…
Reference in New Issue