diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java index abb9dc148f..eca433e9e7 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java @@ -5,13 +5,71 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; +import java.util.Optional; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; public class MakeTarArchive implements Serializable { + private static final Logger log = LoggerFactory.getLogger(MakeTarArchive.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + MakeTarArchive.class + .getResourceAsStream( + "/eu/dnetlib/dhp/common/input_maketar_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + final String outputPath = parser.get("hdfsPath"); + log.info("hdfsPath: {}", outputPath); + + final String hdfsNameNode = parser.get("nameNode"); + log.info("nameNode: {}", hdfsNameNode); + + final String inputPath = parser.get("sourcePath"); + log.info("input path : {}", inputPath); + + final int gBperSplit = Optional + .ofNullable(parser.get("splitSize")) + .map(Integer::valueOf) + .orElse(10); + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + + makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit); + + } + + public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit) + throws IOException { + + RemoteIterator dirIterator = fileSystem.listLocatedStatus(new Path(inputPath)); + + while (dirIterator.hasNext()) { + LocatedFileStatus fileStatus = dirIterator.next(); + + Path p = fileStatus.getPath(); + String pathString = p.toString(); + String entity = pathString.substring(pathString.lastIndexOf("/") + 1); + + MakeTarArchive.tarMaxSize(fileSystem, pathString, outputPath + "/" + entity, entity, gBperSplit); + } + } + private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException { Path hdfsWritePath = new Path(outputPath); if (fileSystem.exists(hdfsWritePath)) { @@ -21,7 +79,7 @@ public class MakeTarArchive implements Serializable { return new TarArchiveOutputStream(fileSystem.create(hdfsWritePath).getWrappedStream()); } - private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name) + private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dirName) throws IOException { Path hdfsWritePath = new Path(outputPath); @@ -37,7 +95,7 @@ public class MakeTarArchive implements Serializable { new Path(inputPath), true); while (iterator.hasNext()) { - writeCurrentFile(fileSystem, dir_name, iterator, ar, 0); + writeCurrentFile(fileSystem, dirName, iterator, ar, 0); } } @@ -59,32 +117,30 @@ public class MakeTarArchive implements Serializable { new Path(inputPath), true); boolean next = fileStatusListIterator.hasNext(); while (next) { - TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + (partNum + 1) + ".tar"); + try (TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + (partNum + 1) + ".tar")) { - long current_size = 0; - while (next && current_size < bytesPerSplit) { - current_size = writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, current_size); - next = fileStatusListIterator.hasNext(); + long currentSize = 0; + while (next && currentSize < bytesPerSplit) { + currentSize = writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, currentSize); + next = fileStatusListIterator.hasNext(); + } + + partNum += 1; } - - partNum += 1; - ar.close(); } - } - } - private static long writeCurrentFile(FileSystem fileSystem, String dir_name, + private static long writeCurrentFile(FileSystem fileSystem, String dirName, RemoteIterator fileStatusListIterator, - TarArchiveOutputStream ar, long current_size) throws IOException { + TarArchiveOutputStream ar, long currentSize) throws IOException { LocatedFileStatus fileStatus = fileStatusListIterator.next(); Path p = fileStatus.getPath(); - String p_string = p.toString(); - if (!p_string.endsWith("_SUCCESS")) { - String name = p_string.substring(p_string.lastIndexOf("/") + 1); + String pString = p.toString(); + if (!pString.endsWith("_SUCCESS")) { + String name = pString.substring(pString.lastIndexOf("/") + 1); if (name.startsWith("part-") & name.length() > 10) { String tmp = name.substring(0, 10); if (name.contains(".")) { @@ -92,9 +148,9 @@ public class MakeTarArchive implements Serializable { } name = tmp; } - TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name); + TarArchiveEntry entry = new TarArchiveEntry(dirName + "/" + name); entry.setSize(fileStatus.getLen()); - current_size += fileStatus.getLen(); + currentSize += fileStatus.getLen(); ar.putArchiveEntry(entry); InputStream is = fileSystem.open(fileStatus.getPath()); @@ -110,7 +166,7 @@ public class MakeTarArchive implements Serializable { ar.closeArchiveEntry(); } - return current_size; + return currentSize; } } diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/common/input_maketar_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/common/input_maketar_parameters.json new file mode 100644 index 0000000000..a153188651 --- /dev/null +++ b/dhp-common/src/main/resources/eu/dnetlib/dhp/common/input_maketar_parameters.json @@ -0,0 +1,30 @@ +[ + + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "hdp", + "paramLongName": "hdfsPath", + "paramDescription": "the path used to store the output archive", + "paramRequired": true + }, + { + "paramName":"nn", + "paramLongName":"nameNode", + "paramDescription": "the name node", + "paramRequired": true + }, + { + "paramName":"ss", + "paramLongName":"splitSize", + "paramDescription": "the maximum size of the archive", + "paramRequired": false + } +] + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/create_scholix_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/create_scholix_params.json index 336181e0a7..c56e130c05 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/create_scholix_params.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/create_scholix_params.json @@ -2,5 +2,6 @@ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"r", "paramLongName":"relationPath", "paramDescription": "the relation resolved Path", "paramRequired": true}, {"paramName":"s", "paramLongName":"summaryPath", "paramDescription": "the summary Path", "paramRequired": true}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target base path of the scholix", "paramRequired": true} + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target base path of the scholix", "paramRequired": true}, + {"paramName":"dc", "paramLongName":"dumpCitations", "paramDescription": "should dump citation relations", "paramRequired": false} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml index a37d85ad41..d47ebb0bec 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/dumpScholix/oozie_app/workflow.xml @@ -16,7 +16,11 @@ maxNumberOfPid filter relation with at least #maxNumberOfPid - + + dumpCitations + false + should dump citation relations + @@ -98,6 +102,7 @@ --summaryPath${targetPath}/provision/summaries --targetPath${targetPath}/provision/scholix --relationPath${targetPath}/relation + --dumpCitations${dumpCitations} @@ -135,11 +140,21 @@ --objectTypescholix --maxPidNumberFiltermaxNumberOfPid + + + + + + + eu.dnetlib.dhp.common.MakeTarArchive + --nameNode${nameNode} + --hdfsPath${targetPath}/tar + --sourcePath${targetPath}/json + - - + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala index 0073afff5e..556106180e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala @@ -1,7 +1,10 @@ package eu.dnetlib.dhp.sx.graph import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset} import org.apache.commons.io.IOUtils import org.apache.commons.lang3.StringUtils @@ -9,7 +12,8 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} -import scala.collection.JavaConverters._ +import scala.reflect.ClassTag +import scala.util.Try object SparkConvertRDDtoDataset { @@ -36,11 +40,12 @@ object SparkConvertRDDtoDataset { val t = parser.get("targetPath") log.info(s"targetPath -> $t") - val filterRelation = parser.get("filterRelation") - log.info(s"filterRelation -> $filterRelation") + val subRelTypeFilter = parser.get("filterRelation") + log.info(s"filterRelation -> $subRelTypeFilter") val entityPath = s"$t/entities" val relPath = s"$t/relation" + val mapper = new ObjectMapper() implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) @@ -99,44 +104,66 @@ object SparkConvertRDDtoDataset { log.info("Converting Relation") - if (filterRelation != null && StringUtils.isNoneBlank(filterRelation)) { + val relClassFilter = List( + ModelConstants.MERGES, + ModelConstants.IS_MERGED_IN, + ModelConstants.HAS_AMONG_TOP_N_SIMILAR_DOCS, + ModelConstants.IS_AMONG_TOP_N_SIMILAR_DOCS + ) - val rddRelation = spark.sparkContext - .textFile(s"$sourcePath/relation") - .map(s => mapper.readValue(s, classOf[Relation])) - .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false) - .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) - //filter OpenCitations relations - .filter(r => - r.getCollectedfrom != null && r.getCollectedfrom.size() > 0 && !r.getCollectedfrom.asScala.exists(k => - "opencitations".equalsIgnoreCase(k.getValue) - ) - ) - .filter(r => r.getSubRelType != null && r.getSubRelType.equalsIgnoreCase(filterRelation)) - spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") - } else { - - val relationSemanticFilter = List( - "merges", - "ismergedin", - "HasAmongTopNSimilarDocuments", - "IsAmongTopNSimilarDocuments" + val rddRelation = spark.sparkContext + .textFile(s"$sourcePath/relation") + .map(s => mapper.readValue(s, classOf[Relation])) + .filter(r => r.getDataInfo != null && !r.getDataInfo.getDeletedbyinference) + .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) + .filter(r => filterRelations(subRelTypeFilter, relClassFilter, r)) + //filter OpenCitations relations + .filter(r => + r.getDataInfo.getProvenanceaction != null && + !"sysimport:crosswalk:opencitations".equals(r.getDataInfo.getProvenanceaction.getClassid) ) - val rddRelation = spark.sparkContext - .textFile(s"$sourcePath/relation") - .map(s => mapper.readValue(s, classOf[Relation])) - .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false) - .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) - //filter OpenCitations relations - .filter(r => - r.getCollectedfrom != null && r.getCollectedfrom.size() > 0 && !r.getCollectedfrom.asScala.exists(k => - "opencitations".equalsIgnoreCase(k.getValue) - ) - ) - .filter(r => !relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass))) - spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") - } - + spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") } + + private def filterRelations(subRelTypeFilter: String, relClassFilter: List[String], r: Relation): Boolean = { + if (StringUtils.isNotBlank(subRelTypeFilter)) { + subRelTypeFilter.equalsIgnoreCase(r.getSubRelType) + } else { + !relClassFilter.exists(k => k.equalsIgnoreCase(r.getRelClass)) + } + } + + /* + //TODO: finalise implementation + private def processResult[T<: Result]( + implicit ct: ClassTag[T], + log: Logger, + spark: SparkSession, + sourcePath: String, + entityPath: String, + clazz: Class[T] + ): Unit = { + val entityType = clazz.getSimpleName.toLowerCase + + log.info(s"Converting $entityType") + + val mapper = new ObjectMapper() with ScalaObjectMapper + mapper.registerModule(DefaultScalaModule) + + val rdd = spark.sparkContext + .textFile(s"$sourcePath/$entityType") + .map(s => mapper.readValue(s, clazz)) + .filter(r => r.getDataInfo != null && !r.getDataInfo.getDeletedbyinference); + + implicit val encoder: Encoder[T] = Encoders.kryo(clazz) + spark + .createDataset(rdd) + .as[T] + .write + .mode(SaveMode.Overwrite) + .save(s"$entityPath/$entityType") + } + */ + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala index af19b9698c..fd06e7deaf 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala @@ -12,6 +12,8 @@ import org.apache.spark.sql.functions.count import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} +import scala.util.Try + object SparkCreateScholix { def main(args: Array[String]): Unit = { @@ -37,6 +39,8 @@ object SparkCreateScholix { log.info(s"summaryPath -> $summaryPath") val targetPath = parser.get("targetPath") log.info(s"targetPath -> $targetPath") + val dumpCitations = Try(parser.get("dumpCitations").toBoolean).getOrElse(false) + log.info(s"dumpCitations -> $dumpCitations") implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation] implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary] @@ -138,7 +142,7 @@ object SparkCreateScholix { val relatedEntitiesDS: Dataset[RelatedEntities] = spark.read .load(s"$targetPath/related_entities") .as[RelatedEntities] - .filter(r => r.relatedPublication > 0 || r.relatedDataset > 0) + .filter(r => dumpCitations || r.relatedPublication > 0 || r.relatedDataset > 0) relatedEntitiesDS .joinWith(summaryDS, relatedEntitiesDS("id").equalTo(summaryDS("_1")), "inner")