forked from D-Net/dnet-hadoop
added hierarchy rel in ROR actionset
This commit is contained in:
parent
59f76b50d4
commit
d66e20e7ac
|
@ -3,6 +3,7 @@ package eu.dnetlib.dhp.actionmanager.ror;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION;
|
import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION;
|
||||||
|
import static eu.dnetlib.dhp.schema.common.ModelConstants.ORG_ORG_RELTYPE;
|
||||||
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo;
|
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo;
|
||||||
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field;
|
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field;
|
||||||
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listKeyValues;
|
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listKeyValues;
|
||||||
|
@ -29,8 +30,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.sql.Dataset;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -38,8 +38,8 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob;
|
|
||||||
import eu.dnetlib.dhp.actionmanager.ror.model.ExternalIdType;
|
import eu.dnetlib.dhp.actionmanager.ror.model.ExternalIdType;
|
||||||
|
import eu.dnetlib.dhp.actionmanager.ror.model.Relationship;
|
||||||
import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization;
|
import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
@ -48,8 +48,10 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
@ -112,24 +114,21 @@ public class GenerateRorActionSetJob {
|
||||||
final String outputPath) throws IOException {
|
final String outputPath) throws IOException {
|
||||||
|
|
||||||
readInputPath(spark, inputPath)
|
readInputPath(spark, inputPath)
|
||||||
.map(
|
.map(GenerateRorActionSetJob::convertRorOrg)
|
||||||
(MapFunction<RorOrganization, Organization>) GenerateRorActionSetJob::convertRorOrg,
|
.flatMap(List::iterator)
|
||||||
Encoders.bean(Organization.class))
|
|
||||||
.toJavaRDD()
|
|
||||||
.map(o -> new AtomicAction<>(Organization.class, o))
|
|
||||||
.mapToPair(
|
.mapToPair(
|
||||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
|
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static Organization convertRorOrg(final RorOrganization r) {
|
protected static List<AtomicAction<? extends Oaf>> convertRorOrg(final RorOrganization r) {
|
||||||
|
|
||||||
final Date now = new Date();
|
final Date now = new Date();
|
||||||
|
|
||||||
final Organization o = new Organization();
|
final Organization o = new Organization();
|
||||||
|
|
||||||
o.setId(String.format("20|%s::%s", ROR_NS_PREFIX, DHPUtils.md5(r.getId())));
|
o.setId(calculateOpenaireId(r.getId()));
|
||||||
o.setOriginalId(Arrays.asList(String.format("%s::%s", ROR_NS_PREFIX, r.getId())));
|
o.setOriginalId(Arrays.asList(String.format("%s::%s", ROR_NS_PREFIX, r.getId())));
|
||||||
o.setCollectedfrom(ROR_COLLECTED_FROM);
|
o.setCollectedfrom(ROR_COLLECTED_FROM);
|
||||||
o.setPid(pids(r));
|
o.setPid(pids(r));
|
||||||
|
@ -166,7 +165,43 @@ public class GenerateRorActionSetJob {
|
||||||
o.setDataInfo(ROR_DATA_INFO);
|
o.setDataInfo(ROR_DATA_INFO);
|
||||||
o.setLastupdatetimestamp(now.getTime());
|
o.setLastupdatetimestamp(now.getTime());
|
||||||
|
|
||||||
return o;
|
final List<AtomicAction<? extends Oaf>> res = new ArrayList<>();
|
||||||
|
res.add(new AtomicAction<>(Organization.class, o));
|
||||||
|
|
||||||
|
for (final Relationship rorRel : r.getRelationships()) {
|
||||||
|
if (rorRel.getType().equalsIgnoreCase("parent")) {
|
||||||
|
final String orgId1 = calculateOpenaireId(r.getId());
|
||||||
|
final String orgId2 = calculateOpenaireId(rorRel.getId());
|
||||||
|
res
|
||||||
|
.add(
|
||||||
|
new AtomicAction<>(Relation.class,
|
||||||
|
calculateHierarchyRel(orgId1, orgId2, ModelConstants.IS_PARENT_OF)));
|
||||||
|
res
|
||||||
|
.add(
|
||||||
|
new AtomicAction<>(Relation.class,
|
||||||
|
calculateHierarchyRel(orgId2, orgId1, ModelConstants.IS_CHILD_OF)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Relation calculateHierarchyRel(final String source, final String target, final String relClass) {
|
||||||
|
final Relation rel = new Relation();
|
||||||
|
rel.setSource(source);
|
||||||
|
rel.setTarget(target);
|
||||||
|
rel.setRelType(ORG_ORG_RELTYPE);
|
||||||
|
rel.setSubRelType(ModelConstants.RELATIONSHIP);
|
||||||
|
rel.setRelClass(relClass);
|
||||||
|
rel.setCollectedfrom(ROR_COLLECTED_FROM);
|
||||||
|
rel.setDataInfo(ROR_DATA_INFO);
|
||||||
|
rel.setLastupdatetimestamp(System.currentTimeMillis());
|
||||||
|
return rel;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String calculateOpenaireId(final String rorId) {
|
||||||
|
return String.format("20|%s::%s", ROR_NS_PREFIX, DHPUtils.md5(rorId));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<StructuredProperty> pids(final RorOrganization r) {
|
private static List<StructuredProperty> pids(final RorOrganization r) {
|
||||||
|
@ -202,14 +237,14 @@ public class GenerateRorActionSetJob {
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Dataset<RorOrganization> readInputPath(
|
private static JavaRDD<RorOrganization> readInputPath(
|
||||||
final SparkSession spark,
|
final SparkSession spark,
|
||||||
final String path) throws IOException {
|
final String path) throws IOException {
|
||||||
|
|
||||||
try (final FileSystem fileSystem = FileSystem.get(new Configuration());
|
try (final FileSystem fileSystem = FileSystem.get(new Configuration());
|
||||||
final InputStream is = fileSystem.open(new Path(path))) {
|
final InputStream is = fileSystem.open(new Path(path))) {
|
||||||
final RorOrganization[] arr = OBJECT_MAPPER.readValue(is, RorOrganization[].class);
|
final RorOrganization[] arr = OBJECT_MAPPER.readValue(is, RorOrganization[].class);
|
||||||
return spark.createDataset(Arrays.asList(arr), Encoders.bean(RorOrganization.class));
|
return spark.createDataset(Arrays.asList(arr), Encoders.bean(RorOrganization.class)).toJavaRDD();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.ror;
|
package eu.dnetlib.dhp.actionmanager.ror;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
@ -13,9 +16,12 @@ import org.junit.jupiter.api.Test;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization;
|
import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization;
|
||||||
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
@Disabled
|
|
||||||
class GenerateRorActionSetJobTest {
|
class GenerateRorActionSetJobTest {
|
||||||
|
|
||||||
private static final ObjectMapper mapper = new ObjectMapper();
|
private static final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
@ -30,21 +36,40 @@ class GenerateRorActionSetJobTest {
|
||||||
void testConvertRorOrg() throws Exception {
|
void testConvertRorOrg() throws Exception {
|
||||||
final RorOrganization r = mapper
|
final RorOrganization r = mapper
|
||||||
.readValue(IOUtils.toString(getClass().getResourceAsStream("ror_org.json")), RorOrganization.class);
|
.readValue(IOUtils.toString(getClass().getResourceAsStream("ror_org.json")), RorOrganization.class);
|
||||||
final Organization org = GenerateRorActionSetJob.convertRorOrg(r);
|
final List<AtomicAction<? extends Oaf>> aas = GenerateRorActionSetJob.convertRorOrg(r);
|
||||||
|
|
||||||
|
Assertions.assertEquals(3, aas.size());
|
||||||
|
assertEquals(Organization.class, aas.get(0).getClazz());
|
||||||
|
assertEquals(Relation.class, aas.get(1).getClazz());
|
||||||
|
assertEquals(Relation.class, aas.get(2).getClazz());
|
||||||
|
|
||||||
|
final Organization o = (Organization) aas.get(0).getPayload();
|
||||||
|
final Relation r1 = (Relation) aas.get(1).getPayload();
|
||||||
|
final Relation r2 = (Relation) aas.get(2).getPayload();
|
||||||
|
|
||||||
|
assertEquals(o.getId(), r1.getSource());
|
||||||
|
assertEquals(r1.getSource(), r2.getTarget());
|
||||||
|
assertEquals(r2.getSource(), r1.getTarget());
|
||||||
|
assertEquals(ModelConstants.IS_PARENT_OF, r1.getRelClass());
|
||||||
|
assertEquals(ModelConstants.IS_CHILD_OF, r2.getRelClass());
|
||||||
|
|
||||||
|
System.out.println(mapper.writeValueAsString(o));
|
||||||
|
System.out.println(mapper.writeValueAsString(r1));
|
||||||
|
System.out.println(mapper.writeValueAsString(r2));
|
||||||
|
|
||||||
final String s = mapper.writeValueAsString(org);
|
|
||||||
Assertions.assertTrue(StringUtils.isNotBlank(s));
|
|
||||||
System.out.println(s);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@Disabled
|
||||||
void testConvertAllRorOrg() throws Exception {
|
void testConvertAllRorOrg() throws Exception {
|
||||||
final RorOrganization[] arr = mapper
|
final RorOrganization[] arr = mapper
|
||||||
.readValue(IOUtils.toString(new FileInputStream(local_file_path)), RorOrganization[].class);
|
.readValue(IOUtils.toString(new FileInputStream(local_file_path)), RorOrganization[].class);
|
||||||
|
|
||||||
for (final RorOrganization r : arr) {
|
for (final RorOrganization r : arr) {
|
||||||
Organization o = GenerateRorActionSetJob.convertRorOrg(r);
|
final List<AtomicAction<? extends Oaf>> aas = GenerateRorActionSetJob.convertRorOrg(r);
|
||||||
Assertions.assertNotNull(o);
|
Assertions.assertFalse(aas.isEmpty());
|
||||||
|
Assertions.assertNotNull(aas.get(0));
|
||||||
|
final Organization o = (Organization) aas.get(0).getPayload();
|
||||||
Assertions.assertTrue(StringUtils.isNotBlank(o.getId()));
|
Assertions.assertTrue(StringUtils.isNotBlank(o.getId()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,24 +1,24 @@
|
||||||
{
|
{
|
||||||
"ip_addresses": [],
|
"ip_addresses": [],
|
||||||
"aliases": [],
|
"aliases": [],
|
||||||
"acronyms": [
|
"acronyms": [
|
||||||
"ANU"
|
"MSO"
|
||||||
],
|
],
|
||||||
"links": [
|
"links": [
|
||||||
"http://www.anu.edu.au/"
|
"https://rsaa.anu.edu.au/observatories/mount-stromlo-observatory"
|
||||||
],
|
],
|
||||||
"country": {
|
"country": {
|
||||||
"country_code": "AU",
|
"country_code": "AU",
|
||||||
"country_name": "Australia"
|
"country_name": "Australia"
|
||||||
},
|
},
|
||||||
"name": "Australian National University",
|
"name": "Mount Stromlo Observatory",
|
||||||
"wikipedia_url": "http://en.wikipedia.org/wiki/Australian_National_University",
|
"wikipedia_url": "https://en.wikipedia.org/wiki/Mount_Stromlo_Observatory",
|
||||||
"addresses": [
|
"addresses": [
|
||||||
{
|
{
|
||||||
"lat": -35.2778,
|
"lat": -35.320278,
|
||||||
"state_code": "AU-ACT",
|
"state_code": "AU-ACT",
|
||||||
"country_geonames_id": 2077456,
|
"country_geonames_id": 2077456,
|
||||||
"lng": 149.1205,
|
"lng": 149.006944,
|
||||||
"state": "Australian Capital Territory",
|
"state": "Australian Capital Territory",
|
||||||
"city": "Canberra",
|
"city": "Canberra",
|
||||||
"geonames_city": {
|
"geonames_city": {
|
||||||
|
@ -61,63 +61,34 @@
|
||||||
"types": [
|
"types": [
|
||||||
"Education"
|
"Education"
|
||||||
],
|
],
|
||||||
"established": 1946,
|
"established": 1924,
|
||||||
"relationships": [
|
"relationships": [
|
||||||
{
|
{
|
||||||
"type": "Related",
|
"type": "Parent",
|
||||||
"id": "https://ror.org/041c7s516",
|
"id": "https://ror.org/019wvm592",
|
||||||
"label": "Calvary Hospital"
|
"label": "Australian National University"
|
||||||
},
|
|
||||||
{
|
|
||||||
"type": "Related",
|
|
||||||
"id": "https://ror.org/04h7nbn38",
|
|
||||||
"label": "Canberra Hospital"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"type": "Related",
|
|
||||||
"id": "https://ror.org/030jpqj15",
|
|
||||||
"label": "Goulburn Base Hospital"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"type": "Child",
|
|
||||||
"id": "https://ror.org/006a4jj40",
|
|
||||||
"label": "Mount Stromlo Observatory"
|
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"email_address": null,
|
"email_address": null,
|
||||||
"external_ids": {
|
"external_ids": {
|
||||||
"Wikidata": {
|
|
||||||
"all": [
|
|
||||||
"Q127990"
|
|
||||||
],
|
|
||||||
"preferred": null
|
|
||||||
},
|
|
||||||
"OrgRef": {
|
|
||||||
"all": [
|
|
||||||
"285106"
|
|
||||||
],
|
|
||||||
"preferred": null
|
|
||||||
},
|
|
||||||
"ISNI": {
|
"ISNI": {
|
||||||
"all": [
|
"all": [
|
||||||
"0000 0001 2180 7477"
|
"0000 0004 0459 2816"
|
||||||
],
|
],
|
||||||
"preferred": null
|
"preferred": null
|
||||||
},
|
},
|
||||||
"FundRef": {
|
"Wikidata": {
|
||||||
"all": [
|
"all": [
|
||||||
"501100000995",
|
"Q1310548"
|
||||||
"501100001151",
|
|
||||||
"100009020"
|
|
||||||
],
|
],
|
||||||
"preferred": "501100000995"
|
"preferred": null
|
||||||
},
|
},
|
||||||
"GRID": {
|
"GRID": {
|
||||||
"all": "grid.1001.0",
|
"all": "grid.440325.4",
|
||||||
"preferred": "grid.1001.0"
|
"preferred": "grid.440325.4"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"id": "https://ror.org/019wvm592",
|
"id": "https://ror.org/006a4jj40",
|
||||||
"labels": [],
|
"labels": [],
|
||||||
"status": "active"
|
"status": "active"
|
||||||
}
|
}
|
Loading…
Reference in New Issue