diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java index a6562789d..c9d2495ff 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java @@ -25,18 +25,23 @@ public class PropagationConstant { private PropagationConstant() { } - + public static final String DOI = "doi"; + public static final String REF_DOI = ".refs"; public static final String UPDATE_DATA_INFO_TYPE = "update"; public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos"; public static final String UPDATE_SUBJECT_FOS_CLASS_NAME = "Update of results with FOS subjects"; public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip"; public static final String UPDATE_MEASURE_BIP_CLASS_NAME = "Update of results with BipFinder! measures"; - public static final String FOS_CLASS_ID = "fos"; + public static final String FOS_CLASS_ID = "FOS"; public static final String FOS_CLASS_NAME = "Subject from fos classification"; + public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations"; + public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations"; + public static final String ID_PREFIX = "50|doi_________::"; + public static final String OC_TRUST = "0.91"; - + public final static String NULL = "NULL"; public static final String INSTITUTIONAL_REPO_TYPE = "pubsrepository::institutional"; @@ -95,8 +100,15 @@ public class PropagationConstant { public static DataInfo getDataInfo( String inference_provenance, String inference_class_id, String inference_class_name, String qualifierSchema, String trust) { + return getDataInfo(inference_provenance, inference_class_id, inference_class_name, qualifierSchema, trust, true); + + } + + public static DataInfo getDataInfo( + String inference_provenance, String inference_class_id, String inference_class_name, String qualifierSchema, + String trust, boolean inferred) { DataInfo di = new DataInfo(); - di.setInferred(true); + di.setInferred(inferred); di.setDeletedbyinference(false); di.setTrust(trust); di.setInferenceprovenance(inference_provenance); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/bipfinder/PrepareBipFinder.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/bipfinder/PrepareBipFinder.java index 7c4f5b859..10a00b3e4 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/bipfinder/PrepareBipFinder.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/bipfinder/PrepareBipFinder.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.bypassactionset.bipfinder; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.bypassactionset.SparkUpdateBip; import eu.dnetlib.dhp.bypassactionset.model.BipDeserialize; import eu.dnetlib.dhp.bypassactionset.model.BipScore; import eu.dnetlib.dhp.schema.oaf.Result; diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/SparkUpdateBip.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/bipfinder/SparkUpdateBip.java similarity index 98% rename from dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/SparkUpdateBip.java rename to dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/bipfinder/SparkUpdateBip.java index 737aac0bc..d1489bfdf 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/SparkUpdateBip.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/bipfinder/SparkUpdateBip.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.bypassactionset; +package eu.dnetlib.dhp.bypassactionset.bipfinder; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.bypassactionset.model.BipScore; diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/SparkUpdateFOS.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/fos/SparkUpdateFOS.java similarity index 94% rename from dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/SparkUpdateFOS.java rename to dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/fos/SparkUpdateFOS.java index 79d3d6d14..3c4bd7737 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/SparkUpdateFOS.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/fos/SparkUpdateFOS.java @@ -1,11 +1,9 @@ -package eu.dnetlib.dhp.bypassactionset; +package eu.dnetlib.dhp.bypassactionset.fos; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.bypassactionset.model.FOSDataModel; import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Measure; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import org.apache.commons.io.IOUtils; @@ -28,8 +26,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; public class SparkUpdateFOS implements Serializable { private static final Logger log = LoggerFactory.getLogger(SparkUpdateFOS.class); - private final static String NULL = "NULL"; - private final static String DNET_RESULT_SUBJECT = "dnet:result_subject"; + public static void main(String[] args) throws Exception { @@ -108,7 +105,7 @@ public class SparkUpdateFOS implements Serializable { return null; StructuredProperty sp = new StructuredProperty(); sp.setValue(sbj); - sp.setQualifier(getQualifier(FOS_CLASS_ID, FOS_CLASS_NAME, DNET_RESULT_SUBJECT)); + sp.setQualifier(getQualifier(FOS_CLASS_ID, FOS_CLASS_NAME, ModelConstants.DNET_SUBJECT_TYPOLOGIES)); sp.setDataInfo(getDataInfo(UPDATE_DATA_INFO_TYPE, UPDATE_SUBJECT_FOS_CLASS_ID, UPDATE_SUBJECT_FOS_CLASS_NAME, diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/opencitations/GetOpenCitationsRefs.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/opencitations/GetOpenCitationsRefs.java new file mode 100644 index 000000000..5d42cb4c5 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/opencitations/GetOpenCitationsRefs.java @@ -0,0 +1,92 @@ +package eu.dnetlib.dhp.bypassactionset.opencitations; + +import java.io.*; +import java.io.Serializable; +import java.util.Objects; +import java.util.zip.GZIPOutputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class GetOpenCitationsRefs implements Serializable { + private static final Logger log = LoggerFactory.getLogger(GetOpenCitationsRefs.class); + + public static void main(final String[] args) throws IOException, ParseException { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + GetOpenCitationsRefs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/bypassactionset/opencitations/input_parameters.json")))); + + parser.parseArgument(args); + + final String[] inputFile = parser.get("inputFile").split(";"); + log.info("inputFile {}", inputFile.toString()); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath {}", workingPath); + + final String hdfsNameNode = parser.get("hdfsNameNode"); + log.info("hdfsNameNode {}", hdfsNameNode); + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + + GetOpenCitationsRefs ocr = new GetOpenCitationsRefs(); + + for (String file : inputFile) { + ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem); + } + + } + + private void doExtract(String inputFile, String workingPath, FileSystem fileSystem) + throws IOException { + + final Path path = new Path(inputFile); + + FSDataInputStream oc_zip = fileSystem.open(path); + + int count = 1; + try (ZipInputStream zis = new ZipInputStream(oc_zip)) { + ZipEntry entry = null; + while ((entry = zis.getNextEntry()) != null) { + + if (!entry.isDirectory()) { + String fileName = entry.getName(); + fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count; + count++; + try ( + FSDataOutputStream out = fileSystem + .create(new Path(workingPath + "/COCI/" + fileName + ".gz")); + GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { + + IOUtils.copy(zis, gzipOs); + + } + } + + } + + } + + } + +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/opencitations/SparkUpdateOCRels.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/opencitations/SparkUpdateOCRels.java new file mode 100644 index 000000000..a8ab6b74b --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/opencitations/SparkUpdateOCRels.java @@ -0,0 +1,158 @@ +package eu.dnetlib.dhp.bypassactionset.opencitations; + + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.io.Serializable; +import java.util.*; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +import eu.dnetlib.dhp.schema.common.ModelConstants; + +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; + + + +public class SparkUpdateOCRels implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkUpdateOCRels.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(final String[] args) throws IOException, ParseException { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + SparkUpdateOCRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json")))); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}", inputPath.toString()); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); + + final boolean shouldDuplicateRels = + Optional.ofNullable(parser.get("shouldDuplicateRels")) + .map(Boolean::valueOf) + .orElse(Boolean.FALSE); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> + addOCRelations(spark, inputPath, outputPath, shouldDuplicateRels) + ); + + } + + private static void addOCRelations(SparkSession spark, String inputPath, String outputPath, + boolean shouldDuplicateRels) { + spark + .sqlContext() + .createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING()) + .flatMap( + (FlatMapFunction) value -> createRelation(value, shouldDuplicateRels).iterator(), + Encoders.bean(Relation.class)) + .filter((FilterFunction) value -> value != null) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(outputPath); + + } + + private static List createRelation(String value, boolean duplicate) { + String[] line = value.split(","); + if (!line[1].startsWith("10.")) { + return new ArrayList<>(); + } + List relationList = new ArrayList<>(); + + String citing = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue(DOI, line[1])); + final String cited = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue(DOI, line[2])); + + relationList + .addAll( + getRelations( + citing, + cited)); + + if (duplicate && line[1].endsWith(REF_DOI)) { + citing = ID_PREFIX + IdentifierFactory + .md5(CleaningFunctions.normalizePidValue(DOI, line[1].substring(0, line[1].indexOf(REF_DOI)))); + relationList.addAll(getRelations(citing, cited)); + } + + return relationList; + } + + private static Collection getRelations(String citing, String cited) { + + return Arrays + .asList( + getRelation(citing, cited, ModelConstants.CITES), + getRelation(cited, citing, ModelConstants.IS_CITED_BY)); + } + + public static Relation getRelation( + String source, + String target, + String relclass) { + Relation r = new Relation(); + r.setCollectedfrom(getCollectedFrom()); + r.setSource(source); + r.setTarget(target); + r.setRelClass(relclass); + r.setRelType(ModelConstants.RESULT_RESULT); + r.setSubRelType(ModelConstants.CITATION); + r + .setDataInfo( + getDataInfo(UPDATE_DATA_INFO_TYPE, OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, OC_TRUST, false)); + return r; + } + + public static List getCollectedFrom() { + KeyValue kv = new KeyValue(); + kv.setKey(ModelConstants.OPENOCITATIONS_ID); + kv.setValue(ModelConstants.OPENOCITATIONS_NAME); + + return Arrays.asList(kv); + } + + + +}