From fd34372c4077e1e93b00b1965c044d5000a91e69 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 6 Mar 2024 13:42:00 +0100 Subject: [PATCH] [OCNEW] first implementation --- .../CreateActionSetSparkJob.java | 68 +++++----- .../opencitations/GetOpenCitationsRefs.java | 64 +++++---- .../opencitations/MapOCIdsInPids.java | 123 ++++++++++++++++++ .../actionmanager/opencitations/ReadCOCI.java | 53 ++++---- .../opencitations/model/COCI.java | 17 +++ .../opencitations/input_parameters.json | 14 +- .../input_readcoci_parameters.json | 16 +-- .../opencitations/oozie_app/workflow.xml | 84 ++++++++++-- .../opencitations/remap_parameters.json | 20 +++ 9 files changed, 329 insertions(+), 130 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java index b707fdcd3..47949a4bb 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -28,6 +28,7 @@ import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.*; import eu.dnetlib.dhp.utils.DHPUtils; @@ -37,12 +38,6 @@ public class CreateActionSetSparkJob implements Serializable { public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations"; public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations"; - // DOI-to-DOI citations - public static final String COCI = "COCI"; - - // PMID-to-PMID citations - public static final String POCI = "POCI"; - private static final String DOI_PREFIX = "50|doi_________::"; private static final String PMID_PREFIX = "50|pmid________::"; @@ -95,22 +90,21 @@ public class CreateActionSetSparkJob implements Serializable { private static void extractContent(SparkSession spark, String inputPath, String outputPath, boolean shouldDuplicateRels) { - getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, COCI) - .union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, POCI)) + getTextTextJavaPairRDD(spark, inputPath) + // .union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, POCI)) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); } - private static JavaPairRDD getTextTextJavaPairRDD(SparkSession spark, String inputPath, - boolean shouldDuplicateRels, String prefix) { + private static JavaPairRDD getTextTextJavaPairRDD(SparkSession spark, String inputPath) { return spark .read() - .textFile(inputPath + "/" + prefix + "/" + prefix + "_JSON/*") + .textFile(inputPath) .map( (MapFunction) value -> OBJECT_MAPPER.readValue(value, COCI.class), Encoders.bean(COCI.class)) .flatMap( (FlatMapFunction) value -> createRelation( - value, shouldDuplicateRels, prefix) + value) .iterator(), Encoders.bean(Relation.class)) .filter((FilterFunction) Objects::nonNull) @@ -121,34 +115,41 @@ public class CreateActionSetSparkJob implements Serializable { new Text(OBJECT_MAPPER.writeValueAsString(aa)))); } - private static List createRelation(COCI value, boolean duplicate, String p) { + private static List createRelation(COCI value) { List relationList = new ArrayList<>(); - String prefix; + String citing; String cited; - switch (p) { - case COCI: - prefix = DOI_PREFIX; - citing = prefix + switch (value.getCiting_pid()) { + case "doi": + citing = DOI_PREFIX + IdentifierFactory .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting())); - cited = prefix - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCited())); break; - case POCI: - prefix = PMID_PREFIX; - citing = prefix + case "pmid": + citing = PMID_PREFIX + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCiting())); - cited = prefix - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCited())); + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting())); break; default: - throw new IllegalStateException("Invalid prefix: " + p); + throw new IllegalStateException("Invalid prefix: " + value.getCiting_pid()); + } + + switch (value.getCited_pid()) { + case "doi": + cited = DOI_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting())); + break; + case "pmid": + cited = PMID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting())); + break; + default: + throw new IllegalStateException("Invalid prefix: " + value.getCited_pid()); } if (!citing.equals(cited)) { @@ -157,15 +158,6 @@ public class CreateActionSetSparkJob implements Serializable { getRelation( citing, cited, ModelConstants.CITES)); - - if (duplicate && value.getCiting().endsWith(".refs")) { - citing = prefix + IdentifierFactory - .md5( - CleaningFunctions - .normalizePidValue( - "doi", value.getCiting().substring(0, value.getCiting().indexOf(".refs")))); - relationList.add(getRelation(citing, cited, ModelConstants.CITES)); - } } return relationList; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java index 600cf7df1..be653aed2 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java @@ -12,10 +12,7 @@ import java.util.zip.ZipInputStream; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,17 +34,17 @@ public class GetOpenCitationsRefs implements Serializable { parser.parseArgument(args); - final String[] inputFile = parser.get("inputFile").split(";"); - log.info("inputFile {}", Arrays.asList(inputFile)); +// final String[] inputFile = parser.get("inputFile").split(";"); +// log.info("inputFile {}", Arrays.asList(inputFile)); - final String workingPath = parser.get("workingPath"); - log.info("workingPath {}", workingPath); + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}", inputPath); final String hdfsNameNode = parser.get("hdfsNameNode"); log.info("hdfsNameNode {}", hdfsNameNode); - final String prefix = parser.get("prefix"); - log.info("prefix {}", prefix); + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsNameNode); @@ -56,41 +53,42 @@ public class GetOpenCitationsRefs implements Serializable { GetOpenCitationsRefs ocr = new GetOpenCitationsRefs(); - for (String file : inputFile) { - ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem, prefix); - } + ocr.doExtract(inputPath, outputPath, fileSystem); } - private void doExtract(String inputFile, String workingPath, FileSystem fileSystem, String prefix) + private void doExtract(String inputPath, String outputPath, FileSystem fileSystem) throws IOException { - final Path path = new Path(inputFile); + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(inputPath), true); + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + // do stuff with the file like ... + FSDataInputStream oc_zip = fileSystem.open(fileStatus.getPath()); + try (ZipInputStream zis = new ZipInputStream(oc_zip)) { + ZipEntry entry = null; + while ((entry = zis.getNextEntry()) != null) { - FSDataInputStream oc_zip = fileSystem.open(path); + if (!entry.isDirectory()) { + String fileName = entry.getName(); + // fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count; + fileName = fileName.substring(0, fileName.lastIndexOf(".")); + // count++; + try ( + FSDataOutputStream out = fileSystem + .create(new Path(outputPath + "/" + fileName + ".gz")); + GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { - // int count = 1; - try (ZipInputStream zis = new ZipInputStream(oc_zip)) { - ZipEntry entry = null; - while ((entry = zis.getNextEntry()) != null) { - - if (!entry.isDirectory()) { - String fileName = entry.getName(); - // fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count; - fileName = fileName.substring(0, fileName.lastIndexOf(".")); - // count++; - try ( - FSDataOutputStream out = fileSystem - .create(new Path(workingPath + "/" + prefix + "/" + fileName + ".gz")); - GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { - - IOUtils.copy(zis, gzipOs); + IOUtils.copy(zis, gzipOs); + } } + } } - } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java new file mode 100644 index 000000000..c6c5bc0ba --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java @@ -0,0 +1,123 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import scala.Tuple2; + +/** + * @author miriam.baglioni + * @Date 29/02/24 + */ +public class MapOCIdsInPids implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class); + private static final String DELIMITER = ","; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(final String[] args) throws IOException, ParseException { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + MapOCIdsInPids.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json")))); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> mapIdentifiers(spark, inputPath, outputPath)); + + } + + private static void mapIdentifiers(SparkSession spark, String inputPath, String outputPath) { + Dataset coci = spark + .read() + .textFile(inputPath + "/JSON") + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, COCI.class), + Encoders.bean(COCI.class)); + + Dataset> correspondenceData = spark + .read() + .format("csv") + .option("sep", DELIMITER) + .option("inferSchema", "true") + .option("header", "true") + .option("quotes", "\"") + .load(inputPath + "/correspondence/omid.zip") + .repartition(5000) + .flatMap((FlatMapFunction>) r -> { + String ocIdentifier = r.getAs("omid"); + String[] correspondentIdentifiers = ((String) r.getAs("id")).split(" "); + return Arrays + .stream(correspondentIdentifiers) + .map(ci -> new Tuple2(ocIdentifier, ci)) + .collect(Collectors.toList()) + .iterator(); + }, Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + + Dataset mappedCitingDataset = coci + .joinWith(correspondenceData, coci.col("citing").equalTo(correspondenceData.col("_1")), "left") + .map((MapFunction>, COCI>) t2 -> { + String correspondent = t2._2()._2(); + t2._1().setCiting_pid(correspondent.substring(0, correspondent.indexOf(":"))); + t2._1().setCiting(correspondent.substring(correspondent.indexOf(":") + 1)); + return t2._1(); + }, Encoders.bean(COCI.class)); + + mappedCitingDataset + .joinWith(correspondenceData, mappedCitingDataset.col("cited").equalTo(correspondenceData.col("_1"))) + .map((MapFunction>, COCI>) t2 -> { + String correspondent = t2._2()._2(); + t2._1().setCited_pid(correspondent.substring(0, correspondent.indexOf(":"))); + t2._1().setCited(correspondent.substring(correspondent.indexOf(":") + 1)); + return t2._1(); + }, Encoders.bean(COCI.class)) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(outputPath); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java index b9c24df3b..afbd43176 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java @@ -12,10 +12,7 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; @@ -42,19 +39,21 @@ public class ReadCOCI implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - final String[] inputFile = parser.get("inputFile").split(";"); - log.info("inputFile {}", Arrays.asList(inputFile)); + final String hdfsNameNode = parser.get("hdfsNameNode"); + log.info("hdfsNameNode {}", hdfsNameNode); + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); + final String workingPath = parser.get("inputPath"); log.info("workingPath {}", workingPath); - final String format = parser.get("format"); - log.info("format {}", format); - SparkConf sconf = new SparkConf(); + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); final String delimiter = Optional .ofNullable(parser.get("delimiter")) .orElse(DEFAULT_DELIMITER); @@ -66,19 +65,20 @@ public class ReadCOCI implements Serializable { doRead( spark, workingPath, - inputFile, + fileSystem, outputPath, - delimiter, - format); + delimiter); }); } - private static void doRead(SparkSession spark, String workingPath, String[] inputFiles, + private static void doRead(SparkSession spark, String workingPath, FileSystem fileSystem, String outputPath, - String delimiter, String format) { - - for (String inputFile : inputFiles) { - String pString = workingPath + "/" + inputFile + ".gz"; + String delimiter) throws IOException { + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(workingPath), true); + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); Dataset cociData = spark .read() @@ -87,26 +87,23 @@ public class ReadCOCI implements Serializable { .option("inferSchema", "true") .option("header", "true") .option("quotes", "\"") - .load(pString) + .load(fileStatus.getPath().toString()) .repartition(100); cociData.map((MapFunction) row -> { COCI coci = new COCI(); - if (format.equals("COCI")) { - coci.setCiting(row.getString(1)); - coci.setCited(row.getString(2)); - } else { - coci.setCiting(String.valueOf(row.getInt(1))); - coci.setCited(String.valueOf(row.getInt(2))); - } + + coci.setCiting(row.getString(1)); + coci.setCited(row.getString(2)); + coci.setOci(row.getString(0)); return coci; }, Encoders.bean(COCI.class)) .write() - .mode(SaveMode.Overwrite) + .mode(SaveMode.Append) .option("compression", "gzip") - .json(outputPath + inputFile); + .json(outputPath); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java index c1ef1abad..42fca5dd3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java @@ -9,8 +9,10 @@ public class COCI implements Serializable { private String oci; private String citing; + private String citing_pid; private String cited; + private String cited_pid; public String getOci() { return oci; @@ -36,4 +38,19 @@ public class COCI implements Serializable { this.cited = cited; } + public String getCiting_pid() { + return citing_pid; + } + + public void setCiting_pid(String citing_pid) { + this.citing_pid = citing_pid; + } + + public String getCited_pid() { + return cited_pid; + } + + public void setCited_pid(String cited_pid) { + this.cited_pid = cited_pid; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json index 96db7eeb7..f4b6e2d68 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json @@ -1,13 +1,13 @@ [ { - "paramName": "if", - "paramLongName": "inputFile", + "paramName": "ip", + "paramLongName": "inputPath", "paramDescription": "the zipped opencitations file", "paramRequired": true }, { - "paramName": "wp", - "paramLongName": "workingPath", + "paramName": "op", + "paramLongName": "outputPath", "paramDescription": "the working path", "paramRequired": true }, @@ -16,11 +16,5 @@ "paramLongName": "hdfsNameNode", "paramDescription": "the hdfs name node", "paramRequired": true - }, - { - "paramName": "p", - "paramLongName": "prefix", - "paramDescription": "COCI or POCI", - "paramRequired": true } ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json index fa840089d..a74ceb983 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json @@ -1,7 +1,7 @@ [ { - "paramName": "wp", - "paramLongName": "workingPath", + "paramName": "ip", + "paramLongName": "inputPath", "paramDescription": "the zipped opencitations file", "paramRequired": true }, @@ -24,15 +24,9 @@ "paramLongName": "outputPath", "paramDescription": "the hdfs name node", "paramRequired": true - }, - { - "paramName": "if", - "paramLongName": "inputFile", - "paramDescription": "the hdfs name node", - "paramRequired": true - }, { - "paramName": "f", - "paramLongName": "format", + }, { + "paramName": "nn", + "paramLongName": "hdfsNameNode", "paramDescription": "the hdfs name node", "paramRequired": true } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml index deb32459b..e28b021bc 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -27,7 +27,8 @@ ${wf:conf('resumeFrom') eq 'DownloadDump'} ${wf:conf('resumeFrom') eq 'ExtractContent'} ${wf:conf('resumeFrom') eq 'ReadContent'} - + ${wf:conf('resumeFrom') eq 'CreateAS'} + @@ -35,6 +36,15 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + ${jobTracker} @@ -47,7 +57,28 @@ download.sh ${filelist} - ${workingPath}/${prefix}/Original + ${inputPath}/Original + HADOOP_USER_NAME=${wf:user()} + download.sh + + + + + + + + + ${jobTracker} + ${nameNode} + + + mapred.job.queue.name + ${queueName} + + + download_corr.sh + ${filecorrespondence} + ${inputPath}/correspondence HADOOP_USER_NAME=${wf:user()} download.sh @@ -60,9 +91,19 @@ eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs --hdfsNameNode${nameNode} - --inputFile${inputFile} - --workingPath${workingPath}/${prefix} - --prefix${prefix} + --inputPath${inputPath}/Original + --outputPath${inputPath}/Extracted + + + + + + + + eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs + --hdfsNameNode${nameNode} + --inputPath${inputPath}/correspondence + --outputPath${inputPath}/correspondence_extracted @@ -85,11 +126,34 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --workingPath${workingPath}/${prefix}/${prefix} - --outputPath${workingPath}/${prefix}/${prefix}_JSON/ + --inputPath${inputPath}/Extracted + --outputPath${inputPath}/JSON --delimiter${delimiter} - --inputFile${inputFileCoci} - --format${prefix} + --hdfsNameNode${nameNode} + + + + + + + + yarn + cluster + Produces the AS for OC + eu.dnetlib.dhp.actionmanager.opencitations.MapOCIdsInPids + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --inputPath${inputPath} + --outputPath${outputPathExtraction} @@ -112,7 +176,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --inputPath${workingPath} + --inputPath${outputPathExtraction} --outputPath${outputPath} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json new file mode 100644 index 000000000..95ab5f6d9 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "ip", + "paramLongName": "inputPath", + "paramDescription": "the zipped opencitations file", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "the working path", + "paramRequired": true + }, + { + "paramName": "issm", + "paramLongName": "isSparkSessionManged", + "paramDescription": "the hdfs name node", + "paramRequired": false + } +]