diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java index 869e1cb68..e4d458780 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.actionmanager.ror; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION; +import static eu.dnetlib.dhp.schema.common.ModelConstants.ORG_ORG_RELTYPE; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listKeyValues; @@ -29,8 +30,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; @@ -38,8 +38,8 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob; import eu.dnetlib.dhp.actionmanager.ror.model.ExternalIdType; +import eu.dnetlib.dhp.actionmanager.ror.model.Relationship; import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; @@ -48,8 +48,10 @@ import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; @@ -112,24 +114,21 @@ public class GenerateRorActionSetJob { final String outputPath) throws IOException { readInputPath(spark, inputPath) - .map( - (MapFunction) GenerateRorActionSetJob::convertRorOrg, - Encoders.bean(Organization.class)) - .toJavaRDD() - .map(o -> new AtomicAction<>(Organization.class, o)) + .map(GenerateRorActionSetJob::convertRorOrg) + .flatMap(List::iterator) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa)))) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); } - protected static Organization convertRorOrg(final RorOrganization r) { + protected static List> convertRorOrg(final RorOrganization r) { final Date now = new Date(); final Organization o = new Organization(); - o.setId(String.format("20|%s::%s", ROR_NS_PREFIX, DHPUtils.md5(r.getId()))); + o.setId(calculateOpenaireId(r.getId())); o.setOriginalId(Arrays.asList(String.format("%s::%s", ROR_NS_PREFIX, r.getId()))); o.setCollectedfrom(ROR_COLLECTED_FROM); o.setPid(pids(r)); @@ -166,7 +165,43 @@ public class GenerateRorActionSetJob { o.setDataInfo(ROR_DATA_INFO); o.setLastupdatetimestamp(now.getTime()); - return o; + final List> res = new ArrayList<>(); + res.add(new AtomicAction<>(Organization.class, o)); + + for (final Relationship rorRel : r.getRelationships()) { + if (rorRel.getType().equalsIgnoreCase("parent")) { + final String orgId1 = calculateOpenaireId(r.getId()); + final String orgId2 = calculateOpenaireId(rorRel.getId()); + res + .add( + new AtomicAction<>(Relation.class, + calculateHierarchyRel(orgId1, orgId2, ModelConstants.IS_PARENT_OF))); + res + .add( + new AtomicAction<>(Relation.class, + calculateHierarchyRel(orgId2, orgId1, ModelConstants.IS_CHILD_OF))); + } + } + + return res; + + } + + private static Relation calculateHierarchyRel(final String source, final String target, final String relClass) { + final Relation rel = new Relation(); + rel.setSource(source); + rel.setTarget(target); + rel.setRelType(ORG_ORG_RELTYPE); + rel.setSubRelType(ModelConstants.RELATIONSHIP); + rel.setRelClass(relClass); + rel.setCollectedfrom(ROR_COLLECTED_FROM); + rel.setDataInfo(ROR_DATA_INFO); + rel.setLastupdatetimestamp(System.currentTimeMillis()); + return rel; + } + + private static String calculateOpenaireId(final String rorId) { + return String.format("20|%s::%s", ROR_NS_PREFIX, DHPUtils.md5(rorId)); } private static List pids(final RorOrganization r) { @@ -202,14 +237,14 @@ public class GenerateRorActionSetJob { .collect(Collectors.toList()); } - private static Dataset readInputPath( + private static JavaRDD readInputPath( final SparkSession spark, final String path) throws IOException { try (final FileSystem fileSystem = FileSystem.get(new Configuration()); final InputStream is = fileSystem.open(new Path(path))) { final RorOrganization[] arr = OBJECT_MAPPER.readValue(is, RorOrganization[].class); - return spark.createDataset(Arrays.asList(arr), Encoders.bean(RorOrganization.class)); + return spark.createDataset(Arrays.asList(arr), Encoders.bean(RorOrganization.class)).toJavaRDD(); } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJobTest.java index aa11f4ab5..d50c1d5f3 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJobTest.java @@ -1,7 +1,10 @@ package eu.dnetlib.dhp.actionmanager.ror; +import static org.junit.jupiter.api.Assertions.assertEquals; + import java.io.FileInputStream; +import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -13,9 +16,12 @@ import org.junit.jupiter.api.Test; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Relation; -@Disabled class GenerateRorActionSetJobTest { private static final ObjectMapper mapper = new ObjectMapper(); @@ -30,21 +36,40 @@ class GenerateRorActionSetJobTest { void testConvertRorOrg() throws Exception { final RorOrganization r = mapper .readValue(IOUtils.toString(getClass().getResourceAsStream("ror_org.json")), RorOrganization.class); - final Organization org = GenerateRorActionSetJob.convertRorOrg(r); + final List> aas = GenerateRorActionSetJob.convertRorOrg(r); + + Assertions.assertEquals(3, aas.size()); + assertEquals(Organization.class, aas.get(0).getClazz()); + assertEquals(Relation.class, aas.get(1).getClazz()); + assertEquals(Relation.class, aas.get(2).getClazz()); + + final Organization o = (Organization) aas.get(0).getPayload(); + final Relation r1 = (Relation) aas.get(1).getPayload(); + final Relation r2 = (Relation) aas.get(2).getPayload(); + + assertEquals(o.getId(), r1.getSource()); + assertEquals(r1.getSource(), r2.getTarget()); + assertEquals(r2.getSource(), r1.getTarget()); + assertEquals(ModelConstants.IS_PARENT_OF, r1.getRelClass()); + assertEquals(ModelConstants.IS_CHILD_OF, r2.getRelClass()); + + System.out.println(mapper.writeValueAsString(o)); + System.out.println(mapper.writeValueAsString(r1)); + System.out.println(mapper.writeValueAsString(r2)); - final String s = mapper.writeValueAsString(org); - Assertions.assertTrue(StringUtils.isNotBlank(s)); - System.out.println(s); } @Test + @Disabled void testConvertAllRorOrg() throws Exception { final RorOrganization[] arr = mapper .readValue(IOUtils.toString(new FileInputStream(local_file_path)), RorOrganization[].class); for (final RorOrganization r : arr) { - Organization o = GenerateRorActionSetJob.convertRorOrg(r); - Assertions.assertNotNull(o); + final List> aas = GenerateRorActionSetJob.convertRorOrg(r); + Assertions.assertFalse(aas.isEmpty()); + Assertions.assertNotNull(aas.get(0)); + final Organization o = (Organization) aas.get(0).getPayload(); Assertions.assertTrue(StringUtils.isNotBlank(o.getId())); } } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/ror/ror_org.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/ror/ror_org.json index d2b4fa64b..2bd79d06d 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/ror/ror_org.json +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/ror/ror_org.json @@ -1,123 +1,94 @@ -{ - "ip_addresses": [], - "aliases": [], - "acronyms": [ - "ANU" - ], - "links": [ - "http://www.anu.edu.au/" - ], - "country": { - "country_code": "AU", - "country_name": "Australia" - }, - "name": "Australian National University", - "wikipedia_url": "http://en.wikipedia.org/wiki/Australian_National_University", - "addresses": [ - { - "lat": -35.2778, - "state_code": "AU-ACT", - "country_geonames_id": 2077456, - "lng": 149.1205, - "state": "Australian Capital Territory", - "city": "Canberra", - "geonames_city": { - "nuts_level2": { - "name": null, - "code": null - }, - "geonames_admin2": { - "ascii_name": null, - "id": null, - "name": null, - "code": null - }, - "geonames_admin1": { - "ascii_name": "ACT", - "id": 2177478, - "name": "ACT", - "code": "AU.01" - }, - "city": "Canberra", - "id": 2172517, - "nuts_level1": { - "name": null, - "code": null - }, - "nuts_level3": { - "name": null, - "code": null - }, - "license": { - "attribution": "Data from geonames.org under a CC-BY 3.0 license", - "license": "http://creativecommons.org/licenses/by/3.0/" - } - }, - "postcode": null, - "primary": false, - "line": null - } - ], - "types": [ - "Education" - ], - "established": 1946, - "relationships": [ - { - "type": "Related", - "id": "https://ror.org/041c7s516", - "label": "Calvary Hospital" - }, - { - "type": "Related", - "id": "https://ror.org/04h7nbn38", - "label": "Canberra Hospital" - }, - { - "type": "Related", - "id": "https://ror.org/030jpqj15", - "label": "Goulburn Base Hospital" - }, - { - "type": "Child", - "id": "https://ror.org/006a4jj40", - "label": "Mount Stromlo Observatory" - } - ], - "email_address": null, - "external_ids": { - "Wikidata": { - "all": [ - "Q127990" - ], - "preferred": null - }, - "OrgRef": { - "all": [ - "285106" - ], - "preferred": null - }, - "ISNI": { - "all": [ - "0000 0001 2180 7477" - ], - "preferred": null - }, - "FundRef": { - "all": [ - "501100000995", - "501100001151", - "100009020" - ], - "preferred": "501100000995" - }, - "GRID": { - "all": "grid.1001.0", - "preferred": "grid.1001.0" - } - }, - "id": "https://ror.org/019wvm592", - "labels": [], - "status": "active" + { + "ip_addresses": [], + "aliases": [], + "acronyms": [ + "MSO" + ], + "links": [ + "https://rsaa.anu.edu.au/observatories/mount-stromlo-observatory" + ], + "country": { + "country_code": "AU", + "country_name": "Australia" + }, + "name": "Mount Stromlo Observatory", + "wikipedia_url": "https://en.wikipedia.org/wiki/Mount_Stromlo_Observatory", + "addresses": [ + { + "lat": -35.320278, + "state_code": "AU-ACT", + "country_geonames_id": 2077456, + "lng": 149.006944, + "state": "Australian Capital Territory", + "city": "Canberra", + "geonames_city": { + "nuts_level2": { + "name": null, + "code": null + }, + "geonames_admin2": { + "ascii_name": null, + "id": null, + "name": null, + "code": null + }, + "geonames_admin1": { + "ascii_name": "ACT", + "id": 2177478, + "name": "ACT", + "code": "AU.01" + }, + "city": "Canberra", + "id": 2172517, + "nuts_level1": { + "name": null, + "code": null + }, + "nuts_level3": { + "name": null, + "code": null + }, + "license": { + "attribution": "Data from geonames.org under a CC-BY 3.0 license", + "license": "http://creativecommons.org/licenses/by/3.0/" + } + }, + "postcode": null, + "primary": false, + "line": null + } + ], + "types": [ + "Education" + ], + "established": 1924, + "relationships": [ + { + "type": "Parent", + "id": "https://ror.org/019wvm592", + "label": "Australian National University" + } + ], + "email_address": null, + "external_ids": { + "ISNI": { + "all": [ + "0000 0004 0459 2816" + ], + "preferred": null + }, + "Wikidata": { + "all": [ + "Q1310548" + ], + "preferred": null + }, + "GRID": { + "all": "grid.440325.4", + "preferred": "grid.440325.4" + } + }, + "id": "https://ror.org/006a4jj40", + "labels": [], + "status": "active" } \ No newline at end of file