first steps in the implementation of the integration of opencitations

This commit is contained in:
Miriam Baglioni 2021-09-22 15:18:05 +02:00
parent 954a16c213
commit f2118d771a
12 changed files with 322 additions and 10 deletions

View File

@ -10,7 +10,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.opencsv.bean.CsvToBeanBuilder;
public class GetCSV {

View File

@ -0,0 +1,63 @@
package eu.dnetlib.dhp.actionmanager.opencitations;
import java.io.BufferedOutputStream;
import java.net.URI;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.mortbay.log.Log;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class ExtractOpenCitationRefs {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
ExtractOpenCitationRefs.class
.getResourceAsStream(
"/eu/dnetlib/dhp/a/ccionmanager/opencitations/opencitations_parameters.json")));
parser.parseArgument(args);
final String hdfsServerUri = parser.get("hdfsServerUri");
final String workingPath = hdfsServerUri.concat(parser.get("workingPath"));
final String outputPath = parser.get("outputPath");
final String opencitationFile = parser.get("opencitationFile");
Path hdfsreadpath = new Path(workingPath.concat("/").concat(opencitationFile));
Configuration conf = new Configuration();
conf.set("fs.defaultFS", workingPath);
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem fs = FileSystem.get(URI.create(workingPath), conf);
FSDataInputStream crossrefFileStream = fs.open(hdfsreadpath);
try (TarArchiveInputStream tais = new TarArchiveInputStream(
new GzipCompressorInputStream(crossrefFileStream))) {
TarArchiveEntry entry = null;
while ((entry = tais.getNextTarEntry()) != null) {
if (!entry.isDirectory()) {
try (
FSDataOutputStream out = fs
.create(new Path(outputPath.concat(entry.getName()).concat(".gz")));
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
IOUtils.copy(tais, gzipOs);
}
}
}
}
Log.info("Crossref dump reading completed");
}
}

View File

@ -0,0 +1,90 @@
package eu.dnetlib.dhp.actionmanager.opencitations;
import java.io.*;
import java.io.Serializable;
import java.util.Objects;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class GetOpenCitationsRefs implements Serializable {
private static final Logger log = LoggerFactory.getLogger(GetOpenCitationsRefs.class);
public static void main(final String[] args) throws IOException, ParseException {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects
.requireNonNull(
GetOpenCitationsRefs.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json"))));
parser.parseArgument(args);
final String inputFile = parser.get("inputFile");
log.info("inputFile {}", inputFile);
final String workingPath = parser.get("workingPath");
log.info("workingPath {}", workingPath);
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
new GetOpenCitationsRefs().doExtract(inputFile, workingPath, fileSystem);
}
private void doExtract(String inputFile, String workingPath, FileSystem fileSystem)
throws IOException {
final Path path = new Path(inputFile);
FSDataInputStream oc_zip = fileSystem.open(path);
int count = 1;
try (ZipInputStream zis = new ZipInputStream(oc_zip)) {
ZipEntry entry = null;
while ((entry = zis.getNextEntry()) != null) {
if (!entry.isDirectory()) {
String fileName = entry.getName();
fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count;
count++;
try (
FSDataOutputStream out = fileSystem
.create(new Path(workingPath + "/COCI/" + fileName + ".gz"));
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
IOUtils.copy(zis, gzipOs);
}
}
}
}
}
}

View File

@ -0,0 +1,20 @@
[
{
"paramName": "if",
"paramLongName": "inputFile",
"paramDescription": "the zipped opencitations file",
"paramRequired": true
},
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramDescription": "the working path",
"paramRequired": true
},
{
"paramName": "hnn",
"paramLongName": "hdfsNameNode",
"paramDescription": "the hdfs name node",
"paramRequired": true
}
]

View File

@ -0,0 +1,58 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>sparkExecutorNumber</name>
<value>4</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>sparkDriverMemory</name>
<value>15G</value>
</property>
<property>
<name>sparkExecutorMemory</name>
<value>6G</value>
</property>
<property>
<name>sparkExecutorCores</name>
<value>1</value>
</property>
</configuration>

View File

@ -0,0 +1,64 @@
<workflow-app name="OpenCitations Integration" xmlns="uri:oozie:workflow:0.5">
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="download"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="download">
<shell xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>download.sh</exec>
<argument>${url}</argument>
<argument>${crossrefDumpPath}</argument>
<argument>${crossrefdumpfilename}</argument>
<argument>${crossrefdumptoken}</argument>
<env-var>HADOOP_USER_NAME=${wf:user()}</env-var>
<file>download.sh</file>
<capture-output/>
</shell>
<ok to="ImportCrossRef"/>
<error to="Kill"/>
</action>
<action name="extract">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--inputFile</arg><arg>${inputFile}</arg>
<arg>--workingPath</arg><arg>${workingDir}/OpenCitations</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,8 @@
[
{"paramName":"n", "paramLongName":"hdfsServerUri", "paramDescription": "the server uri", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the default work path", "paramRequired": true},
{"paramName":"f", "paramLongName":"opencitationFile", "paramDescription": "the name of the file", "paramRequired": true},
{"paramName":"issm", "paramLongName":"isSparkSessionManaged", "paramDescription": "the name of the activities orcid file", "paramRequired": false},
{"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the name of the activities orcid file", "paramRequired": true}
]

View File

@ -69,7 +69,7 @@ public class PropagationConstant {
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_COUNTRY_INSTREPO_CLASS_ID,
PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS));
ModelConstants.DNET_PROVENANCE_ACTIONS));
return nc;
}
@ -84,7 +84,8 @@ public class PropagationConstant {
return di;
}
public static Qualifier getQualifier(String inference_class_id, String inference_class_name, String qualifierSchema) {
public static Qualifier getQualifier(String inference_class_id, String inference_class_name,
String qualifierSchema) {
Qualifier pa = new Qualifier();
pa.setClassid(inference_class_id);
pa.setClassname(inference_class_name);
@ -108,7 +109,11 @@ public class PropagationConstant {
r.setRelClass(rel_class);
r.setRelType(rel_type);
r.setSubRelType(subrel_type);
r.setDataInfo(getDataInfo(inference_provenance, inference_class_id, inference_class_name, ModelConstants.DNET_PROVENANCE_ACTIONS));
r
.setDataInfo(
getDataInfo(
inference_provenance, inference_class_id, inference_class_name,
ModelConstants.DNET_PROVENANCE_ACTIONS));
return r;
}

View File

@ -173,14 +173,17 @@ public class SparkOrcidToResultFromSemRelJob {
if (toaddpid) {
StructuredProperty p = new StructuredProperty();
p.setValue(autoritative_author.getOrcid());
p.setQualifier(getQualifier(ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES));
p
.setQualifier(
getQualifier(
ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, ModelConstants.DNET_PID_TYPES));
p
.setDataInfo(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID,
PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS));
ModelConstants.DNET_PROVENANCE_ACTIONS));
Optional<List<StructuredProperty>> authorPid = Optional.ofNullable(author.getPid());
if (authorPid.isPresent()) {

View File

@ -10,7 +10,6 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
@ -22,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
@ -130,7 +130,7 @@ public class SparkResultToCommunityFromOrganizationJob {
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS)));
ModelConstants.DNET_PROVENANCE_ACTIONS)));
propagatedContexts.add(newContext);
}
}

View File

@ -7,7 +7,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
@ -20,6 +19,7 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
@ -126,7 +126,7 @@ public class SparkResultToCommunityThroughSemRelJob {
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS)));
ModelConstants.DNET_PROVENANCE_ACTIONS)));
return newContext;
}
return null;