diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 20c649a74..45e712c7e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -4,6 +4,8 @@ package eu.dnetlib.dhp.actionmanager.bipaffiliations; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; +import java.util.Arrays; +import java.util.List; import java.util.stream.Collectors; import eu.dnetlib.dhp.actionmanager.Constants; @@ -18,6 +20,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; @@ -38,7 +41,6 @@ import scala.Tuple2; */ public class PrepareAffiliationRelations implements Serializable { - private static final String DOI = "doi"; private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String ID_PREFIX = "50|doi_________::"; @@ -102,7 +104,7 @@ public class PrepareAffiliationRelations implements Serializable { // prepare action sets for affiliation relations affiliationRelations - .map((MapFunction) affRel -> { + .flatMap((FlatMapFunction) affRel -> { // DOI to OpenAIRE id final String paperId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); @@ -125,15 +127,8 @@ public class PrepareAffiliationRelations implements Serializable { qualifier, Double.toString(affRel.getConfidence())); - return OafMapperUtils.getRelation( - paperId, - affId, - ModelConstants.RESULT_ORGANIZATION, - ModelConstants.AFFILIATION, - ModelConstants.HAS_AUTHOR_INSTITUTION, - null, - dataInfo, - null); + // return bi-directional relations + return getAffiliationRelationPair(paperId, affId, dataInfo).iterator(); }, Encoders.bean(Relation.class)) .toJavaRDD() @@ -144,4 +139,27 @@ public class PrepareAffiliationRelations implements Serializable { .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); } + + private static List getAffiliationRelationPair(String paperId, String affId, DataInfo dataInfo) { + return Arrays.asList( + OafMapperUtils.getRelation( + paperId, + affId, + ModelConstants.RESULT_ORGANIZATION, + ModelConstants.AFFILIATION, + ModelConstants.HAS_AUTHOR_INSTITUTION, + null, + dataInfo, + null), + OafMapperUtils.getRelation( + affId, + paperId, + ModelConstants.RESULT_ORGANIZATION, + ModelConstants.AFFILIATION, + ModelConstants.IS_AUTHOR_INSTITUTION_OF, + null, + dataInfo, + null) + ); + } }