cleanup and refinements

This commit is contained in:
Claudio Atzori 2023-10-04 12:32:05 +02:00
parent 5919e488dd
commit ee8a39e7d2
5 changed files with 55 additions and 41 deletions

View File

@ -32,18 +32,28 @@ import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2; import scala.Tuple2;
public class CreateActionSetSparkJob implements Serializable { public class CreateActionSetSparkJob implements Serializable {
public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations"; public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations";
public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations"; public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations";
// DOI-to-DOI citations
public static final String COCI = "COCI";
// PMID-to-PMID citations
public static final String POCI = "POCI";
private static final String DOI_PREFIX = "50|doi_________::"; private static final String DOI_PREFIX = "50|doi_________::";
private static final String PMID_PREFIX = "50|pmid________::"; private static final String PMID_PREFIX = "50|pmid________::";
private static final String TRUST = "0.91"; private static final String TRUST = "0.91";
private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class); private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws IOException, ParseException { public static void main(final String[] args) throws IOException, ParseException {
@ -67,7 +77,7 @@ public class CreateActionSetSparkJob implements Serializable {
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath"); final String inputPath = parser.get("inputPath");
log.info("inputPath {}", inputPath.toString()); log.info("inputPath {}", inputPath);
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath); log.info("outputPath {}", outputPath);
@ -81,19 +91,16 @@ public class CreateActionSetSparkJob implements Serializable {
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> extractContent(spark, inputPath, outputPath, shouldDuplicateRels));
extractContent(spark, inputPath, outputPath, shouldDuplicateRels);
});
} }
private static void extractContent(SparkSession spark, String inputPath, String outputPath, private static void extractContent(SparkSession spark, String inputPath, String outputPath,
boolean shouldDuplicateRels) { boolean shouldDuplicateRels) {
getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, "COCI") getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, COCI)
.union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, "POCI")) .union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, POCI))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
} }
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(SparkSession spark, String inputPath, private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(SparkSession spark, String inputPath,
@ -109,7 +116,7 @@ public class CreateActionSetSparkJob implements Serializable {
value, shouldDuplicateRels, prefix) value, shouldDuplicateRels, prefix)
.iterator(), .iterator(),
Encoders.bean(Relation.class)) Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) value -> value != null) .filter((FilterFunction<Relation>) Objects::nonNull)
.toJavaRDD() .toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p)) .map(p -> new AtomicAction(p.getClass(), p))
.mapToPair( .mapToPair(
@ -123,20 +130,28 @@ public class CreateActionSetSparkJob implements Serializable {
String prefix; String prefix;
String citing; String citing;
String cited; String cited;
if (p.equals("COCI")) {
prefix = DOI_PREFIX;
citing = prefix
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting()));
cited = prefix
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited()));
} else {
prefix = PMID_PREFIX;
citing = prefix
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("pmid", value.getCiting()));
cited = prefix
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("pmid", value.getCited()));
switch (p) {
case COCI:
prefix = DOI_PREFIX;
citing = prefix
+ IdentifierFactory
.md5(CleaningFunctions.normalizePidValue(PidType.doi.toString(), value.getCiting()));
cited = prefix
+ IdentifierFactory
.md5(CleaningFunctions.normalizePidValue(PidType.doi.toString(), value.getCited()));
break;
case POCI:
prefix = PMID_PREFIX;
citing = prefix
+ IdentifierFactory
.md5(CleaningFunctions.normalizePidValue(PidType.pmid.toString(), value.getCiting()));
cited = prefix
+ IdentifierFactory
.md5(CleaningFunctions.normalizePidValue(PidType.pmid.toString(), value.getCited()));
break;
default:
throw new IllegalStateException("Invalid prefix: " + p);
} }
if (!citing.equals(cited)) { if (!citing.equals(cited)) {
@ -162,7 +177,7 @@ public class CreateActionSetSparkJob implements Serializable {
public static Relation getRelation( public static Relation getRelation(
String source, String source,
String target, String target,
String relclass) { String relClass) {
return OafMapperUtils return OafMapperUtils
.getRelation( .getRelation(
@ -170,7 +185,7 @@ public class CreateActionSetSparkJob implements Serializable {
target, target,
ModelConstants.RESULT_RESULT, ModelConstants.RESULT_RESULT,
ModelConstants.CITATION, ModelConstants.CITATION,
relclass, relClass,
Arrays Arrays
.asList( .asList(
OafMapperUtils.keyValue(ModelConstants.OPENOCITATIONS_ID, ModelConstants.OPENOCITATIONS_NAME)), OafMapperUtils.keyValue(ModelConstants.OPENOCITATIONS_ID, ModelConstants.OPENOCITATIONS_NAME)),
@ -183,6 +198,6 @@ public class CreateActionSetSparkJob implements Serializable {
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
TRUST), TRUST),
null); null);
} }
} }

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.actionmanager.opencitations;
import java.io.*; import java.io.*;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects; import java.util.Objects;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
@ -37,7 +38,7 @@ public class GetOpenCitationsRefs implements Serializable {
parser.parseArgument(args); parser.parseArgument(args);
final String[] inputFile = parser.get("inputFile").split(";"); final String[] inputFile = parser.get("inputFile").split(";");
log.info("inputFile {}", inputFile.toString()); log.info("inputFile {}", Arrays.asList(inputFile));
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath {}", workingPath); log.info("workingPath {}", workingPath);

View File

@ -7,6 +7,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -42,7 +43,7 @@ public class ReadCOCI implements Serializable {
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
final String[] inputFile = parser.get("inputFile").split(";"); final String[] inputFile = parser.get("inputFile").split(";");
log.info("inputFile {}", inputFile.toString()); log.info("inputFile {}", Arrays.asList(inputFile));
Boolean isSparkSessionManaged = isSparkSessionManaged(parser); Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
@ -74,10 +75,10 @@ public class ReadCOCI implements Serializable {
private static void doRead(SparkSession spark, String workingPath, String[] inputFiles, private static void doRead(SparkSession spark, String workingPath, String[] inputFiles,
String outputPath, String outputPath,
String delimiter, String format) throws IOException { String delimiter, String format) {
for (String inputFile : inputFiles) { for (String inputFile : inputFiles) {
String p_string = workingPath + "/" + inputFile + ".gz"; String pString = workingPath + "/" + inputFile + ".gz";
Dataset<Row> cociData = spark Dataset<Row> cociData = spark
.read() .read()
@ -86,7 +87,7 @@ public class ReadCOCI implements Serializable {
.option("inferSchema", "true") .option("inferSchema", "true")
.option("header", "true") .option("header", "true")
.option("quotes", "\"") .option("quotes", "\"")
.load(p_string) .load(pString)
.repartition(100); .repartition(100);
cociData.map((MapFunction<Row, COCI>) row -> { cociData.map((MapFunction<Row, COCI>) row -> {

View File

@ -16,15 +16,11 @@
"paramLongName": "isSparkSessionManaged", "paramLongName": "isSparkSessionManaged",
"paramDescription": "the hdfs name node", "paramDescription": "the hdfs name node",
"paramRequired": false "paramRequired": false
}, { },
"paramName": "sdr", {
"paramLongName": "shouldDuplicateRels", "paramName": "sdr",
"paramDescription": "the hdfs name node", "paramLongName": "shouldDuplicateRels",
"paramRequired": false "paramDescription": "activates/deactivates the construction of bidirectional relations Cites/IsCitedBy",
},{ "paramRequired": false
"paramName": "p", }
"paramLongName": "prefix",
"paramDescription": "the hdfs name node",
"paramRequired": true
}
] ]

View File

@ -34,6 +34,7 @@
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="download"> <action name="download">
<shell xmlns="uri:oozie:shell-action:0.2"> <shell xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker> <job-tracker>${jobTracker}</job-tracker>
@ -54,6 +55,7 @@
<ok to="extract"/> <ok to="extract"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="extract"> <action name="extract">
<java> <java>
<main-class>eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs</main-class> <main-class>eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs</main-class>
@ -112,7 +114,6 @@
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${workingPath}</arg> <arg>--inputPath</arg><arg>${workingPath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--prefix</arg><arg>${prefix}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>