Open Citation integration #401
|
@ -22,12 +22,14 @@ import org.apache.spark.sql.SparkSession;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
|
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.*;
|
import eu.dnetlib.dhp.schema.oaf.utils.*;
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
|
@ -37,16 +39,12 @@ public class CreateActionSetSparkJob implements Serializable {
|
||||||
public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations";
|
public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations";
|
||||||
public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations";
|
public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations";
|
||||||
|
|
||||||
// DOI-to-DOI citations
|
|
||||||
public static final String COCI = "COCI";
|
|
||||||
|
|
||||||
// PMID-to-PMID citations
|
|
||||||
public static final String POCI = "POCI";
|
|
||||||
|
|
||||||
private static final String DOI_PREFIX = "50|doi_________::";
|
private static final String DOI_PREFIX = "50|doi_________::";
|
||||||
|
|
||||||
private static final String PMID_PREFIX = "50|pmid________::";
|
private static final String PMID_PREFIX = "50|pmid________::";
|
||||||
|
private static final String ARXIV_PREFIX = "50|arXiv_______::";
|
||||||
|
|
||||||
|
private static final String PMCID_PREFIX = "50|pmcid_______::";
|
||||||
private static final String TRUST = "0.91";
|
private static final String TRUST = "0.91";
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
|
private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
|
||||||
|
@ -79,38 +77,30 @@ public class CreateActionSetSparkJob implements Serializable {
|
||||||
final String outputPath = parser.get("outputPath");
|
final String outputPath = parser.get("outputPath");
|
||||||
log.info("outputPath {}", outputPath);
|
log.info("outputPath {}", outputPath);
|
||||||
|
|
||||||
final boolean shouldDuplicateRels = Optional
|
|
||||||
.ofNullable(parser.get("shouldDuplicateRels"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.FALSE);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
runWithSparkSession(
|
runWithSparkSession(
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> extractContent(spark, inputPath, outputPath, shouldDuplicateRels));
|
spark -> extractContent(spark, inputPath, outputPath));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void extractContent(SparkSession spark, String inputPath, String outputPath,
|
private static void extractContent(SparkSession spark, String inputPath, String outputPath) {
|
||||||
boolean shouldDuplicateRels) {
|
|
||||||
|
|
||||||
getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, COCI)
|
getTextTextJavaPairRDD(spark, inputPath)
|
||||||
.union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, POCI))
|
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);// , GzipCodec.class);
|
||||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(SparkSession spark, String inputPath,
|
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(SparkSession spark, String inputPath) {
|
||||||
boolean shouldDuplicateRels, String prefix) {
|
|
||||||
return spark
|
return spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(inputPath + "/" + prefix + "/" + prefix + "_JSON/*")
|
.textFile(inputPath)
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<String, COCI>) value -> OBJECT_MAPPER.readValue(value, COCI.class),
|
(MapFunction<String, COCI>) value -> OBJECT_MAPPER.readValue(value, COCI.class),
|
||||||
Encoders.bean(COCI.class))
|
Encoders.bean(COCI.class))
|
||||||
.flatMap(
|
.flatMap(
|
||||||
(FlatMapFunction<COCI, Relation>) value -> createRelation(
|
(FlatMapFunction<COCI, Relation>) value -> createRelation(
|
||||||
value, shouldDuplicateRels, prefix)
|
value)
|
||||||
.iterator(),
|
.iterator(),
|
||||||
Encoders.bean(Relation.class))
|
Encoders.bean(Relation.class))
|
||||||
.filter((FilterFunction<Relation>) Objects::nonNull)
|
.filter((FilterFunction<Relation>) Objects::nonNull)
|
||||||
|
@ -121,34 +111,68 @@ public class CreateActionSetSparkJob implements Serializable {
|
||||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
|
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Relation> createRelation(COCI value, boolean duplicate, String p) {
|
private static List<Relation> createRelation(COCI value) throws JsonProcessingException {
|
||||||
|
|
||||||
List<Relation> relationList = new ArrayList<>();
|
List<Relation> relationList = new ArrayList<>();
|
||||||
String prefix;
|
|
||||||
String citing;
|
String citing;
|
||||||
String cited;
|
String cited;
|
||||||
|
|
||||||
switch (p) {
|
switch (value.getCiting_pid()) {
|
||||||
case COCI:
|
case "doi":
|
||||||
prefix = DOI_PREFIX;
|
citing = DOI_PREFIX
|
||||||
citing = prefix
|
|
||||||
+ IdentifierFactory
|
+ IdentifierFactory
|
||||||
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting()));
|
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCiting()));
|
||||||
cited = prefix
|
break;
|
||||||
|
case "pmid":
|
||||||
|
citing = PMID_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCiting()));
|
||||||
|
break;
|
||||||
|
case "arxiv":
|
||||||
|
citing = ARXIV_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), value.getCiting()));
|
||||||
|
break;
|
||||||
|
case "pmcid":
|
||||||
|
citing = PMCID_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), value.getCiting()));
|
||||||
|
break;
|
||||||
|
case "isbn":
|
||||||
|
case "issn":
|
||||||
|
return relationList;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw new IllegalStateException("Invalid prefix: " + new ObjectMapper().writeValueAsString(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (value.getCited_pid()) {
|
||||||
|
case "doi":
|
||||||
|
cited = DOI_PREFIX
|
||||||
+ IdentifierFactory
|
+ IdentifierFactory
|
||||||
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCited()));
|
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getCited()));
|
||||||
break;
|
break;
|
||||||
case POCI:
|
case "pmid":
|
||||||
prefix = PMID_PREFIX;
|
cited = PMID_PREFIX
|
||||||
citing = prefix
|
|
||||||
+ IdentifierFactory
|
|
||||||
.md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCiting()));
|
|
||||||
cited = prefix
|
|
||||||
+ IdentifierFactory
|
+ IdentifierFactory
|
||||||
.md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCited()));
|
.md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), value.getCited()));
|
||||||
break;
|
break;
|
||||||
|
case "arxiv":
|
||||||
|
cited = ARXIV_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.arXiv.toString(), value.getCited()));
|
||||||
|
break;
|
||||||
|
case "pmcid":
|
||||||
|
cited = PMCID_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), value.getCited()));
|
||||||
|
break;
|
||||||
|
case "isbn":
|
||||||
|
case "issn":
|
||||||
|
return relationList;
|
||||||
default:
|
default:
|
||||||
throw new IllegalStateException("Invalid prefix: " + p);
|
throw new IllegalStateException("Invalid prefix: " + new ObjectMapper().writeValueAsString(value));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!citing.equals(cited)) {
|
if (!citing.equals(cited)) {
|
||||||
|
@ -157,15 +181,6 @@ public class CreateActionSetSparkJob implements Serializable {
|
||||||
getRelation(
|
getRelation(
|
||||||
citing,
|
citing,
|
||||||
cited, ModelConstants.CITES));
|
cited, ModelConstants.CITES));
|
||||||
|
|
||||||
if (duplicate && value.getCiting().endsWith(".refs")) {
|
|
||||||
citing = prefix + IdentifierFactory
|
|
||||||
.md5(
|
|
||||||
CleaningFunctions
|
|
||||||
.normalizePidValue(
|
|
||||||
"doi", value.getCiting().substring(0, value.getCiting().indexOf(".refs"))));
|
|
||||||
relationList.add(getRelation(citing, cited, ModelConstants.CITES));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return relationList;
|
return relationList;
|
||||||
|
|
|
@ -12,10 +12,7 @@ import java.util.zip.ZipInputStream;
|
||||||
import org.apache.commons.cli.ParseException;
|
import org.apache.commons.cli.ParseException;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.*;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -37,17 +34,17 @@ public class GetOpenCitationsRefs implements Serializable {
|
||||||
|
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
final String[] inputFile = parser.get("inputFile").split(";");
|
// final String[] inputFile = parser.get("inputFile").split(";");
|
||||||
log.info("inputFile {}", Arrays.asList(inputFile));
|
// log.info("inputFile {}", Arrays.asList(inputFile));
|
||||||
|
|
||||||
final String workingPath = parser.get("workingPath");
|
final String inputPath = parser.get("inputPath");
|
||||||
log.info("workingPath {}", workingPath);
|
log.info("inputPath {}", inputPath);
|
||||||
|
|
||||||
final String hdfsNameNode = parser.get("hdfsNameNode");
|
final String hdfsNameNode = parser.get("hdfsNameNode");
|
||||||
log.info("hdfsNameNode {}", hdfsNameNode);
|
log.info("hdfsNameNode {}", hdfsNameNode);
|
||||||
|
|
||||||
final String prefix = parser.get("prefix");
|
final String outputPath = parser.get("outputPath");
|
||||||
log.info("prefix {}", prefix);
|
log.info("outputPath {}", outputPath);
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set("fs.defaultFS", hdfsNameNode);
|
conf.set("fs.defaultFS", hdfsNameNode);
|
||||||
|
@ -56,20 +53,20 @@ public class GetOpenCitationsRefs implements Serializable {
|
||||||
|
|
||||||
GetOpenCitationsRefs ocr = new GetOpenCitationsRefs();
|
GetOpenCitationsRefs ocr = new GetOpenCitationsRefs();
|
||||||
|
|
||||||
for (String file : inputFile) {
|
ocr.doExtract(inputPath, outputPath, fileSystem);
|
||||||
ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem, prefix);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doExtract(String inputFile, String workingPath, FileSystem fileSystem, String prefix)
|
private void doExtract(String inputPath, String outputPath, FileSystem fileSystem)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
final Path path = new Path(inputFile);
|
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
|
||||||
|
.listFiles(
|
||||||
FSDataInputStream oc_zip = fileSystem.open(path);
|
new Path(inputPath), true);
|
||||||
|
while (fileStatusListIterator.hasNext()) {
|
||||||
// int count = 1;
|
LocatedFileStatus fileStatus = fileStatusListIterator.next();
|
||||||
|
// do stuff with the file like ...
|
||||||
|
FSDataInputStream oc_zip = fileSystem.open(fileStatus.getPath());
|
||||||
try (ZipInputStream zis = new ZipInputStream(oc_zip)) {
|
try (ZipInputStream zis = new ZipInputStream(oc_zip)) {
|
||||||
ZipEntry entry = null;
|
ZipEntry entry = null;
|
||||||
while ((entry = zis.getNextEntry()) != null) {
|
while ((entry = zis.getNextEntry()) != null) {
|
||||||
|
@ -81,7 +78,7 @@ public class GetOpenCitationsRefs implements Serializable {
|
||||||
// count++;
|
// count++;
|
||||||
try (
|
try (
|
||||||
FSDataOutputStream out = fileSystem
|
FSDataOutputStream out = fileSystem
|
||||||
.create(new Path(workingPath + "/" + prefix + "/" + fileName + ".gz"));
|
.create(new Path(outputPath + "/" + fileName + ".gz"));
|
||||||
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
|
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
|
||||||
|
|
||||||
IOUtils.copy(zis, gzipOs);
|
IOUtils.copy(zis, gzipOs);
|
||||||
|
@ -92,6 +89,7 @@ public class GetOpenCitationsRefs implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,171 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.opencitations;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.zip.ZipEntry;
|
||||||
|
import java.util.zip.ZipInputStream;
|
||||||
|
|
||||||
|
import org.apache.commons.cli.ParseException;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
|
import org.apache.spark.api.java.function.ForeachFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 29/02/24
|
||||||
|
*/
|
||||||
|
public class MapOCIdsInPids implements Serializable {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
|
||||||
|
private static final String DELIMITER = ",";
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws IOException, ParseException {
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
Objects
|
||||||
|
.requireNonNull(
|
||||||
|
MapOCIdsInPids.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json"))));
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String inputPath = parser.get("inputPath");
|
||||||
|
log.info("inputPath {}", inputPath);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath {}", outputPath);
|
||||||
|
|
||||||
|
final String nameNode = parser.get("nameNode");
|
||||||
|
log.info("nameNode {}", nameNode);
|
||||||
|
|
||||||
|
unzipCorrespondenceFile(inputPath, nameNode);
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> mapIdentifiers(spark, inputPath, outputPath));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void unzipCorrespondenceFile(String inputPath, String hdfsNameNode) throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set("fs.defaultFS", hdfsNameNode);
|
||||||
|
|
||||||
|
final Path path = new Path(inputPath + "/correspondence/omid.zip");
|
||||||
|
FileSystem fileSystem = FileSystem.get(conf);
|
||||||
|
|
||||||
|
FSDataInputStream project_zip = fileSystem.open(path);
|
||||||
|
|
||||||
|
try (ZipInputStream zis = new ZipInputStream(project_zip)) {
|
||||||
|
ZipEntry entry = null;
|
||||||
|
while ((entry = zis.getNextEntry()) != null) {
|
||||||
|
|
||||||
|
if (!entry.isDirectory()) {
|
||||||
|
String fileName = entry.getName();
|
||||||
|
byte buffer[] = new byte[1024];
|
||||||
|
int count;
|
||||||
|
|
||||||
|
try (
|
||||||
|
FSDataOutputStream out = fileSystem
|
||||||
|
.create(new Path(inputPath + "/correspondence/omid.csv"))) {
|
||||||
|
|
||||||
|
while ((count = zis.read(buffer, 0, buffer.length)) != -1)
|
||||||
|
out.write(buffer, 0, count);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void mapIdentifiers(SparkSession spark, String inputPath, String outputPath) {
|
||||||
|
Dataset<COCI> coci = spark
|
||||||
|
.read()
|
||||||
|
.textFile(inputPath + "/JSON")
|
||||||
|
.map(
|
||||||
|
(MapFunction<String, COCI>) value -> OBJECT_MAPPER.readValue(value, COCI.class),
|
||||||
|
Encoders.bean(COCI.class));
|
||||||
|
|
||||||
|
Dataset<Tuple2<String, String>> correspondenceData = spark
|
||||||
|
.read()
|
||||||
|
.format("csv")
|
||||||
|
.option("sep", DELIMITER)
|
||||||
|
.option("inferSchema", "true")
|
||||||
|
.option("header", "true")
|
||||||
|
.option("quotes", "\"")
|
||||||
|
.load(inputPath + "/correspondence/omid.csv")
|
||||||
|
.repartition(5000)
|
||||||
|
.flatMap((FlatMapFunction<Row, Tuple2<String, String>>) r -> {
|
||||||
|
String ocIdentifier = r.getAs("omid");
|
||||||
|
String[] correspondentIdentifiers = ((String) r.getAs("id")).split(" ");
|
||||||
|
return Arrays
|
||||||
|
.stream(correspondentIdentifiers)
|
||||||
|
.map(ci -> new Tuple2<String, String>(ocIdentifier, ci))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.iterator();
|
||||||
|
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
|
||||||
|
|
||||||
|
Dataset<COCI> mappedCitingDataset = coci
|
||||||
|
.joinWith(correspondenceData, coci.col("citing").equalTo(correspondenceData.col("_1")))
|
||||||
|
.map((MapFunction<Tuple2<COCI, Tuple2<String, String>>, COCI>) t2 -> {
|
||||||
|
String correspondent = t2._2()._2();
|
||||||
|
t2._1().setCiting_pid(correspondent.substring(0, correspondent.indexOf(":")));
|
||||||
|
t2._1().setCiting(correspondent.substring(correspondent.indexOf(":") + 1));
|
||||||
|
return t2._1();
|
||||||
|
}, Encoders.bean(COCI.class));
|
||||||
|
|
||||||
|
mappedCitingDataset
|
||||||
|
.joinWith(correspondenceData, mappedCitingDataset.col("cited").equalTo(correspondenceData.col("_1")))
|
||||||
|
.map((MapFunction<Tuple2<COCI, Tuple2<String, String>>, COCI>) t2 -> {
|
||||||
|
String correspondent = t2._2()._2();
|
||||||
|
t2._1().setCited_pid(correspondent.substring(0, correspondent.indexOf(":")));
|
||||||
|
t2._1().setCited(correspondent.substring(correspondent.indexOf(":") + 1));
|
||||||
|
return t2._1();
|
||||||
|
}, Encoders.bean(COCI.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Append)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -12,11 +12,9 @@ import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.*;
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -42,19 +40,21 @@ public class ReadCOCI implements Serializable {
|
||||||
final String outputPath = parser.get("outputPath");
|
final String outputPath = parser.get("outputPath");
|
||||||
log.info("outputPath: {}", outputPath);
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
final String[] inputFile = parser.get("inputFile").split(";");
|
final String hdfsNameNode = parser.get("hdfsNameNode");
|
||||||
log.info("inputFile {}", Arrays.asList(inputFile));
|
log.info("hdfsNameNode {}", hdfsNameNode);
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
final String workingPath = parser.get("workingPath");
|
final String workingPath = parser.get("inputPath");
|
||||||
log.info("workingPath {}", workingPath);
|
log.info("workingPath {}", workingPath);
|
||||||
|
|
||||||
final String format = parser.get("format");
|
|
||||||
log.info("format {}", format);
|
|
||||||
|
|
||||||
SparkConf sconf = new SparkConf();
|
SparkConf sconf = new SparkConf();
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set("fs.defaultFS", hdfsNameNode);
|
||||||
|
|
||||||
|
FileSystem fileSystem = FileSystem.get(conf);
|
||||||
final String delimiter = Optional
|
final String delimiter = Optional
|
||||||
.ofNullable(parser.get("delimiter"))
|
.ofNullable(parser.get("delimiter"))
|
||||||
.orElse(DEFAULT_DELIMITER);
|
.orElse(DEFAULT_DELIMITER);
|
||||||
|
@ -66,20 +66,21 @@ public class ReadCOCI implements Serializable {
|
||||||
doRead(
|
doRead(
|
||||||
spark,
|
spark,
|
||||||
workingPath,
|
workingPath,
|
||||||
inputFile,
|
fileSystem,
|
||||||
outputPath,
|
outputPath,
|
||||||
delimiter,
|
delimiter);
|
||||||
format);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void doRead(SparkSession spark, String workingPath, String[] inputFiles,
|
private static void doRead(SparkSession spark, String workingPath, FileSystem fileSystem,
|
||||||
String outputPath,
|
String outputPath,
|
||||||
String delimiter, String format) {
|
String delimiter) throws IOException {
|
||||||
|
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
|
||||||
for (String inputFile : inputFiles) {
|
.listFiles(
|
||||||
String pString = workingPath + "/" + inputFile + ".gz";
|
new Path(workingPath), true);
|
||||||
|
while (fileStatusListIterator.hasNext()) {
|
||||||
|
LocatedFileStatus fileStatus = fileStatusListIterator.next();
|
||||||
|
log.info("extracting file {}", fileStatus.getPath().toString());
|
||||||
Dataset<Row> cociData = spark
|
Dataset<Row> cociData = spark
|
||||||
.read()
|
.read()
|
||||||
.format("csv")
|
.format("csv")
|
||||||
|
@ -87,26 +88,26 @@ public class ReadCOCI implements Serializable {
|
||||||
.option("inferSchema", "true")
|
.option("inferSchema", "true")
|
||||||
.option("header", "true")
|
.option("header", "true")
|
||||||
.option("quotes", "\"")
|
.option("quotes", "\"")
|
||||||
.load(pString)
|
.load(fileStatus.getPath().toString())
|
||||||
.repartition(100);
|
.repartition(100);
|
||||||
|
|
||||||
cociData.map((MapFunction<Row, COCI>) row -> {
|
cociData.map((MapFunction<Row, COCI>) row -> {
|
||||||
|
|
||||||
COCI coci = new COCI();
|
COCI coci = new COCI();
|
||||||
if (format.equals("COCI")) {
|
|
||||||
coci.setCiting(row.getString(1));
|
coci.setCiting(row.getString(1));
|
||||||
coci.setCited(row.getString(2));
|
coci.setCited(row.getString(2));
|
||||||
} else {
|
|
||||||
coci.setCiting(String.valueOf(row.getInt(1)));
|
|
||||||
coci.setCited(String.valueOf(row.getInt(2)));
|
|
||||||
}
|
|
||||||
coci.setOci(row.getString(0));
|
coci.setOci(row.getString(0));
|
||||||
|
|
||||||
return coci;
|
return coci;
|
||||||
}, Encoders.bean(COCI.class))
|
}, Encoders.bean(COCI.class))
|
||||||
|
.filter((FilterFunction<COCI>) c -> c != null)
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Append)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(outputPath + inputFile);
|
.json(outputPath);
|
||||||
|
fileSystem.rename(fileStatus.getPath(), new Path("/tmp/miriam/OC/DONE"));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,8 +9,10 @@ public class COCI implements Serializable {
|
||||||
private String oci;
|
private String oci;
|
||||||
|
|
||||||
private String citing;
|
private String citing;
|
||||||
|
private String citing_pid;
|
||||||
|
|
||||||
private String cited;
|
private String cited;
|
||||||
|
private String cited_pid;
|
||||||
|
|
||||||
public String getOci() {
|
public String getOci() {
|
||||||
return oci;
|
return oci;
|
||||||
|
@ -25,6 +27,8 @@ public class COCI implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setCiting(String citing) {
|
public void setCiting(String citing) {
|
||||||
|
if (citing != null && citing.startsWith("omid:"))
|
||||||
|
citing = citing.substring(5);
|
||||||
this.citing = citing;
|
this.citing = citing;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,7 +37,24 @@ public class COCI implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setCited(String cited) {
|
public void setCited(String cited) {
|
||||||
|
if (cited != null && cited.startsWith("omid:"))
|
||||||
|
cited = cited.substring(5);
|
||||||
this.cited = cited;
|
this.cited = cited;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getCiting_pid() {
|
||||||
|
return citing_pid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCiting_pid(String citing_pid) {
|
||||||
|
this.citing_pid = citing_pid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCited_pid() {
|
||||||
|
return cited_pid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCited_pid(String cited_pid) {
|
||||||
|
this.cited_pid = cited_pid;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,13 @@
|
||||||
[
|
[
|
||||||
{
|
{
|
||||||
"paramName": "if",
|
"paramName": "ip",
|
||||||
"paramLongName": "inputFile",
|
"paramLongName": "inputPath",
|
||||||
"paramDescription": "the zipped opencitations file",
|
"paramDescription": "the zipped opencitations file",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"paramName": "wp",
|
"paramName": "op",
|
||||||
"paramLongName": "workingPath",
|
"paramLongName": "outputPath",
|
||||||
"paramDescription": "the working path",
|
"paramDescription": "the working path",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
|
@ -16,11 +16,5 @@
|
||||||
"paramLongName": "hdfsNameNode",
|
"paramLongName": "hdfsNameNode",
|
||||||
"paramDescription": "the hdfs name node",
|
"paramDescription": "the hdfs name node",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName": "p",
|
|
||||||
"paramLongName": "prefix",
|
|
||||||
"paramDescription": "COCI or POCI",
|
|
||||||
"paramRequired": true
|
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
[
|
[
|
||||||
{
|
{
|
||||||
"paramName": "wp",
|
"paramName": "ip",
|
||||||
"paramLongName": "workingPath",
|
"paramLongName": "inputPath",
|
||||||
"paramDescription": "the zipped opencitations file",
|
"paramDescription": "the zipped opencitations file",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
|
@ -24,15 +24,9 @@
|
||||||
"paramLongName": "outputPath",
|
"paramLongName": "outputPath",
|
||||||
"paramDescription": "the hdfs name node",
|
"paramDescription": "the hdfs name node",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName": "if",
|
|
||||||
"paramLongName": "inputFile",
|
|
||||||
"paramDescription": "the hdfs name node",
|
|
||||||
"paramRequired": true
|
|
||||||
}, {
|
}, {
|
||||||
"paramName": "f",
|
"paramName": "nn",
|
||||||
"paramLongName": "format",
|
"paramLongName": "hdfsNameNode",
|
||||||
"paramDescription": "the hdfs name node",
|
"paramDescription": "the hdfs name node",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,9 @@
|
||||||
<case to="download">${wf:conf('resumeFrom') eq 'DownloadDump'}</case>
|
<case to="download">${wf:conf('resumeFrom') eq 'DownloadDump'}</case>
|
||||||
<case to="extract">${wf:conf('resumeFrom') eq 'ExtractContent'}</case>
|
<case to="extract">${wf:conf('resumeFrom') eq 'ExtractContent'}</case>
|
||||||
<case to="read">${wf:conf('resumeFrom') eq 'ReadContent'}</case>
|
<case to="read">${wf:conf('resumeFrom') eq 'ReadContent'}</case>
|
||||||
<default to="create_actionset"/> <!-- first action to be done when downloadDump is to be performed -->
|
<case to="remap">${wf:conf('resumeFrom') eq 'MapContent'}</case>
|
||||||
|
<case to="create_actionset">${wf:conf('resumeFrom') eq 'CreateAS'}</case>
|
||||||
|
<default to="deleteoutputpath"/> <!-- first action to be done when downloadDump is to be performed -->
|
||||||
</switch>
|
</switch>
|
||||||
</decision>
|
</decision>
|
||||||
|
|
||||||
|
@ -35,6 +37,15 @@
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
|
<action name="deleteoutputpath">
|
||||||
|
<fs>
|
||||||
|
<delete path='${inputPath}'/>
|
||||||
|
<mkdir path='${inputPath}'/>
|
||||||
|
</fs>
|
||||||
|
<ok to="download"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
<action name="download">
|
<action name="download">
|
||||||
<shell xmlns="uri:oozie:shell-action:0.2">
|
<shell xmlns="uri:oozie:shell-action:0.2">
|
||||||
<job-tracker>${jobTracker}</job-tracker>
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
@ -47,7 +58,28 @@
|
||||||
</configuration>
|
</configuration>
|
||||||
<exec>download.sh</exec>
|
<exec>download.sh</exec>
|
||||||
<argument>${filelist}</argument>
|
<argument>${filelist}</argument>
|
||||||
<argument>${workingPath}/${prefix}/Original</argument>
|
<argument>${inputPath}/Original</argument>
|
||||||
|
<env-var>HADOOP_USER_NAME=${wf:user()}</env-var>
|
||||||
|
<file>download.sh</file>
|
||||||
|
<capture-output/>
|
||||||
|
</shell>
|
||||||
|
<ok to="download_correspondence"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
<!-- downloads the correspondence from the omid and the pid (doi, pmid etc)-->
|
||||||
|
<action name="download_correspondence">
|
||||||
|
<shell xmlns="uri:oozie:shell-action:0.2">
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>mapred.job.queue.name</name>
|
||||||
|
<value>${queueName}</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
||||||
|
<exec>download_corr.sh</exec>
|
||||||
|
<argument>${filecorrespondence}</argument>
|
||||||
|
<argument>${inputPath}/correspondence</argument>
|
||||||
<env-var>HADOOP_USER_NAME=${wf:user()}</env-var>
|
<env-var>HADOOP_USER_NAME=${wf:user()}</env-var>
|
||||||
<file>download.sh</file>
|
<file>download.sh</file>
|
||||||
<capture-output/>
|
<capture-output/>
|
||||||
|
@ -60,9 +92,19 @@
|
||||||
<java>
|
<java>
|
||||||
<main-class>eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs</main-class>
|
<main-class>eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs</main-class>
|
||||||
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||||
<arg>--inputFile</arg><arg>${inputFile}</arg>
|
<arg>--inputPath</arg><arg>${inputPath}/Original</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}/${prefix}</arg>
|
<arg>--outputPath</arg><arg>${inputPath}/Extracted</arg>
|
||||||
<arg>--prefix</arg><arg>${prefix}</arg>
|
</java>
|
||||||
|
<ok to="read"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="extract_correspondence">
|
||||||
|
<java>
|
||||||
|
<main-class>eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs</main-class>
|
||||||
|
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||||
|
<arg>--inputPath</arg><arg>${inputPath}/correspondence</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${inputPath}/correspondence_extracted</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="read"/>
|
<ok to="read"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -85,11 +127,35 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}/${prefix}/${prefix}</arg>
|
<arg>--inputPath</arg><arg>${inputPath}/Extracted</arg>
|
||||||
<arg>--outputPath</arg><arg>${workingPath}/${prefix}/${prefix}_JSON/</arg>
|
<arg>--outputPath</arg><arg>${inputPath}/JSON</arg>
|
||||||
<arg>--delimiter</arg><arg>${delimiter}</arg>
|
<arg>--delimiter</arg><arg>${delimiter}</arg>
|
||||||
<arg>--inputFile</arg><arg>${inputFileCoci}</arg>
|
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||||
<arg>--format</arg><arg>${prefix}</arg>
|
</spark>
|
||||||
|
<ok to="remap"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="remap">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Produces the AS for OC</name>
|
||||||
|
<class>eu.dnetlib.dhp.actionmanager.opencitations.MapOCIdsInPids</class>
|
||||||
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--inputPath</arg><arg>${inputPath}</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${outputPathExtraction}</arg>
|
||||||
|
<arg>--nameNode</arg><arg>${nameNode}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="create_actionset"/>
|
<ok to="create_actionset"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -112,7 +178,7 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${workingPath}</arg>
|
<arg>--inputPath</arg><arg>${outputPathExtraction}</arg>
|
||||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "ip",
|
||||||
|
"paramLongName": "inputPath",
|
||||||
|
"paramDescription": "the zipped opencitations file",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "op",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the working path",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManged",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": false
|
||||||
|
},{
|
||||||
|
"paramName": "nn",
|
||||||
|
"paramLongName": "nameNode",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -76,7 +76,7 @@ public class CreateOpenCitationsASTest {
|
||||||
|
|
||||||
String inputPath = getClass()
|
String inputPath = getClass()
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI")
|
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap/jsonforas")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
CreateActionSetSparkJob
|
CreateActionSetSparkJob
|
||||||
|
@ -84,8 +84,6 @@ public class CreateOpenCitationsASTest {
|
||||||
new String[] {
|
new String[] {
|
||||||
"-isSparkSessionManaged",
|
"-isSparkSessionManaged",
|
||||||
Boolean.FALSE.toString(),
|
Boolean.FALSE.toString(),
|
||||||
"-shouldDuplicateRels",
|
|
||||||
Boolean.TRUE.toString(),
|
|
||||||
"-inputPath",
|
"-inputPath",
|
||||||
inputPath,
|
inputPath,
|
||||||
"-outputPath",
|
"-outputPath",
|
||||||
|
@ -99,9 +97,10 @@ public class CreateOpenCitationsASTest {
|
||||||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||||
.map(aa -> ((Relation) aa.getPayload()));
|
.map(aa -> ((Relation) aa.getPayload()));
|
||||||
|
|
||||||
assertEquals(31, tmp.count());
|
Assertions.assertEquals(27, tmp.count());
|
||||||
|
tmp.foreach(r -> Assertions.assertEquals(1, r.getCollectedfrom().size()));
|
||||||
|
|
||||||
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
|
tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,90 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.opencitations;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 07/03/24
|
||||||
|
*/
|
||||||
|
public class RemapTest {
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path workingDir;
|
||||||
|
private static final Logger log = LoggerFactory
|
||||||
|
.getLogger(RemapTest.class);
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
workingDir = Files
|
||||||
|
.createTempDirectory(RemapTest.class.getSimpleName());
|
||||||
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.setAppName(RemapTest.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
conf.set("hive.metastore.local", "true");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(RemapTest.class.getSimpleName())
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
|
spark.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testRemap() throws Exception {
|
||||||
|
String inputPath = getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/opencitations/COCI/inputremap")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
MapOCIdsInPids
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-inputPath",
|
||||||
|
inputPath,
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/out/",
|
||||||
|
"-nameNode", "input1;input2;input3;input4;input5"
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
{"cited":"br/061201599020", "citing":"br/06203041400","oci":"oci:06701327944-06504326071"}
|
||||||
|
{"cited":"br/061201599020","citing":"br/06502272390","oci":"oci:06502272390-061301355525"}
|
||||||
|
{"cited":"br/061201599020", "citing":"br/06120941789","oci":"oci:0670804699-067055659"}
|
||||||
|
{"cited":"br/06210273177","citing":"br/06203041400","oci":"oci:061502003994-062201281456"}
|
||||||
|
{"cited":"br/06210273177", "citing":"br/06502272390","oci":"oci:06502272390-0660806688"}
|
||||||
|
{"cited":"br/06210273177", "citing":"br/06120941789","oci":"oci:06502307119-0620223645"}
|
||||||
|
{"cited":"br/0660613430","citing":"br/06203041400","oci":"oci:061502004011-061902692285"}
|
||||||
|
{"cited":"br/0660613430", "citing":"br/06502272390","oci":"oci:0660549063-0610398792"}
|
||||||
|
{"cited":"br/0660613430", "citing":"br/06120941789","oci":"oci:06420189324-06301543046"}
|
||||||
|
{"cited":"br/062602732073","citing":"br/06203041400","oci":"oci:06380130275-061502004367"}
|
||||||
|
{"cited":"br/062602732073","citing":"br/06502272390","oci":"oci:062403449086-062501448395"}
|
||||||
|
{"cited":"br/062602732073","citing":"br/06120941789","oci":"oci:06420189328-061202007182"}
|
||||||
|
{"cited":"br/061103703697","citing":"br/06203041400","oci":"oci:062603906965-061701362658"}
|
||||||
|
{"cited":"br/061103703697", "citing":"br/06502272390","oci":"oci:0670294309-06104327031"}
|
||||||
|
{"cited":"br/061103703697","citing":"br/06120941789","oci":"oci:061702060228-061301712529"}
|
||||||
|
{"cited":"br/06230199640", "citing":"br/0670517081","oci":"oci:06901104174-06503692526"}
|
||||||
|
{"cited":"br/061703513967","citing":"br/061702310822","oci":"oci:061702310822-061703513967"}
|
||||||
|
{"cited":"br/062104002953","citing":"br/061702311472","oci":"oci:061702311472-062104002953"}
|
||||||
|
{"cited":"br/061101204417","citing":"br/062102701590","oci":"oci:062102701590-061101204417"}
|
||||||
|
{"cited":"br/062403787088","citing":"br/061401499173","oci":"oci:061401499173-062403787088"}
|
||||||
|
{"cited":"br/061203576338","citing":"br/06110279619","oci":"oci:06110279619-061203576338"}
|
||||||
|
{"cited":"br/061601962207","citing":"br/061502004018","oci":"oci:061502004018-061601962207"}
|
||||||
|
{"cited":"br/06101014588", "citing":"br/061502004027","oci":"oci:061502004027-06101014588"}
|
||||||
|
{"cited":"br/06704040804", "citing":"br/06220799044","oci":"oci:06220799044-06704040804"}
|
||||||
|
{"cited":"br/061401105151","citing":"br/061502004037","oci":"oci:061502004037-061401105151"}
|
||||||
|
{"cited":"br/0640821079", "citing":"br/061702311537","oci":"oci:061702311537-0640821079"}
|
||||||
|
{"cited":"br/06604165310", "citing":"br/062501970289","oci":"oci:062501970289-06604165310"}
|
||||||
|
{"cited":"br/061501351689","citing":"br/061203895786","oci":"oci:061203895786-061501351689"}
|
||||||
|
{"cited":"br/06202223692", "citing":"br/06110298832","oci":"oci:06110298832-06202223692"}
|
||||||
|
{"cited":"br/06104310727", "citing":"br/0660439086","oci":"oci:0660439086-06104310727"}
|
||||||
|
{"cited":"br/06150216214", "citing":"br/06340150329","oci":"oci:06340150329-06150216214"}
|
|
@ -0,0 +1,48 @@
|
||||||
|
omid,id
|
||||||
|
br/061201599020,doi:10.1142/s0219887817501687
|
||||||
|
br/06203041400,doi:10.1111/j.1523-5378.2005.00327.x pmid:16104945
|
||||||
|
br/06210273177,doi:10.1090/qam/20394
|
||||||
|
br/06502272390,pmid:32235596 doi:10.3390/nano10040644
|
||||||
|
br/0660613430,doi:10.1007/bf00470411
|
||||||
|
br/06120941789,doi:10.1098/rspa.2006.1747
|
||||||
|
br/062602732073,doi:10.1007/978-3-642-38844-6_25
|
||||||
|
br/06230199640,pmid:25088780 doi:10.1016/j.ymeth.2014.07.008
|
||||||
|
br/061103703697,pmid:2682767
|
||||||
|
br/0670517081,doi:10.1016/j.foodpol.2021.102189
|
||||||
|
br/06502310477,doi:10.1142/s0218127416500450
|
||||||
|
br/06520113284,doi:10.1109/cfasta57821.2023.10243367
|
||||||
|
br/062303652439,pmid:5962654 doi:10.1016/0020-708x(66)90001-9
|
||||||
|
br/06250691436,doi:10.1042/bst20150052 pmid:26009172
|
||||||
|
br/061201665577,doi:10.1097/00115550-200205000-00018
|
||||||
|
br/06503490336,pmid:34689254 doi:10.1007/s10072-021-05687-0
|
||||||
|
br/06220615942,pmid:25626134 doi:10.1016/j.jcis.2015.01.008
|
||||||
|
br/061103389243,doi:10.4324/9780203702819-10
|
||||||
|
br/062303011271,doi:10.1109/icassp.2011.5946250
|
||||||
|
br/061302926083,doi:10.4018/978-1-6684-3937-1.ch002
|
||||||
|
br/061402485360,doi:10.1109/iciict.2015.7396079
|
||||||
|
br/06410101083,doi:10.1016/j.autcon.2023.104828
|
||||||
|
br/062202243386,doi:10.1016/0001-8791(81)90022-1
|
||||||
|
br/06170421486,doi:10.1130/0016-7606(2003)115<0166:dsagmf>2.0.co;2
|
||||||
|
br/061201983865,doi:10.4324/9781315109008 isbn:9781315109008
|
||||||
|
br/061701697230,doi:10.1016/j.trd.2012.07.006
|
||||||
|
br/061201137111,doi:10.1109/access.2020.2971656
|
||||||
|
br/06120436283,pmid:2254430 doi:10.1128/jcm.28.11.2551-2554.1990
|
||||||
|
br/061903968916,doi:10.1111/j.1742-1241.1988.tb08627.x
|
||||||
|
br/06201583482,doi:10.1016/0016-5085(78)93139-6
|
||||||
|
br/06130338317,doi:10.2134/agronj1952.00021962004400080013x
|
||||||
|
br/062601538320,doi:10.1371/journal.pone.0270593 pmid:35789338
|
||||||
|
br/062401098626,pmid:22385804 doi:10.1016/j.talanta.2011.12.034
|
||||||
|
br/06190436492,doi:10.1039/c7dt01499f pmid:28644489
|
||||||
|
br/06202819247,doi:10.1007/978-3-319-45823-6_57
|
||||||
|
br/0648013560,doi:10.1080/14772000.2012.705356
|
||||||
|
br/0690214059,doi:10.2752/175630608x329217
|
||||||
|
br/06601640415,doi:10.1080/18128600508685647
|
||||||
|
br/061503394761,doi:10.1002/0471443395.img018
|
||||||
|
br/061702861849,pmid:31203682 doi:10.1080/10428194.2019.1627538
|
||||||
|
br/06450133713,doi:10.1093/acprof:oso/9780199670888.003.0008
|
||||||
|
br/0628074892,doi:10.1097/hnp.0000000000000597
|
||||||
|
br/061601032219,doi:10.1002/bdm.2102
|
||||||
|
br/06602079930,doi:10.1101/2020.08.25.267500
|
||||||
|
br/0604192147,doi:10.11501/3307395
|
||||||
|
br/061101933800,doi:10.1142/s0217732398002242
|
||||||
|
br/06504184118,pmid:10091417
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
{"oci":"oci:06701327944-06504326071","citing":"16104945","citing_pid":"pmid","cited":"10.1142/s0219887817501687","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:06701327944-06504326071","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"10.1142/s0219887817501687","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:06502272390-061301355525","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"10.1142/s0219887817501687","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:06502272390-061301355525","citing":"32235596","citing_pid":"pmid","cited":"10.1142/s0219887817501687","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:0670804699-067055659","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"10.1142/s0219887817501687","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:061502003994-062201281456","citing":"16104945","citing_pid":"pmid","cited":"10.1090/qam/20394","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:061502003994-062201281456","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"10.1090/qam/20394","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:06502272390-0660806688","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"10.1090/qam/20394","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:06502272390-0660806688","citing":"32235596","citing_pid":"pmid","cited":"10.1090/qam/20394","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:06502307119-0620223645","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"10.1090/qam/20394","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:061502004011-061902692285","citing":"16104945","citing_pid":"pmid","cited":"10.1007/bf00470411","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:061502004011-061902692285","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"10.1007/bf00470411","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:0660549063-0610398792","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"10.1007/bf00470411","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:0660549063-0610398792","citing":"32235596","citing_pid":"pmid","cited":"10.1007/bf00470411","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:06420189324-06301543046","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"10.1007/bf00470411","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:06380130275-061502004367","citing":"16104945","citing_pid":"pmid","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:06380130275-061502004367","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:062403449086-062501448395","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:062403449086-062501448395","citing":"32235596","citing_pid":"pmid","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:06420189328-061202007182","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"10.1007/978-3-642-38844-6_25","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:062603906965-061701362658","citing":"16104945","citing_pid":"pmid","cited":"2682767","cited_pid":"pmid"}
|
||||||
|
{"oci":"oci:062603906965-061701362658","citing":"10.1111/j.1523-5378.2005.00327.x","citing_pid":"doi","cited":"2682767","cited_pid":"pmid"}
|
||||||
|
{"oci":"oci:0670294309-06104327031","citing":"10.3390/nano10040644","citing_pid":"doi","cited":"2682767","cited_pid":"pmid"}
|
||||||
|
{"oci":"oci:0670294309-06104327031","citing":"32235596","citing_pid":"pmid","cited":"2682767","cited_pid":"pmid"}
|
||||||
|
{"oci":"oci:061702060228-061301712529","citing":"10.1098/rspa.2006.1747","citing_pid":"doi","cited":"2682767","cited_pid":"pmid"}
|
||||||
|
{"oci":"oci:06901104174-06503692526","citing":"10.1016/j.foodpol.2021.102189","citing_pid":"doi","cited":"10.1016/j.ymeth.2014.07.008","cited_pid":"doi"}
|
||||||
|
{"oci":"oci:06901104174-06503692526","citing":"10.1016/j.foodpol.2021.102189","citing_pid":"doi","cited":"25088780","cited_pid":"pmid"}
|
Loading…
Reference in New Issue