partial implementation of the ROR->actionset workflow

This commit is contained in:
Michele Artini 2021-04-28 16:00:24 +02:00
parent c537986b7c
commit b5cf505cc6
16 changed files with 971 additions and 0 deletions

View File

@ -0,0 +1,194 @@
package eu.dnetlib.dhp.actionmanager.ror;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.structuredProperty;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob;
import eu.dnetlib.dhp.actionmanager.ror.model.RorOrganization;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2;
public class GenerateRorActionSetJob {
private static final String COUNTRIES_VOC = "dnet:countries";
private static final Logger log = LoggerFactory.getLogger(GenerateRorActionSetJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final List<KeyValue> ROR_COLLECTED_FROM = listKeyValues("10|openaire____::993a7ae7a863813cf95028b50708e222", "ROR");
private static final DataInfo ROR_DATA_INFO = dataInfo(false, "", false, false, ENTITYREGISTRY_PROVENANCE_ACTION, "0.92");
private static final Qualifier ROR_PID_TYPE = qualifier("ROR", "ROR", "dnet:pid_types", "dnet:pid_types");
private static final Qualifier GRID_PID_TYPE = qualifier("GRID", "GRID", "dnet:pid_types", "dnet:pid_types");
private static final Qualifier WIKIDATA_PID_TYPE = qualifier("Wikidata", "Wikidata", "dnet:pid_types", "dnet:pid_types");
private static final Qualifier ORGREF_PID_TYPE = qualifier("OrgRef", "OrgRef", "dnet:pid_types", "dnet:pid_types");
private static final Qualifier ISNI_PID_TYPE = qualifier("ISNI", "ISNI", "dnet:pid_types", "dnet:pid_types");
private static final Qualifier FUNDREF_PID_TYPE = qualifier("FundRef", "FundRef", "dnet:pid_types", "dnet:pid_types");
public static void main(final String[] args) throws Exception {
final String jsonConfiguration = IOUtils
.toString(SparkAtomicActionJob.class
.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/ror/action_set_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, outputPath);
processRorOrganizations(spark, inputPath, outputPath);
});
}
private static void removeOutputDir(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static void processRorOrganizations(final SparkSession spark,
final String inputPath,
final String outputPath) {
readInputPath(spark, inputPath)
.map(GenerateRorActionSetJob::convertRorOrg, Encoders.bean(Organization.class))
.toJavaRDD()
.map(o -> new AtomicAction<>(Organization.class, o))
.mapToPair(aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
}
private static Organization convertRorOrg(final RorOrganization r) {
final Date now = new Date();
final Organization o = new Organization();
o.setId(String.format("20|ror_________::%s", DHPUtils.md5(r.getId())));
o.setOriginalId(Arrays.asList(r.getId()));
o.setCollectedfrom(ROR_COLLECTED_FROM);
o.setPid(pids(r));
o.setDateofcollection(now.toString());
o.setDateoftransformation(now.toString());
o.setExtraInfo(new ArrayList<>()); // Values not present in the file
o.setOaiprovenance(null); // Values not present in the file
o.setLegalshortname(field(r.getAcronyms().stream().findFirst().orElse(r.getName()), ROR_DATA_INFO));
o.setLegalname(field(r.getName(), ROR_DATA_INFO));
o.setAlternativeNames(alternativeNames(r));
o.setWebsiteurl(field(r.getLinks().stream().findFirst().orElse(null), ROR_DATA_INFO));
o.setLogourl(null);
o.setEclegalbody(null);
o.setEclegalperson(null);
o.setEcnonprofit(null);
o.setEcresearchorganization(null);
o.setEchighereducation(null);
o.setEcinternationalorganizationeurinterests(null);
o.setEcinternationalorganization(null);
o.setEcenterprise(null);
o.setEcsmevalidated(null);
o.setEcnutscode(null);
if (r.getCountry() != null) {
o.setCountry(qualifier(r.getCountry().getCountryCode(), r.getCountry().getCountryName(), COUNTRIES_VOC, COUNTRIES_VOC));
} else {
o.setCountry(null);
}
o.setDataInfo(ROR_DATA_INFO);
o.setLastupdatetimestamp(now.getTime());
return o;
}
private static List<StructuredProperty> pids(final RorOrganization r) {
final List<StructuredProperty> pids = new ArrayList<>();
pids.add(structuredProperty(r.getId(), ROR_PID_TYPE, ROR_DATA_INFO));
pids.add(structuredProperty(r.getExternalIds().getGrid().getAll(), GRID_PID_TYPE, ROR_DATA_INFO));
pids.addAll(pids(r.getExternalIds().getFundRef().getAll(), FUNDREF_PID_TYPE));
pids.addAll(pids(r.getExternalIds().getIsni().getAll(), ISNI_PID_TYPE));
pids.addAll(pids(r.getExternalIds().getOrgRef().getAll(), ORGREF_PID_TYPE));
pids.addAll(pids(r.getExternalIds().getWikidata().getAll(), WIKIDATA_PID_TYPE));
return pids;
}
private static List<StructuredProperty> pids(final List<String> list, final Qualifier pidType) {
if (list == null) { return new ArrayList<>(); }
return list.stream()
.filter(StringUtils::isNotBlank)
.distinct()
.map(s -> structuredProperty(s, pidType, ROR_DATA_INFO))
.collect(Collectors.toList());
}
private static List<Field<String>> alternativeNames(final RorOrganization r) {
final Set<String> names = new LinkedHashSet<>();
names.addAll(r.getAliases());
names.addAll(r.getAcronyms());
return names.stream().filter(StringUtils::isNotBlank).map(s -> field(s, ROR_DATA_INFO)).collect(Collectors.toList());
}
private static Dataset<RorOrganization> readInputPath(
final SparkSession spark,
final String inputPath) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, RorOrganization>) value -> OBJECT_MAPPER.readValue(value, RorOrganization.class), Encoders.bean(RorOrganization.class));
}
}

View File

@ -0,0 +1,100 @@
package eu.dnetlib.dhp.actionmanager.ror.model;
import java.io.Serializable;
public class Address implements Serializable {
private Float lat;
private String stateCode;
private Integer countryGeonamesId;
private Float lng;
private String state;
private String city;
private GeonamesCity geonamesCity;
private String postcode;
private Boolean primary;
private String line;
private final static long serialVersionUID = 2444635485253443195L;
public Float getLat() {
return lat;
}
public void setLat(final Float lat) {
this.lat = lat;
}
public String getStateCode() {
return stateCode;
}
public void setStateCode(final String stateCode) {
this.stateCode = stateCode;
}
public Integer getCountryGeonamesId() {
return countryGeonamesId;
}
public void setCountryGeonamesId(final Integer countryGeonamesId) {
this.countryGeonamesId = countryGeonamesId;
}
public Float getLng() {
return lng;
}
public void setLng(final Float lng) {
this.lng = lng;
}
public String getState() {
return state;
}
public void setState(final String state) {
this.state = state;
}
public String getCity() {
return city;
}
public void setCity(final String city) {
this.city = city;
}
public GeonamesCity getGeonamesCity() {
return geonamesCity;
}
public void setGeonamesCity(final GeonamesCity geonamesCity) {
this.geonamesCity = geonamesCity;
}
public String getPostcode() {
return postcode;
}
public void setPostcode(final String postcode) {
this.postcode = postcode;
}
public Boolean getPrimary() {
return primary;
}
public void setPrimary(final Boolean primary) {
this.primary = primary;
}
public String getLine() {
return line;
}
public void setLine(final String line) {
this.line = line;
}
}

View File

@ -0,0 +1,28 @@
package eu.dnetlib.dhp.actionmanager.ror.model;
import java.io.Serializable;
public class Country implements Serializable {
private String countryCode;
private String countryName;
private final static long serialVersionUID = 4357848706229493627L;
public String getCountryCode() {
return countryCode;
}
public void setCountryCode(final String countryCode) {
this.countryCode = countryCode;
}
public String getCountryName() {
return countryName;
}
public void setCountryName(final String countryName) {
this.countryName = countryName;
}
}

View File

@ -0,0 +1,30 @@
package eu.dnetlib.dhp.actionmanager.ror.model;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ExternalIdType implements Serializable {
private List<String> all = new ArrayList<>();
private String preferred;
private final static long serialVersionUID = 2616688352998387611L;
public List<String> getAll() {
return all;
}
public void setAll(final List<String> all) {
this.all = all;
}
public String getPreferred() {
return preferred;
}
public void setPreferred(final String preferred) {
this.preferred = preferred;
}
}

View File

@ -0,0 +1,55 @@
package eu.dnetlib.dhp.actionmanager.ror.model;
import java.io.Serializable;
public class ExternalIds implements Serializable {
private ExternalIdType wikidata;
private ExternalIdType orgRef;
private ExternalIdType isni;
private ExternalIdType fundRef;
private GridType grid;
private final static long serialVersionUID = 686536347353680869L;
public ExternalIdType getWikidata() {
return wikidata;
}
public void setWikidata(final ExternalIdType wikidata) {
this.wikidata = wikidata;
}
public ExternalIdType getOrgRef() {
return orgRef;
}
public void setOrgRef(final ExternalIdType orgRef) {
this.orgRef = orgRef;
}
public ExternalIdType getIsni() {
return isni;
}
public void setIsni(final ExternalIdType isni) {
this.isni = isni;
}
public ExternalIdType getFundRef() {
return fundRef;
}
public void setFundRef(final ExternalIdType fundRef) {
this.fundRef = fundRef;
}
public GridType getGrid() {
return grid;
}
public void setGrid(final GridType grid) {
this.grid = grid;
}
}

View File

@ -0,0 +1,46 @@
package eu.dnetlib.dhp.actionmanager.ror.model;
import java.io.Serializable;
public class GeonamesAdmin implements Serializable {
private String asciiName;
private Integer id;
private String name;
private String code;
private final static long serialVersionUID = 7294958526269195673L;
public String getAsciiName() {
return asciiName;
}
public void setAsciiName(final String asciiName) {
this.asciiName = asciiName;
}
public Integer getId() {
return id;
}
public void setId(final Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getCode() {
return code;
}
public void setCode(final String code) {
this.code = code;
}
}

View File

@ -0,0 +1,82 @@
package eu.dnetlib.dhp.actionmanager.ror.model;
import java.io.Serializable;
public class GeonamesCity implements Serializable {
private GeonamesAdmin geonamesAdmin1;
private GeonamesAdmin geonamesAdmin2;
private String city;
private Integer id;
private NameAndCode nutsLevel1;
private NameAndCode nutsLevel2;
private NameAndCode nutsLevel3;
private License license;
private final static long serialVersionUID = -8389480201526252955L;
public NameAndCode getNutsLevel2() {
return nutsLevel2;
}
public void setNutsLevel2(final NameAndCode nutsLevel2) {
this.nutsLevel2 = nutsLevel2;
}
public GeonamesAdmin getGeonamesAdmin2() {
return geonamesAdmin2;
}
public void setGeonamesAdmin2(final GeonamesAdmin geonamesAdmin2) {
this.geonamesAdmin2 = geonamesAdmin2;
}
public GeonamesAdmin getGeonamesAdmin1() {
return geonamesAdmin1;
}
public void setGeonamesAdmin1(final GeonamesAdmin geonamesAdmin1) {
this.geonamesAdmin1 = geonamesAdmin1;
}
public String getCity() {
return city;
}
public void setCity(final String city) {
this.city = city;
}
public Integer getId() {
return id;
}
public void setId(final Integer id) {
this.id = id;
}
public NameAndCode getNutsLevel1() {
return nutsLevel1;
}
public void setNutsLevel1(final NameAndCode nutsLevel1) {
this.nutsLevel1 = nutsLevel1;
}
public NameAndCode getNutsLevel3() {
return nutsLevel3;
}
public void setNutsLevel3(final NameAndCode nutsLevel3) {
this.nutsLevel3 = nutsLevel3;
}
public License getLicense() {
return license;
}
public void setLicense(final License license) {
this.license = license;
}
}

View File

@ -0,0 +1,32 @@
package eu.dnetlib.dhp.actionmanager.ror.model;
import java.io.Serializable;
public class GridType implements Serializable {
/**
*
*/
private static final long serialVersionUID = -5605887658267581353L;
private String all;
private String preferred;
public String getAll() {
return all;
}
public void setAll(final String all) {
this.all = all;
}
public String getPreferred() {
return preferred;
}
public void setPreferred(final String preferred) {
this.preferred = preferred;
}
}

View File

@ -0,0 +1,28 @@
package eu.dnetlib.dhp.actionmanager.ror.model;
import java.io.Serializable;
public class Label implements Serializable {
private String iso639;
private String label;
private final static long serialVersionUID = -6576156103297850809L;
public String getIso639() {
return iso639;
}
public void setIso639(final String iso639) {
this.iso639 = iso639;
}
public String getLabel() {
return label;
}
public void setLabel(final String label) {
this.label = label;
}
}

View File

@ -0,0 +1,28 @@
package eu.dnetlib.dhp.actionmanager.ror.model;
import java.io.Serializable;
public class License implements Serializable {
private String attribution;
private String license;
private final static long serialVersionUID = -194308261058176439L;
public String getAttribution() {
return attribution;
}
public void setAttribution(final String attribution) {
this.attribution = attribution;
}
public String getLicense() {
return license;
}
public void setLicense(final String license) {
this.license = license;
}
}

View File

@ -0,0 +1,28 @@
package eu.dnetlib.dhp.actionmanager.ror.model;
import java.io.Serializable;
public class NameAndCode implements Serializable {
private String name;
private String code;
private final static long serialVersionUID = 5459836979206140843L;
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getCode() {
return code;
}
public void setCode(final String code) {
this.code = code;
}
}

View File

@ -0,0 +1,37 @@
package eu.dnetlib.dhp.actionmanager.ror.model;
import java.io.Serializable;
public class Relationship implements Serializable {
private String type;
private String id;
private String label;
private final static long serialVersionUID = 7847399503395576960L;
public String getType() {
return type;
}
public void setType(final String type) {
this.type = type;
}
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public String getLabel() {
return label;
}
public void setLabel(final String label) {
this.label = label;
}
}

View File

@ -0,0 +1,156 @@
package eu.dnetlib.dhp.actionmanager.ror.model;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class RorOrganization implements Serializable {
private List<String> ipAddresses = new ArrayList<>();
private List<String> aliases = new ArrayList<>();
private List<String> acronyms = new ArrayList<>();
private List<String> links = new ArrayList<>();
private Country country;
private String name;
private String wikipediaUrl;
private List<Address> addresses = new ArrayList<>();
private List<String> types = new ArrayList<>();
private Integer established;
private List<Relationship> relationships = new ArrayList<>();
private String emailAddress;
private ExternalIds externalIds;
private String id;
private List<Label> labels = new ArrayList<>();
private String status;
private final static long serialVersionUID = -2658312087616043225L;
public List<String> getIpAddresses() {
return ipAddresses;
}
public void setIpAddresses(final List<String> ipAddresses) {
this.ipAddresses = ipAddresses;
}
public List<String> getAliases() {
return aliases;
}
public void setAliases(final List<String> aliases) {
this.aliases = aliases;
}
public List<String> getAcronyms() {
return acronyms;
}
public void setAcronyms(final List<String> acronyms) {
this.acronyms = acronyms;
}
public List<String> getLinks() {
return links;
}
public void setLinks(final List<String> links) {
this.links = links;
}
public Country getCountry() {
return country;
}
public void setCountry(final Country country) {
this.country = country;
}
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getWikipediaUrl() {
return wikipediaUrl;
}
public void setWikipediaUrl(final String wikipediaUrl) {
this.wikipediaUrl = wikipediaUrl;
}
public List<Address> getAddresses() {
return addresses;
}
public void setAddresses(final List<Address> addresses) {
this.addresses = addresses;
}
public List<String> getTypes() {
return types;
}
public void setTypes(final List<String> types) {
this.types = types;
}
public Integer getEstablished() {
return established;
}
public void setEstablished(final Integer established) {
this.established = established;
}
public List<Relationship> getRelationships() {
return relationships;
}
public void setRelationships(final List<Relationship> relationships) {
this.relationships = relationships;
}
public String getEmailAddress() {
return emailAddress;
}
public void setEmailAddress(final String emailAddress) {
this.emailAddress = emailAddress;
}
public ExternalIds getExternalIds() {
return externalIds;
}
public void setExternalIds(final ExternalIds externalIds) {
this.externalIds = externalIds;
}
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public List<Label> getLabels() {
return labels;
}
public void setLabels(final List<Label> labels) {
this.labels = labels;
}
public String getStatus() {
return status;
}
public void setStatus(final String status) {
this.status = status;
}
}

View File

@ -0,0 +1,14 @@
[
{
"paramName": "i",
"paramLongName": "inputPath",
"paramDescription": "the path of the input json",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,58 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>sparkExecutorNumber</name>
<value>4</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>sparkDriverMemory</name>
<value>15G</value>
</property>
<property>
<name>sparkExecutorMemory</name>
<value>6G</value>
</property>
<property>
<name>sparkExecutorCores</name>
<value>1</value>
</property>
</configuration>

View File

@ -0,0 +1,55 @@
<workflow-app name="Update_ROR_action_set" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>inputPath</name>
<description>the path of the json</description>
</property>
<property>
<name>outputPath</name>
<description>path where to store the action set</description>
</property>
</parameters>
<start to="deleteoutputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="deleteoutputpath">
<fs>
<delete path='${outputPath}'/>
<mkdir path='${outputPath}'/>
<delete path='${workingDir}'/>
<mkdir path='${workingDir}'/>
</fs>
<ok to="processRorFile"/>
<error to="Kill"/>
</action>
<action name="processRorFile">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ProcessRorFile</name>
<class>eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>