diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index adb198fb3..20c649a74 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -1,19 +1,14 @@ package eu.dnetlib.dhp.actionmanager.bipaffiliations; -import static eu.dnetlib.dhp.actionmanager.Constants.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; import java.util.stream.Collectors; -import java.util.stream.Stream; import eu.dnetlib.dhp.actionmanager.Constants; import eu.dnetlib.dhp.actionmanager.bipaffiliations.model.*; +import eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; @@ -25,7 +20,6 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,16 +27,14 @@ import org.apache.spark.sql.Dataset; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; /** - * created the Atomic Action for each tipe of results + * Creates action sets for Crossref affiliation relations inferred by BIP! */ public class PrepareAffiliationRelations implements Serializable { @@ -50,19 +42,17 @@ public class PrepareAffiliationRelations implements Serializable { private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String ID_PREFIX = "50|doi_________::"; - - private static final String TRUST = "0.91"; - - public static final String BIP_AFFILIATIONS_CLASSID = "sysimport:crosswalk:bipaffiliations"; - public static final String BIP_AFFILIATIONS_CLASSNAME = "Imported from BIP! Affiliations"; + public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference"; + public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!"; + public static final String BIP_INFERENCE_PROVENANCE = "bip_affiliation"; public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils - .toString( - PrepareAffiliationRelations.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json")); + .toString( + PrepareAffiliationRelations.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -91,14 +81,12 @@ public class PrepareAffiliationRelations implements Serializable { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + // load and parse affiliation relations from HDFS JavaRDD affiliationRelationsDeserializeRDD = sc .textFile(inputPath) .map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class)); -// for(AffiliationRelationDeserializer rel: bipDeserializeJavaRDD.collect()){ -// System.out.println(rel); -// } - + // convert affiliation to an internal representation Dataset affiliationRelations = spark.createDataset( affiliationRelationsDeserializeRDD.flatMap(entry -> @@ -112,15 +100,40 @@ public class PrepareAffiliationRelations implements Serializable { .rdd(), Encoders.bean(AffiliationRelationModel.class)); + // prepare action sets for affiliation relations affiliationRelations .map((MapFunction) affRel -> { - String paperId = ID_PREFIX - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); - final String affId = ID_PREFIX - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("ror", affRel.getRorId())); + // DOI to OpenAIRE id + final String paperId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); - return getRelation(paperId, affId, ModelConstants.HAS_AUTHOR_INSTITUTION); + // ROR id to OpenAIRE id + final String affId = GenerateRorActionSetJob.calculateOpenaireId(affRel.getRorId()); + + Qualifier qualifier = OafMapperUtils.qualifier( + BIP_AFFILIATIONS_CLASSID, + BIP_AFFILIATIONS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS); + + // format data info; setting `confidence` into relation's `trust` + DataInfo dataInfo = OafMapperUtils.dataInfo( + false, + BIP_INFERENCE_PROVENANCE, + true, + false, + qualifier, + Double.toString(affRel.getConfidence())); + + return OafMapperUtils.getRelation( + paperId, + affId, + ModelConstants.RESULT_ORGANIZATION, + ModelConstants.AFFILIATION, + ModelConstants.HAS_AUTHOR_INSTITUTION, + null, + dataInfo, + null); }, Encoders.bean(Relation.class)) .toJavaRDD() @@ -131,50 +144,4 @@ public class PrepareAffiliationRelations implements Serializable { .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); } - - public static Relation getRelation(String source, String target, String relclass) { - Relation r = new Relation(); - - r.setCollectedfrom(getCollectedFrom()); - r.setSource(source); - r.setTarget(target); - r.setRelClass(relclass); - r.setRelType(ModelConstants.RESULT_ORGANIZATION); - r.setSubRelType(ModelConstants.AFFILIATION); - r.setDataInfo(getDataInfo()); - return r; - } - - public static List getCollectedFrom() { - KeyValue kv = new KeyValue(); - kv.setKey(ModelConstants.DNET_PROVENANCE_ACTIONS); - kv.setValue(ModelConstants.DNET_PROVENANCE_ACTIONS); - - return Collections.singletonList(kv); - } - - public static DataInfo getDataInfo() { - DataInfo di = new DataInfo(); - di.setInferred(false); - di.setDeletedbyinference(false); - di.setTrust(TRUST); - di.setProvenanceaction( - getQualifier( - BIP_AFFILIATIONS_CLASSID, - BIP_AFFILIATIONS_CLASSNAME, - ModelConstants.DNET_PROVENANCE_ACTIONS - )); - return di; - } - - public static Qualifier getQualifier(String class_id, String class_name, - String qualifierSchema) { - Qualifier pa = new Qualifier(); - pa.setClassid(class_id); - pa.setClassname(class_name); - pa.setSchemeid(qualifierSchema); - pa.setSchemename(qualifierSchema); - return pa; - } - } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java index 1be2a96fd..5f3493d56 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java @@ -168,7 +168,7 @@ public class GenerateRorActionSetJob { } - private static String calculateOpenaireId(final String rorId) { + public static String calculateOpenaireId(final String rorId) { return String.format("20|%s::%s", Constants.ROR_NS_PREFIX, DHPUtils.md5(rorId)); }