Merge pull request 'Open Citation integration' (#401) from ocnew into beta

Reviewed-on: D-Net/dnet-hadoop#401
This commit is contained in:
Claudio Atzori 2024-03-25 16:10:40 +01:00
commit fa4b3e6d2b
14 changed files with 622 additions and 142 deletions

View File

@ -22,12 +22,14 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.*;
import eu.dnetlib.dhp.utils.DHPUtils;
@ -37,16 +39,12 @@ public class CreateActionSetSparkJob implements Serializable {
public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations";
public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations";
// DOI-to-DOI citations
public static final String COCI = "COCI";
// PMID-to-PMID citations
public static final String POCI = "POCI";
private static final String DOI_PREFIX = "50|doi_________::";
private static final String PMID_PREFIX = "50|pmid________::";
private static final String ARXIV_PREFIX = "50|arXiv_______::";
private static final String PMCID_PREFIX = "50|pmcid_______::";
private static final String TRUST = "0.91";
private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
@ -79,38 +77,30 @@ public class CreateActionSetSparkJob implements Serializable {
final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath);
final boolean shouldDuplicateRels = Optional
.ofNullable(parser.get("shouldDuplicateRels"))
.map(Boolean::valueOf)
.orElse(Boolean.FALSE);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> extractContent(spark, inputPath, outputPath, shouldDuplicateRels));
spark -> extractContent(spark, inputPath, outputPath));
}
private static void extractContent(SparkSession spark, String inputPath, String outputPath,
boolean shouldDuplicateRels) {
private static void extractContent(SparkSession spark, String inputPath, String outputPath) {
getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, COCI)
.union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, POCI))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
getTextTextJavaPairRDD(spark, inputPath)
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);// , GzipCodec.class);
}
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(SparkSession spark, String inputPath,
boolean shouldDuplicateRels, String prefix) {
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(SparkSession spark, String inputPath) {
return spark
.read()
.textFile(inputPath + "/" + prefix + "/" + prefix + "_JSON/*")
.textFile(inputPath)
.map(
(MapFunction<String, COCI>) value -> OBJECT_MAPPER.readValue(value, COCI.class),
Encoders.bean(COCI.class))
.flatMap(
(FlatMapFunction<COCI, Relation>) value -> createRelation(
value, shouldDuplicateRels, prefix)
value)
.iterator(),
Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) Objects::nonNull)
@ -121,34 +111,68 @@ public class CreateActionSetSparkJob implements Serializable {
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
}
private static List<Relation> createRelation(COCI value, boolean duplicate, String p) {
private static List<Relation> createRelation(COCI value) throws JsonProcessingException {
List<Relation> relationList = new ArrayList<>();
String prefix;
String citing;
String cited;
switch (p) {
case COCI:
prefix = DOI_PREFIX;
citing = prefix
switch (value.getCiting_pid()) {
case "doi":
citing = DOI_PREFIX
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting()));
cited = prefix
break;
case "pmid":
citing = PMID_PREFIX
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCiting()));
break;
case "arxiv":
citing = ARXIV_PREFIX
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), value.getCiting()));
break;
case "pmcid":
citing = PMCID_PREFIX
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), value.getCiting()));
break;
case "isbn":
case "issn":
return relationList;
default:
throw new IllegalStateException("Invalid prefix: " + new ObjectMapper().writeValueAsString(value));
}
switch (value.getCited_pid()) {
case "doi":
cited = DOI_PREFIX
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCited()));
break;
case POCI:
prefix = PMID_PREFIX;
citing = prefix
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCiting()));
cited = prefix
case "pmid":
cited = PMID_PREFIX
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCited()));
break;
case "arxiv":
cited = ARXIV_PREFIX
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), value.getCited()));
break;
case "pmcid":
cited = PMCID_PREFIX
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), value.getCited()));
break;
case "isbn":
case "issn":
return relationList;
default:
throw new IllegalStateException("Invalid prefix: " + p);
throw new IllegalStateException("Invalid prefix: " + new ObjectMapper().writeValueAsString(value));
}
if (!citing.equals(cited)) {
@ -157,15 +181,6 @@ public class CreateActionSetSparkJob implements Serializable {
getRelation(
citing,
cited, ModelConstants.CITES));
if (duplicate && value.getCiting().endsWith(".refs")) {
citing = prefix + IdentifierFactory
.md5(
CleaningFunctions
.normalizePidValue(
"doi", value.getCiting().substring(0, value.getCiting().indexOf(".refs"))));
relationList.add(getRelation(citing, cited, ModelConstants.CITES));
}
}
return relationList;

View File

@ -12,10 +12,7 @@ import java.util.zip.ZipInputStream;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,17 +34,17 @@ public class GetOpenCitationsRefs implements Serializable {
parser.parseArgument(args);
final String[] inputFile = parser.get("inputFile").split(";");
log.info("inputFile {}", Arrays.asList(inputFile));
// final String[] inputFile = parser.get("inputFile").split(";");
// log.info("inputFile {}", Arrays.asList(inputFile));
final String workingPath = parser.get("workingPath");
log.info("workingPath {}", workingPath);
final String inputPath = parser.get("inputPath");
log.info("inputPath {}", inputPath);
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode);
final String prefix = parser.get("prefix");
log.info("prefix {}", prefix);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
@ -56,41 +53,42 @@ public class GetOpenCitationsRefs implements Serializable {
GetOpenCitationsRefs ocr = new GetOpenCitationsRefs();
for (String file : inputFile) {
ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem, prefix);
}
ocr.doExtract(inputPath, outputPath, fileSystem);
}
private void doExtract(String inputFile, String workingPath, FileSystem fileSystem, String prefix)
private void doExtract(String inputPath, String outputPath, FileSystem fileSystem)
throws IOException {
final Path path = new Path(inputFile);
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
.listFiles(
new Path(inputPath), true);
while (fileStatusListIterator.hasNext()) {
LocatedFileStatus fileStatus = fileStatusListIterator.next();
// do stuff with the file like ...
FSDataInputStream oc_zip = fileSystem.open(fileStatus.getPath());
try (ZipInputStream zis = new ZipInputStream(oc_zip)) {
ZipEntry entry = null;
while ((entry = zis.getNextEntry()) != null) {
FSDataInputStream oc_zip = fileSystem.open(path);
if (!entry.isDirectory()) {
String fileName = entry.getName();
// fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count;
fileName = fileName.substring(0, fileName.lastIndexOf("."));
// count++;
try (
FSDataOutputStream out = fileSystem
.create(new Path(outputPath + "/" + fileName + ".gz"));
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
// int count = 1;
try (ZipInputStream zis = new ZipInputStream(oc_zip)) {
ZipEntry entry = null;
while ((entry = zis.getNextEntry()) != null) {
if (!entry.isDirectory()) {
String fileName = entry.getName();
// fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count;
fileName = fileName.substring(0, fileName.lastIndexOf("."));
// count++;
try (
FSDataOutputStream out = fileSystem
.create(new Path(workingPath + "/" + prefix + "/" + fileName + ".gz"));
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
IOUtils.copy(zis, gzipOs);
IOUtils.copy(zis, gzipOs);
}
}
}
}
}
}

View File

@ -0,0 +1,171 @@
package eu.dnetlib.dhp.actionmanager.opencitations;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import scala.Tuple2;
/**
* @author miriam.baglioni
* @Date 29/02/24
*/
public class MapOCIdsInPids implements Serializable {
private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
private static final String DELIMITER = ",";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws IOException, ParseException {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects
.requireNonNull(
MapOCIdsInPids.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json"))));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath");
log.info("inputPath {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath);
final String nameNode = parser.get("nameNode");
log.info("nameNode {}", nameNode);
unzipCorrespondenceFile(inputPath, nameNode);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> mapIdentifiers(spark, inputPath, outputPath));
}
private static void unzipCorrespondenceFile(String inputPath, String hdfsNameNode) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
final Path path = new Path(inputPath + "/correspondence/omid.zip");
FileSystem fileSystem = FileSystem.get(conf);
FSDataInputStream project_zip = fileSystem.open(path);
try (ZipInputStream zis = new ZipInputStream(project_zip)) {
ZipEntry entry = null;
while ((entry = zis.getNextEntry()) != null) {
if (!entry.isDirectory()) {
String fileName = entry.getName();
byte buffer[] = new byte[1024];
int count;
try (
FSDataOutputStream out = fileSystem
.create(new Path(inputPath + "/correspondence/omid.csv"))) {
while ((count = zis.read(buffer, 0, buffer.length)) != -1)
out.write(buffer, 0, count);
}
}
}
}
}
private static void mapIdentifiers(SparkSession spark, String inputPath, String outputPath) {
Dataset<COCI> coci = spark
.read()
.textFile(inputPath + "/JSON")
.map(
(MapFunction<String, COCI>) value -> OBJECT_MAPPER.readValue(value, COCI.class),
Encoders.bean(COCI.class));
Dataset<Tuple2<String, String>> correspondenceData = spark
.read()
.format("csv")
.option("sep", DELIMITER)
.option("inferSchema", "true")
.option("header", "true")
.option("quotes", "\"")
.load(inputPath + "/correspondence/omid.csv")
.repartition(5000)
.flatMap((FlatMapFunction<Row, Tuple2<String, String>>) r -> {
String ocIdentifier = r.getAs("omid");
String[] correspondentIdentifiers = ((String) r.getAs("id")).split(" ");
return Arrays
.stream(correspondentIdentifiers)
.map(ci -> new Tuple2<String, String>(ocIdentifier, ci))
.collect(Collectors.toList())
.iterator();
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
Dataset<COCI> mappedCitingDataset = coci
.joinWith(correspondenceData, coci.col("citing").equalTo(correspondenceData.col("_1")))
.map((MapFunction<Tuple2<COCI, Tuple2<String, String>>, COCI>) t2 -> {
String correspondent = t2._2()._2();
t2._1().setCiting_pid(correspondent.substring(0, correspondent.indexOf(":")));
t2._1().setCiting(correspondent.substring(correspondent.indexOf(":") + 1));
return t2._1();
}, Encoders.bean(COCI.class));
mappedCitingDataset
.joinWith(correspondenceData, mappedCitingDataset.col("cited").equalTo(correspondenceData.col("_1")))
.map((MapFunction<Tuple2<COCI, Tuple2<String, String>>, COCI>) t2 -> {
String correspondent = t2._2()._2();
t2._1().setCited_pid(correspondent.substring(0, correspondent.indexOf(":")));
t2._1().setCited(correspondent.substring(correspondent.indexOf(":") + 1));
return t2._1();
}, Encoders.bean(COCI.class))
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(outputPath);
}
}

View File

@ -12,11 +12,9 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
@ -42,19 +40,21 @@ public class ReadCOCI implements Serializable {
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String[] inputFile = parser.get("inputFile").split(";");
log.info("inputFile {}", Arrays.asList(inputFile));
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath");
final String workingPath = parser.get("inputPath");
log.info("workingPath {}", workingPath);
final String format = parser.get("format");
log.info("format {}", format);
SparkConf sconf = new SparkConf();
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
final String delimiter = Optional
.ofNullable(parser.get("delimiter"))
.orElse(DEFAULT_DELIMITER);
@ -66,20 +66,21 @@ public class ReadCOCI implements Serializable {
doRead(
spark,
workingPath,
inputFile,
fileSystem,
outputPath,
delimiter,
format);
delimiter);
});
}
private static void doRead(SparkSession spark, String workingPath, String[] inputFiles,
private static void doRead(SparkSession spark, String workingPath, FileSystem fileSystem,
String outputPath,
String delimiter, String format) {
for (String inputFile : inputFiles) {
String pString = workingPath + "/" + inputFile + ".gz";
String delimiter) throws IOException {
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
.listFiles(
new Path(workingPath), true);
while (fileStatusListIterator.hasNext()) {
LocatedFileStatus fileStatus = fileStatusListIterator.next();
log.info("extracting file {}", fileStatus.getPath().toString());
Dataset<Row> cociData = spark
.read()
.format("csv")
@ -87,26 +88,26 @@ public class ReadCOCI implements Serializable {
.option("inferSchema", "true")
.option("header", "true")
.option("quotes", "\"")
.load(pString)
.load(fileStatus.getPath().toString())
.repartition(100);
cociData.map((MapFunction<Row, COCI>) row -> {
COCI coci = new COCI();
if (format.equals("COCI")) {
coci.setCiting(row.getString(1));
coci.setCited(row.getString(2));
} else {
coci.setCiting(String.valueOf(row.getInt(1)));
coci.setCited(String.valueOf(row.getInt(2)));
}
coci.setCiting(row.getString(1));
coci.setCited(row.getString(2));
coci.setOci(row.getString(0));
return coci;
}, Encoders.bean(COCI.class))
.filter((FilterFunction<COCI>) c -> c != null)
.write()
.mode(SaveMode.Overwrite)
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(outputPath + inputFile);
.json(outputPath);
fileSystem.rename(fileStatus.getPath(), new Path("/tmp/miriam/OC/DONE"));
}
}

View File

@ -9,8 +9,10 @@ public class COCI implements Serializable {
private String oci;
private String citing;
private String citing_pid;
private String cited;
private String cited_pid;
public String getOci() {
return oci;
@ -25,6 +27,8 @@ public class COCI implements Serializable {
}
public void setCiting(String citing) {
if (citing != null && citing.startsWith("omid:"))
citing = citing.substring(5);
this.citing = citing;
}
@ -33,7 +37,24 @@ public class COCI implements Serializable {
}
public void setCited(String cited) {
if (cited != null && cited.startsWith("omid:"))
cited = cited.substring(5);
this.cited = cited;
}
public String getCiting_pid() {
return citing_pid;
}
public void setCiting_pid(String citing_pid) {
this.citing_pid = citing_pid;
}
public String getCited_pid() {
return cited_pid;
}
public void setCited_pid(String cited_pid) {
this.cited_pid = cited_pid;
}
}

View File

@ -1,13 +1,13 @@
[
{
"paramName": "if",
"paramLongName": "inputFile",
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the zipped opencitations file",
"paramRequired": true
},
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the working path",
"paramRequired": true
},
@ -16,11 +16,5 @@
"paramLongName": "hdfsNameNode",
"paramDescription": "the hdfs name node",
"paramRequired": true
},
{
"paramName": "p",
"paramLongName": "prefix",
"paramDescription": "COCI or POCI",
"paramRequired": true
}
]

View File

@ -1,7 +1,7 @@
[
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the zipped opencitations file",
"paramRequired": true
},
@ -24,15 +24,9 @@
"paramLongName": "outputPath",
"paramDescription": "the hdfs name node",
"paramRequired": true
},
{
"paramName": "if",
"paramLongName": "inputFile",
"paramDescription": "the hdfs name node",
"paramRequired": true
}, {
"paramName": "f",
"paramLongName": "format",
}, {
"paramName": "nn",
"paramLongName": "hdfsNameNode",
"paramDescription": "the hdfs name node",
"paramRequired": true
}

View File

@ -27,7 +27,9 @@
<case to="download">${wf:conf('resumeFrom') eq 'DownloadDump'}</case>
<case to="extract">${wf:conf('resumeFrom') eq 'ExtractContent'}</case>
<case to="read">${wf:conf('resumeFrom') eq 'ReadContent'}</case>
<default to="create_actionset"/> <!-- first action to be done when downloadDump is to be performed -->
<case to="remap">${wf:conf('resumeFrom') eq 'MapContent'}</case>
<case to="create_actionset">${wf:conf('resumeFrom') eq 'CreateAS'}</case>
<default to="deleteoutputpath"/> <!-- first action to be done when downloadDump is to be performed -->
</switch>
</decision>
@ -35,6 +37,15 @@
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="deleteoutputpath">
<fs>
<delete path='${inputPath}'/>
<mkdir path='${inputPath}'/>
</fs>
<ok to="download"/>
<error to="Kill"/>
</action>
<action name="download">
<shell xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
@ -47,7 +58,28 @@
</configuration>
<exec>download.sh</exec>
<argument>${filelist}</argument>
<argument>${workingPath}/${prefix}/Original</argument>
<argument>${inputPath}/Original</argument>
<env-var>HADOOP_USER_NAME=${wf:user()}</env-var>
<file>download.sh</file>
<capture-output/>
</shell>
<ok to="download_correspondence"/>
<error to="Kill"/>
</action>
<!-- downloads the correspondence from the omid and the pid (doi, pmid etc)-->
<action name="download_correspondence">
<shell xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>download_corr.sh</exec>
<argument>${filecorrespondence}</argument>
<argument>${inputPath}/correspondence</argument>
<env-var>HADOOP_USER_NAME=${wf:user()}</env-var>
<file>download.sh</file>
<capture-output/>
@ -60,9 +92,19 @@
<java>
<main-class>eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--inputFile</arg><arg>${inputFile}</arg>
<arg>--workingPath</arg><arg>${workingPath}/${prefix}</arg>
<arg>--prefix</arg><arg>${prefix}</arg>
<arg>--inputPath</arg><arg>${inputPath}/Original</arg>
<arg>--outputPath</arg><arg>${inputPath}/Extracted</arg>
</java>
<ok to="read"/>
<error to="Kill"/>
</action>
<action name="extract_correspondence">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--inputPath</arg><arg>${inputPath}/correspondence</arg>
<arg>--outputPath</arg><arg>${inputPath}/correspondence_extracted</arg>
</java>
<ok to="read"/>
<error to="Kill"/>
@ -85,11 +127,35 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}/${prefix}/${prefix}</arg>
<arg>--outputPath</arg><arg>${workingPath}/${prefix}/${prefix}_JSON/</arg>
<arg>--inputPath</arg><arg>${inputPath}/Extracted</arg>
<arg>--outputPath</arg><arg>${inputPath}/JSON</arg>
<arg>--delimiter</arg><arg>${delimiter}</arg>
<arg>--inputFile</arg><arg>${inputFileCoci}</arg>
<arg>--format</arg><arg>${prefix}</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
</spark>
<ok to="remap"/>
<error to="Kill"/>
</action>
<action name="remap">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the AS for OC</name>
<class>eu.dnetlib.dhp.actionmanager.opencitations.MapOCIdsInPids</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}</arg>
<arg>--outputPath</arg><arg>${outputPathExtraction}</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
</spark>
<ok to="create_actionset"/>
<error to="Kill"/>
@ -112,7 +178,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}</arg>
<arg>--inputPath</arg><arg>${outputPathExtraction}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="End"/>

View File

@ -0,0 +1,25 @@
[
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the zipped opencitations file",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "the working path",
"paramRequired": true
},
{
"paramName": "issm",
"paramLongName": "isSparkSessionManged",
"paramDescription": "the hdfs name node",
"paramRequired": false
},{
"paramName": "nn",
"paramLongName": "nameNode",
"paramDescription": "the hdfs name node",
"paramRequired": true
}
]

View File

@ -76,7 +76,7 @@ public class CreateOpenCitationsASTest {
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/jsonforas")
.getPath();
CreateActionSetSparkJob
@ -84,8 +84,6 @@ public class CreateOpenCitationsASTest {
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-shouldDuplicateRels",
Boolean.TRUE.toString(),
"-inputPath",
inputPath,
"-outputPath",
@ -99,9 +97,10 @@ public class CreateOpenCitationsASTest {
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload()));
assertEquals(31, tmp.count());
Assertions.assertEquals(27, tmp.count());
tmp.foreach(r -> Assertions.assertEquals(1, r.getCollectedfrom().size()));
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
}

View File

@ -0,0 +1,90 @@
package eu.dnetlib.dhp.actionmanager.opencitations;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
/**
* @author miriam.baglioni
* @Date 07/03/24
*/
public class RemapTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(RemapTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(RemapTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(RemapTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(RemapTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void testRemap() throws Exception {
String inputPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap")
.getPath();
MapOCIdsInPids
.main(
new String[] {
"-isSparkSessionManged",
Boolean.FALSE.toString(),
"-inputPath",
inputPath,
"-outputPath",
workingDir.toString() + "/out/",
"-nameNode", "input1;input2;input3;input4;input5"
});
}
}

View File

@ -0,0 +1,31 @@
{"cited":"br/061201599020", "citing":"br/06203041400","oci":"oci:06701327944-06504326071"}
{"cited":"br/061201599020","citing":"br/06502272390","oci":"oci:06502272390-061301355525"}
{"cited":"br/061201599020", "citing":"br/06120941789","oci":"oci:0670804699-067055659"}
{"cited":"br/06210273177","citing":"br/06203041400","oci":"oci:061502003994-062201281456"}
{"cited":"br/06210273177", "citing":"br/06502272390","oci":"oci:06502272390-0660806688"}
{"cited":"br/06210273177", "citing":"br/06120941789","oci":"oci:06502307119-0620223645"}
{"cited":"br/0660613430","citing":"br/06203041400","oci":"oci:061502004011-061902692285"}
{"cited":"br/0660613430", "citing":"br/06502272390","oci":"oci:0660549063-0610398792"}
{"cited":"br/0660613430", "citing":"br/06120941789","oci":"oci:06420189324-06301543046"}
{"cited":"br/062602732073","citing":"br/06203041400","oci":"oci:06380130275-061502004367"}
{"cited":"br/062602732073","citing":"br/06502272390","oci":"oci:062403449086-062501448395"}
{"cited":"br/062602732073","citing":"br/06120941789","oci":"oci:06420189328-061202007182"}
{"cited":"br/061103703697","citing":"br/06203041400","oci":"oci:062603906965-061701362658"}
{"cited":"br/061103703697", "citing":"br/06502272390","oci":"oci:0670294309-06104327031"}
{"cited":"br/061103703697","citing":"br/06120941789","oci":"oci:061702060228-061301712529"}
{"cited":"br/06230199640", "citing":"br/0670517081","oci":"oci:06901104174-06503692526"}
{"cited":"br/061703513967","citing":"br/061702310822","oci":"oci:061702310822-061703513967"}
{"cited":"br/062104002953","citing":"br/061702311472","oci":"oci:061702311472-062104002953"}
{"cited":"br/061101204417","citing":"br/062102701590","oci":"oci:062102701590-061101204417"}
{"cited":"br/062403787088","citing":"br/061401499173","oci":"oci:061401499173-062403787088"}
{"cited":"br/061203576338","citing":"br/06110279619","oci":"oci:06110279619-061203576338"}
{"cited":"br/061601962207","citing":"br/061502004018","oci":"oci:061502004018-061601962207"}
{"cited":"br/06101014588", "citing":"br/061502004027","oci":"oci:061502004027-06101014588"}
{"cited":"br/06704040804", "citing":"br/06220799044","oci":"oci:06220799044-06704040804"}
{"cited":"br/061401105151","citing":"br/061502004037","oci":"oci:061502004037-061401105151"}
{"cited":"br/0640821079", "citing":"br/061702311537","oci":"oci:061702311537-0640821079"}
{"cited":"br/06604165310", "citing":"br/062501970289","oci":"oci:062501970289-06604165310"}
{"cited":"br/061501351689","citing":"br/061203895786","oci":"oci:061203895786-061501351689"}
{"cited":"br/06202223692", "citing":"br/06110298832","oci":"oci:06110298832-06202223692"}
{"cited":"br/06104310727", "citing":"br/0660439086","oci":"oci:0660439086-06104310727"}
{"cited":"br/06150216214", "citing":"br/06340150329","oci":"oci:06340150329-06150216214"}

View File

@ -0,0 +1,48 @@
omid,id
br/061201599020,doi:10.1142/s0219887817501687
br/06203041400,doi:10.1111/j.1523-5378.2005.00327.x pmid:16104945
br/06210273177,doi:10.1090/qam/20394
br/06502272390,pmid:32235596 doi:10.3390/nano10040644
br/0660613430,doi:10.1007/bf00470411
br/06120941789,doi:10.1098/rspa.2006.1747
br/062602732073,doi:10.1007/978-3-642-38844-6_25
br/06230199640,pmid:25088780 doi:10.1016/j.ymeth.2014.07.008
br/061103703697,pmid:2682767
br/0670517081,doi:10.1016/j.foodpol.2021.102189
br/06502310477,doi:10.1142/s0218127416500450
br/06520113284,doi:10.1109/cfasta57821.2023.10243367
br/062303652439,pmid:5962654 doi:10.1016/0020-708x(66)90001-9
br/06250691436,doi:10.1042/bst20150052 pmid:26009172
br/061201665577,doi:10.1097/00115550-200205000-00018
br/06503490336,pmid:34689254 doi:10.1007/s10072-021-05687-0
br/06220615942,pmid:25626134 doi:10.1016/j.jcis.2015.01.008
br/061103389243,doi:10.4324/9780203702819-10
br/062303011271,doi:10.1109/icassp.2011.5946250
br/061302926083,doi:10.4018/978-1-6684-3937-1.ch002
br/061402485360,doi:10.1109/iciict.2015.7396079
br/06410101083,doi:10.1016/j.autcon.2023.104828
br/062202243386,doi:10.1016/0001-8791(81)90022-1
br/06170421486,doi:10.1130/0016-7606(2003)115<0166:dsagmf>2.0.co;2
br/061201983865,doi:10.4324/9781315109008 isbn:9781315109008
br/061701697230,doi:10.1016/j.trd.2012.07.006
br/061201137111,doi:10.1109/access.2020.2971656
br/06120436283,pmid:2254430 doi:10.1128/jcm.28.11.2551-2554.1990
br/061903968916,doi:10.1111/j.1742-1241.1988.tb08627.x
br/06201583482,doi:10.1016/0016-5085(78)93139-6
br/06130338317,doi:10.2134/agronj1952.00021962004400080013x
br/062601538320,doi:10.1371/journal.pone.0270593 pmid:35789338
br/062401098626,pmid:22385804 doi:10.1016/j.talanta.2011.12.034
br/06190436492,doi:10.1039/c7dt01499f pmid:28644489
br/06202819247,doi:10.1007/978-3-319-45823-6_57
br/0648013560,doi:10.1080/14772000.2012.705356
br/0690214059,doi:10.2752/175630608x329217
br/06601640415,doi:10.1080/18128600508685647
br/061503394761,doi:10.1002/0471443395.img018
br/061702861849,pmid:31203682 doi:10.1080/10428194.2019.1627538
br/06450133713,doi:10.1093/acprof:oso/9780199670888.003.0008
br/0628074892,doi:10.1097/hnp.0000000000000597
br/061601032219,doi:10.1002/bdm.2102
br/06602079930,doi:10.1101/2020.08.25.267500
br/0604192147,doi:10.11501/3307395
br/061101933800,doi:10.1142/s0217732398002242
br/06504184118,pmid:10091417
1 omid id
2 br/061201599020 doi:10.1142/s0219887817501687
3 br/06203041400 doi:10.1111/j.1523-5378.2005.00327.x pmid:16104945
4 br/06210273177 doi:10.1090/qam/20394
5 br/06502272390 pmid:32235596 doi:10.3390/nano10040644
6 br/0660613430 doi:10.1007/bf00470411
7 br/06120941789 doi:10.1098/rspa.2006.1747
8 br/062602732073 doi:10.1007/978-3-642-38844-6_25
9 br/06230199640 pmid:25088780 doi:10.1016/j.ymeth.2014.07.008
10 br/061103703697 pmid:2682767
11 br/0670517081 doi:10.1016/j.foodpol.2021.102189
12 br/06502310477 doi:10.1142/s0218127416500450
13 br/06520113284 doi:10.1109/cfasta57821.2023.10243367
14 br/062303652439 pmid:5962654 doi:10.1016/0020-708x(66)90001-9
15 br/06250691436 doi:10.1042/bst20150052 pmid:26009172
16 br/061201665577 doi:10.1097/00115550-200205000-00018
17 br/06503490336 pmid:34689254 doi:10.1007/s10072-021-05687-0
18 br/06220615942 pmid:25626134 doi:10.1016/j.jcis.2015.01.008
19 br/061103389243 doi:10.4324/9780203702819-10
20 br/062303011271 doi:10.1109/icassp.2011.5946250
21 br/061302926083 doi:10.4018/978-1-6684-3937-1.ch002
22 br/061402485360 doi:10.1109/iciict.2015.7396079
23 br/06410101083 doi:10.1016/j.autcon.2023.104828
24 br/062202243386 doi:10.1016/0001-8791(81)90022-1
25 br/06170421486 doi:10.1130/0016-7606(2003)115<0166:dsagmf>2.0.co;2
26 br/061201983865 doi:10.4324/9781315109008 isbn:9781315109008
27 br/061701697230 doi:10.1016/j.trd.2012.07.006
28 br/061201137111 doi:10.1109/access.2020.2971656
29 br/06120436283 pmid:2254430 doi:10.1128/jcm.28.11.2551-2554.1990
30 br/061903968916 doi:10.1111/j.1742-1241.1988.tb08627.x
31 br/06201583482 doi:10.1016/0016-5085(78)93139-6
32 br/06130338317 doi:10.2134/agronj1952.00021962004400080013x
33 br/062601538320 doi:10.1371/journal.pone.0270593 pmid:35789338
34 br/062401098626 pmid:22385804 doi:10.1016/j.talanta.2011.12.034
35 br/06190436492 doi:10.1039/c7dt01499f pmid:28644489
36 br/06202819247 doi:10.1007/978-3-319-45823-6_57
37 br/0648013560 doi:10.1080/14772000.2012.705356
38 br/0690214059 doi:10.2752/175630608x329217
39 br/06601640415 doi:10.1080/18128600508685647
40 br/061503394761 doi:10.1002/0471443395.img018
41 br/061702861849 pmid:31203682 doi:10.1080/10428194.2019.1627538
42 br/06450133713 doi:10.1093/acprof:oso/9780199670888.003.0008
43 br/0628074892 doi:10.1097/hnp.0000000000000597
44 br/061601032219 doi:10.1002/bdm.2102
45 br/06602079930 doi:10.1101/2020.08.25.267500
46 br/0604192147 doi:10.11501/3307395
47 br/061101933800 doi:10.1142/s0217732398002242
48 br/06504184118 pmid:10091417

View File

@ -0,0 +1,27 @@
{"oci":"oci:06701327944-06504326071","citing":"16104945","citing_pid":"pmid","cited":"10.1142/s0219887817501687","cited_pid":"doi"}
{"oci":"oci:06701327944-06504326071","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"10.1142/s0219887817501687","cited_pid":"doi"}
{"oci":"oci:06502272390-061301355525","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"10.1142/s0219887817501687","cited_pid":"doi"}
{"oci":"oci:06502272390-061301355525","citing":"32235596","citing_pid":"pmid","cited":"10.1142/s0219887817501687","cited_pid":"doi"}
{"oci":"oci:0670804699-067055659","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"10.1142/s0219887817501687","cited_pid":"doi"}
{"oci":"oci:061502003994-062201281456","citing":"16104945","citing_pid":"pmid","cited":"10.1090/qam/20394","cited_pid":"doi"}
{"oci":"oci:061502003994-062201281456","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"10.1090/qam/20394","cited_pid":"doi"}
{"oci":"oci:06502272390-0660806688","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"10.1090/qam/20394","cited_pid":"doi"}
{"oci":"oci:06502272390-0660806688","citing":"32235596","citing_pid":"pmid","cited":"10.1090/qam/20394","cited_pid":"doi"}
{"oci":"oci:06502307119-0620223645","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"10.1090/qam/20394","cited_pid":"doi"}
{"oci":"oci:061502004011-061902692285","citing":"16104945","citing_pid":"pmid","cited":"10.1007/bf00470411","cited_pid":"doi"}
{"oci":"oci:061502004011-061902692285","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"10.1007/bf00470411","cited_pid":"doi"}
{"oci":"oci:0660549063-0610398792","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"10.1007/bf00470411","cited_pid":"doi"}
{"oci":"oci:0660549063-0610398792","citing":"32235596","citing_pid":"pmid","cited":"10.1007/bf00470411","cited_pid":"doi"}
{"oci":"oci:06420189324-06301543046","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"10.1007/bf00470411","cited_pid":"doi"}
{"oci":"oci:06380130275-061502004367","citing":"16104945","citing_pid":"pmid","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"}
{"oci":"oci:06380130275-061502004367","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"}
{"oci":"oci:062403449086-062501448395","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"}
{"oci":"oci:062403449086-062501448395","citing":"32235596","citing_pid":"pmid","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"}
{"oci":"oci:06420189328-061202007182","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"}
{"oci":"oci:062603906965-061701362658","citing":"16104945","citing_pid":"pmid","cited":"2682767","cited_pid":"pmid"}
{"oci":"oci:062603906965-061701362658","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"2682767","cited_pid":"pmid"}
{"oci":"oci:0670294309-06104327031","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"2682767","cited_pid":"pmid"}
{"oci":"oci:0670294309-06104327031","citing":"32235596","citing_pid":"pmid","cited":"2682767","cited_pid":"pmid"}
{"oci":"oci:061702060228-061301712529","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"2682767","cited_pid":"pmid"}
{"oci":"oci:06901104174-06503692526","citing":"10.1016/j.foodpol.2021.102189","citing_pid":"doi","cited":"10.1016/j.ymeth.2014.07.008","cited_pid":"doi"}
{"oci":"oci:06901104174-06503692526","citing":"10.1016/j.foodpol.2021.102189","citing_pid":"doi","cited":"25088780","cited_pid":"pmid"}