1
0
Fork 0

Add bi-directional affiliation relations

This commit is contained in:
Serafeim Chatzopoulos 2023-07-06 18:29:15 +03:00
parent 12528ed2ef
commit bc7b00bcd1
1 changed files with 29 additions and 11 deletions

View File

@ -4,6 +4,8 @@ package eu.dnetlib.dhp.actionmanager.bipaffiliations;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.actionmanager.Constants; import eu.dnetlib.dhp.actionmanager.Constants;
@ -18,6 +20,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -38,7 +41,6 @@ import scala.Tuple2;
*/ */
public class PrepareAffiliationRelations implements Serializable { public class PrepareAffiliationRelations implements Serializable {
private static final String DOI = "doi";
private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class); private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String ID_PREFIX = "50|doi_________::"; private static final String ID_PREFIX = "50|doi_________::";
@ -102,7 +104,7 @@ public class PrepareAffiliationRelations implements Serializable {
// prepare action sets for affiliation relations // prepare action sets for affiliation relations
affiliationRelations affiliationRelations
.map((MapFunction<AffiliationRelationModel, Relation>) affRel -> { .flatMap((FlatMapFunction<AffiliationRelationModel, Relation>) affRel -> {
// DOI to OpenAIRE id // DOI to OpenAIRE id
final String paperId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); final String paperId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi()));
@ -125,15 +127,8 @@ public class PrepareAffiliationRelations implements Serializable {
qualifier, qualifier,
Double.toString(affRel.getConfidence())); Double.toString(affRel.getConfidence()));
return OafMapperUtils.getRelation( // return bi-directional relations
paperId, return getAffiliationRelationPair(paperId, affId, dataInfo).iterator();
affId,
ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION,
ModelConstants.HAS_AUTHOR_INSTITUTION,
null,
dataInfo,
null);
}, Encoders.bean(Relation.class)) }, Encoders.bean(Relation.class))
.toJavaRDD() .toJavaRDD()
@ -144,4 +139,27 @@ public class PrepareAffiliationRelations implements Serializable {
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
} }
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, DataInfo dataInfo) {
return Arrays.asList(
OafMapperUtils.getRelation(
paperId,
affId,
ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION,
ModelConstants.HAS_AUTHOR_INSTITUTION,
null,
dataInfo,
null),
OafMapperUtils.getRelation(
affId,
paperId,
ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION,
ModelConstants.IS_AUTHOR_INSTITUTION_OF,
null,
dataInfo,
null)
);
}
} }