1
0
Fork 0
This commit is contained in:
Miriam Baglioni 2022-06-06 11:52:29 +02:00
commit 31d4557e8d
6 changed files with 49 additions and 45 deletions

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.actionmanager.ror;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION; import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ORG_ORG_RELTYPE;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listKeyValues; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listKeyValues;
@ -39,7 +38,6 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.ror.model.ExternalIdType; import eu.dnetlib.dhp.actionmanager.ror.model.ExternalIdType;
import eu.dnetlib.dhp.actionmanager.ror.model.Relationship;
import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization; import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
@ -51,7 +49,6 @@ import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2; import scala.Tuple2;
@ -168,38 +165,10 @@ public class GenerateRorActionSetJob {
final List<AtomicAction<? extends Oaf>> res = new ArrayList<>(); final List<AtomicAction<? extends Oaf>> res = new ArrayList<>();
res.add(new AtomicAction<>(Organization.class, o)); res.add(new AtomicAction<>(Organization.class, o));
for (final Relationship rorRel : r.getRelationships()) {
if (rorRel.getType().equalsIgnoreCase("parent")) {
final String orgId1 = calculateOpenaireId(r.getId());
final String orgId2 = calculateOpenaireId(rorRel.getId());
res
.add(
new AtomicAction<>(Relation.class,
calculateHierarchyRel(orgId1, orgId2, ModelConstants.IS_PARENT_OF)));
res
.add(
new AtomicAction<>(Relation.class,
calculateHierarchyRel(orgId2, orgId1, ModelConstants.IS_CHILD_OF)));
}
}
return res; return res;
} }
private static Relation calculateHierarchyRel(final String source, final String target, final String relClass) {
final Relation rel = new Relation();
rel.setSource(source);
rel.setTarget(target);
rel.setRelType(ORG_ORG_RELTYPE);
rel.setSubRelType(ModelConstants.RELATIONSHIP);
rel.setRelClass(relClass);
rel.setCollectedfrom(ROR_COLLECTED_FROM);
rel.setDataInfo(ROR_DATA_INFO);
rel.setLastupdatetimestamp(System.currentTimeMillis());
return rel;
}
private static String calculateOpenaireId(final String rorId) { private static String calculateOpenaireId(final String rorId) {
return String.format("20|%s::%s", ROR_NS_PREFIX, DHPUtils.md5(rorId)); return String.format("20|%s::%s", ROR_NS_PREFIX, DHPUtils.md5(rorId));
} }

View File

@ -15,7 +15,7 @@
"official_name": "Aperta TÜBİTAK Open Archive" "official_name": "Aperta TÜBİTAK Open Archive"
}, },
"BL.CAM": { "BL.CAM": {
"openaire_id": "re3data_____::r3d100010620", "openaire_id": "opendoar____::109",
"datacite_name": "Apollo", "datacite_name": "Apollo",
"official_name": "Apollo" "official_name": "Apollo"
}, },
@ -196,7 +196,7 @@
}, },
"CSIC.DIGITAL": { "CSIC.DIGITAL": {
"openaire_id": "re3data_____::r3d100011076", "openaire_id": "re3data_____::r3d100011076",
"datacite_name": "DIGITAL.CSIC", "datacite_name": "Digital CSIC",
"official_name": "DIGITAL.CSIC" "official_name": "DIGITAL.CSIC"
}, },
"BL.DRI": { "BL.DRI": {
@ -644,6 +644,11 @@
"datacite_name": "PANGAEA", "datacite_name": "PANGAEA",
"official_name": "PANGAEA" "official_name": "PANGAEA"
}, },
"TIB.PANGAEA": {
"openaire_id": "re3data_____::r3d100010134",
"datacite_name": "PANGAEA",
"official_name": "PANGAEA"
},
"NASAPDS.NASAPDS": { "NASAPDS.NASAPDS": {
"openaire_id": "re3data_____::r3d100010121", "openaire_id": "re3data_____::r3d100010121",
"datacite_name": "PDS", "datacite_name": "PDS",
@ -896,7 +901,7 @@
}, },
"FIGSHARE.UCT": { "FIGSHARE.UCT": {
"openaire_id": "re3data_____::r3d100012633", "openaire_id": "re3data_____::r3d100012633",
"datacite_name": "ZivaHub", "datacite_name": "University of Cape Town (UCT)",
"official_name": "ZivaHub" "official_name": "ZivaHub"
}, },
"BL.UCLAN": { "BL.UCLAN": {
@ -1030,9 +1035,9 @@
"official_name": "ZBW Journal Data Archive" "official_name": "ZBW Journal Data Archive"
}, },
"CERN.ZENODO": { "CERN.ZENODO": {
"openaire_id": "re3data_____::r3d100010468", "openaire_id": "opendoar____::2659",
"datacite_name": "Zenodo", "datacite_name": "Zenodo",
"official_name": "Zenodo" "official_name": "ZENODO"
}, },
"ZBW.ZEW": { "ZBW.ZEW": {
"openaire_id": "re3data_____::r3d100010399", "openaire_id": "re3data_____::r3d100010399",

View File

@ -60,7 +60,7 @@ object SparkGenerateDoiBoost {
val openaireOrganizationPath = parser.get("openaireOrganizationPath") val openaireOrganizationPath = parser.get("openaireOrganizationPath")
val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable { val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable {
override def zero: Publication = new Publication override def zero: Publication = null
override def reduce(b: Publication, a: (String, Publication)): Publication = { override def reduce(b: Publication, a: (String, Publication)): Publication = {
@ -177,8 +177,33 @@ object SparkGenerateDoiBoost {
.map(DoiBoostMappingUtil.fixPublication) .map(DoiBoostMappingUtil.fixPublication)
.map(p => (p.getId, p)) .map(p => (p.getId, p))
.groupByKey(_._1) .groupByKey(_._1)
.agg(crossrefAggregator.toColumn) .reduceGroups((left, right) => {
.map(p => p._2) //Check left is not null
if (left != null && left._1 != null) {
//If right is null then return left
if (right == null || right._2 == null)
left
else {
// Here Left and Right are not null
// So we have to merge
val b1 = left._2
val b2 = right._2
b1.mergeFrom(b2)
b1.mergeOAFDataInfo(b2)
val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor)
b1.setAuthor(authors)
if (b2.getId != null && b2.getId.nonEmpty)
b1.setId(b2.getId)
//Return publication Merged
(b1.getId, b1)
}
} else {
// Left is Null so we return right
right
}
})
.filter(s => s != null && s._2 != null)
.map(s => s._2._2)
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(s"$workingDirPath/doiBoostPublicationFiltered") .save(s"$workingDirPath/doiBoostPublicationFiltered")

View File

@ -711,10 +711,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
final Relation r = new Relation(); final Relation r = new Relation();
r.setRelType(ORG_ORG_RELTYPE); r.setRelType(ORG_ORG_RELTYPE);
r.setSubRelType(ModelConstants.RELATIONSHIP); r.setSubRelType(ModelConstants.RELATIONSHIP);
r r.setRelClass(rs.getString("type"));
.setRelClass(
rs.getString("type").equalsIgnoreCase("parent") ? ModelConstants.IS_PARENT_OF
: ModelConstants.IS_CHILD_OF);
r.setSource(orgId1); r.setSource(orgId1);
r.setTarget(orgId2); r.setTarget(orgId2);
r.setCollectedfrom(collectedFrom); r.setCollectedfrom(collectedFrom);

View File

@ -283,7 +283,15 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait_dispatch" to="copy_relation"/> <join name="wait_dispatch" to="delete_target_relation"/>
<action name="delete_target_relation">
<fs>
<delete path="${nameNode}/${graphOutputPath}/relation"/>
</fs>
<ok to="copy_relation"/>
<error to="Kill"/>
</action>
<action name="copy_relation"> <action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">

View File

@ -10,4 +10,4 @@ SELECT
'OpenOrgs Database' AS collectedfromname, 'OpenOrgs Database' AS collectedfromname,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction
FROM relationships FROM relationships
WHERE reltype = 'Child' OR reltype = 'Parent' WHERE reltype = 'IsChildOf' OR reltype = 'IsParentOf'