diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java index e4d458780..6b5bed5b8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java @@ -3,7 +3,6 @@ package eu.dnetlib.dhp.actionmanager.ror; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION; -import static eu.dnetlib.dhp.schema.common.ModelConstants.ORG_ORG_RELTYPE; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listKeyValues; @@ -39,7 +38,6 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.actionmanager.ror.model.ExternalIdType; -import eu.dnetlib.dhp.actionmanager.ror.model.Relationship; import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; @@ -51,7 +49,6 @@ import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; @@ -168,38 +165,10 @@ public class GenerateRorActionSetJob { final List> res = new ArrayList<>(); res.add(new AtomicAction<>(Organization.class, o)); - for (final Relationship rorRel : r.getRelationships()) { - if (rorRel.getType().equalsIgnoreCase("parent")) { - final String orgId1 = calculateOpenaireId(r.getId()); - final String orgId2 = calculateOpenaireId(rorRel.getId()); - res - .add( - new AtomicAction<>(Relation.class, - calculateHierarchyRel(orgId1, orgId2, ModelConstants.IS_PARENT_OF))); - res - .add( - new AtomicAction<>(Relation.class, - calculateHierarchyRel(orgId2, orgId1, ModelConstants.IS_CHILD_OF))); - } - } - return res; } - private static Relation calculateHierarchyRel(final String source, final String target, final String relClass) { - final Relation rel = new Relation(); - rel.setSource(source); - rel.setTarget(target); - rel.setRelType(ORG_ORG_RELTYPE); - rel.setSubRelType(ModelConstants.RELATIONSHIP); - rel.setRelClass(relClass); - rel.setCollectedfrom(ROR_COLLECTED_FROM); - rel.setDataInfo(ROR_DATA_INFO); - rel.setLastupdatetimestamp(System.currentTimeMillis()); - return rel; - } - private static String calculateOpenaireId(final String rorId) { return String.format("20|%s::%s", ROR_NS_PREFIX, DHPUtils.md5(rorId)); } diff --git a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index 3315fc41d..72b93749b 100644 --- a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -59,6 +59,54 @@ object SparkGenerateDoiBoost { val workingDirPath = parser.get("workingPath") val openaireOrganizationPath = parser.get("openaireOrganizationPath") + val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable { + override def zero: Publication = null + + override def reduce(b: Publication, a: (String, Publication)): Publication = { + + if (b == null) { + if (a != null && a._2 != null) { + a._2.setId(a._1) + return a._2 + } + } else { + if (a != null && a._2 != null) { + b.mergeOAFDataInfo(a._2) + b.mergeFrom(a._2) + b.setId(a._1) + val authors = AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor) + b.setAuthor(authors) + return b + } + } + new Publication + } + + override def merge(b1: Publication, b2: Publication): Publication = { + if (b1 == null) { + if (b2 != null) + return b2 + } else { + if (b2 != null) { + b1.mergeOAFDataInfo(b2) + b1.mergeFrom(b2) + val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor) + b1.setAuthor(authors) + if (b2.getId != null && b2.getId.nonEmpty) + b1.setId(b2.getId) + return b1 + } + } + new Publication + } + + override def finish(reduction: Publication): Publication = reduction + + override def bufferEncoder: Encoder[Publication] = Encoders.kryo[Publication] + + override def outputEncoder: Encoder[Publication] = Encoders.kryo[Publication] + } + implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization] implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset] @@ -284,4 +332,4 @@ object SparkGenerateDoiBoost { .save(s"$workingDirPath/doiBoostOrganization") } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 1a0afb981..ad1e8721a 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.ArrayList; +import java.util.Objects; import java.util.Optional; import org.apache.commons.io.IOUtils; @@ -102,6 +103,7 @@ public class SparkBulkTagJob { ResultTagger resultTagger = new ResultTagger(); readPath(spark, inputPath, resultClazz) .map(patchResult(), Encoders.bean(resultClazz)) + .filter(Objects::nonNull) .map( (MapFunction) value -> resultTagger .enrichContextCriteria(