From f2118d771aee924e47c77c6b603cd4086c90d7ef Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 22 Sep 2021 15:18:05 +0200 Subject: [PATCH 1/3] first steps in the implementation of the integration of opencitations --- .../dnetlib/dhp/common/collection/GetCSV.java | 1 - .../ExtractOpenCitationRefs.java | 63 +++++++++++++ .../opencitations/GetOpenCitationsRefs.java | 90 +++++++++++++++++++ .../opencitations/input_parameters.json | 20 +++++ .../oozie_app/config-default.xml | 58 ++++++++++++ .../opencitations/oozie_app/download.sh | 2 + .../opencitations/oozie_app/workflow.xml | 64 +++++++++++++ .../opencitations_parameters.json | 8 ++ .../eu/dnetlib/dhp/PropagationConstant.java | 11 ++- .../SparkOrcidToResultFromSemRelJob.java | 7 +- ...kResultToCommunityFromOrganizationJob.java | 4 +- ...parkResultToCommunityThroughSemRelJob.java | 4 +- 12 files changed, 322 insertions(+), 10 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ExtractOpenCitationRefs.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/download.sh create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/opencitations_parameters.json diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/GetCSV.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/GetCSV.java index 44e19142c..9696975cd 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/GetCSV.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/GetCSV.java @@ -10,7 +10,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.fasterxml.jackson.databind.ObjectMapper; -import com.opencsv.bean.CsvToBeanBuilder; public class GetCSV { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ExtractOpenCitationRefs.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ExtractOpenCitationRefs.java new file mode 100644 index 000000000..58ec37e65 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ExtractOpenCitationRefs.java @@ -0,0 +1,63 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +import java.io.BufferedOutputStream; +import java.net.URI; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.mortbay.log.Log; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class ExtractOpenCitationRefs { + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + ExtractOpenCitationRefs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/a/ccionmanager/opencitations/opencitations_parameters.json"))); + parser.parseArgument(args); + final String hdfsServerUri = parser.get("hdfsServerUri"); + final String workingPath = hdfsServerUri.concat(parser.get("workingPath")); + final String outputPath = parser.get("outputPath"); + final String opencitationFile = parser.get("opencitationFile"); + + Path hdfsreadpath = new Path(workingPath.concat("/").concat(opencitationFile)); + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", workingPath); + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + FileSystem fs = FileSystem.get(URI.create(workingPath), conf); + FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath); + try (TarArchiveInputStream tais = new TarArchiveInputStream( + new GzipCompressorInputStream(crossrefFileStream))) { + TarArchiveEntry entry = null; + while ((entry = tais.getNextTarEntry()) != null) { + if (!entry.isDirectory()) { + try ( + FSDataOutputStream out = fs + .create(new Path(outputPath.concat(entry.getName()).concat(".gz"))); + GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { + + IOUtils.copy(tais, gzipOs); + + } + + } + } + } + Log.info("Crossref dump reading completed"); + + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java new file mode 100644 index 000000000..ea3bdf9b3 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java @@ -0,0 +1,90 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +import java.io.*; +import java.io.Serializable; +import java.util.Objects; +import java.util.zip.GZIPOutputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class GetOpenCitationsRefs implements Serializable { + private static final Logger log = LoggerFactory.getLogger(GetOpenCitationsRefs.class); + + public static void main(final String[] args) throws IOException, ParseException { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + GetOpenCitationsRefs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json")))); + + parser.parseArgument(args); + + final String inputFile = parser.get("inputFile"); + log.info("inputFile {}", inputFile); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath {}", workingPath); + + final String hdfsNameNode = parser.get("hdfsNameNode"); + log.info("hdfsNameNode {}", hdfsNameNode); + + + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + + new GetOpenCitationsRefs().doExtract(inputFile, workingPath, fileSystem); + } + + private void doExtract(String inputFile, String workingPath, FileSystem fileSystem) + throws IOException { + + final Path path = new Path(inputFile); + + FSDataInputStream oc_zip = fileSystem.open(path); + + int count = 1; + try (ZipInputStream zis = new ZipInputStream(oc_zip)) { + ZipEntry entry = null; + while ((entry = zis.getNextEntry()) != null) { + + if (!entry.isDirectory()) { + String fileName = entry.getName(); + fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count; + count++; + try ( + FSDataOutputStream out = fileSystem + .create(new Path(workingPath + "/COCI/" + fileName + ".gz")); + GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { + + IOUtils.copy(zis, gzipOs); + + } + } + + } + + } + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json new file mode 100644 index 000000000..4910ad11d --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "if", + "paramLongName": "inputFile", + "paramDescription": "the zipped opencitations file", + "paramRequired": true + }, + { + "paramName": "wp", + "paramLongName": "workingPath", + "paramDescription": "the working path", + "paramRequired": true + }, + { + "paramName": "hnn", + "paramLongName": "hdfsNameNode", + "paramDescription": "the hdfs name node", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/config-default.xml new file mode 100644 index 000000000..a1755f329 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/config-default.xml @@ -0,0 +1,58 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + + + oozie.launcher.mapreduce.user.classpath.first + true + + + sparkExecutorNumber + 4 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + sparkDriverMemory + 15G + + + sparkExecutorMemory + 6G + + + sparkExecutorCores + 1 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/download.sh b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/download.sh new file mode 100644 index 000000000..54f66287c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/download.sh @@ -0,0 +1,2 @@ +#!/bin/bash +wget -i $1 \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml new file mode 100644 index 000000000..90d5e9eee --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -0,0 +1,64 @@ + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + ${jobTracker} + ${nameNode} + + + mapred.job.queue.name + ${queueName} + + + download.sh + ${url} + ${crossrefDumpPath} + ${crossrefdumpfilename} + ${crossrefdumptoken} + HADOOP_USER_NAME=${wf:user()} + download.sh + + + + + + + + eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs + --hdfsNameNode${nameNode} + --inputFile${inputFile} + --workingPath${workingDir}/OpenCitations + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/opencitations_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/opencitations_parameters.json new file mode 100644 index 000000000..258d6816e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/opencitations_parameters.json @@ -0,0 +1,8 @@ +[ + {"paramName":"n", "paramLongName":"hdfsServerUri", "paramDescription": "the server uri", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the default work path", "paramRequired": true}, + {"paramName":"f", "paramLongName":"opencitationFile", "paramDescription": "the name of the file", "paramRequired": true}, + {"paramName":"issm", "paramLongName":"isSparkSessionManaged", "paramDescription": "the name of the activities orcid file", "paramRequired": false}, + {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the name of the activities orcid file", "paramRequired": true} + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java index 0d7c74475..23e97a97a 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java @@ -69,7 +69,7 @@ public class PropagationConstant { PROPAGATION_DATA_INFO_TYPE, PROPAGATION_COUNTRY_INSTREPO_CLASS_ID, PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS)); + ModelConstants.DNET_PROVENANCE_ACTIONS)); return nc; } @@ -84,7 +84,8 @@ public class PropagationConstant { return di; } - public static Qualifier getQualifier(String inference_class_id, String inference_class_name, String qualifierSchema) { + public static Qualifier getQualifier(String inference_class_id, String inference_class_name, + String qualifierSchema) { Qualifier pa = new Qualifier(); pa.setClassid(inference_class_id); pa.setClassname(inference_class_name); @@ -108,7 +109,11 @@ public class PropagationConstant { r.setRelClass(rel_class); r.setRelType(rel_type); r.setSubRelType(subrel_type); - r.setDataInfo(getDataInfo(inference_provenance, inference_class_id, inference_class_name, ModelConstants.DNET_PROVENANCE_ACTIONS)); + r + .setDataInfo( + getDataInfo( + inference_provenance, inference_class_id, inference_class_name, + ModelConstants.DNET_PROVENANCE_ACTIONS)); return r; } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java index 68949b900..a38b4da2e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java @@ -173,14 +173,17 @@ public class SparkOrcidToResultFromSemRelJob { if (toaddpid) { StructuredProperty p = new StructuredProperty(); p.setValue(autoritative_author.getOrcid()); - p.setQualifier(getQualifier(ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES)); + p + .setQualifier( + getQualifier( + ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES)); p .setDataInfo( getDataInfo( PROPAGATION_DATA_INFO_TYPE, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS)); + ModelConstants.DNET_PROVENANCE_ACTIONS)); Optional> authorPid = Optional.ofNullable(author.getPid()); if (authorPid.isPresent()) { diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java index 1289ff644..50df08f8c 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java @@ -10,7 +10,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelConstants; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -22,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.Result; import scala.Tuple2; @@ -130,7 +130,7 @@ public class SparkResultToCommunityFromOrganizationJob { PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS))); + ModelConstants.DNET_PROVENANCE_ACTIONS))); propagatedContexts.add(newContext); } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java index 7f76ead94..f31a26230 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -7,7 +7,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelConstants; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -20,6 +19,7 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; @@ -126,7 +126,7 @@ public class SparkResultToCommunityThroughSemRelJob { PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS))); + ModelConstants.DNET_PROVENANCE_ACTIONS))); return newContext; } return null; From 5ec69889db0f07d247ce423da1c5efe09961e9b4 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 27 Sep 2021 16:02:06 +0200 Subject: [PATCH 2/3] OpenCitations: creation of AS from OC --- .../dnetlib/dhp/common/collection/GetCSV.java | 1 + .../CreateActionSetSparkJob.java | 178 +++++++++++ .../opencitations/CreateRelationsJson.java | 176 ++++++++++ .../ExtractOpenCitationRefs.java | 63 ---- .../opencitations/GetOpenCitationsRefs.java | 13 +- .../opencitations/OpenCitationModel.java | 5 + .../opencitations/as_parameters.json | 25 ++ .../opencitations/oozie_app/download.sh | 2 +- .../opencitations/oozie_app/workflow.xml | 45 ++- .../CreateOpenCitationsASTest.java | 301 ++++++++++++++++++ .../opencitations/inputFiles/input1 | 8 + .../opencitations/inputFiles/input2 | 8 + .../opencitations/inputFiles/input3 | 9 + pom.xml | 2 +- 14 files changed, 757 insertions(+), 79 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateRelationsJson.java delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ExtractOpenCitationRefs.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/OpenCitationModel.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input1 create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input2 create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input3 diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/GetCSV.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/GetCSV.java index 9696975cd..44e19142c 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/GetCSV.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/GetCSV.java @@ -10,6 +10,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.fasterxml.jackson.databind.ObjectMapper; +import com.opencsv.bean.CsvToBeanBuilder; public class GetCSV { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java new file mode 100644 index 000000000..9486a74ce --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -0,0 +1,178 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.io.Serializable; +import java.util.*; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import scala.Tuple2; + +public class CreateActionSetSparkJob implements Serializable { + public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations"; + public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations"; + private static final String ID_PREFIX = "50|doi_________::"; + private static final String TRUST = "0.91"; + + private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(final String[] args) throws IOException, ParseException { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + CreateActionSetSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json")))); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}", inputPath.toString()); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); + + final boolean shouldDuplicateRels = Boolean.valueOf(parser.get("shouldDuplicateRels")); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + extractContent(spark, inputPath, outputPath, shouldDuplicateRels); + }); + + } + + private static void extractContent(SparkSession spark, String inputPath, String outputPath, + boolean shouldDuplicateRels) { + spark + .sqlContext() + .createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING()) + .flatMap( + (FlatMapFunction) value -> createRelation(value, shouldDuplicateRels).iterator(), + Encoders.bean(Relation.class)) + .filter((FilterFunction) value -> value != null) + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + + } + + private static List createRelation(String value, boolean duplicate) { + String[] line = value.split(","); + if (!line[1].startsWith("10.")) { + return new ArrayList<>(); + } + List relationList = new ArrayList<>(); + + String citing = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[1])); + final String cited = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[2])); + + relationList + .addAll( + getRelations( + citing, + cited)); + + if (duplicate && line[1].endsWith(".refs")) { + citing = ID_PREFIX + IdentifierFactory + .md5(CleaningFunctions.normalizePidValue("doi", line[1].substring(0, line[1].indexOf(".refs")))); + relationList.addAll(getRelations(citing, cited)); + } + + return relationList; + } + + private static Collection getRelations(String citing, String cited) { + + return Arrays + .asList( + getRelation(citing, cited, ModelConstants.CITES), + getRelation(cited, citing, ModelConstants.IS_CITED_BY)); + } + + public static Relation getRelation( + String source, + String target, + String relclass) { + Relation r = new Relation(); + r.setCollectedfrom(getCollectedFrom()); + r.setSource(source); + r.setTarget(target); + r.setRelClass(relclass); + r.setRelType(ModelConstants.RESULT_RESULT); + r.setSubRelType(ModelConstants.CITATION); + r + .setDataInfo( + getDataInfo()); + return r; + } + + public static List getCollectedFrom() { + KeyValue kv = new KeyValue(); + kv.setKey(ModelConstants.OPENOCITATIONS_ID); + kv.setValue(ModelConstants.OPENOCITATIONS_NAME); + + return Arrays.asList(kv); + } + + public static DataInfo getDataInfo() { + DataInfo di = new DataInfo(); + di.setInferred(false); + di.setDeletedbyinference(false); + di.setTrust(TRUST); + + di + .setProvenanceaction( + getQualifier(OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS)); + return di; + } + + public static Qualifier getQualifier(String class_id, String class_name, + String qualifierSchema) { + Qualifier pa = new Qualifier(); + pa.setClassid(class_id); + pa.setClassname(class_name); + pa.setSchemeid(qualifierSchema); + pa.setSchemename(qualifierSchema); + return pa; + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateRelationsJson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateRelationsJson.java new file mode 100644 index 000000000..4996a3089 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateRelationsJson.java @@ -0,0 +1,176 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.io.Serializable; +import java.util.*; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import scala.Tuple2; + +public class CreateRelationsJson implements Serializable { + public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations"; + public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations"; + private static final String ID_PREFIX = "50|doi_________::"; + private static final String TRUST = "0.91"; + + private static final Logger log = LoggerFactory.getLogger(CreateRelationsJson.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(final String[] args) throws IOException, ParseException { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + CreateRelationsJson.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json")))); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}", inputPath.toString()); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + extractContent(spark, inputPath, outputPath); + }); + + } + + private static void extractContent(SparkSession spark, String inputPath, String outputPath) { + spark + .sqlContext() + .createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING()) + .flatMap( + (FlatMapFunction) value -> createRelation(value).iterator(), + Encoders.bean(Relation.class)) + .filter((FilterFunction) value -> value != null) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + + } + + private static List createRelation(String value) { + String[] line = value.split(","); + if (!line[1].startsWith("10.")) { + return new ArrayList<>(); + } + List relationList = new ArrayList<>(); + + String citing = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[1])); + final String cited = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[2])); + + relationList + .addAll( + getRelations( + citing, + cited)); + + if (line[1].endsWith(".refs")) { + citing = ID_PREFIX + IdentifierFactory + .md5(CleaningFunctions.normalizePidValue("doi", line[1].substring(0, line[1].indexOf(".refs")))); + relationList.addAll(getRelations(citing, cited)); + } + + return relationList; + } + + private static Collection getRelations(String citing, String cited) { + + return Arrays + .asList( + getRelation(citing, cited, ModelConstants.CITES), + getRelation(cited, citing, ModelConstants.IS_CITED_BY)); + } + + public static Relation getRelation( + String source, + String target, + String relclass) { + Relation r = new Relation(); + r.setCollectedfrom(getCollectedFrom()); + r.setSource(source); + r.setTarget(target); + r.setRelClass(relclass); + r.setRelType(ModelConstants.RESULT_RESULT); + r.setSubRelType(ModelConstants.CITATION); + r + .setDataInfo( + getDataInfo()); + return r; + } + + public static List getCollectedFrom() { + KeyValue kv = new KeyValue(); + kv.setKey(ModelConstants.OPENOCITATIONS_ID); + kv.setValue(ModelConstants.OPENOCITATIONS_NAME); + + return Arrays.asList(kv); + } + + public static DataInfo getDataInfo() { + DataInfo di = new DataInfo(); + di.setInferred(false); + di.setDeletedbyinference(false); + di.setTrust(TRUST); + + di + .setProvenanceaction( + getQualifier(OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS)); + return di; + } + + public static Qualifier getQualifier(String class_id, String class_name, + String qualifierSchema) { + Qualifier pa = new Qualifier(); + pa.setClassid(class_id); + pa.setClassname(class_name); + pa.setSchemeid(qualifierSchema); + pa.setSchemename(qualifierSchema); + return pa; + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ExtractOpenCitationRefs.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ExtractOpenCitationRefs.java deleted file mode 100644 index 58ec37e65..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ExtractOpenCitationRefs.java +++ /dev/null @@ -1,63 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.opencitations; - -import java.io.BufferedOutputStream; -import java.net.URI; -import java.util.zip.GZIPOutputStream; - -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; -import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.mortbay.log.Log; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; - -public class ExtractOpenCitationRefs { - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - ExtractOpenCitationRefs.class - .getResourceAsStream( - "/eu/dnetlib/dhp/a/ccionmanager/opencitations/opencitations_parameters.json"))); - parser.parseArgument(args); - final String hdfsServerUri = parser.get("hdfsServerUri"); - final String workingPath = hdfsServerUri.concat(parser.get("workingPath")); - final String outputPath = parser.get("outputPath"); - final String opencitationFile = parser.get("opencitationFile"); - - Path hdfsreadpath = new Path(workingPath.concat("/").concat(opencitationFile)); - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", workingPath); - conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); - conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); - FileSystem fs = FileSystem.get(URI.create(workingPath), conf); - FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath); - try (TarArchiveInputStream tais = new TarArchiveInputStream( - new GzipCompressorInputStream(crossrefFileStream))) { - TarArchiveEntry entry = null; - while ((entry = tais.getNextTarEntry()) != null) { - if (!entry.isDirectory()) { - try ( - FSDataOutputStream out = fs - .create(new Path(outputPath.concat(entry.getName()).concat(".gz"))); - GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { - - IOUtils.copy(tais, gzipOs); - - } - - } - } - } - Log.info("Crossref dump reading completed"); - - } -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java index ea3bdf9b3..3530c9980 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java @@ -36,8 +36,8 @@ public class GetOpenCitationsRefs implements Serializable { parser.parseArgument(args); - final String inputFile = parser.get("inputFile"); - log.info("inputFile {}", inputFile); + final String[] inputFile = parser.get("inputFile").split(";"); + log.info("inputFile {}", inputFile.toString()); final String workingPath = parser.get("workingPath"); log.info("workingPath {}", workingPath); @@ -45,14 +45,17 @@ public class GetOpenCitationsRefs implements Serializable { final String hdfsNameNode = parser.get("hdfsNameNode"); log.info("hdfsNameNode {}", hdfsNameNode); - - Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsNameNode); FileSystem fileSystem = FileSystem.get(conf); - new GetOpenCitationsRefs().doExtract(inputFile, workingPath, fileSystem); + GetOpenCitationsRefs ocr = new GetOpenCitationsRefs(); + + for (String file : inputFile) { + ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem); + } + } private void doExtract(String inputFile, String workingPath, FileSystem fileSystem) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/OpenCitationModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/OpenCitationModel.java new file mode 100644 index 000000000..2da96084e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/OpenCitationModel.java @@ -0,0 +1,5 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +public class OpenCitationModel { +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json new file mode 100644 index 000000000..308e02026 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json @@ -0,0 +1,25 @@ +[ + { + "paramName": "ip", + "paramLongName": "inputPath", + "paramDescription": "the zipped opencitations file", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "the working path", + "paramRequired": true + }, + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, { + "paramName": "sdr", + "paramLongName": "shouldDuplicateRels", + "paramDescription": "the hdfs name node", + "paramRequired": false +} +] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/download.sh b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/download.sh index 54f66287c..7a34f3c4e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/download.sh +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/download.sh @@ -1,2 +1,2 @@ #!/bin/bash -wget -i $1 \ No newline at end of file +for file in $(echo $1 | tr ";" "\n"); do curl -L $(echo $file | cut -d '@' -f 1 ) | hdfs dfs -put - $2/$(echo $file | cut -d '@' -f 2) ; done; \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml index 90d5e9eee..d052791a3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -20,9 +20,16 @@ + + + + ${wf:conf('resumeFrom') eq 'DownloadDump'} + ${wf:conf('resumeFrom') eq 'ExtractContent'} + + + - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -37,15 +44,13 @@ download.sh - ${url} - ${crossrefDumpPath} - ${crossrefdumpfilename} - ${crossrefdumptoken} + ${filelist} + ${workingPath}/Original HADOOP_USER_NAME=${wf:user()} download.sh - + @@ -53,12 +58,34 @@ eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs --hdfsNameNode${nameNode} --inputFile${inputFile} - --workingPath${workingDir}/OpenCitations + --workingPath${workingPath} - + - + + + yarn + cluster + Produces the AS for OC + eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --inputPath${workingPath}/COCI + --outputPath${outputPath} + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java new file mode 100644 index 000000000..f3ceaa1ec --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java @@ -0,0 +1,301 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; + +public class CreateOpenCitationsASTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(CreateOpenCitationsASTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(CreateOpenCitationsASTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(CreateOpenCitationsASTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(CreateOpenCitationsASTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testNumberofRelations() throws Exception { + + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + .getPath(); + + CreateActionSetSparkJob + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet" + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); + + assertEquals(60, tmp.count()); + + tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); + + } + + @Test + void testRelationsCollectedFrom() throws Exception { + + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + .getPath(); + + CreateActionSetSparkJob + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet" + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); + + tmp.foreach(r -> { + assertEquals(ModelConstants.OPENOCITATIONS_NAME, r.getCollectedfrom().get(0).getValue()); + assertEquals(ModelConstants.OPENOCITATIONS_ID, r.getCollectedfrom().get(0).getKey()); + }); + + } + + @Test + void testRelationsDataInfo() throws Exception { + + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + .getPath(); + + CreateActionSetSparkJob + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet" + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); + + tmp.foreach(r -> { + assertEquals(false, r.getDataInfo().getInferred()); + assertEquals(false, r.getDataInfo().getDeletedbyinference()); + assertEquals("0.91", r.getDataInfo().getTrust()); + assertEquals( + CreateActionSetSparkJob.OPENCITATIONS_CLASSID, r.getDataInfo().getProvenanceaction().getClassid()); + assertEquals( + CreateActionSetSparkJob.OPENCITATIONS_CLASSNAME, r.getDataInfo().getProvenanceaction().getClassname()); + assertEquals(ModelConstants.DNET_PROVENANCE_ACTIONS, r.getDataInfo().getProvenanceaction().getSchemeid()); + assertEquals(ModelConstants.DNET_PROVENANCE_ACTIONS, r.getDataInfo().getProvenanceaction().getSchemename()); + }); + + } + + @Test + void testRelationsSemantics() throws Exception { + + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + .getPath(); + + CreateActionSetSparkJob + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet" + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); + + tmp.foreach(r -> { + assertEquals("citation", r.getSubRelType()); + assertEquals("resultResult", r.getRelType()); + }); + assertEquals(30, tmp.filter(r -> r.getRelClass().equals("Cites")).count()); + assertEquals(30, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count()); + + } + + @Test + void testRelationsSourceTargetPrefix() throws Exception { + + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + .getPath(); + + CreateActionSetSparkJob + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet" + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); + + tmp.foreach(r -> { + assertEquals("50|doi_________::", r.getSource().substring(0, 17)); + assertEquals("50|doi_________::", r.getTarget().substring(0, 17)); + }); + + } + + @Test + void testRelationsSourceTargetCouple() throws Exception { + final String doi1 = "50|doi_________::" + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-015-3684-x")); + final String doi2 = "50|doi_________::" + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1111/j.1551-2916.2008.02408.x")); + final String doi3 = "50|doi_________::" + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-014-2114-9")); + final String doi4 = "50|doi_________::" + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1016/j.ceramint.2013.09.069")); + final String doi5 = "50|doi_________::" + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-009-9913-4")); + final String doi6 = "50|doi_________::" + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1016/0038-1098(72)90370-5")); + + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + .getPath(); + + CreateActionSetSparkJob + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet" + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); + + JavaRDD check = tmp.filter(r -> r.getSource().equals(doi1) || r.getTarget().equals(doi1)); + + assertEquals(10, check.count()); + + check.foreach(r -> { + if (r.getSource().equals(doi2) || r.getSource().equals(doi3) || r.getSource().equals(doi4) || + r.getSource().equals(doi5) || r.getSource().equals(doi6)) { + assertEquals(ModelConstants.IS_CITED_BY, r.getRelClass()); + assertEquals(doi1, r.getTarget()); + } + }); + + assertEquals(5, check.filter(r -> r.getSource().equals(doi1)).count()); + check.filter(r -> r.getSource().equals(doi1)).foreach(r -> assertEquals(ModelConstants.CITES, r.getRelClass())); + + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input1 b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input1 new file mode 100644 index 000000000..d93d6fd99 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input1 @@ -0,0 +1,8 @@ +oci,citing,cited,creation,timespan,journal_sc,author_sc +02001000007362801000805046300010563030608046333-0200101010136193701050501630209010637020000083700020400083733,10.1007/s10854-015-3684-x,10.1111/j.1551-2916.2008.02408.x,2015-09-01,P7Y2M,no,no +02001000007362801000805046300010563030608046333-02001000007362801000805046300010463020101046309,10.1007/s10854-015-3684-x,10.1007/s10854-014-2114-9,2015-09-01,P1Y2M4D,yes,no +02001000007362801000805046300010563030608046333-020010001063619371214271022182329370200010337000937000609,10.1007/s10854-015-3684-x,10.1016/j.ceramint.2013.09.069,2015-09-01,P1Y6M,no,no +02001000007362801000805046300010563030608046333-02001000007362801000805046300000963090901036304,10.1007/s10854-015-3684-x,10.1007/s10854-009-9913-4,2015-09-01,P6Y3M10D,yes,no +02001000007362801000805046300010563030608046333-02001000106360000030863010009085807025909000307006305,10.1007/s10854-015-3684-x,10.1016/0038-1098(72)90370-5,2015-09-01,P43Y8M,no,no +02001000007362801000805046300010563030608056309-02001000106361937281010370200010437000937000308,10.1007/s10854-015-3685-9,10.1016/j.saa.2014.09.038,2015-09-03,P0Y7M,no,no +02001000007362801000805046300010563030608056309-0200100010636193722102912171027370200010537000437000106,10.1007/s10854-015-3685-9,10.1016/j.matchar.2015.04.016,2015-09-03,P0Y2M,no,no \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input2 b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input2 new file mode 100644 index 000000000..14ee8b354 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input2 @@ -0,0 +1,8 @@ +oci,citing,cited,creation,timespan,journal_sc,author_sc +02001000308362804010509076300010963000003086301-0200100020936020001003227000009010004,10.1038/s41597-019-0038-1,10.1029/2010wr009104,2019-04-15,P8Y1M,no,no +02001000308362804010509076300010963000003086301-0200100010636280103060463080105025800015900000006006303,10.1038/s41597-019-0038-1,10.1016/s1364-8152(01)00060-3,2019-04-15,P17Y3M,no,no +02001000308362804010509076300010963000003086301-02001000007362800000407076300010063000401066333,10.1038/s41597-019-0038-1,10.1007/s00477-010-0416-x,2019-04-15,P8Y9M6D,no,no +02001000308362804010509076300010963000003086301-02001000007362800000700046300010363000905016308,10.1038/s41597-019-0038-1,10.1007/s00704-013-0951-8,2019-04-15,P5Y9M23D,no,no +02001000308362804010509076300010963000003086301-02001000002361924123705070707,10.1038/s41597-019-0038-1,10.1002/joc.5777,2019-04-15,P0Y8M1D,no,no +02001000308362804010509076300010963000003086301-02005010904361714282863020263040504076302000108,10.1038/s41597-019-0038-1,10.5194/hess-22-4547-2018,2019-04-15,P0Y7M18D,no,no +02001000308362804010509076300010963000003086301-02001000002361924123703050404,10.1038/s41597-019-0038-1,10.1002/joc.3544,2019-04-15,P6Y9M6D,no,no \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input3 b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input3 new file mode 100644 index 000000000..0611929d5 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input3 @@ -0,0 +1,9 @@ +oci,citing,cited,creation,timespan,journal_sc,author_sc +0200100000236090708010101090307000202023727141528-020050302063600040000010307,10.1002/9781119370222.refs,10.5326/0400137,2020-06-22,P16Y3M,no,no +0200100000236090708010101090307000202023727141528-0200101010136193701050302630905003337020000073700000301093733,10.1002/9781119370222.refs,10.1111/j.1532-950x.2007.00319.x,2020-06-22,P12Y8M,no,no +0200100000236090708010101090307000202023727141528-0200101010136312830370102030509,10.1002/9781119370222.refs,10.1111/vsu.12359,2020-06-22,P4Y10M29D,no,no +0200100000236090708010101090307000202023727141528-020050302063600030900020904,10.1002/9781119370222.refs,10.5326/0390294,2020-06-22,P17Y1M,no,no +0200100000236090708010101090307000202023727141528-020050302063600040200030701,10.1002/9781119370222.refs,10.5326/0420371,2020-06-22,P13Y9M,no,no +0200100000236090708010101090307000202023727141528-0200101010136193701050302630905003337020001033701020000003733,10.1002/9781119370222.refs,10.1111/j.1532-950x.2013.12000.x,2020-06-22,P7Y2M,no,no +0200100000236090708010101090307000202023727141528-020010008003600000408000106093702000006370306070200,10.1002/9781119370222.refs,10.1080/00480169.2006.36720,2020-06-22,P13Y6M,no,no +0200100000236090708010101090307000202023727141528-0200101010136193701070501630008010337020000063700000003033733,10.1002/9781119370222.refs,10.1111/j.1751-0813.2006.00033.x,2020-06-22,P13Y8M,no,no \ No newline at end of file diff --git a/pom.xml b/pom.xml index 61b0ad873..bd322daae 100644 --- a/pom.xml +++ b/pom.xml @@ -753,7 +753,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [2.7.18] + [2.7.19-SNAPSHOT] [4.0.3] [6.0.5] [3.1.6] From 1cc09adfaa118a54bf1977e56853044154027268 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 18 Oct 2021 14:11:27 +0200 Subject: [PATCH 3/3] Opencitations: chenaged the test class to mirror the creation or not of duplicate dois for .refs oc original plus added optional parameter to duplicate the relation --- .../CreateActionSetSparkJob.java | 5 +- .../opencitations/CreateRelationsJson.java | 176 ------------------ .../opencitations/OpenCitationModel.java | 5 - .../CreateOpenCitationsASTest.java | 40 +++- 4 files changed, 41 insertions(+), 185 deletions(-) delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateRelationsJson.java delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/OpenCitationModel.java diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java index 9486a74ce..eeb86a8ff 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -65,7 +65,10 @@ public class CreateActionSetSparkJob implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath {}", outputPath); - final boolean shouldDuplicateRels = Boolean.valueOf(parser.get("shouldDuplicateRels")); + final boolean shouldDuplicateRels = + Optional.ofNullable(parser.get("shouldDuplicateRels")) + .map(Boolean::valueOf) + .orElse(Boolean.FALSE); SparkConf conf = new SparkConf(); runWithSparkSession( diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateRelationsJson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateRelationsJson.java deleted file mode 100644 index 4996a3089..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateRelationsJson.java +++ /dev/null @@ -1,176 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.opencitations; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.IOException; -import java.io.Serializable; -import java.util.*; - -import org.apache.commons.cli.ParseException; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; -import scala.Tuple2; - -public class CreateRelationsJson implements Serializable { - public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations"; - public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations"; - private static final String ID_PREFIX = "50|doi_________::"; - private static final String TRUST = "0.91"; - - private static final Logger log = LoggerFactory.getLogger(CreateRelationsJson.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(final String[] args) throws IOException, ParseException { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - Objects - .requireNonNull( - CreateRelationsJson.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json")))); - - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String inputPath = parser.get("inputPath"); - log.info("inputPath {}", inputPath.toString()); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath {}", outputPath); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - extractContent(spark, inputPath, outputPath); - }); - - } - - private static void extractContent(SparkSession spark, String inputPath, String outputPath) { - spark - .sqlContext() - .createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING()) - .flatMap( - (FlatMapFunction) value -> createRelation(value).iterator(), - Encoders.bean(Relation.class)) - .filter((FilterFunction) value -> value != null) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); - - } - - private static List createRelation(String value) { - String[] line = value.split(","); - if (!line[1].startsWith("10.")) { - return new ArrayList<>(); - } - List relationList = new ArrayList<>(); - - String citing = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[1])); - final String cited = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[2])); - - relationList - .addAll( - getRelations( - citing, - cited)); - - if (line[1].endsWith(".refs")) { - citing = ID_PREFIX + IdentifierFactory - .md5(CleaningFunctions.normalizePidValue("doi", line[1].substring(0, line[1].indexOf(".refs")))); - relationList.addAll(getRelations(citing, cited)); - } - - return relationList; - } - - private static Collection getRelations(String citing, String cited) { - - return Arrays - .asList( - getRelation(citing, cited, ModelConstants.CITES), - getRelation(cited, citing, ModelConstants.IS_CITED_BY)); - } - - public static Relation getRelation( - String source, - String target, - String relclass) { - Relation r = new Relation(); - r.setCollectedfrom(getCollectedFrom()); - r.setSource(source); - r.setTarget(target); - r.setRelClass(relclass); - r.setRelType(ModelConstants.RESULT_RESULT); - r.setSubRelType(ModelConstants.CITATION); - r - .setDataInfo( - getDataInfo()); - return r; - } - - public static List getCollectedFrom() { - KeyValue kv = new KeyValue(); - kv.setKey(ModelConstants.OPENOCITATIONS_ID); - kv.setValue(ModelConstants.OPENOCITATIONS_NAME); - - return Arrays.asList(kv); - } - - public static DataInfo getDataInfo() { - DataInfo di = new DataInfo(); - di.setInferred(false); - di.setDeletedbyinference(false); - di.setTrust(TRUST); - - di - .setProvenanceaction( - getQualifier(OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS)); - return di; - } - - public static Qualifier getQualifier(String class_id, String class_name, - String qualifierSchema) { - Qualifier pa = new Qualifier(); - pa.setClassid(class_id); - pa.setClassname(class_name); - pa.setSchemeid(qualifierSchema); - pa.setSchemename(qualifierSchema); - return pa; - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/OpenCitationModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/OpenCitationModel.java deleted file mode 100644 index 2da96084e..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/OpenCitationModel.java +++ /dev/null @@ -1,5 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.opencitations; - -public class OpenCitationModel { -} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java index f3ceaa1ec..7567f855b 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java @@ -84,6 +84,8 @@ public class CreateOpenCitationsASTest { new String[] { "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-shouldDuplicateRels", + Boolean.TRUE.toString(), "-inputPath", inputPath, "-outputPath", @@ -99,7 +101,39 @@ public class CreateOpenCitationsASTest { assertEquals(60, tmp.count()); - tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); + // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); + + } + + @Test + void testNumberofRelations2() throws Exception { + + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + .getPath(); + + CreateActionSetSparkJob + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet" + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); + + assertEquals(44, tmp.count()); + + // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); } @@ -206,8 +240,8 @@ public class CreateOpenCitationsASTest { assertEquals("citation", r.getSubRelType()); assertEquals("resultResult", r.getRelType()); }); - assertEquals(30, tmp.filter(r -> r.getRelClass().equals("Cites")).count()); - assertEquals(30, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count()); + assertEquals(22, tmp.filter(r -> r.getRelClass().equals("Cites")).count()); + assertEquals(22, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count()); }