From df393608224bce62f7d0f10aeaacc6c18f1671a2 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Wed, 25 Sep 2024 12:32:53 +0200 Subject: [PATCH] [AffRo] changed the creation of the action set agaisnt the nen model of provision of the matchings --- .../PrepareAffiliationRelations.java | 19 +- .../ExtractAffRoInfoFromOpenOrgs.java | 217 ++++++++++++++++++ 2 files changed, 230 insertions(+), 6 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/openorgsforaffro/ExtractAffRoInfoFromOpenOrgs.java diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 70ca1576c..e2aca014d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -125,7 +125,7 @@ public class PrepareAffiliationRelations implements Serializable { List collectedfromPublisher = OafMapperUtils .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); - JavaPairRDD publisherRelations = prepareAffiliationRelationFromPublisher( + JavaPairRDD publisherRelations = prepareAffiliationRelations( spark, publisherlInputPath, collectedfromPublisher); crossrefRelations @@ -154,11 +154,10 @@ public class PrepareAffiliationRelations implements Serializable { private static JavaPairRDD prepareAffiliationRelations(SparkSession spark, String inputPath, List collectedfrom) { - // load and parse affiliation relations from HDFS Dataset df = spark .read() - .schema("`DOI` STRING, `Matchings` ARRAY>") + .schema("`DOI` STRING, `Matchings` ARRAY>") .json(inputPath) .where("DOI is not null"); @@ -169,9 +168,11 @@ public class PrepareAffiliationRelations implements Serializable { // unroll nested arrays df = df .withColumn("matching", functions.explode(new Column("Matchings"))) + .where("matchings.Status = 'active'") .select( new Column("DOI").as("doi"), - new Column("matching.RORid").as("rorid"), + new Column("matching.PID").as("pidtype"), + new Column("matchings.Value").as("pidvalue"), new Column("matching.Confidence").as("confidence")); // prepare action sets for affiliation relations @@ -183,8 +184,14 @@ public class PrepareAffiliationRelations implements Serializable { final String paperId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi"))); - // ROR id to OpenAIRE id - final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid")); + // Organization to OpenAIRE identifier + String affId = null; + if(row.getAs("pittype").equals("ROR")) + //ROR id to OpenIARE id + affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("pidvalue")); + else + //getting the OpenOrgs identifier for the organization + affId = row.getAs("pidvalue"); Qualifier qualifier = OafMapperUtils .qualifier( diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/openorgsforaffro/ExtractAffRoInfoFromOpenOrgs.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/openorgsforaffro/ExtractAffRoInfoFromOpenOrgs.java new file mode 100644 index 000000000..768ac8521 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/openorgsforaffro/ExtractAffRoInfoFromOpenOrgs.java @@ -0,0 +1,217 @@ +package eu.dnetlib.dhp.oa.graph.openorgsforaffro; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.*; +import java.util.zip.GZIPOutputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class ExtractAffRoInfoFromOpenOrgs implements Serializable { + public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations"; + public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations"; + + private static final String DOI_PREFIX = "50|doi_________::"; + + private static final String PMID_PREFIX = "50|pmid________::"; + private static final String ARXIV_PREFIX = "50|arXiv_______::"; + + private static final String PMCID_PREFIX = "50|pmcid_______::"; + private static final String TRUST = "0.91"; + + private static final Logger log = LoggerFactory.getLogger(ExtractAffRoInfoFromOpenOrgs.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(final String[] args) throws IOException, ParseException { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + ExtractAffRoInfoFromOpenOrgs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json")))); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> extractContent(spark, inputPath, outputPath)); + + } + + private static void extractContent(SparkSession spark, String inputPath, String outputPath) { + + getTextTextJavaPairRDD(spark, inputPath) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + } + + private static JavaPairRDD getTextTextJavaPairRDD(SparkSession spark, String inputPath) { + return spark + .read() + .textFile(inputPath) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, COCI.class), + Encoders.bean(COCI.class)) + .flatMap( + (FlatMapFunction) value -> createRelation( + value) + .iterator(), + Encoders.bean(Relation.class)) + .filter((FilterFunction) Objects::nonNull) + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))); + } + + private static List createRelation(COCI value) throws JsonProcessingException { + + List relationList = new ArrayList<>(); + + String citing; + String cited; + + switch (value.getCiting_pid()) { + case "doi": + citing = DOI_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting())); + break; + case "pmid": + citing = PMID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCiting())); + break; + case "arxiv": + citing = ARXIV_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), value.getCiting())); + break; + case "pmcid": + citing = PMCID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), value.getCiting())); + break; + case "isbn": + case "issn": + return relationList; + + default: + throw new IllegalStateException("Invalid prefix: " + new ObjectMapper().writeValueAsString(value)); + } + + switch (value.getCited_pid()) { + case "doi": + cited = DOI_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCited())); + break; + case "pmid": + cited = PMID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCited())); + break; + case "arxiv": + cited = ARXIV_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), value.getCited())); + break; + case "pmcid": + cited = PMCID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), value.getCited())); + break; + case "isbn": + case "issn": + return relationList; + default: + throw new IllegalStateException("Invalid prefix: " + new ObjectMapper().writeValueAsString(value)); + } + + if (!citing.equals(cited)) { + relationList + .add( + getRelation( + citing, + cited, ModelConstants.CITES)); + } + + return relationList; + } + + public static Relation getRelation( + String source, + String target, + String relClass) { + + return OafMapperUtils + .getRelation( + source, + target, + ModelConstants.RESULT_RESULT, + ModelConstants.CITATION, + relClass, + Arrays + .asList( + OafMapperUtils.keyValue(ModelConstants.OPENOCITATIONS_ID, ModelConstants.OPENOCITATIONS_NAME)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), + TRUST), + null); + } +}