diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java index e3a9833b3..2db756a94 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -32,18 +32,28 @@ import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; public class CreateActionSetSparkJob implements Serializable { public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations"; public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations"; + + // DOI-to-DOI citations + public static final String COCI = "COCI"; + + // PMID-to-PMID citations + public static final String POCI = "POCI"; + private static final String DOI_PREFIX = "50|doi_________::"; private static final String PMID_PREFIX = "50|pmid________::"; + private static final String TRUST = "0.91"; private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(final String[] args) throws IOException, ParseException { @@ -67,7 +77,7 @@ public class CreateActionSetSparkJob implements Serializable { log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("inputPath"); - log.info("inputPath {}", inputPath.toString()); + log.info("inputPath {}", inputPath); final String outputPath = parser.get("outputPath"); log.info("outputPath {}", outputPath); @@ -81,19 +91,16 @@ public class CreateActionSetSparkJob implements Serializable { runWithSparkSession( conf, isSparkSessionManaged, - spark -> { - extractContent(spark, inputPath, outputPath, shouldDuplicateRels); - }); + spark -> extractContent(spark, inputPath, outputPath, shouldDuplicateRels)); } private static void extractContent(SparkSession spark, String inputPath, String outputPath, boolean shouldDuplicateRels) { - getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, "COCI") - .union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, "POCI")) + getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, COCI) + .union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, POCI)) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); - } private static JavaPairRDD getTextTextJavaPairRDD(SparkSession spark, String inputPath, @@ -109,7 +116,7 @@ public class CreateActionSetSparkJob implements Serializable { value, shouldDuplicateRels, prefix) .iterator(), Encoders.bean(Relation.class)) - .filter((FilterFunction) value -> value != null) + .filter((FilterFunction) Objects::nonNull) .toJavaRDD() .map(p -> new AtomicAction(p.getClass(), p)) .mapToPair( @@ -123,20 +130,28 @@ public class CreateActionSetSparkJob implements Serializable { String prefix; String citing; String cited; - if (p.equals("COCI")) { - prefix = DOI_PREFIX; - citing = prefix - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting())); - cited = prefix - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited())); - - } else { - prefix = PMID_PREFIX; - citing = prefix - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("pmid", value.getCiting())); - cited = prefix - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("pmid", value.getCited())); + switch (p) { + case COCI: + prefix = DOI_PREFIX; + citing = prefix + + IdentifierFactory + .md5(CleaningFunctions.normalizePidValue(PidType.doi.toString(), value.getCiting())); + cited = prefix + + IdentifierFactory + .md5(CleaningFunctions.normalizePidValue(PidType.doi.toString(), value.getCited())); + break; + case POCI: + prefix = PMID_PREFIX; + citing = prefix + + IdentifierFactory + .md5(CleaningFunctions.normalizePidValue(PidType.pmid.toString(), value.getCiting())); + cited = prefix + + IdentifierFactory + .md5(CleaningFunctions.normalizePidValue(PidType.pmid.toString(), value.getCited())); + break; + default: + throw new IllegalStateException("Invalid prefix: " + p); } if (!citing.equals(cited)) { @@ -162,7 +177,7 @@ public class CreateActionSetSparkJob implements Serializable { public static Relation getRelation( String source, String target, - String relclass) { + String relClass) { return OafMapperUtils .getRelation( @@ -170,7 +185,7 @@ public class CreateActionSetSparkJob implements Serializable { target, ModelConstants.RESULT_RESULT, ModelConstants.CITATION, - relclass, + relClass, Arrays .asList( OafMapperUtils.keyValue(ModelConstants.OPENOCITATIONS_ID, ModelConstants.OPENOCITATIONS_NAME)), @@ -183,6 +198,6 @@ public class CreateActionSetSparkJob implements Serializable { ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), TRUST), null); - } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java index 60dc998ef..600cf7df1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.actionmanager.opencitations; import java.io.*; import java.io.Serializable; +import java.util.Arrays; import java.util.Objects; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; @@ -37,7 +38,7 @@ public class GetOpenCitationsRefs implements Serializable { parser.parseArgument(args); final String[] inputFile = parser.get("inputFile").split(";"); - log.info("inputFile {}", inputFile.toString()); + log.info("inputFile {}", Arrays.asList(inputFile)); final String workingPath = parser.get("workingPath"); log.info("workingPath {}", workingPath); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java index 3d384de9d..b9c24df3b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java @@ -7,6 +7,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; import java.io.Serializable; +import java.util.Arrays; import java.util.Optional; import org.apache.commons.io.IOUtils; @@ -42,7 +43,7 @@ public class ReadCOCI implements Serializable { log.info("outputPath: {}", outputPath); final String[] inputFile = parser.get("inputFile").split(";"); - log.info("inputFile {}", inputFile.toString()); + log.info("inputFile {}", Arrays.asList(inputFile)); Boolean isSparkSessionManaged = isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); @@ -74,10 +75,10 @@ public class ReadCOCI implements Serializable { private static void doRead(SparkSession spark, String workingPath, String[] inputFiles, String outputPath, - String delimiter, String format) throws IOException { + String delimiter, String format) { for (String inputFile : inputFiles) { - String p_string = workingPath + "/" + inputFile + ".gz"; + String pString = workingPath + "/" + inputFile + ".gz"; Dataset cociData = spark .read() @@ -86,7 +87,7 @@ public class ReadCOCI implements Serializable { .option("inferSchema", "true") .option("header", "true") .option("quotes", "\"") - .load(p_string) + .load(pString) .repartition(100); cociData.map((MapFunction) row -> { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json index e25d1f4b8..5244a6fe4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json @@ -16,15 +16,11 @@ "paramLongName": "isSparkSessionManaged", "paramDescription": "the hdfs name node", "paramRequired": false - }, { - "paramName": "sdr", - "paramLongName": "shouldDuplicateRels", - "paramDescription": "the hdfs name node", - "paramRequired": false -},{ - "paramName": "p", - "paramLongName": "prefix", - "paramDescription": "the hdfs name node", - "paramRequired": true -} + }, + { + "paramName": "sdr", + "paramLongName": "shouldDuplicateRels", + "paramDescription": "activates/deactivates the construction of bidirectional relations Cites/IsCitedBy", + "paramRequired": false + } ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml index d87dfa2ba..deb32459b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -34,6 +34,7 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + ${jobTracker} @@ -54,6 +55,7 @@ + eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs @@ -112,7 +114,6 @@ --inputPath${workingPath} --outputPath${outputPath} - --prefix${prefix}