From fd34372c4077e1e93b00b1965c044d5000a91e69 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 6 Mar 2024 13:42:00 +0100 Subject: [PATCH 1/4] [OCNEW] first implementation --- .../CreateActionSetSparkJob.java | 68 +++++----- .../opencitations/GetOpenCitationsRefs.java | 64 +++++---- .../opencitations/MapOCIdsInPids.java | 123 ++++++++++++++++++ .../actionmanager/opencitations/ReadCOCI.java | 53 ++++---- .../opencitations/model/COCI.java | 17 +++ .../opencitations/input_parameters.json | 14 +- .../input_readcoci_parameters.json | 16 +-- .../opencitations/oozie_app/workflow.xml | 84 ++++++++++-- .../opencitations/remap_parameters.json | 20 +++ 9 files changed, 329 insertions(+), 130 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java index b707fdcd3..47949a4bb 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -28,6 +28,7 @@ import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.*; import eu.dnetlib.dhp.utils.DHPUtils; @@ -37,12 +38,6 @@ public class CreateActionSetSparkJob implements Serializable { public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations"; public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations"; - // DOI-to-DOI citations - public static final String COCI = "COCI"; - - // PMID-to-PMID citations - public static final String POCI = "POCI"; - private static final String DOI_PREFIX = "50|doi_________::"; private static final String PMID_PREFIX = "50|pmid________::"; @@ -95,22 +90,21 @@ public class CreateActionSetSparkJob implements Serializable { private static void extractContent(SparkSession spark, String inputPath, String outputPath, boolean shouldDuplicateRels) { - getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, COCI) - .union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, POCI)) + getTextTextJavaPairRDD(spark, inputPath) + // .union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, POCI)) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); } - private static JavaPairRDD getTextTextJavaPairRDD(SparkSession spark, String inputPath, - boolean shouldDuplicateRels, String prefix) { + private static JavaPairRDD getTextTextJavaPairRDD(SparkSession spark, String inputPath) { return spark .read() - .textFile(inputPath + "/" + prefix + "/" + prefix + "_JSON/*") + .textFile(inputPath) .map( (MapFunction) value -> OBJECT_MAPPER.readValue(value, COCI.class), Encoders.bean(COCI.class)) .flatMap( (FlatMapFunction) value -> createRelation( - value, shouldDuplicateRels, prefix) + value) .iterator(), Encoders.bean(Relation.class)) .filter((FilterFunction) Objects::nonNull) @@ -121,34 +115,41 @@ public class CreateActionSetSparkJob implements Serializable { new Text(OBJECT_MAPPER.writeValueAsString(aa)))); } - private static List createRelation(COCI value, boolean duplicate, String p) { + private static List createRelation(COCI value) { List relationList = new ArrayList<>(); - String prefix; + String citing; String cited; - switch (p) { - case COCI: - prefix = DOI_PREFIX; - citing = prefix + switch (value.getCiting_pid()) { + case "doi": + citing = DOI_PREFIX + IdentifierFactory .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting())); - cited = prefix - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCited())); break; - case POCI: - prefix = PMID_PREFIX; - citing = prefix + case "pmid": + citing = PMID_PREFIX + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCiting())); - cited = prefix - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCited())); + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting())); break; default: - throw new IllegalStateException("Invalid prefix: " + p); + throw new IllegalStateException("Invalid prefix: " + value.getCiting_pid()); + } + + switch (value.getCited_pid()) { + case "doi": + cited = DOI_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting())); + break; + case "pmid": + cited = PMID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting())); + break; + default: + throw new IllegalStateException("Invalid prefix: " + value.getCited_pid()); } if (!citing.equals(cited)) { @@ -157,15 +158,6 @@ public class CreateActionSetSparkJob implements Serializable { getRelation( citing, cited, ModelConstants.CITES)); - - if (duplicate && value.getCiting().endsWith(".refs")) { - citing = prefix + IdentifierFactory - .md5( - CleaningFunctions - .normalizePidValue( - "doi", value.getCiting().substring(0, value.getCiting().indexOf(".refs")))); - relationList.add(getRelation(citing, cited, ModelConstants.CITES)); - } } return relationList; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java index 600cf7df1..be653aed2 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java @@ -12,10 +12,7 @@ import java.util.zip.ZipInputStream; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,17 +34,17 @@ public class GetOpenCitationsRefs implements Serializable { parser.parseArgument(args); - final String[] inputFile = parser.get("inputFile").split(";"); - log.info("inputFile {}", Arrays.asList(inputFile)); +// final String[] inputFile = parser.get("inputFile").split(";"); +// log.info("inputFile {}", Arrays.asList(inputFile)); - final String workingPath = parser.get("workingPath"); - log.info("workingPath {}", workingPath); + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}", inputPath); final String hdfsNameNode = parser.get("hdfsNameNode"); log.info("hdfsNameNode {}", hdfsNameNode); - final String prefix = parser.get("prefix"); - log.info("prefix {}", prefix); + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsNameNode); @@ -56,41 +53,42 @@ public class GetOpenCitationsRefs implements Serializable { GetOpenCitationsRefs ocr = new GetOpenCitationsRefs(); - for (String file : inputFile) { - ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem, prefix); - } + ocr.doExtract(inputPath, outputPath, fileSystem); } - private void doExtract(String inputFile, String workingPath, FileSystem fileSystem, String prefix) + private void doExtract(String inputPath, String outputPath, FileSystem fileSystem) throws IOException { - final Path path = new Path(inputFile); + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(inputPath), true); + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + // do stuff with the file like ... + FSDataInputStream oc_zip = fileSystem.open(fileStatus.getPath()); + try (ZipInputStream zis = new ZipInputStream(oc_zip)) { + ZipEntry entry = null; + while ((entry = zis.getNextEntry()) != null) { - FSDataInputStream oc_zip = fileSystem.open(path); + if (!entry.isDirectory()) { + String fileName = entry.getName(); + // fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count; + fileName = fileName.substring(0, fileName.lastIndexOf(".")); + // count++; + try ( + FSDataOutputStream out = fileSystem + .create(new Path(outputPath + "/" + fileName + ".gz")); + GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { - // int count = 1; - try (ZipInputStream zis = new ZipInputStream(oc_zip)) { - ZipEntry entry = null; - while ((entry = zis.getNextEntry()) != null) { - - if (!entry.isDirectory()) { - String fileName = entry.getName(); - // fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count; - fileName = fileName.substring(0, fileName.lastIndexOf(".")); - // count++; - try ( - FSDataOutputStream out = fileSystem - .create(new Path(workingPath + "/" + prefix + "/" + fileName + ".gz")); - GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { - - IOUtils.copy(zis, gzipOs); + IOUtils.copy(zis, gzipOs); + } } + } } - } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java new file mode 100644 index 000000000..c6c5bc0ba --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java @@ -0,0 +1,123 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import scala.Tuple2; + +/** + * @author miriam.baglioni + * @Date 29/02/24 + */ +public class MapOCIdsInPids implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class); + private static final String DELIMITER = ","; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(final String[] args) throws IOException, ParseException { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + MapOCIdsInPids.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json")))); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> mapIdentifiers(spark, inputPath, outputPath)); + + } + + private static void mapIdentifiers(SparkSession spark, String inputPath, String outputPath) { + Dataset coci = spark + .read() + .textFile(inputPath + "/JSON") + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, COCI.class), + Encoders.bean(COCI.class)); + + Dataset> correspondenceData = spark + .read() + .format("csv") + .option("sep", DELIMITER) + .option("inferSchema", "true") + .option("header", "true") + .option("quotes", "\"") + .load(inputPath + "/correspondence/omid.zip") + .repartition(5000) + .flatMap((FlatMapFunction>) r -> { + String ocIdentifier = r.getAs("omid"); + String[] correspondentIdentifiers = ((String) r.getAs("id")).split(" "); + return Arrays + .stream(correspondentIdentifiers) + .map(ci -> new Tuple2(ocIdentifier, ci)) + .collect(Collectors.toList()) + .iterator(); + }, Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + + Dataset mappedCitingDataset = coci + .joinWith(correspondenceData, coci.col("citing").equalTo(correspondenceData.col("_1")), "left") + .map((MapFunction>, COCI>) t2 -> { + String correspondent = t2._2()._2(); + t2._1().setCiting_pid(correspondent.substring(0, correspondent.indexOf(":"))); + t2._1().setCiting(correspondent.substring(correspondent.indexOf(":") + 1)); + return t2._1(); + }, Encoders.bean(COCI.class)); + + mappedCitingDataset + .joinWith(correspondenceData, mappedCitingDataset.col("cited").equalTo(correspondenceData.col("_1"))) + .map((MapFunction>, COCI>) t2 -> { + String correspondent = t2._2()._2(); + t2._1().setCited_pid(correspondent.substring(0, correspondent.indexOf(":"))); + t2._1().setCited(correspondent.substring(correspondent.indexOf(":") + 1)); + return t2._1(); + }, Encoders.bean(COCI.class)) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(outputPath); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java index b9c24df3b..afbd43176 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java @@ -12,10 +12,7 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; @@ -42,19 +39,21 @@ public class ReadCOCI implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - final String[] inputFile = parser.get("inputFile").split(";"); - log.info("inputFile {}", Arrays.asList(inputFile)); + final String hdfsNameNode = parser.get("hdfsNameNode"); + log.info("hdfsNameNode {}", hdfsNameNode); + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); + final String workingPath = parser.get("inputPath"); log.info("workingPath {}", workingPath); - final String format = parser.get("format"); - log.info("format {}", format); - SparkConf sconf = new SparkConf(); + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); final String delimiter = Optional .ofNullable(parser.get("delimiter")) .orElse(DEFAULT_DELIMITER); @@ -66,19 +65,20 @@ public class ReadCOCI implements Serializable { doRead( spark, workingPath, - inputFile, + fileSystem, outputPath, - delimiter, - format); + delimiter); }); } - private static void doRead(SparkSession spark, String workingPath, String[] inputFiles, + private static void doRead(SparkSession spark, String workingPath, FileSystem fileSystem, String outputPath, - String delimiter, String format) { - - for (String inputFile : inputFiles) { - String pString = workingPath + "/" + inputFile + ".gz"; + String delimiter) throws IOException { + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(workingPath), true); + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); Dataset cociData = spark .read() @@ -87,26 +87,23 @@ public class ReadCOCI implements Serializable { .option("inferSchema", "true") .option("header", "true") .option("quotes", "\"") - .load(pString) + .load(fileStatus.getPath().toString()) .repartition(100); cociData.map((MapFunction) row -> { COCI coci = new COCI(); - if (format.equals("COCI")) { - coci.setCiting(row.getString(1)); - coci.setCited(row.getString(2)); - } else { - coci.setCiting(String.valueOf(row.getInt(1))); - coci.setCited(String.valueOf(row.getInt(2))); - } + + coci.setCiting(row.getString(1)); + coci.setCited(row.getString(2)); + coci.setOci(row.getString(0)); return coci; }, Encoders.bean(COCI.class)) .write() - .mode(SaveMode.Overwrite) + .mode(SaveMode.Append) .option("compression", "gzip") - .json(outputPath + inputFile); + .json(outputPath); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java index c1ef1abad..42fca5dd3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java @@ -9,8 +9,10 @@ public class COCI implements Serializable { private String oci; private String citing; + private String citing_pid; private String cited; + private String cited_pid; public String getOci() { return oci; @@ -36,4 +38,19 @@ public class COCI implements Serializable { this.cited = cited; } + public String getCiting_pid() { + return citing_pid; + } + + public void setCiting_pid(String citing_pid) { + this.citing_pid = citing_pid; + } + + public String getCited_pid() { + return cited_pid; + } + + public void setCited_pid(String cited_pid) { + this.cited_pid = cited_pid; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json index 96db7eeb7..f4b6e2d68 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json @@ -1,13 +1,13 @@ [ { - "paramName": "if", - "paramLongName": "inputFile", + "paramName": "ip", + "paramLongName": "inputPath", "paramDescription": "the zipped opencitations file", "paramRequired": true }, { - "paramName": "wp", - "paramLongName": "workingPath", + "paramName": "op", + "paramLongName": "outputPath", "paramDescription": "the working path", "paramRequired": true }, @@ -16,11 +16,5 @@ "paramLongName": "hdfsNameNode", "paramDescription": "the hdfs name node", "paramRequired": true - }, - { - "paramName": "p", - "paramLongName": "prefix", - "paramDescription": "COCI or POCI", - "paramRequired": true } ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json index fa840089d..a74ceb983 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json @@ -1,7 +1,7 @@ [ { - "paramName": "wp", - "paramLongName": "workingPath", + "paramName": "ip", + "paramLongName": "inputPath", "paramDescription": "the zipped opencitations file", "paramRequired": true }, @@ -24,15 +24,9 @@ "paramLongName": "outputPath", "paramDescription": "the hdfs name node", "paramRequired": true - }, - { - "paramName": "if", - "paramLongName": "inputFile", - "paramDescription": "the hdfs name node", - "paramRequired": true - }, { - "paramName": "f", - "paramLongName": "format", + }, { + "paramName": "nn", + "paramLongName": "hdfsNameNode", "paramDescription": "the hdfs name node", "paramRequired": true } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml index deb32459b..e28b021bc 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -27,7 +27,8 @@ ${wf:conf('resumeFrom') eq 'DownloadDump'} ${wf:conf('resumeFrom') eq 'ExtractContent'} ${wf:conf('resumeFrom') eq 'ReadContent'} - + ${wf:conf('resumeFrom') eq 'CreateAS'} + @@ -35,6 +36,15 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + ${jobTracker} @@ -47,7 +57,28 @@ download.sh ${filelist} - ${workingPath}/${prefix}/Original + ${inputPath}/Original + HADOOP_USER_NAME=${wf:user()} + download.sh + + + + + + + + + ${jobTracker} + ${nameNode} + + + mapred.job.queue.name + ${queueName} + + + download_corr.sh + ${filecorrespondence} + ${inputPath}/correspondence HADOOP_USER_NAME=${wf:user()} download.sh @@ -60,9 +91,19 @@ eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs --hdfsNameNode${nameNode} - --inputFile${inputFile} - --workingPath${workingPath}/${prefix} - --prefix${prefix} + --inputPath${inputPath}/Original + --outputPath${inputPath}/Extracted + + + + + + + + eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs + --hdfsNameNode${nameNode} + --inputPath${inputPath}/correspondence + --outputPath${inputPath}/correspondence_extracted @@ -85,11 +126,34 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --workingPath${workingPath}/${prefix}/${prefix} - --outputPath${workingPath}/${prefix}/${prefix}_JSON/ + --inputPath${inputPath}/Extracted + --outputPath${inputPath}/JSON --delimiter${delimiter} - --inputFile${inputFileCoci} - --format${prefix} + --hdfsNameNode${nameNode} + + + + + + + + yarn + cluster + Produces the AS for OC + eu.dnetlib.dhp.actionmanager.opencitations.MapOCIdsInPids + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --inputPath${inputPath} + --outputPath${outputPathExtraction} @@ -112,7 +176,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --inputPath${workingPath} + --inputPath${outputPathExtraction} --outputPath${outputPath} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json new file mode 100644 index 000000000..95ab5f6d9 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "ip", + "paramLongName": "inputPath", + "paramDescription": "the zipped opencitations file", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "the working path", + "paramRequired": true + }, + { + "paramName": "issm", + "paramLongName": "isSparkSessionManged", + "paramDescription": "the hdfs name node", + "paramRequired": false + } +] -- 2.17.1 From 48c052215c06d081131034b5fda24fe57821bb67 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 12 Mar 2024 23:12:32 +0100 Subject: [PATCH 2/4] [OC New] last fix --- .../bipfinder/SparkAtomicActionScoreJob.java | 2 +- .../CreateActionSetSparkJob.java | 55 ++++++++---- .../opencitations/MapOCIdsInPids.java | 61 ++++++++++++- .../actionmanager/opencitations/ReadCOCI.java | 6 +- .../opencitations/model/COCI.java | 4 + .../opencitations/oozie_app/workflow.xml | 2 + .../opencitations/remap_parameters.json | 7 +- .../CreateOpenCitationsASTest.java | 9 +- .../opencitations/RemapTest.java | 90 +++++++++++++++++++ .../COCI/inputremap/JSON/part-00000 | 31 +++++++ .../COCI/inputremap/correspondence/omid.csv | 48 ++++++++++ .../opencitations/COCI/inputremap/jsonforas | 27 ++++++ 12 files changed, 316 insertions(+), 26 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/RemapTest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/JSON/part-00000 create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/correspondence/omid.csv create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/jsonforas diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index 040c89782..8adc88920 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -95,7 +95,7 @@ public class SparkAtomicActionScoreJob implements Serializable { return projectScores.map((MapFunction) bipProjectScores -> { Project project = new Project(); - project.setId(bipProjectScores.getProjectId()); + // project.setId(bipProjectScores.getProjectId()); project.setMeasures(bipProjectScores.toMeasures()); return project; }, Encoders.bean(Project.class)) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java index 47949a4bb..950b2eb57 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -22,6 +22,7 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; @@ -41,7 +42,9 @@ public class CreateActionSetSparkJob implements Serializable { private static final String DOI_PREFIX = "50|doi_________::"; private static final String PMID_PREFIX = "50|pmid________::"; + private static final String ARXIV_PREFIX = "50|arXiv_______::"; + private static final String PMCID_PREFIX = "50|pmcid_______::"; private static final String TRUST = "0.91"; private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class); @@ -74,25 +77,18 @@ public class CreateActionSetSparkJob implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath {}", outputPath); - final boolean shouldDuplicateRels = Optional - .ofNullable(parser.get("shouldDuplicateRels")) - .map(Boolean::valueOf) - .orElse(Boolean.FALSE); - SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, - spark -> extractContent(spark, inputPath, outputPath, shouldDuplicateRels)); + spark -> extractContent(spark, inputPath, outputPath)); } - private static void extractContent(SparkSession spark, String inputPath, String outputPath, - boolean shouldDuplicateRels) { + private static void extractContent(SparkSession spark, String inputPath, String outputPath) { getTextTextJavaPairRDD(spark, inputPath) - // .union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, POCI)) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);// , GzipCodec.class); } private static JavaPairRDD getTextTextJavaPairRDD(SparkSession spark, String inputPath) { @@ -115,7 +111,7 @@ public class CreateActionSetSparkJob implements Serializable { new Text(OBJECT_MAPPER.writeValueAsString(aa)))); } - private static List createRelation(COCI value) { + private static List createRelation(COCI value) throws JsonProcessingException { List relationList = new ArrayList<>(); @@ -131,25 +127,52 @@ public class CreateActionSetSparkJob implements Serializable { case "pmid": citing = PMID_PREFIX + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting())); + .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCiting())); break; + case "arxiv": + citing = ARXIV_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), value.getCiting())); + break; + case "pmcid": + citing = PMCID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), value.getCiting())); + break; + case "isbn": + case "issn": + return relationList; + default: - throw new IllegalStateException("Invalid prefix: " + value.getCiting_pid()); + throw new IllegalStateException("Invalid prefix: " + new ObjectMapper().writeValueAsString(value)); } switch (value.getCited_pid()) { case "doi": cited = DOI_PREFIX + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting())); + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCited())); break; case "pmid": cited = PMID_PREFIX + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting())); + .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCited())); break; + case "arxiv": + cited = ARXIV_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), value.getCited())); + break; + case "pmcid": + cited = PMCID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), value.getCited())); + break; + case "isbn": + case "issn": + return relationList; default: - throw new IllegalStateException("Invalid prefix: " + value.getCited_pid()); + throw new IllegalStateException("Invalid prefix: " + new ObjectMapper().writeValueAsString(value)); } if (!citing.equals(cited)) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java index c6c5bc0ba..d4fc33f01 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java @@ -9,11 +9,19 @@ import java.util.Arrays; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.slf4j.Logger; @@ -62,6 +70,10 @@ public class MapOCIdsInPids implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath {}", outputPath); + final String nameNode = parser.get("nameNode"); + log.info("nameNode {}", nameNode); + + unzipCorrespondenceFile(inputPath, nameNode); SparkConf conf = new SparkConf(); runWithSparkSession( conf, @@ -70,6 +82,41 @@ public class MapOCIdsInPids implements Serializable { } + private static void unzipCorrespondenceFile(String inputPath, String hdfsNameNode) throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + final Path path = new Path(inputPath + "/correspondence/omid.zip"); + FileSystem fileSystem = FileSystem.get(conf); + + FSDataInputStream project_zip = fileSystem.open(path); + + try (ZipInputStream zis = new ZipInputStream(project_zip)) { + ZipEntry entry = null; + while ((entry = zis.getNextEntry()) != null) { + + if (!entry.isDirectory()) { + String fileName = entry.getName(); + byte buffer[] = new byte[1024]; + int count; + + try ( + FSDataOutputStream out = fileSystem + .create(new Path(inputPath + "/correspondence/omid.csv"))) { + + while ((count = zis.read(buffer, 0, buffer.length)) != -1) + out.write(buffer, 0, count); + + } + + } + + } + + } + + } + private static void mapIdentifiers(SparkSession spark, String inputPath, String outputPath) { Dataset coci = spark .read() @@ -85,7 +132,7 @@ public class MapOCIdsInPids implements Serializable { .option("inferSchema", "true") .option("header", "true") .option("quotes", "\"") - .load(inputPath + "/correspondence/omid.zip") + .load(inputPath + "/correspondence/omid.csv") .repartition(5000) .flatMap((FlatMapFunction>) r -> { String ocIdentifier = r.getAs("omid"); @@ -98,7 +145,7 @@ public class MapOCIdsInPids implements Serializable { }, Encoders.tuple(Encoders.STRING(), Encoders.STRING())); Dataset mappedCitingDataset = coci - .joinWith(correspondenceData, coci.col("citing").equalTo(correspondenceData.col("_1")), "left") + .joinWith(correspondenceData, coci.col("citing").equalTo(correspondenceData.col("_1"))) .map((MapFunction>, COCI>) t2 -> { String correspondent = t2._2()._2(); t2._1().setCiting_pid(correspondent.substring(0, correspondent.indexOf(":"))); @@ -118,6 +165,16 @@ public class MapOCIdsInPids implements Serializable { .mode(SaveMode.Append) .option("compression", "gzip") .json(outputPath); + + mappedCitingDataset + .joinWith(correspondenceData, mappedCitingDataset.col("cited").equalTo(correspondenceData.col("_1"))) + .map((MapFunction>, COCI>) t2 -> { + String correspondent = t2._2()._2(); + t2._1().setCited_pid(correspondent.substring(0, correspondent.indexOf(":"))); + t2._1().setCited(correspondent.substring(correspondent.indexOf(":") + 1)); + return t2._1(); + }, Encoders.bean(COCI.class)) + .foreach((ForeachFunction) c -> System.out.println(OBJECT_MAPPER.writeValueAsString(c))); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java index afbd43176..479aea458 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java @@ -14,6 +14,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.slf4j.Logger; @@ -79,7 +80,7 @@ public class ReadCOCI implements Serializable { new Path(workingPath), true); while (fileStatusListIterator.hasNext()) { LocatedFileStatus fileStatus = fileStatusListIterator.next(); - + log.info("extracting file {}", fileStatus.getPath().toString()); Dataset cociData = spark .read() .format("csv") @@ -91,6 +92,7 @@ public class ReadCOCI implements Serializable { .repartition(100); cociData.map((MapFunction) row -> { + COCI coci = new COCI(); coci.setCiting(row.getString(1)); @@ -100,10 +102,12 @@ public class ReadCOCI implements Serializable { return coci; }, Encoders.bean(COCI.class)) + .filter((FilterFunction) c -> c != null) .write() .mode(SaveMode.Append) .option("compression", "gzip") .json(outputPath); + fileSystem.rename(fileStatus.getPath(), new Path("/tmp/miriam/OC/DONE")); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java index 42fca5dd3..22a3fb52d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java @@ -27,6 +27,8 @@ public class COCI implements Serializable { } public void setCiting(String citing) { + if (citing != null && citing.startsWith("omid:")) + citing = citing.substring(5); this.citing = citing; } @@ -35,6 +37,8 @@ public class COCI implements Serializable { } public void setCited(String cited) { + if (cited != null && cited.startsWith("omid:")) + cited = cited.substring(5); this.cited = cited; } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml index e28b021bc..566cf7d02 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -27,6 +27,7 @@ ${wf:conf('resumeFrom') eq 'DownloadDump'} ${wf:conf('resumeFrom') eq 'ExtractContent'} ${wf:conf('resumeFrom') eq 'ReadContent'} + ${wf:conf('resumeFrom') eq 'MapContent'} ${wf:conf('resumeFrom') eq 'CreateAS'} @@ -154,6 +155,7 @@ --inputPath${inputPath} --outputPath${outputPathExtraction} + --nameNode${nameNode} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json index 95ab5f6d9..7ebaddfdf 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json @@ -16,5 +16,10 @@ "paramLongName": "isSparkSessionManged", "paramDescription": "the hdfs name node", "paramRequired": false - } + },{ + "paramName": "nn", + "paramLongName": "nameNode", + "paramDescription": "the hdfs name node", + "paramRequired": true +} ] diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java index 523437950..ed80ed5c5 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java @@ -76,7 +76,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/jsonforas") .getPath(); CreateActionSetSparkJob @@ -84,8 +84,6 @@ public class CreateOpenCitationsASTest { new String[] { "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-shouldDuplicateRels", - Boolean.TRUE.toString(), "-inputPath", inputPath, "-outputPath", @@ -99,9 +97,10 @@ public class CreateOpenCitationsASTest { .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(aa -> ((Relation) aa.getPayload())); - assertEquals(31, tmp.count()); + Assertions.assertEquals(27, tmp.count()); + tmp.foreach(r -> Assertions.assertEquals(1, r.getCollectedfrom().size())); - // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); + tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/RemapTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/RemapTest.java new file mode 100644 index 000000000..5fc732bcf --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/RemapTest.java @@ -0,0 +1,90 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; + +/** + * @author miriam.baglioni + * @Date 07/03/24 + */ +public class RemapTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(RemapTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(RemapTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(RemapTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(RemapTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testRemap() throws Exception { + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap") + .getPath(); + + MapOCIdsInPids + .main( + new String[] { + "-isSparkSessionManged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-outputPath", + workingDir.toString() + "/out/", + "-nameNode", "input1;input2;input3;input4;input5" + }); + + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/JSON/part-00000 b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/JSON/part-00000 new file mode 100644 index 000000000..50f731b17 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/JSON/part-00000 @@ -0,0 +1,31 @@ +{"cited":"br/061201599020", "citing":"br/06203041400","oci":"oci:06701327944-06504326071"} +{"cited":"br/061201599020","citing":"br/06502272390","oci":"oci:06502272390-061301355525"} +{"cited":"br/061201599020", "citing":"br/06120941789","oci":"oci:0670804699-067055659"} +{"cited":"br/06210273177","citing":"br/06203041400","oci":"oci:061502003994-062201281456"} +{"cited":"br/06210273177", "citing":"br/06502272390","oci":"oci:06502272390-0660806688"} +{"cited":"br/06210273177", "citing":"br/06120941789","oci":"oci:06502307119-0620223645"} +{"cited":"br/0660613430","citing":"br/06203041400","oci":"oci:061502004011-061902692285"} +{"cited":"br/0660613430", "citing":"br/06502272390","oci":"oci:0660549063-0610398792"} +{"cited":"br/0660613430", "citing":"br/06120941789","oci":"oci:06420189324-06301543046"} +{"cited":"br/062602732073","citing":"br/06203041400","oci":"oci:06380130275-061502004367"} +{"cited":"br/062602732073","citing":"br/06502272390","oci":"oci:062403449086-062501448395"} +{"cited":"br/062602732073","citing":"br/06120941789","oci":"oci:06420189328-061202007182"} +{"cited":"br/061103703697","citing":"br/06203041400","oci":"oci:062603906965-061701362658"} +{"cited":"br/061103703697", "citing":"br/06502272390","oci":"oci:0670294309-06104327031"} +{"cited":"br/061103703697","citing":"br/06120941789","oci":"oci:061702060228-061301712529"} +{"cited":"br/06230199640", "citing":"br/0670517081","oci":"oci:06901104174-06503692526"} +{"cited":"br/061703513967","citing":"br/061702310822","oci":"oci:061702310822-061703513967"} +{"cited":"br/062104002953","citing":"br/061702311472","oci":"oci:061702311472-062104002953"} +{"cited":"br/061101204417","citing":"br/062102701590","oci":"oci:062102701590-061101204417"} +{"cited":"br/062403787088","citing":"br/061401499173","oci":"oci:061401499173-062403787088"} +{"cited":"br/061203576338","citing":"br/06110279619","oci":"oci:06110279619-061203576338"} +{"cited":"br/061601962207","citing":"br/061502004018","oci":"oci:061502004018-061601962207"} +{"cited":"br/06101014588", "citing":"br/061502004027","oci":"oci:061502004027-06101014588"} +{"cited":"br/06704040804", "citing":"br/06220799044","oci":"oci:06220799044-06704040804"} +{"cited":"br/061401105151","citing":"br/061502004037","oci":"oci:061502004037-061401105151"} +{"cited":"br/0640821079", "citing":"br/061702311537","oci":"oci:061702311537-0640821079"} +{"cited":"br/06604165310", "citing":"br/062501970289","oci":"oci:062501970289-06604165310"} +{"cited":"br/061501351689","citing":"br/061203895786","oci":"oci:061203895786-061501351689"} +{"cited":"br/06202223692", "citing":"br/06110298832","oci":"oci:06110298832-06202223692"} +{"cited":"br/06104310727", "citing":"br/0660439086","oci":"oci:0660439086-06104310727"} +{"cited":"br/06150216214", "citing":"br/06340150329","oci":"oci:06340150329-06150216214"} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/correspondence/omid.csv b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/correspondence/omid.csv new file mode 100644 index 000000000..0e34ba936 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/correspondence/omid.csv @@ -0,0 +1,48 @@ +omid,id +br/061201599020,doi:10.1142/s0219887817501687 +br/06203041400,doi:10.1111/j.1523-5378.2005.00327.x pmid:16104945 +br/06210273177,doi:10.1090/qam/20394 +br/06502272390,pmid:32235596 doi:10.3390/nano10040644 +br/0660613430,doi:10.1007/bf00470411 +br/06120941789,doi:10.1098/rspa.2006.1747 +br/062602732073,doi:10.1007/978-3-642-38844-6_25 +br/06230199640,pmid:25088780 doi:10.1016/j.ymeth.2014.07.008 +br/061103703697,pmid:2682767 +br/0670517081,doi:10.1016/j.foodpol.2021.102189 +br/06502310477,doi:10.1142/s0218127416500450 +br/06520113284,doi:10.1109/cfasta57821.2023.10243367 +br/062303652439,pmid:5962654 doi:10.1016/0020-708x(66)90001-9 +br/06250691436,doi:10.1042/bst20150052 pmid:26009172 +br/061201665577,doi:10.1097/00115550-200205000-00018 +br/06503490336,pmid:34689254 doi:10.1007/s10072-021-05687-0 +br/06220615942,pmid:25626134 doi:10.1016/j.jcis.2015.01.008 +br/061103389243,doi:10.4324/9780203702819-10 +br/062303011271,doi:10.1109/icassp.2011.5946250 +br/061302926083,doi:10.4018/978-1-6684-3937-1.ch002 +br/061402485360,doi:10.1109/iciict.2015.7396079 +br/06410101083,doi:10.1016/j.autcon.2023.104828 +br/062202243386,doi:10.1016/0001-8791(81)90022-1 +br/06170421486,doi:10.1130/0016-7606(2003)115<0166:dsagmf>2.0.co;2 +br/061201983865,doi:10.4324/9781315109008 isbn:9781315109008 +br/061701697230,doi:10.1016/j.trd.2012.07.006 +br/061201137111,doi:10.1109/access.2020.2971656 +br/06120436283,pmid:2254430 doi:10.1128/jcm.28.11.2551-2554.1990 +br/061903968916,doi:10.1111/j.1742-1241.1988.tb08627.x +br/06201583482,doi:10.1016/0016-5085(78)93139-6 +br/06130338317,doi:10.2134/agronj1952.00021962004400080013x +br/062601538320,doi:10.1371/journal.pone.0270593 pmid:35789338 +br/062401098626,pmid:22385804 doi:10.1016/j.talanta.2011.12.034 +br/06190436492,doi:10.1039/c7dt01499f pmid:28644489 +br/06202819247,doi:10.1007/978-3-319-45823-6_57 +br/0648013560,doi:10.1080/14772000.2012.705356 +br/0690214059,doi:10.2752/175630608x329217 +br/06601640415,doi:10.1080/18128600508685647 +br/061503394761,doi:10.1002/0471443395.img018 +br/061702861849,pmid:31203682 doi:10.1080/10428194.2019.1627538 +br/06450133713,doi:10.1093/acprof:oso/9780199670888.003.0008 +br/0628074892,doi:10.1097/hnp.0000000000000597 +br/061601032219,doi:10.1002/bdm.2102 +br/06602079930,doi:10.1101/2020.08.25.267500 +br/0604192147,doi:10.11501/3307395 +br/061101933800,doi:10.1142/s0217732398002242 +br/06504184118,pmid:10091417 \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/jsonforas b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/jsonforas new file mode 100644 index 000000000..4b8853d6e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/jsonforas @@ -0,0 +1,27 @@ +{"oci":"oci:06701327944-06504326071","citing":"16104945","citing_pid":"pmid","cited":"10.1142/s0219887817501687","cited_pid":"doi"} +{"oci":"oci:06701327944-06504326071","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"10.1142/s0219887817501687","cited_pid":"doi"} +{"oci":"oci:06502272390-061301355525","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"10.1142/s0219887817501687","cited_pid":"doi"} +{"oci":"oci:06502272390-061301355525","citing":"32235596","citing_pid":"pmid","cited":"10.1142/s0219887817501687","cited_pid":"doi"} +{"oci":"oci:0670804699-067055659","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"10.1142/s0219887817501687","cited_pid":"doi"} +{"oci":"oci:061502003994-062201281456","citing":"16104945","citing_pid":"pmid","cited":"10.1090/qam/20394","cited_pid":"doi"} +{"oci":"oci:061502003994-062201281456","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"10.1090/qam/20394","cited_pid":"doi"} +{"oci":"oci:06502272390-0660806688","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"10.1090/qam/20394","cited_pid":"doi"} +{"oci":"oci:06502272390-0660806688","citing":"32235596","citing_pid":"pmid","cited":"10.1090/qam/20394","cited_pid":"doi"} +{"oci":"oci:06502307119-0620223645","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"10.1090/qam/20394","cited_pid":"doi"} +{"oci":"oci:061502004011-061902692285","citing":"16104945","citing_pid":"pmid","cited":"10.1007/bf00470411","cited_pid":"doi"} +{"oci":"oci:061502004011-061902692285","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"10.1007/bf00470411","cited_pid":"doi"} +{"oci":"oci:0660549063-0610398792","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"10.1007/bf00470411","cited_pid":"doi"} +{"oci":"oci:0660549063-0610398792","citing":"32235596","citing_pid":"pmid","cited":"10.1007/bf00470411","cited_pid":"doi"} +{"oci":"oci:06420189324-06301543046","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"10.1007/bf00470411","cited_pid":"doi"} +{"oci":"oci:06380130275-061502004367","citing":"16104945","citing_pid":"pmid","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"} +{"oci":"oci:06380130275-061502004367","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"} +{"oci":"oci:062403449086-062501448395","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"} +{"oci":"oci:062403449086-062501448395","citing":"32235596","citing_pid":"pmid","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"} +{"oci":"oci:06420189328-061202007182","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"} +{"oci":"oci:062603906965-061701362658","citing":"16104945","citing_pid":"pmid","cited":"2682767","cited_pid":"pmid"} +{"oci":"oci:062603906965-061701362658","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"2682767","cited_pid":"pmid"} +{"oci":"oci:0670294309-06104327031","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"2682767","cited_pid":"pmid"} +{"oci":"oci:0670294309-06104327031","citing":"32235596","citing_pid":"pmid","cited":"2682767","cited_pid":"pmid"} +{"oci":"oci:061702060228-061301712529","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"2682767","cited_pid":"pmid"} +{"oci":"oci:06901104174-06503692526","citing":"10.1016/j.foodpol.2021.102189","citing_pid":"doi","cited":"10.1016/j.ymeth.2014.07.008","cited_pid":"doi"} +{"oci":"oci:06901104174-06503692526","citing":"10.1016/j.foodpol.2021.102189","citing_pid":"doi","cited":"25088780","cited_pid":"pmid"} \ No newline at end of file -- 2.17.1 From 5a32bb9578d437e669539d07c66eb84c0ddc58c5 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 13 Mar 2024 09:36:18 +0100 Subject: [PATCH 3/4] [OC New] last fix --- .../actionmanager/opencitations/MapOCIdsInPids.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java index d4fc33f01..f7de4ac08 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java @@ -166,15 +166,6 @@ public class MapOCIdsInPids implements Serializable { .option("compression", "gzip") .json(outputPath); - mappedCitingDataset - .joinWith(correspondenceData, mappedCitingDataset.col("cited").equalTo(correspondenceData.col("_1"))) - .map((MapFunction>, COCI>) t2 -> { - String correspondent = t2._2()._2(); - t2._1().setCited_pid(correspondent.substring(0, correspondent.indexOf(":"))); - t2._1().setCited(correspondent.substring(correspondent.indexOf(":") + 1)); - return t2._1(); - }, Encoders.bean(COCI.class)) - .foreach((ForeachFunction) c -> System.out.println(OBJECT_MAPPER.writeValueAsString(c))); - } + } } -- 2.17.1 From 6c3b692f60dac106080eea32052d11fc24badfda Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 25 Mar 2024 16:10:23 +0100 Subject: [PATCH 4/4] integrated minor change from beta branch --- .../dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index 8adc88920..76c8ec7fa 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -95,7 +95,7 @@ public class SparkAtomicActionScoreJob implements Serializable { return projectScores.map((MapFunction) bipProjectScores -> { Project project = new Project(); - // project.setId(bipProjectScores.getProjectId()); + //project.setId(bipProjectScores.getProjectId()); project.setMeasures(bipProjectScores.toMeasures()); return project; }, Encoders.bean(Project.class)) -- 2.17.1