diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/GetCSV.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/GetCSV.java index 44e19142c..9696975cd 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/GetCSV.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/GetCSV.java @@ -10,7 +10,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.fasterxml.jackson.databind.ObjectMapper; -import com.opencsv.bean.CsvToBeanBuilder; public class GetCSV { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ExtractOpenCitationRefs.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ExtractOpenCitationRefs.java new file mode 100644 index 000000000..58ec37e65 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ExtractOpenCitationRefs.java @@ -0,0 +1,63 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +import java.io.BufferedOutputStream; +import java.net.URI; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.mortbay.log.Log; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class ExtractOpenCitationRefs { + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + ExtractOpenCitationRefs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/a/ccionmanager/opencitations/opencitations_parameters.json"))); + parser.parseArgument(args); + final String hdfsServerUri = parser.get("hdfsServerUri"); + final String workingPath = hdfsServerUri.concat(parser.get("workingPath")); + final String outputPath = parser.get("outputPath"); + final String opencitationFile = parser.get("opencitationFile"); + + Path hdfsreadpath = new Path(workingPath.concat("/").concat(opencitationFile)); + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", workingPath); + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + FileSystem fs = FileSystem.get(URI.create(workingPath), conf); + FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath); + try (TarArchiveInputStream tais = new TarArchiveInputStream( + new GzipCompressorInputStream(crossrefFileStream))) { + TarArchiveEntry entry = null; + while ((entry = tais.getNextTarEntry()) != null) { + if (!entry.isDirectory()) { + try ( + FSDataOutputStream out = fs + .create(new Path(outputPath.concat(entry.getName()).concat(".gz"))); + GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { + + IOUtils.copy(tais, gzipOs); + + } + + } + } + } + Log.info("Crossref dump reading completed"); + + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java new file mode 100644 index 000000000..ea3bdf9b3 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java @@ -0,0 +1,90 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +import java.io.*; +import java.io.Serializable; +import java.util.Objects; +import java.util.zip.GZIPOutputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class GetOpenCitationsRefs implements Serializable { + private static final Logger log = LoggerFactory.getLogger(GetOpenCitationsRefs.class); + + public static void main(final String[] args) throws IOException, ParseException { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + GetOpenCitationsRefs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json")))); + + parser.parseArgument(args); + + final String inputFile = parser.get("inputFile"); + log.info("inputFile {}", inputFile); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath {}", workingPath); + + final String hdfsNameNode = parser.get("hdfsNameNode"); + log.info("hdfsNameNode {}", hdfsNameNode); + + + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + + new GetOpenCitationsRefs().doExtract(inputFile, workingPath, fileSystem); + } + + private void doExtract(String inputFile, String workingPath, FileSystem fileSystem) + throws IOException { + + final Path path = new Path(inputFile); + + FSDataInputStream oc_zip = fileSystem.open(path); + + int count = 1; + try (ZipInputStream zis = new ZipInputStream(oc_zip)) { + ZipEntry entry = null; + while ((entry = zis.getNextEntry()) != null) { + + if (!entry.isDirectory()) { + String fileName = entry.getName(); + fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count; + count++; + try ( + FSDataOutputStream out = fileSystem + .create(new Path(workingPath + "/COCI/" + fileName + ".gz")); + GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { + + IOUtils.copy(zis, gzipOs); + + } + } + + } + + } + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json new file mode 100644 index 000000000..4910ad11d --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "if", + "paramLongName": "inputFile", + "paramDescription": "the zipped opencitations file", + "paramRequired": true + }, + { + "paramName": "wp", + "paramLongName": "workingPath", + "paramDescription": "the working path", + "paramRequired": true + }, + { + "paramName": "hnn", + "paramLongName": "hdfsNameNode", + "paramDescription": "the hdfs name node", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/config-default.xml new file mode 100644 index 000000000..a1755f329 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/config-default.xml @@ -0,0 +1,58 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + + + oozie.launcher.mapreduce.user.classpath.first + true + + + sparkExecutorNumber + 4 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + sparkDriverMemory + 15G + + + sparkExecutorMemory + 6G + + + sparkExecutorCores + 1 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/download.sh b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/download.sh new file mode 100644 index 000000000..54f66287c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/download.sh @@ -0,0 +1,2 @@ +#!/bin/bash +wget -i $1 \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml new file mode 100644 index 000000000..90d5e9eee --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -0,0 +1,64 @@ + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + ${jobTracker} + ${nameNode} + + + mapred.job.queue.name + ${queueName} + + + download.sh + ${url} + ${crossrefDumpPath} + ${crossrefdumpfilename} + ${crossrefdumptoken} + HADOOP_USER_NAME=${wf:user()} + download.sh + + + + + + + + eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs + --hdfsNameNode${nameNode} + --inputFile${inputFile} + --workingPath${workingDir}/OpenCitations + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/opencitations_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/opencitations_parameters.json new file mode 100644 index 000000000..258d6816e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/opencitations_parameters.json @@ -0,0 +1,8 @@ +[ + {"paramName":"n", "paramLongName":"hdfsServerUri", "paramDescription": "the server uri", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the default work path", "paramRequired": true}, + {"paramName":"f", "paramLongName":"opencitationFile", "paramDescription": "the name of the file", "paramRequired": true}, + {"paramName":"issm", "paramLongName":"isSparkSessionManaged", "paramDescription": "the name of the activities orcid file", "paramRequired": false}, + {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the name of the activities orcid file", "paramRequired": true} + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java index 0d7c74475..23e97a97a 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java @@ -69,7 +69,7 @@ public class PropagationConstant { PROPAGATION_DATA_INFO_TYPE, PROPAGATION_COUNTRY_INSTREPO_CLASS_ID, PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS)); + ModelConstants.DNET_PROVENANCE_ACTIONS)); return nc; } @@ -84,7 +84,8 @@ public class PropagationConstant { return di; } - public static Qualifier getQualifier(String inference_class_id, String inference_class_name, String qualifierSchema) { + public static Qualifier getQualifier(String inference_class_id, String inference_class_name, + String qualifierSchema) { Qualifier pa = new Qualifier(); pa.setClassid(inference_class_id); pa.setClassname(inference_class_name); @@ -108,7 +109,11 @@ public class PropagationConstant { r.setRelClass(rel_class); r.setRelType(rel_type); r.setSubRelType(subrel_type); - r.setDataInfo(getDataInfo(inference_provenance, inference_class_id, inference_class_name, ModelConstants.DNET_PROVENANCE_ACTIONS)); + r + .setDataInfo( + getDataInfo( + inference_provenance, inference_class_id, inference_class_name, + ModelConstants.DNET_PROVENANCE_ACTIONS)); return r; } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java index 68949b900..a38b4da2e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java @@ -173,14 +173,17 @@ public class SparkOrcidToResultFromSemRelJob { if (toaddpid) { StructuredProperty p = new StructuredProperty(); p.setValue(autoritative_author.getOrcid()); - p.setQualifier(getQualifier(ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES)); + p + .setQualifier( + getQualifier( + ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES)); p .setDataInfo( getDataInfo( PROPAGATION_DATA_INFO_TYPE, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID, PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS)); + ModelConstants.DNET_PROVENANCE_ACTIONS)); Optional> authorPid = Optional.ofNullable(author.getPid()); if (authorPid.isPresent()) { diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java index 1289ff644..50df08f8c 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java @@ -10,7 +10,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelConstants; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -22,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.Result; import scala.Tuple2; @@ -130,7 +130,7 @@ public class SparkResultToCommunityFromOrganizationJob { PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS))); + ModelConstants.DNET_PROVENANCE_ACTIONS))); propagatedContexts.add(newContext); } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java index 7f76ead94..f31a26230 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -7,7 +7,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelConstants; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -20,6 +19,7 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; @@ -126,7 +126,7 @@ public class SparkResultToCommunityThroughSemRelJob { PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS))); + ModelConstants.DNET_PROVENANCE_ACTIONS))); return newContext; } return null;