Merge branch 'beta' of code-repo.d4science.org:D-Net/dnet-hadoop into beta

This commit is contained in:
Sandro La Bruzzo 2022-02-18 11:11:32 +01:00
commit 891781ee3f
3 changed files with 173 additions and 142 deletions

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.actionmanager.ror;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION; import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ORG_ORG_RELTYPE;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listKeyValues; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listKeyValues;
@ -29,8 +30,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -38,8 +38,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob;
import eu.dnetlib.dhp.actionmanager.ror.model.ExternalIdType; import eu.dnetlib.dhp.actionmanager.ror.model.ExternalIdType;
import eu.dnetlib.dhp.actionmanager.ror.model.Relationship;
import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization; import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
@ -48,8 +48,10 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2; import scala.Tuple2;
@ -112,24 +114,21 @@ public class GenerateRorActionSetJob {
final String outputPath) throws IOException { final String outputPath) throws IOException {
readInputPath(spark, inputPath) readInputPath(spark, inputPath)
.map( .map(GenerateRorActionSetJob::convertRorOrg)
(MapFunction<RorOrganization, Organization>) GenerateRorActionSetJob::convertRorOrg, .flatMap(List::iterator)
Encoders.bean(Organization.class))
.toJavaRDD()
.map(o -> new AtomicAction<>(Organization.class, o))
.mapToPair( .mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa)))) new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
} }
protected static Organization convertRorOrg(final RorOrganization r) { protected static List<AtomicAction<? extends Oaf>> convertRorOrg(final RorOrganization r) {
final Date now = new Date(); final Date now = new Date();
final Organization o = new Organization(); final Organization o = new Organization();
o.setId(String.format("20|%s::%s", ROR_NS_PREFIX, DHPUtils.md5(r.getId()))); o.setId(calculateOpenaireId(r.getId()));
o.setOriginalId(Arrays.asList(String.format("%s::%s", ROR_NS_PREFIX, r.getId()))); o.setOriginalId(Arrays.asList(String.format("%s::%s", ROR_NS_PREFIX, r.getId())));
o.setCollectedfrom(ROR_COLLECTED_FROM); o.setCollectedfrom(ROR_COLLECTED_FROM);
o.setPid(pids(r)); o.setPid(pids(r));
@ -166,7 +165,43 @@ public class GenerateRorActionSetJob {
o.setDataInfo(ROR_DATA_INFO); o.setDataInfo(ROR_DATA_INFO);
o.setLastupdatetimestamp(now.getTime()); o.setLastupdatetimestamp(now.getTime());
return o; final List<AtomicAction<? extends Oaf>> res = new ArrayList<>();
res.add(new AtomicAction<>(Organization.class, o));
for (final Relationship rorRel : r.getRelationships()) {
if (rorRel.getType().equalsIgnoreCase("parent")) {
final String orgId1 = calculateOpenaireId(r.getId());
final String orgId2 = calculateOpenaireId(rorRel.getId());
res
.add(
new AtomicAction<>(Relation.class,
calculateHierarchyRel(orgId1, orgId2, ModelConstants.IS_PARENT_OF)));
res
.add(
new AtomicAction<>(Relation.class,
calculateHierarchyRel(orgId2, orgId1, ModelConstants.IS_CHILD_OF)));
}
}
return res;
}
private static Relation calculateHierarchyRel(final String source, final String target, final String relClass) {
final Relation rel = new Relation();
rel.setSource(source);
rel.setTarget(target);
rel.setRelType(ORG_ORG_RELTYPE);
rel.setSubRelType(ModelConstants.RELATIONSHIP);
rel.setRelClass(relClass);
rel.setCollectedfrom(ROR_COLLECTED_FROM);
rel.setDataInfo(ROR_DATA_INFO);
rel.setLastupdatetimestamp(System.currentTimeMillis());
return rel;
}
private static String calculateOpenaireId(final String rorId) {
return String.format("20|%s::%s", ROR_NS_PREFIX, DHPUtils.md5(rorId));
} }
private static List<StructuredProperty> pids(final RorOrganization r) { private static List<StructuredProperty> pids(final RorOrganization r) {
@ -202,14 +237,14 @@ public class GenerateRorActionSetJob {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private static Dataset<RorOrganization> readInputPath( private static JavaRDD<RorOrganization> readInputPath(
final SparkSession spark, final SparkSession spark,
final String path) throws IOException { final String path) throws IOException {
try (final FileSystem fileSystem = FileSystem.get(new Configuration()); try (final FileSystem fileSystem = FileSystem.get(new Configuration());
final InputStream is = fileSystem.open(new Path(path))) { final InputStream is = fileSystem.open(new Path(path))) {
final RorOrganization[] arr = OBJECT_MAPPER.readValue(is, RorOrganization[].class); final RorOrganization[] arr = OBJECT_MAPPER.readValue(is, RorOrganization[].class);
return spark.createDataset(Arrays.asList(arr), Encoders.bean(RorOrganization.class)); return spark.createDataset(Arrays.asList(arr), Encoders.bean(RorOrganization.class)).toJavaRDD();
} }
} }

View File

@ -1,7 +1,10 @@
package eu.dnetlib.dhp.actionmanager.ror; package eu.dnetlib.dhp.actionmanager.ror;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.util.List;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -13,9 +16,12 @@ import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization; import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
@Disabled
class GenerateRorActionSetJobTest { class GenerateRorActionSetJobTest {
private static final ObjectMapper mapper = new ObjectMapper(); private static final ObjectMapper mapper = new ObjectMapper();
@ -30,21 +36,40 @@ class GenerateRorActionSetJobTest {
void testConvertRorOrg() throws Exception { void testConvertRorOrg() throws Exception {
final RorOrganization r = mapper final RorOrganization r = mapper
.readValue(IOUtils.toString(getClass().getResourceAsStream("ror_org.json")), RorOrganization.class); .readValue(IOUtils.toString(getClass().getResourceAsStream("ror_org.json")), RorOrganization.class);
final Organization org = GenerateRorActionSetJob.convertRorOrg(r); final List<AtomicAction<? extends Oaf>> aas = GenerateRorActionSetJob.convertRorOrg(r);
Assertions.assertEquals(3, aas.size());
assertEquals(Organization.class, aas.get(0).getClazz());
assertEquals(Relation.class, aas.get(1).getClazz());
assertEquals(Relation.class, aas.get(2).getClazz());
final Organization o = (Organization) aas.get(0).getPayload();
final Relation r1 = (Relation) aas.get(1).getPayload();
final Relation r2 = (Relation) aas.get(2).getPayload();
assertEquals(o.getId(), r1.getSource());
assertEquals(r1.getSource(), r2.getTarget());
assertEquals(r2.getSource(), r1.getTarget());
assertEquals(ModelConstants.IS_PARENT_OF, r1.getRelClass());
assertEquals(ModelConstants.IS_CHILD_OF, r2.getRelClass());
System.out.println(mapper.writeValueAsString(o));
System.out.println(mapper.writeValueAsString(r1));
System.out.println(mapper.writeValueAsString(r2));
final String s = mapper.writeValueAsString(org);
Assertions.assertTrue(StringUtils.isNotBlank(s));
System.out.println(s);
} }
@Test @Test
@Disabled
void testConvertAllRorOrg() throws Exception { void testConvertAllRorOrg() throws Exception {
final RorOrganization[] arr = mapper final RorOrganization[] arr = mapper
.readValue(IOUtils.toString(new FileInputStream(local_file_path)), RorOrganization[].class); .readValue(IOUtils.toString(new FileInputStream(local_file_path)), RorOrganization[].class);
for (final RorOrganization r : arr) { for (final RorOrganization r : arr) {
Organization o = GenerateRorActionSetJob.convertRorOrg(r); final List<AtomicAction<? extends Oaf>> aas = GenerateRorActionSetJob.convertRorOrg(r);
Assertions.assertNotNull(o); Assertions.assertFalse(aas.isEmpty());
Assertions.assertNotNull(aas.get(0));
final Organization o = (Organization) aas.get(0).getPayload();
Assertions.assertTrue(StringUtils.isNotBlank(o.getId())); Assertions.assertTrue(StringUtils.isNotBlank(o.getId()));
} }
} }

View File

@ -1,24 +1,24 @@
{ {
"ip_addresses": [], "ip_addresses": [],
"aliases": [], "aliases": [],
"acronyms": [ "acronyms": [
"ANU" "MSO"
], ],
"links": [ "links": [
"http://www.anu.edu.au/" "https://rsaa.anu.edu.au/observatories/mount-stromlo-observatory"
], ],
"country": { "country": {
"country_code": "AU", "country_code": "AU",
"country_name": "Australia" "country_name": "Australia"
}, },
"name": "Australian National University", "name": "Mount Stromlo Observatory",
"wikipedia_url": "http://en.wikipedia.org/wiki/Australian_National_University", "wikipedia_url": "https://en.wikipedia.org/wiki/Mount_Stromlo_Observatory",
"addresses": [ "addresses": [
{ {
"lat": -35.2778, "lat": -35.320278,
"state_code": "AU-ACT", "state_code": "AU-ACT",
"country_geonames_id": 2077456, "country_geonames_id": 2077456,
"lng": 149.1205, "lng": 149.006944,
"state": "Australian Capital Territory", "state": "Australian Capital Territory",
"city": "Canberra", "city": "Canberra",
"geonames_city": { "geonames_city": {
@ -61,63 +61,34 @@
"types": [ "types": [
"Education" "Education"
], ],
"established": 1946, "established": 1924,
"relationships": [ "relationships": [
{ {
"type": "Related", "type": "Parent",
"id": "https://ror.org/041c7s516", "id": "https://ror.org/019wvm592",
"label": "Calvary Hospital" "label": "Australian National University"
},
{
"type": "Related",
"id": "https://ror.org/04h7nbn38",
"label": "Canberra Hospital"
},
{
"type": "Related",
"id": "https://ror.org/030jpqj15",
"label": "Goulburn Base Hospital"
},
{
"type": "Child",
"id": "https://ror.org/006a4jj40",
"label": "Mount Stromlo Observatory"
} }
], ],
"email_address": null, "email_address": null,
"external_ids": { "external_ids": {
"Wikidata": {
"all": [
"Q127990"
],
"preferred": null
},
"OrgRef": {
"all": [
"285106"
],
"preferred": null
},
"ISNI": { "ISNI": {
"all": [ "all": [
"0000 0001 2180 7477" "0000 0004 0459 2816"
], ],
"preferred": null "preferred": null
}, },
"FundRef": { "Wikidata": {
"all": [ "all": [
"501100000995", "Q1310548"
"501100001151",
"100009020"
], ],
"preferred": "501100000995" "preferred": null
}, },
"GRID": { "GRID": {
"all": "grid.1001.0", "all": "grid.440325.4",
"preferred": "grid.1001.0" "preferred": "grid.440325.4"
} }
}, },
"id": "https://ror.org/019wvm592", "id": "https://ror.org/006a4jj40",
"labels": [], "labels": [],
"status": "active" "status": "active"
} }