diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index dce9082fd..adb198fb3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -5,6 +5,8 @@ import static eu.dnetlib.dhp.actionmanager.Constants.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -12,10 +14,16 @@ import java.util.stream.Stream; import eu.dnetlib.dhp.actionmanager.Constants; import eu.dnetlib.dhp.actionmanager.bipaffiliations.model.*; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -31,6 +39,7 @@ import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import scala.Tuple2; /** * created the Atomic Action for each tipe of results @@ -40,6 +49,12 @@ public class PrepareAffiliationRelations implements Serializable { private static final String DOI = "doi"; private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String ID_PREFIX = "50|doi_________::"; + + private static final String TRUST = "0.91"; + + public static final String BIP_AFFILIATIONS_CLASSID = "sysimport:crosswalk:bipaffiliations"; + public static final String BIP_AFFILIATIONS_CLASSNAME = "Imported from BIP! Affiliations"; public static void main(String[] args) throws Exception { @@ -76,93 +91,90 @@ public class PrepareAffiliationRelations implements Serializable { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD bipDeserializeJavaRDD = sc + JavaRDD affiliationRelationsDeserializeRDD = sc .textFile(inputPath) .map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class)); // for(AffiliationRelationDeserializer rel: bipDeserializeJavaRDD.collect()){ // System.out.println(rel); // } - JavaRDD affiliationRelations = - bipDeserializeJavaRDD.flatMap(entry -> - entry.getMatchings().stream().flatMap(matching -> - matching.getRorId().stream().map( rorId -> new AffiliationRelationModel( - entry.getDoi(), - rorId, - matching.getConfidence() - ))).collect(Collectors.toList()).iterator()); - for(AffiliationRelationModel rel: affiliationRelations.collect()){ - System.out.println(rel); - } -// Dataset relations = spark -// .createDataset(bipDeserializeJavaRDD.flatMap(entry -> { -//// entry.keySet().stream().map(key -> { -// AffiliationRelationModel rel = new AffiliationRelationModel(entry.getDoi()) -// System.out.println(entry); -// return entry; -//// BipScore bs = new BipScore(); -//// bs.setId(key); -//// bs.setScoreList(entry.get(key)); -//// return bs; -// }).collect(Collectors.toList()).iterator()).rdd(), Encoßders.bean(AffiliationRelationModel.class)); + Dataset affiliationRelations = + spark.createDataset( + affiliationRelationsDeserializeRDD.flatMap(entry -> + entry.getMatchings().stream().flatMap(matching -> + matching.getRorId().stream().map( rorId -> new AffiliationRelationModel( + entry.getDoi(), + rorId, + matching.getConfidence() + ))).collect(Collectors.toList()) + .iterator()) + .rdd(), + Encoders.bean(AffiliationRelationModel.class)); -// bipScores -// -// .map((MapFunction) bs -> { -// Result ret = new Result(); -// -// ret.setId(bs.getId()); -// -// ret.setMeasures(getMeasure(bs)); -// -// return ret; -// }, Encoders.bean(Result.class)) -// .toJavaRDD() -// .map(p -> new AtomicAction(Result.class, p)) -// .mapToPair( -// aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), -// new Text(OBJECT_MAPPER.writeValueAsString(aa)))) -// .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + affiliationRelations + .map((MapFunction) affRel -> { + + String paperId = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); + final String affId = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("ror", affRel.getRorId())); + + return getRelation(paperId, affId, ModelConstants.HAS_AUTHOR_INSTITUTION); + + }, Encoders.bean(Relation.class)) + .toJavaRDD() + .map(p -> new AtomicAction(Relation.class, p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); -// } -// -// private static List getMeasure(BipScore value) { -// return value -// .getScoreList() -// .stream() -// .map(score -> { -// Measure m = new Measure(); -// m.setId(score.getId()); -// m -// .setUnit( -// score -// .getUnit() -// .stream() -// .map(unit -> { -// KeyValue kv = new KeyValue(); -// kv.setValue(unit.getValue()); -// kv.setKey(unit.getKey()); -// kv -// .setDataInfo( -// OafMapperUtils -// .dataInfo( -// false, -// UPDATE_DATA_INFO_TYPE, -// true, -// false, -// OafMapperUtils -// .qualifier( -// UPDATE_MEASURE_BIP_CLASS_ID, -// UPDATE_CLASS_NAME, -// ModelConstants.DNET_PROVENANCE_ACTIONS, -// ModelConstants.DNET_PROVENANCE_ACTIONS), -// "")); -// return kv; -// }) -// .collect(Collectors.toList())); -// return m; -// }) -// .collect(Collectors.toList()); } + + public static Relation getRelation(String source, String target, String relclass) { + Relation r = new Relation(); + + r.setCollectedfrom(getCollectedFrom()); + r.setSource(source); + r.setTarget(target); + r.setRelClass(relclass); + r.setRelType(ModelConstants.RESULT_ORGANIZATION); + r.setSubRelType(ModelConstants.AFFILIATION); + r.setDataInfo(getDataInfo()); + return r; + } + + public static List getCollectedFrom() { + KeyValue kv = new KeyValue(); + kv.setKey(ModelConstants.DNET_PROVENANCE_ACTIONS); + kv.setValue(ModelConstants.DNET_PROVENANCE_ACTIONS); + + return Collections.singletonList(kv); + } + + public static DataInfo getDataInfo() { + DataInfo di = new DataInfo(); + di.setInferred(false); + di.setDeletedbyinference(false); + di.setTrust(TRUST); + di.setProvenanceaction( + getQualifier( + BIP_AFFILIATIONS_CLASSID, + BIP_AFFILIATIONS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS + )); + return di; + } + + public static Qualifier getQualifier(String class_id, String class_name, + String qualifierSchema) { + Qualifier pa = new Qualifier(); + pa.setClassid(class_id); + pa.setClassname(class_name); + pa.setSchemeid(qualifierSchema); + pa.setSchemename(qualifierSchema); + return pa; + } + }