Merge remote-tracking branch 'origin/beta' into crossref_mapping_improvement

This commit is contained in:
sandro.labruzzo 2024-11-20 09:52:51 +01:00
commit ac8995ab64
18 changed files with 213 additions and 63 deletions

1
.gitignore vendored
View File

@ -28,3 +28,4 @@ spark-warehouse
/**/.scalafmt.conf /**/.scalafmt.conf
/.java-version /.java-version
/dhp-shade-package/dependency-reduced-pom.xml /dhp-shade-package/dependency-reduced-pom.xml
/**/job.properties

View File

@ -2,8 +2,7 @@
package eu.dnetlib.dhp.oa.merge; package eu.dnetlib.dhp.oa.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.functions.when;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -135,7 +134,9 @@ public class GroupEntitiesSparkJob {
.applyCoarVocabularies(entity, vocs), .applyCoarVocabularies(entity, vocs),
OAFENTITY_KRYO_ENC) OAFENTITY_KRYO_ENC)
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING()) .groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
.mapGroups((MapGroupsFunction<String, OafEntity, OafEntity>) MergeUtils::mergeById, OAFENTITY_KRYO_ENC) .mapGroups(
(MapGroupsFunction<String, OafEntity, OafEntity>) (key, group) -> MergeUtils.mergeById(group, vocs),
OAFENTITY_KRYO_ENC)
.map( .map(
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) t -> new Tuple2<>( (MapFunction<OafEntity, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
t.getClass().getName(), t), t.getClass().getName(), t),

View File

@ -2,7 +2,6 @@
package eu.dnetlib.dhp.schema.oaf.utils; package eu.dnetlib.dhp.schema.oaf.utils;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*; import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.OPENAIRE_META_RESOURCE_TYPE;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.getProvenance; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.getProvenance;
import java.net.MalformedURLException; import java.net.MalformedURLException;
@ -696,6 +695,7 @@ public class GraphCleaningFunctions extends CleaningFunctions {
} }
} }
// set ORCID_PENDING to all orcid values that are not coming from ORCID provenance
for (Author a : r.getAuthor()) { for (Author a : r.getAuthor()) {
if (Objects.isNull(a.getPid())) { if (Objects.isNull(a.getPid())) {
a.setPid(Lists.newArrayList()); a.setPid(Lists.newArrayList());
@ -752,6 +752,40 @@ public class GraphCleaningFunctions extends CleaningFunctions {
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
} }
// Identify clashing ORCIDS:that is same ORCID associated to multiple authors in this result
Map<String, Integer> clashing_orcid = new HashMap<>();
for (Author a : r.getAuthor()) {
a
.getPid()
.stream()
.filter(
p -> StringUtils
.contains(StringUtils.lowerCase(p.getQualifier().getClassid()), ORCID_PENDING))
.map(StructuredProperty::getValue)
.distinct()
.forEach(orcid -> clashing_orcid.compute(orcid, (k, v) -> (v == null) ? 1 : v + 1));
}
Set<String> clashing = clashing_orcid
.entrySet()
.stream()
.filter(ee -> ee.getValue() > 1)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
// filter out clashing orcids
for (Author a : r.getAuthor()) {
a
.setPid(
a
.getPid()
.stream()
.filter(p -> !clashing.contains(p.getValue()))
.collect(Collectors.toList()));
}
} }
if (value instanceof Publication) { if (value instanceof Publication) {

View File

@ -23,24 +23,30 @@ import org.apache.commons.lang3.tuple.Pair;
import com.github.sisyphsu.dateparser.DateParserUtils; import com.github.sisyphsu.dateparser.DateParserUtils;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.AccessRightComparator; import eu.dnetlib.dhp.schema.common.AccessRightComparator;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
public class MergeUtils { public class MergeUtils {
public static <T extends Oaf> T mergeById(String s, Iterator<T> oafEntityIterator) { public static <T extends Oaf> T mergeById(Iterator<T> oafEntityIterator, VocabularyGroup vocs) {
return mergeGroup(s, oafEntityIterator, true); return mergeGroup(oafEntityIterator, true, vocs);
} }
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator) { public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator) {
return mergeGroup(s, oafEntityIterator, false); return mergeGroup(oafEntityIterator, false);
} }
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator, public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator, boolean checkDelegateAuthority) {
boolean checkDelegateAuthority) { return mergeGroup(oafEntityIterator, checkDelegateAuthority, null);
}
public static <T extends Oaf> T mergeGroup(Iterator<T> oafEntityIterator,
boolean checkDelegateAuthority, VocabularyGroup vocs) {
ArrayList<T> sortedEntities = new ArrayList<>(); ArrayList<T> sortedEntities = new ArrayList<>();
oafEntityIterator.forEachRemaining(sortedEntities::add); oafEntityIterator.forEachRemaining(sortedEntities::add);
@ -49,13 +55,55 @@ public class MergeUtils {
Iterator<T> it = sortedEntities.iterator(); Iterator<T> it = sortedEntities.iterator();
T merged = it.next(); T merged = it.next();
while (it.hasNext()) { if (!it.hasNext() && merged instanceof Result && vocs != null) {
merged = checkedMerge(merged, it.next(), checkDelegateAuthority); return enforceResultType(vocs, (Result) merged);
} else {
while (it.hasNext()) {
merged = checkedMerge(merged, it.next(), checkDelegateAuthority);
}
} }
return merged; return merged;
} }
private static <T extends Oaf> T enforceResultType(VocabularyGroup vocs, Result mergedResult) {
if (Optional.ofNullable(mergedResult.getInstance()).map(List::isEmpty).orElse(true)) {
return (T) mergedResult;
} else {
final Instance i = mergedResult.getInstance().get(0);
if (!vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
return (T) mergedResult;
} else {
final String expectedResultType = Optional
.ofNullable(
vocs
.lookupTermBySynonym(
ModelConstants.DNET_RESULT_TYPOLOGIES, i.getInstancetype().getClassid()))
.orElse(ModelConstants.ORP_DEFAULT_RESULTTYPE)
.getClassid();
// there is a clash among the result types
if (!expectedResultType.equals(mergedResult.getResulttype().getClassid())) {
Result result = (Result) Optional
.ofNullable(ModelSupport.oafTypes.get(expectedResultType))
.map(r -> {
try {
return r.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new IllegalStateException(e);
}
})
.orElse(new OtherResearchProduct());
result.setId(mergedResult.getId());
return (T) mergeResultFields(result, mergedResult);
} else {
return (T) mergedResult;
}
}
}
}
public static <T extends Oaf> T checkedMerge(final T left, final T right, boolean checkDelegateAuthority) { public static <T extends Oaf> T checkedMerge(final T left, final T right, boolean checkDelegateAuthority) {
return (T) merge(left, right, checkDelegateAuthority); return (T) merge(left, right, checkDelegateAuthority);
} }
@ -106,7 +154,7 @@ public class MergeUtils {
return mergeSoftware((Software) left, (Software) right); return mergeSoftware((Software) left, (Software) right);
} }
return mergeResultFields((Result) left, (Result) right); return left;
} else if (sameClass(left, right, Datasource.class)) { } else if (sameClass(left, right, Datasource.class)) {
// TODO // TODO
final int trust = compareTrust(left, right); final int trust = compareTrust(left, right);

View File

@ -104,22 +104,22 @@ public class PrepareAffiliationRelations implements Serializable {
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelationsNewModel( JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelationsNewModel(
spark, crossrefInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::crossref"); spark, crossrefInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":crossref");
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations( JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
spark, pubmedInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::pubmed"); spark, pubmedInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":pubmed");
JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelationsNewModel( JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelationsNewModel(
spark, openapcInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::openapc"); spark, openapcInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":openapc");
JavaPairRDD<Text, Text> dataciteRelations = prepareAffiliationRelationsNewModel( JavaPairRDD<Text, Text> dataciteRelations = prepareAffiliationRelationsNewModel(
spark, dataciteInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::datacite"); spark, dataciteInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":datacite");
JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelationsNewModel( JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelationsNewModel(
spark, webcrawlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::rawaff"); spark, webcrawlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":rawaff");
JavaPairRDD<Text, Text> publisherRelations = prepareAffiliationRelationFromPublisherNewModel( JavaPairRDD<Text, Text> publisherRelations = prepareAffiliationRelationFromPublisherNewModel(
spark, publisherlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::webcrawl"); spark, publisherlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + ":webcrawl");
crossrefRelations crossrefRelations
.union(pubmedRelations) .union(pubmedRelations)

View File

@ -15,6 +15,7 @@ import java.util.stream.Collectors;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -193,8 +194,8 @@ public class ExtractPerson implements Serializable {
private static Relation getProjectRelation(String project, String orcid, String role) { private static Relation getProjectRelation(String project, String orcid, String role) {
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid); String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid);
String target = PROJECT_ID_PREFIX + project.substring(0, 14) String target = PROJECT_ID_PREFIX + StringUtils.substringBefore(project, "::") + "::"
+ IdentifierFactory.md5(project.substring(15)); + IdentifierFactory.md5(StringUtils.substringAfter(project, "::"));
List<KeyValue> properties = new ArrayList<>(); List<KeyValue> properties = new ArrayList<>();
Relation relation = OafMapperUtils Relation relation = OafMapperUtils
@ -345,7 +346,16 @@ public class ExtractPerson implements Serializable {
OafMapperUtils OafMapperUtils
.structuredProperty( .structuredProperty(
op.getOrcid(), ModelConstants.ORCID, ModelConstants.ORCID_CLASSNAME, op.getOrcid(), ModelConstants.ORCID, ModelConstants.ORCID_CLASSNAME,
ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, null)); ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES,
OafMapperUtils.dataInfo(false,
null,
false,
false,
OafMapperUtils.qualifier(ModelConstants.SYSIMPORT_CROSSWALK_ENTITYREGISTRY,
ModelConstants.SYSIMPORT_CROSSWALK_ENTITYREGISTRY,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES),
"0.91")));
person.setDateofcollection(op.getLastModifiedDate()); person.setDateofcollection(op.getLastModifiedDate());
person.setOriginalId(Arrays.asList(op.getOrcid())); person.setOriginalId(Arrays.asList(op.getOrcid()));
person.setDataInfo(ORCIDDATAINFO); person.setDataInfo(ORCIDDATAINFO);

View File

@ -667,11 +667,12 @@ case object Crossref2Oaf {
val doi = input.getString(0) val doi = input.getString(0)
val rorId = input.getString(1) val rorId = input.getString(1)
val pubId = s"50|${PidType.doi.toString.padTo(12, "_")}::${DoiCleaningRule.clean(doi)}"
val pubId = IdentifierFactory.idFromPid("50", "doi", DoiCleaningRule.clean(doi), true)
val affId = GenerateRorActionSetJob.calculateOpenaireId(rorId) val affId = GenerateRorActionSetJob.calculateOpenaireId(rorId)
val r: Relation = new Relation val r: Relation = new Relation
DoiCleaningRule.clean(doi)
r.setSource(pubId) r.setSource(pubId)
r.setTarget(affId) r.setTarget(affId)
r.setRelType(ModelConstants.RESULT_ORGANIZATION) r.setRelType(ModelConstants.RESULT_ORGANIZATION)
@ -955,7 +956,26 @@ case object Crossref2Oaf {
case "10.13039/501100010790" => case "10.13039/501100010790" =>
generateSimpleRelationFromAward(funder, "erasmusplus_", a => a) generateSimpleRelationFromAward(funder, "erasmusplus_", a => a)
case _ => logger.debug("no match for " + funder.DOI.get) case _ => logger.debug("no match for " + funder.DOI.get)
//Add for Danish funders
//Independent Research Fund Denmark (IRFD)
case "10.13039/501100004836" =>
generateSimpleRelationFromAward(funder, "irfd________", a => a)
val targetId = getProjectId("irfd________", "1e5e62235d094afd01cd56e65112fc63")
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
//Carlsberg Foundation (CF)
case "10.13039/501100002808" =>
generateSimpleRelationFromAward(funder, "cf__________", a => a)
val targetId = getProjectId("cf__________", "1e5e62235d094afd01cd56e65112fc63")
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
//Novo Nordisk Foundation (NNF)
case "10.13039/501100009708" =>
generateSimpleRelationFromAward(funder, "nnf___________", a => a)
val targetId = getProjectId("nnf_________", "1e5e62235d094afd01cd56e65112fc63")
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
case _ => logger.debug("no match for " + funder.DOI.get)
} }
} else { } else {

View File

@ -135,7 +135,7 @@ public class DedupRecordFactory {
return Collections.emptyIterator(); return Collections.emptyIterator();
} }
OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator()); OafEntity mergedEntity = MergeUtils.mergeGroup(cliques.iterator());
// dedup records do not have date of transformation attribute // dedup records do not have date of transformation attribute
mergedEntity.setDateoftransformation(null); mergedEntity.setDateoftransformation(null);
mergedEntity mergedEntity

View File

@ -69,6 +69,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
Dataset<Relation> mergeRels = spark Dataset<Relation> mergeRels = spark
.read() .read()
.schema(REL_BEAN_ENC.schema())
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) .load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
.as(REL_BEAN_ENC); .as(REL_BEAN_ENC);

View File

@ -46,8 +46,8 @@ class DatasetMergerTest implements Serializable {
} }
@Test @Test
void datasetMergerTest() throws InstantiationException, IllegalAccessException, InvocationTargetException { void datasetMergerTest() {
Dataset pub_merged = MergeUtils.mergeGroup(dedupId, datasets.stream().map(Tuple2::_2).iterator()); Dataset pub_merged = MergeUtils.mergeGroup(datasets.stream().map(Tuple2::_2).iterator());
// verify id // verify id
assertEquals(dedupId, pub_merged.getId()); assertEquals(dedupId, pub_merged.getId());

View File

@ -566,7 +566,26 @@ case object Crossref2Oaf {
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY) queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES) queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
case _ => logger.debug("no match for " + funder.DOI.get) case _ => logger.debug("no match for " + funder.DOI.get)
//Add for Danish funders
//Independent Research Fund Denmark (IRFD)
case "10.13039/501100004836" =>
generateSimpleRelationFromAward(funder, "irfd________", a => a)
val targetId = getProjectId("irfd________", "1e5e62235d094afd01cd56e65112fc63")
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
//Carlsberg Foundation (CF)
case "10.13039/501100002808" =>
generateSimpleRelationFromAward(funder, "cf__________", a => a)
val targetId = getProjectId("cf__________", "1e5e62235d094afd01cd56e65112fc63")
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
//Novo Nordisk Foundation (NNF)
case "10.13039/501100009708" =>
generateSimpleRelationFromAward(funder, "nnf___________", a => a)
val targetId = getProjectId("nnf_________", "1e5e62235d094afd01cd56e65112fc63")
queue += generateRelation(sourceId, targetId, ModelConstants.IS_PRODUCED_BY)
queue += generateRelation(targetId, sourceId, ModelConstants.PRODUCES)
case _ => logger.debug("no match for " + funder.DOI.get)
} }
} else { } else {

View File

@ -153,34 +153,40 @@ public abstract class AbstractMdRecordToOafMapper {
final DataInfo entityInfo = prepareDataInfo(doc, this.invisible); final DataInfo entityInfo = prepareDataInfo(doc, this.invisible);
final long lastUpdateTimestamp = new Date().getTime(); final long lastUpdateTimestamp = new Date().getTime();
final List<Instance> instances = prepareInstances(doc, entityInfo, collectedFrom, hostedBy); final Instance instance = prepareInstances(doc, entityInfo, collectedFrom, hostedBy);
final String type = getResultType(doc, instances); if (!Optional
.ofNullable(instance.getInstancetype())
.map(Qualifier::getClassid)
.filter(StringUtils::isNotBlank)
.isPresent()) {
return Lists.newArrayList();
}
return createOafs(doc, type, instances, collectedFrom, entityInfo, lastUpdateTimestamp); final String type = getResultType(instance);
return createOafs(doc, type, instance, collectedFrom, entityInfo, lastUpdateTimestamp);
} catch (final DocumentException e) { } catch (final DocumentException e) {
log.error("Error with record:\n" + xml); log.error("Error with record:\n" + xml);
return Lists.newArrayList(); return Lists.newArrayList();
} }
} }
protected String getResultType(final Document doc, final List<Instance> instances) { protected String getResultType(final Instance instance) {
final String type = doc.valueOf("//dr:CobjCategory/@type"); if (this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
if (StringUtils.isBlank(type) && this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
final String instanceType = instances
.stream()
.map(i -> i.getInstancetype().getClassid())
.findFirst()
.filter(s -> !UNKNOWN.equalsIgnoreCase(s))
.orElse("0000"); // Unknown
return Optional return Optional
.ofNullable(this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType)) .ofNullable(instance.getInstancetype())
.map(Qualifier::getClassid) .map(Qualifier::getClassid)
.map(
instanceType -> Optional
.ofNullable(
this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType))
.map(Qualifier::getClassid)
.orElse("0000"))
.orElse("0000"); .orElse("0000");
} else {
throw new IllegalStateException("Missing vocabulary: " + ModelConstants.DNET_RESULT_TYPOLOGIES);
} }
return type;
} }
private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) { private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) {
@ -197,12 +203,12 @@ public abstract class AbstractMdRecordToOafMapper {
protected List<Oaf> createOafs( protected List<Oaf> createOafs(
final Document doc, final Document doc,
final String type, final String type,
final List<Instance> instances, final Instance instance,
final KeyValue collectedFrom, final KeyValue collectedFrom,
final DataInfo info, final DataInfo info,
final long lastUpdateTimestamp) { final long lastUpdateTimestamp) {
final OafEntity entity = createEntity(doc, type, instances, collectedFrom, info, lastUpdateTimestamp); final OafEntity entity = createEntity(doc, type, instance, collectedFrom, info, lastUpdateTimestamp);
final Set<String> originalId = Sets.newHashSet(entity.getOriginalId()); final Set<String> originalId = Sets.newHashSet(entity.getOriginalId());
originalId.add(entity.getId()); originalId.add(entity.getId());
@ -235,19 +241,19 @@ public abstract class AbstractMdRecordToOafMapper {
private OafEntity createEntity(final Document doc, private OafEntity createEntity(final Document doc,
final String type, final String type,
final List<Instance> instances, final Instance instance,
final KeyValue collectedFrom, final KeyValue collectedFrom,
final DataInfo info, final DataInfo info,
final long lastUpdateTimestamp) { final long lastUpdateTimestamp) {
switch (type.toLowerCase()) { switch (type.toLowerCase()) {
case "publication": case "publication":
final Publication p = new Publication(); final Publication p = new Publication();
populateResultFields(p, doc, instances, collectedFrom, info, lastUpdateTimestamp); populateResultFields(p, doc, instance, collectedFrom, info, lastUpdateTimestamp);
p.setJournal(prepareJournal(doc, info)); p.setJournal(prepareJournal(doc, info));
return p; return p;
case "dataset": case "dataset":
final Dataset d = new Dataset(); final Dataset d = new Dataset();
populateResultFields(d, doc, instances, collectedFrom, info, lastUpdateTimestamp); populateResultFields(d, doc, instance, collectedFrom, info, lastUpdateTimestamp);
d.setStoragedate(prepareDatasetStorageDate(doc, info)); d.setStoragedate(prepareDatasetStorageDate(doc, info));
d.setDevice(prepareDatasetDevice(doc, info)); d.setDevice(prepareDatasetDevice(doc, info));
d.setSize(prepareDatasetSize(doc, info)); d.setSize(prepareDatasetSize(doc, info));
@ -258,7 +264,7 @@ public abstract class AbstractMdRecordToOafMapper {
return d; return d;
case "software": case "software":
final Software s = new Software(); final Software s = new Software();
populateResultFields(s, doc, instances, collectedFrom, info, lastUpdateTimestamp); populateResultFields(s, doc, instance, collectedFrom, info, lastUpdateTimestamp);
s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info)); s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info));
s.setLicense(prepareSoftwareLicenses(doc, info)); s.setLicense(prepareSoftwareLicenses(doc, info));
s.setCodeRepositoryUrl(prepareSoftwareCodeRepositoryUrl(doc, info)); s.setCodeRepositoryUrl(prepareSoftwareCodeRepositoryUrl(doc, info));
@ -268,7 +274,7 @@ public abstract class AbstractMdRecordToOafMapper {
case "otherresearchproducts": case "otherresearchproducts":
default: default:
final OtherResearchProduct o = new OtherResearchProduct(); final OtherResearchProduct o = new OtherResearchProduct();
populateResultFields(o, doc, instances, collectedFrom, info, lastUpdateTimestamp); populateResultFields(o, doc, instance, collectedFrom, info, lastUpdateTimestamp);
o.setContactperson(prepareOtherResearchProductContactPersons(doc, info)); o.setContactperson(prepareOtherResearchProductContactPersons(doc, info));
o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info)); o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info));
o.setTool(prepareOtherResearchProductTools(doc, info)); o.setTool(prepareOtherResearchProductTools(doc, info));
@ -415,7 +421,7 @@ public abstract class AbstractMdRecordToOafMapper {
private void populateResultFields( private void populateResultFields(
final Result r, final Result r,
final Document doc, final Document doc,
final List<Instance> instances, final Instance instance,
final KeyValue collectedFrom, final KeyValue collectedFrom,
final DataInfo info, final DataInfo info,
final long lastUpdateTimestamp) { final long lastUpdateTimestamp) {
@ -449,8 +455,8 @@ public abstract class AbstractMdRecordToOafMapper {
r.setExternalReference(new ArrayList<>()); // NOT PRESENT IN MDSTORES r.setExternalReference(new ArrayList<>()); // NOT PRESENT IN MDSTORES
r.setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info)); r.setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info));
r.setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info)); r.setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info));
r.setInstance(instances); r.setInstance(Arrays.asList(instance));
r.setBestaccessright(OafMapperUtils.createBestAccessRights(instances)); r.setBestaccessright(OafMapperUtils.createBestAccessRights(Arrays.asList(instance)));
r.setEoscifguidelines(prepareEOSCIfGuidelines(doc, info)); r.setEoscifguidelines(prepareEOSCIfGuidelines(doc, info));
} }
@ -509,7 +515,7 @@ public abstract class AbstractMdRecordToOafMapper {
protected abstract Qualifier prepareResourceType(Document doc, DataInfo info); protected abstract Qualifier prepareResourceType(Document doc, DataInfo info);
protected abstract List<Instance> prepareInstances( protected abstract Instance prepareInstances(
Document doc, Document doc,
DataInfo info, DataInfo info,
KeyValue collectedfrom, KeyValue collectedfrom,

View File

@ -133,7 +133,7 @@ public class GenerateEntitiesApplication extends AbstractMigrationApplication {
inputRdd inputRdd
.keyBy(oaf -> ModelSupport.idFn().apply(oaf)) .keyBy(oaf -> ModelSupport.idFn().apply(oaf))
.groupByKey() .groupByKey()
.map(t -> MergeUtils.mergeGroup(t._1, t._2.iterator())), .map(t -> MergeUtils.mergeGroup(t._2.iterator())),
// .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) // .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
// .reduceByKey(MergeUtils::merge) // .reduceByKey(MergeUtils::merge)
// .map(Tuple2::_2), // .map(Tuple2::_2),

View File

@ -135,7 +135,7 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
} }
@Override @Override
protected List<Instance> prepareInstances( protected Instance prepareInstances(
final Document doc, final Document doc,
final DataInfo info, final DataInfo info,
final KeyValue collectedfrom, final KeyValue collectedfrom,
@ -197,7 +197,7 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
instance.getUrl().addAll(validUrl); instance.getUrl().addAll(validUrl);
} }
return Lists.newArrayList(instance); return instance;
} }
/** /**

View File

@ -126,7 +126,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
} }
@Override @Override
protected List<Instance> prepareInstances( protected Instance prepareInstances(
final Document doc, final Document doc,
final DataInfo info, final DataInfo info,
final KeyValue collectedfrom, final KeyValue collectedfrom,
@ -210,7 +210,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
instance.setUrl(new ArrayList<>()); instance.setUrl(new ArrayList<>());
instance.getUrl().addAll(validUrl); instance.getUrl().addAll(validUrl);
} }
return Arrays.asList(instance); return instance;
} }
protected String trimAndDecodeUrl(String url) { protected String trimAndDecodeUrl(String url) {

View File

@ -51,6 +51,7 @@
<arg>--orcidPath</arg><arg>${orcidPath}</arg> <arg>--orcidPath</arg><arg>${orcidPath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg> <arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--graphPath</arg><arg>${graphPath}</arg> <arg>--graphPath</arg><arg>${graphPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
<arg>--master</arg><arg>yarn</arg> <arg>--master</arg><arg>yarn</arg>
</spark> </spark>
<ok to="reset_outputpath"/> <ok to="reset_outputpath"/>

View File

@ -162,6 +162,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/publication</arg> <arg>--inputPath</arg><arg>${graphInputPath}/publication</arg>
@ -197,6 +198,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=8000 --conf spark.sql.shuffle.partitions=8000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/dataset</arg> <arg>--inputPath</arg><arg>${graphInputPath}/dataset</arg>
@ -232,6 +234,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=5000 --conf spark.sql.shuffle.partitions=5000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/otherresearchproduct</arg> <arg>--inputPath</arg><arg>${graphInputPath}/otherresearchproduct</arg>
@ -267,6 +270,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=2000 --conf spark.sql.shuffle.partitions=2000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/software</arg> <arg>--inputPath</arg><arg>${graphInputPath}/software</arg>
@ -302,6 +306,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=1000 --conf spark.sql.shuffle.partitions=1000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/datasource</arg> <arg>--inputPath</arg><arg>${graphInputPath}/datasource</arg>
@ -337,6 +342,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=1000 --conf spark.sql.shuffle.partitions=1000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/organization</arg> <arg>--inputPath</arg><arg>${graphInputPath}/organization</arg>
@ -372,6 +378,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=2000 --conf spark.sql.shuffle.partitions=2000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/project</arg> <arg>--inputPath</arg><arg>${graphInputPath}/project</arg>
@ -407,6 +414,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=2000 --conf spark.sql.shuffle.partitions=2000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/person</arg> <arg>--inputPath</arg><arg>${graphInputPath}/person</arg>
@ -442,6 +450,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.autoBroadcastJoinThreshold=-1
--conf spark.sql.shuffle.partitions=20000 --conf spark.sql.shuffle.partitions=20000
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/relation</arg> <arg>--inputPath</arg><arg>${graphInputPath}/relation</arg>

View File

@ -133,7 +133,7 @@ object SparkCreateInputGraph {
val ds: Dataset[T] = spark.read.load(sourcePath).as[T] val ds: Dataset[T] = spark.read.load(sourcePath).as[T]
ds.groupByKey(_.getId) ds.groupByKey(_.getId)
.mapGroups { (id, it) => MergeUtils.mergeGroup(id, it.asJava).asInstanceOf[T] } .mapGroups { (id, it) => MergeUtils.mergeGroup(it.asJava).asInstanceOf[T] }
// .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] } // .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] }
// .map(_) // .map(_)
.write .write