resolving conflicts

This commit is contained in:
Miriam Baglioni 2022-06-06 11:54:33 +02:00
commit 2a77ebb431
3 changed files with 51 additions and 32 deletions

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.actionmanager.ror;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ORG_ORG_RELTYPE;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listKeyValues;
@ -39,7 +38,6 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.ror.model.ExternalIdType;
import eu.dnetlib.dhp.actionmanager.ror.model.Relationship;
import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
@ -51,7 +49,6 @@ import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2;
@ -168,38 +165,10 @@ public class GenerateRorActionSetJob {
final List<AtomicAction<? extends Oaf>> res = new ArrayList<>();
res.add(new AtomicAction<>(Organization.class, o));
for (final Relationship rorRel : r.getRelationships()) {
if (rorRel.getType().equalsIgnoreCase("parent")) {
final String orgId1 = calculateOpenaireId(r.getId());
final String orgId2 = calculateOpenaireId(rorRel.getId());
res
.add(
new AtomicAction<>(Relation.class,
calculateHierarchyRel(orgId1, orgId2, ModelConstants.IS_PARENT_OF)));
res
.add(
new AtomicAction<>(Relation.class,
calculateHierarchyRel(orgId2, orgId1, ModelConstants.IS_CHILD_OF)));
}
}
return res;
}
private static Relation calculateHierarchyRel(final String source, final String target, final String relClass) {
final Relation rel = new Relation();
rel.setSource(source);
rel.setTarget(target);
rel.setRelType(ORG_ORG_RELTYPE);
rel.setSubRelType(ModelConstants.RELATIONSHIP);
rel.setRelClass(relClass);
rel.setCollectedfrom(ROR_COLLECTED_FROM);
rel.setDataInfo(ROR_DATA_INFO);
rel.setLastupdatetimestamp(System.currentTimeMillis());
return rel;
}
private static String calculateOpenaireId(final String rorId) {
return String.format("20|%s::%s", ROR_NS_PREFIX, DHPUtils.md5(rorId));
}

View File

@ -59,6 +59,54 @@ object SparkGenerateDoiBoost {
val workingDirPath = parser.get("workingPath")
val openaireOrganizationPath = parser.get("openaireOrganizationPath")
val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable {
override def zero: Publication = null
override def reduce(b: Publication, a: (String, Publication)): Publication = {
if (b == null) {
if (a != null && a._2 != null) {
a._2.setId(a._1)
return a._2
}
} else {
if (a != null && a._2 != null) {
b.mergeOAFDataInfo(a._2)
b.mergeFrom(a._2)
b.setId(a._1)
val authors = AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor)
b.setAuthor(authors)
return b
}
}
new Publication
}
override def merge(b1: Publication, b2: Publication): Publication = {
if (b1 == null) {
if (b2 != null)
return b2
} else {
if (b2 != null) {
b1.mergeOAFDataInfo(b2)
b1.mergeFrom(b2)
val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor)
b1.setAuthor(authors)
if (b2.getId != null && b2.getId.nonEmpty)
b1.setId(b2.getId)
return b1
}
}
new Publication
}
override def finish(reduction: Publication): Publication = reduction
override def bufferEncoder: Encoder[Publication] = Encoders.kryo[Publication]
override def outputEncoder: Encoder[Publication] = Encoders.kryo[Publication]
}
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization]
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
@ -284,4 +332,4 @@ object SparkGenerateDoiBoost {
.save(s"$workingDirPath/doiBoostOrganization")
}
}
}

View File

@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
@ -102,6 +103,7 @@ public class SparkBulkTagJob {
ResultTagger resultTagger = new ResultTagger();
readPath(spark, inputPath, resultClazz)
.map(patchResult(), Encoders.bean(resultClazz))
.filter(Objects::nonNull)
.map(
(MapFunction<R, R>) value -> resultTagger
.enrichContextCriteria(