From 1a6b3989685de1ca5899b703e87472686e3162e0 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 30 Jun 2021 17:27:55 +0200 Subject: [PATCH] implemented Creation of Raw Graph and Resolution --- .../eu/dnetlib/dhp/sx/graph/IdReplace.scala | 3 - .../dhp/sx/graph/ImportDataFromMongo.java | 153 ----------- .../dhp/sx/graph/SparkCreateInputGraph.scala | 79 ++++++ .../dhp/sx/graph/SparkExtractEntitiesJob.java | 126 --------- .../dhp/sx/graph/SparkResolveRelation.scala | 90 ++++++ .../sx/graph/SparkSXGeneratePidSimlarity.java | 75 ----- .../SparkScholexplorerCreateRawGraphJob.java | 256 ------------------ .../SparkScholexplorerGraphImporter.java | 72 ----- .../sx/graph/SparkSplitOafTODLIEntities.scala | 203 -------------- .../dhp/sx/graph/SparkXMLToOAFDataset.scala | 73 ----- .../dhp/sx/graph/ebi/SparkEBILinksToOaf.scala | 4 +- .../dhp/sx/graph/extract_entities_params.json | 5 + .../sx/graph/resolve_relations_params.json | 6 + .../dhp/sx/graph/step1/oozie_app/workflow.xml | 139 +++------- 14 files changed, 227 insertions(+), 1057 deletions(-) delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/IdReplace.scala delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ImportDataFromMongo.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkExtractEntitiesJob.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporter.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extract_entities_params.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/IdReplace.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/IdReplace.scala deleted file mode 100644 index 8d375600c..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/IdReplace.scala +++ /dev/null @@ -1,3 +0,0 @@ -package eu.dnetlib.dhp.sx.graph - -case class IdReplace(newId:String, oldId:String) {} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ImportDataFromMongo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ImportDataFromMongo.java deleted file mode 100644 index bc40afbfd..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ImportDataFromMongo.java +++ /dev/null @@ -1,153 +0,0 @@ - -package eu.dnetlib.dhp.sx.graph; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.bson.Document; -import org.bson.conversions.Bson; - -import com.mongodb.DBObject; -import com.mongodb.MongoClient; -import com.mongodb.QueryBuilder; -import com.mongodb.client.FindIterable; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoDatabase; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; - -/** - * This job is responsible to collect data from mongoDatabase and store in a sequence File on HDFS Mongo database - * contains information of each MDSTore in two collections: -metadata That contains info like: ID, format, layout, - * interpretation -metadataManager: that contains info : ID, mongoCollectionName from the metadata collection we filter - * the ids with Format, layout, and Interpretation from the metadataManager we get the current MONGO collection name - * which contains metadata XML see function getCurrentId - *

- * This Job will be called different times in base at the triple we want import, and generates for each triple a - * sequence file of XML - */ -public class ImportDataFromMongo { - /** - * It requires in input some parameters described on a file - * eu/dnetlib/dhp/graph/sx/import_from_mongo_parameters.json - *

- * - the name node - the paht where store HDFS File - the mongo host - the mongo port - the metadata format to - * import - the metadata layout to import - the metadata interpretation to import - the mongo database Name - *

- * This params are encoded into args - * - * @param args - * @throws Exception - */ - public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - ImportDataFromMongo.class - .getResourceAsStream( - "/eu/dnetlib/dhp/sx/graph/argumentparser/import_from_mongo_parameters.json"))); - parser.parseArgument(args); - final int port = Integer.parseInt(parser.get("dbport")); - final String host = parser.get("dbhost"); - - final String format = parser.get("format"); - final String layout = parser.get("layout"); - final String interpretation = parser.get("interpretation"); - - final String dbName = parser.get("dbName"); - final MongoClient client = new MongoClient(host, port); - MongoDatabase database = client.getDatabase(dbName); - - MongoCollection metadata = database.getCollection("metadata"); - MongoCollection metadataManager = database.getCollection("metadataManager"); - final DBObject query = QueryBuilder - .start("format") - .is(format) - .and("layout") - .is(layout) - .and("interpretation") - .is(interpretation) - .get(); - final List ids = new ArrayList<>(); - metadata - .find((Bson) query) - .forEach((Consumer) document -> ids.add(document.getString("mdId"))); - List databaseId = ids - .stream() - .map(it -> getCurrentId(it, metadataManager)) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - final String hdfsuri = parser.get("namenode"); - // ====== Init HDFS File System Object - Configuration conf = new Configuration(); - // Set FileSystem URI - conf.set("fs.defaultFS", hdfsuri); - // Because of Maven - conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); - conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); - - FileSystem.get(URI.create(hdfsuri), conf); - Path hdfswritepath = new Path(parser.get("targetPath")); - - final AtomicInteger counter = new AtomicInteger(0); - try (SequenceFile.Writer writer = SequenceFile - .createWriter( - conf, - SequenceFile.Writer.file(hdfswritepath), - SequenceFile.Writer.keyClass(IntWritable.class), - SequenceFile.Writer.valueClass(Text.class))) { - final IntWritable key = new IntWritable(counter.get()); - final Text value = new Text(); - databaseId - .forEach( - id -> { - System.out.println("Reading :" + id); - MongoCollection collection = database.getCollection(id); - collection - .find() - .forEach( - (Consumer) document -> { - key.set(counter.getAndIncrement()); - value.set(document.getString("body")); - - if (counter.get() % 10000 == 0) { - System.out.println("Added " + counter.get()); - } - try { - writer.append(key, value); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - }); - } - } - - /** - * Return the name of mongo collection giving an MdStore ID - * - * @param mdId The id of the MDStore - * @param metadataManager The collection metadataManager on mongo which contains this information - * @return - */ - private static String getCurrentId( - final String mdId, final MongoCollection metadataManager) { - FindIterable result = metadataManager.find((Bson) QueryBuilder.start("mdId").is(mdId).get()); - final Document item = result.first(); - return item == null ? null : item.getString("currentId"); - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala new file mode 100644 index 000000000..5605ad001 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala @@ -0,0 +1,79 @@ +package eu.dnetlib.dhp.sx.graph + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset} +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + + + +object SparkCreateInputGraph { + + def main(args: Array[String]): Unit = { + + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/extract_entities_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + + val resultObject = List( + ("publication", classOf[Publication]), + ("dataset", classOf[OafDataset]), + ("software", classOf[Software]), + ("otherResearchProduct", classOf[OtherResearchProduct]) + + ) + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) + + val sourcePath = parser.get("sourcePath") + log.info(s"sourcePath -> $sourcePath") + val targetPath = parser.get("targetPath") + log.info(s"targetPath -> $targetPath") + + + val oafDs:Dataset[Oaf] = spark.read.load(s"$sourcePath/*").as[Oaf] + + resultObject.foreach(r => extractEntities(oafDs,s"$targetPath/extracted/${r._1}", r._2, log)) + + extractEntities(oafDs,s"$targetPath/extracted/relation", classOf[Relation], log) + + resultObject.foreach { r => + log.info(s"Make ${r._1} unique") + makeDatasetUnique(s"$targetPath/extracted/${r._1}",s"$targetPath/dedup/${r._1}",spark, r._2) + } + } + + + def extractEntities[T <: Oaf ](oafDs:Dataset[Oaf], targetPath:String, clazz:Class[T], log:Logger) :Unit = { + + implicit val resEncoder: Encoder[T] = Encoders.kryo(clazz) + log.info(s"Extract ${clazz.getSimpleName}") + oafDs.filter(o => o.isInstanceOf[T]).map(p => p.asInstanceOf[T]).write.mode(SaveMode.Overwrite).save(targetPath) + } + + + def makeDatasetUnique[T <: Result ](sourcePath:String, targetPath:String, spark:SparkSession, clazz:Class[T]) :Unit = { + import spark.implicits._ + + implicit val resEncoder: Encoder[T] = Encoders.kryo(clazz) + + val ds:Dataset[T] = spark.read.load(sourcePath).as[T] + + ds.groupByKey(_.getId).reduceGroups{(x,y) => + x.mergeFrom(y) + x + }.map(_._2).write.mode(SaveMode.Overwrite).save(targetPath) + + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkExtractEntitiesJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkExtractEntitiesJob.java deleted file mode 100644 index 4f015a9ad..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkExtractEntitiesJob.java +++ /dev/null @@ -1,126 +0,0 @@ - -package eu.dnetlib.dhp.sx.graph; - -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; - -import com.jayway.jsonpath.JsonPath; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import net.minidev.json.JSONArray; - -/** - * This Job extracts a typology of entity and stores it in a new RDD This job is called different times, for each file - * generated by the Job {@link ImportDataFromMongo} and store the new RDD in a path that should be under a folder: - * extractedEntities/entity/version1 - *

- * at the end of this process we will have : extractedEntities/dataset/version1 extractedEntities/dataset/version2 - * extractedEntities/dataset/... extractedEntities/publication/version1 extractedEntities/publication/version2 - * extractedEntities/publication/... extractedEntities/unknown/version1 extractedEntities/unknown/version2 - * extractedEntities/unknown/... extractedEntities/relation/version1 extractedEntities/relation/version2 - * extractedEntities/relation/... - */ -public class SparkExtractEntitiesJob { - static final String IDJSONPATH = "$.id"; - static final String SOURCEJSONPATH = "$.source"; - static final String TARGETJSONPATH = "$.target"; - - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkExtractEntitiesJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json"))); - parser.parseArgument(args); - final SparkSession spark = SparkSession - .builder() - .appName(SparkExtractEntitiesJob.class.getSimpleName()) - .master(parser.get("master")) - .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String targetPath = parser.get("targetPath"); - final String tdir = parser.get("targetDir"); - final JavaRDD inputRDD = sc.textFile(inputPath); - - List entities = Arrays - .stream(parser.get("entities").split(",")) - .map(String::trim) - .collect(Collectors.toList()); - if (entities.stream().anyMatch("dataset"::equalsIgnoreCase)) { - // Extract Dataset - inputRDD - .filter(SparkExtractEntitiesJob::isDataset) - .saveAsTextFile(targetPath + "/dataset/" + tdir, GzipCodec.class); - } - if (entities.stream().anyMatch("unknown"::equalsIgnoreCase)) { - // Extract Unknown - inputRDD - .filter(SparkExtractEntitiesJob::isUnknown) - .saveAsTextFile(targetPath + "/unknown/" + tdir, GzipCodec.class); - } - - if (entities.stream().anyMatch("relation"::equalsIgnoreCase)) { - // Extract Relation - inputRDD - .filter(SparkExtractEntitiesJob::isRelation) - .saveAsTextFile(targetPath + "/relation/" + tdir, GzipCodec.class); - } - if (entities.stream().anyMatch("publication"::equalsIgnoreCase)) { - // Extract Relation - inputRDD - .filter(SparkExtractEntitiesJob::isPublication) - .saveAsTextFile(targetPath + "/publication/" + tdir, GzipCodec.class); - } - } - - public static boolean isDataset(final String json) { - final String id = getJPathString(IDJSONPATH, json); - if (StringUtils.isBlank(id)) - return false; - return id.startsWith("60|"); - } - - public static boolean isPublication(final String json) { - final String id = getJPathString(IDJSONPATH, json); - if (StringUtils.isBlank(id)) - return false; - return id.startsWith("50|"); - } - - public static boolean isUnknown(final String json) { - final String id = getJPathString(IDJSONPATH, json); - if (StringUtils.isBlank(id)) - return false; - return id.startsWith("70|"); - } - - public static boolean isRelation(final String json) { - final String source = getJPathString(SOURCEJSONPATH, json); - final String target = getJPathString(TARGETJSONPATH, json); - return StringUtils.isNotBlank(source) && StringUtils.isNotBlank(target); - } - - public static String getJPathString(final String jsonPath, final String json) { - try { - Object o = JsonPath.read(json, jsonPath); - if (o instanceof String) - return (String) o; - if (o instanceof JSONArray && ((JSONArray) o).size() > 0) - return (String) ((JSONArray) o).get(0); - return ""; - } catch (Exception e) { - return ""; - } - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala new file mode 100644 index 000000000..6ee575e2a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala @@ -0,0 +1,90 @@ +package eu.dnetlib.dhp.sx.graph + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{Relation, Result} +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql._ +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ +object SparkResolveRelation { + def main(args: Array[String]): Unit = { + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + + val relationPath = parser.get("relationPath") + log.info(s"sourcePath -> $relationPath") + val entityPath = parser.get("entityPath") + log.info(s"targetPath -> $entityPath") + val workingPath = parser.get("workingPath") + log.info(s"workingPath -> $workingPath") + + + implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) + import spark.implicits._ + val entities:Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result] + + entities.flatMap(e => e.getPid.asScala + .map(p => + convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid)) + .filter(s => s!= null) + .map(s => (s,e.getId)) + ).groupByKey(_._1) + .reduceGroups((x,y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) + .map(s =>s._2) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/resolvedPid") + + val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/resolvedPid").as[(String,String)] + + val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) + + relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_1")), "left").map{ + m => + val sourceResolved = m._2 + val currentRelation = m._1._2 + if (sourceResolved!=null && sourceResolved._2.nonEmpty) + currentRelation.setSource(sourceResolved._2) + currentRelation + }.write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/resolvedSource") + + + val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/resolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) + relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_1")), "left").map{ + m => + val targetResolved = m._2 + val currentRelation = m._1._2 + if (targetResolved!=null && targetResolved._2.nonEmpty) + currentRelation.setTarget(targetResolved._2) + currentRelation + }.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50")) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/resolvedRelation") + } + + + + + def convertPidToDNETIdentifier(pid:String, pidType: String):String = { + if (pid==null || pid.isEmpty || pidType== null || pidType.isEmpty) + null + else + s"unresolved::${pid.toLowerCase}::${pidType.toLowerCase}" + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java deleted file mode 100644 index 7003b179d..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java +++ /dev/null @@ -1,75 +0,0 @@ - -package eu.dnetlib.dhp.sx.graph; - -import org.apache.commons.lang3.StringUtils; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; - -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.utils.DHPUtils; -import scala.Tuple2; - -/** - * In some case the identifier generated for the Entity in @{@link SparkExtractEntitiesJob} is different from the - * identifier * associated by the aggregator, this means that some relation points to missing identifier To avoid this - * problem we store in the model the Id and the OriginalObJIdentifier This jobs extract this pair and creates a Similar - * relation that will be used in SparkMergeEntities - */ -public class SparkSXGeneratePidSimlarity { - - static final String IDJSONPATH = "$.id"; - static final String OBJIDPATH = "$.originalObjIdentifier"; - - public static void generateDataFrame( - final SparkSession spark, - final JavaSparkContext sc, - final String inputPath, - final String targetPath) { - - final JavaPairRDD datasetSimRel = sc - .textFile(inputPath + "/dataset/*") - .mapToPair( - (PairFunction) k -> new Tuple2<>( - DHPUtils.getJPathString(IDJSONPATH, k), - DHPUtils.getJPathString(OBJIDPATH, k))) - .filter( - t -> !StringUtils - .substringAfter(t._1(), "|") - .equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::"))) - .distinct(); - - final JavaPairRDD publicationSimRel = sc - .textFile(inputPath + "/publication/*") - .mapToPair( - (PairFunction) k -> new Tuple2<>( - DHPUtils.getJPathString(IDJSONPATH, k), - DHPUtils.getJPathString(OBJIDPATH, k))) - .filter( - t -> !StringUtils - .substringAfter(t._1(), "|") - .equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::"))) - .distinct(); - - JavaRDD simRel = datasetSimRel - .union(publicationSimRel) - .map( - s -> { - final Relation r = new Relation(); - r.setSource(s._1()); - r.setTarget(s._2()); - r.setRelType("similar"); - return r; - }); - spark - .createDataset(simRel.rdd(), Encoders.bean(Relation.class)) - .distinct() - .write() - .mode(SaveMode.Overwrite) - .save(targetPath + "/pid_simRel"); - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java deleted file mode 100644 index 05fb826db..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java +++ /dev/null @@ -1,256 +0,0 @@ - -package eu.dnetlib.dhp.sx.graph; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.jayway.jsonpath.JsonPath; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset; -import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; -import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown; -import eu.dnetlib.dhp.utils.DHPUtils; -import net.minidev.json.JSONArray; -import scala.Tuple2; - -/** - * This job is responsible of the creation of RAW Graph It is applied to the different entities generated from - * {@link SparkExtractEntitiesJob} In case of dataset, publication and Unknown Entities we group all the entities of the - * same type by their identifier, and then in the reduce phase we merge all the entities. Merge means: -merge all the - * metadata -merge the collected From values - *

- * In case of relation we need to make a different work: -Phase 1: Map reduce jobs Map: Get all Relation and emit a key - * constructed by (source, relType, Target) and the relation itself Reduce: Merge all relations Looking at the javadoc - * of {@link SparkSXGeneratePidSimlarity} we take the dataset of pid relation and joining by source and target we - * replace the wrong identifier in the relation with the correct ones. At the end we replace the new Dataset of Relation - */ -public class SparkScholexplorerCreateRawGraphJob { - - static final String IDJSONPATH = "$.id"; - static final String SOURCEJSONPATH = "$.source"; - static final String TARGETJSONPATH = "$.target"; - static final String RELJSONPATH = "$.relType"; - - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkScholexplorerCreateRawGraphJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/sx/graph/argumentparser/merge_entities_scholix_parameters.json"))); - parser.parseArgument(args); - final SparkSession spark = SparkSession - .builder() - .config( - new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")) - .appName(SparkScholexplorerCreateRawGraphJob.class.getSimpleName()) - .master(parser.get("master")) - .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String targetPath = parser.get("targetPath"); - final String entity = parser.get("entity"); - FileSystem fs = FileSystem.get(sc.sc().hadoopConfiguration()); - List subFolder = Arrays - .stream(fs.listStatus(new Path(inputPath))) - .filter(FileStatus::isDirectory) - .map(FileStatus::getPath) - .collect(Collectors.toList()); - List> inputRdd = new ArrayList<>(); - subFolder.forEach(p -> inputRdd.add(sc.textFile(p.toUri().getRawPath()))); - JavaRDD union = sc.emptyRDD(); - for (JavaRDD item : inputRdd) { - union = union.union(item); - } - switch (entity) { - case "dataset": - union - .mapToPair( - (PairFunction) f -> { - final String id = getJPathString(IDJSONPATH, f); - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - return new Tuple2<>(id, mapper.readValue(f, DLIDataset.class)); - }) - .reduceByKey( - (a, b) -> { - a.mergeFrom(b); - return a; - }) - .map( - item -> { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(item._2()); - }) - .saveAsTextFile(targetPath, GzipCodec.class); - break; - case "publication": - union - .mapToPair( - (PairFunction) f -> { - final String id = getJPathString(IDJSONPATH, f); - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - return new Tuple2<>(id, mapper.readValue(f, DLIPublication.class)); - }) - .reduceByKey( - (a, b) -> { - a.mergeFrom(b); - return a; - }) - .map( - item -> { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(item._2()); - }) - .saveAsTextFile(targetPath, GzipCodec.class); - break; - case "unknown": - union - .mapToPair( - (PairFunction) f -> { - final String id = getJPathString(IDJSONPATH, f); - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - return new Tuple2<>(id, mapper.readValue(f, DLIUnknown.class)); - }) - .reduceByKey( - (a, b) -> { - a.mergeFrom(b); - return a; - }) - .map( - item -> { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(item._2()); - }) - .saveAsTextFile(targetPath, GzipCodec.class); - break; - case "relation": - SparkSXGeneratePidSimlarity - .generateDataFrame( - spark, sc, inputPath.replace("/relation", ""), targetPath.replace("/relation", "")); - RDD rdd = union - .mapToPair( - (PairFunction) f -> { - final String source = getJPathString(SOURCEJSONPATH, f); - final String target = getJPathString(TARGETJSONPATH, f); - final String reltype = getJPathString(RELJSONPATH, f); - ObjectMapper mapper = new ObjectMapper(); - mapper - .configure( - DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - return new Tuple2<>( - DHPUtils - .md5( - String - .format( - "%s::%s::%s", - source.toLowerCase(), - reltype.toLowerCase(), - target.toLowerCase())), - mapper.readValue(f, Relation.class)); - }) - .reduceByKey( - (a, b) -> { - a.mergeFrom(b); - return a; - }) - .map(Tuple2::_2) - .rdd(); - - spark - .createDataset(rdd, Encoders.bean(Relation.class)) - .write() - .mode(SaveMode.Overwrite) - .save(targetPath); - Dataset rel_ds = spark.read().load(targetPath).as(Encoders.bean(Relation.class)); - - System.out.println("LOADING PATH :" + targetPath.replace("/relation", "") + "/pid_simRel"); - Dataset sim_ds = spark - .read() - .load(targetPath.replace("/relation", "") + "/pid_simRel") - .as(Encoders.bean(Relation.class)); - - Dataset ids = sim_ds - .map( - (MapFunction) relation -> { - final String type = StringUtils.substringBefore(relation.getSource(), "|"); - relation - .setTarget( - String - .format( - "%s|%s", - type, StringUtils.substringAfter(relation.getTarget(), "::"))); - return relation; - }, - Encoders.bean(Relation.class)); - - final Dataset firstJoin = rel_ds - .joinWith(ids, ids.col("target").equalTo(rel_ds.col("source")), "left_outer") - .map( - (MapFunction, Relation>) s -> { - if (s._2() != null) { - s._1().setSource(s._2().getSource()); - } - return s._1(); - }, - Encoders.bean(Relation.class)); - - Dataset secondJoin = firstJoin - .joinWith(ids, ids.col("target").equalTo(firstJoin.col("target")), "left_outer") - .map( - (MapFunction, Relation>) s -> { - if (s._2() != null) { - s._1().setTarget(s._2().getSource()); - } - return s._1(); - }, - Encoders.bean(Relation.class)); - secondJoin.write().mode(SaveMode.Overwrite).save(targetPath + "_fixed"); - - FileSystem fileSystem = FileSystem.get(sc.hadoopConfiguration()); - - fileSystem.delete(new Path(targetPath), true); - fileSystem.rename(new Path(targetPath + "_fixed"), new Path(targetPath)); - } - } - - public static String getJPathString(final String jsonPath, final String json) { - try { - Object o = JsonPath.read(json, jsonPath); - if (o instanceof String) - return (String) o; - if (o instanceof JSONArray && ((JSONArray) o).size() > 0) - return (String) ((JSONArray) o).get(0); - return ""; - } catch (Exception e) { - return ""; - } - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporter.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporter.java deleted file mode 100644 index 97f1251f0..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporter.java +++ /dev/null @@ -1,72 +0,0 @@ - -package eu.dnetlib.dhp.sx.graph; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.sql.SparkSession; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.sx.graph.parser.DatasetScholexplorerParser; -import eu.dnetlib.dhp.sx.graph.parser.PublicationScholexplorerParser; -import eu.dnetlib.scholexplorer.relation.RelationMapper; -import scala.Tuple2; - -/** - * This Job read a sequential File containing XML stored in the aggregator and generates an RDD of heterogeneous - * entities like Dataset, Relation, Publication and Unknown - */ -public class SparkScholexplorerGraphImporter { - - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkScholexplorerGraphImporter.class - .getResourceAsStream( - "/eu/dnetlib/dhp/sx/graph/argumentparser/input_graph_scholix_parameters.json"))); - - parser.parseArgument(args); - final SparkSession spark = SparkSession - .builder() - .appName(SparkScholexplorerGraphImporter.class.getSimpleName()) - .master(parser.get("master")) - .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - - RelationMapper relationMapper = RelationMapper.load(); - - sc - .sequenceFile(inputPath, IntWritable.class, Text.class) - .map(Tuple2::_2) - .map(Text::toString) - .repartition(500) - .flatMap( - (FlatMapFunction) record -> { - switch (parser.get("entity")) { - case "dataset": - final DatasetScholexplorerParser d = new DatasetScholexplorerParser(); - return d.parseObject(record, relationMapper).iterator(); - case "publication": - final PublicationScholexplorerParser p = new PublicationScholexplorerParser(); - return p.parseObject(record, relationMapper).iterator(); - default: - throw new IllegalArgumentException("wrong values of entities"); - } - }) - .map( - k -> { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(k); - }) - .saveAsTextFile(parser.get("targetPath"), GzipCodec.class); - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala deleted file mode 100644 index 9bdf26acb..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala +++ /dev/null @@ -1,203 +0,0 @@ -package eu.dnetlib.dhp.sx.graph - -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result} -import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown} -import eu.dnetlib.dhp.sx.graph.ebi.EBIAggregator -import org.apache.commons.io.IOUtils -import org.apache.commons.lang3.StringUtils -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} -import org.slf4j.LoggerFactory -import org.apache.spark.sql.functions.col - - -object SparkSplitOafTODLIEntities { - - - def getKeyRelation(rel:Relation):String = { - s"${rel.getSource}::${rel.getRelType}::${rel.getTarget}" - - - } - - - def extract_dataset(spark:SparkSession, workingPath:String) :Unit = { - - implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset] - - val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf].repartition(4000) - - val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset].repartition(1000) - - - OAFDataset - .filter(s => s != null && s.isInstanceOf[DLIDataset]) - .map(s =>s.asInstanceOf[DLIDataset]) - .union(ebi_dataset) - .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder)) - .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getDLIDatasetAggregator().toColumn) - .map(p => p._2) - .repartition(2000) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset") - - } - - def extract_publication(spark:SparkSession, workingPath:String) :Unit = { - - implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication] - - val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf] - - val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication].repartition(1000) - - - OAFDataset - .filter(s => s != null && s.isInstanceOf[DLIPublication]) - .map(s =>s.asInstanceOf[DLIPublication]) - .union(ebi_publication) - .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder)) - .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getDLIPublicationAggregator().toColumn) - .map(p => p._2) - .repartition(2000) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication") - - } - - def extract_unknown(spark:SparkSession, workingPath:String) :Unit = { - - implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown] - - val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf] - - OAFDataset - .filter(s => s != null && s.isInstanceOf[DLIUnknown]) - .map(s =>s.asInstanceOf[DLIUnknown]) - .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, unkEncoder)) - .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getDLIUnknownAggregator().toColumn) - .map(p => p._2) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown") - - } - - - def extract_ids(o:Oaf) :(String, String) = { - - o match { - case p: DLIPublication => - val prefix = StringUtils.substringBefore(p.getId, "|") - val original = StringUtils.substringAfter(p.getOriginalObjIdentifier, "::") - (p.getId, s"$prefix|$original") - case p: DLIDataset => - val prefix = StringUtils.substringBefore(p.getId, "|") - val original = StringUtils.substringAfter(p.getOriginalObjIdentifier, "::") - (p.getId, s"$prefix|$original") - case _ =>null - } - } - - def extract_relations(spark:SparkSession, workingPath:String) :Unit = { - - implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation] - import spark.implicits._ - - val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf] - val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation].repartition(2000) - - - OAFDataset - .filter(o => o.isInstanceOf[Result]) - .map(extract_ids)(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - .filter(r => r != null) - .where("_1 != _2") - .select(col("_1").alias("newId"), col("_2").alias("oldId")) - .distinct() - .map(f => IdReplace(f.getString(0), f.getString(1))) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/id_replace") - - - OAFDataset - .filter(s => s != null && s.isInstanceOf[Relation]) - .map(s =>s.asInstanceOf[Relation]) - .union(ebi_relation) - .map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder)) - .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getRelationAggregator().toColumn) - .map(p => p._2) - .repartition(4000) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation_unfixed") - - - val relations = spark.read.load(s"$workingPath/graph/relation_unfixed").as[Relation] - val ids = spark.read.load(s"$workingPath/graph/id_replace").as[IdReplace] - - relations - .map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder)) - .joinWith(ids, col("_1").equalTo(ids("oldId")), "left") - .map(i =>{ - val r = i._1._2 - if (i._2 != null) - { - val id = i._2.newId - r.setSource(id) - } - r - }).write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/rel_f_source") - - val rel_source:Dataset[Relation] = spark.read.load(s"$workingPath/graph/rel_f_source").as[Relation] - - rel_source - .map(r => (r.getTarget, r))(Encoders.tuple(Encoders.STRING, relEncoder)) - .joinWith(ids, col("_1").equalTo(ids("oldId")), "left") - .map(i =>{ - val r:Relation = i._1._2 - if (i._2 != null) - { - val id = i._2.newId - r.setTarget(id) - } - r - }).write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation") - - - - } - - - def main(args: Array[String]): Unit = { - val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json"))) - val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass) - parser.parseArgument(args) - - val workingPath: String = parser.get("workingPath") - val entity:String = parser.get("entity") - logger.info(s"Working dir path = $workingPath") - - val spark:SparkSession = SparkSession - .builder() - .appName(SparkSplitOafTODLIEntities.getClass.getSimpleName) - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .master(parser.get("master")) - .getOrCreate() - - - entity match { - case "publication" => extract_publication(spark, workingPath) - case "dataset" => extract_dataset(spark,workingPath) - case "relation" => extract_relations(spark, workingPath) - case "unknown" => extract_unknown(spark, workingPath) - } - - - - - - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala deleted file mode 100644 index c63ad4370..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala +++ /dev/null @@ -1,73 +0,0 @@ -package eu.dnetlib.dhp.sx.graph - -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation} -import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication} -import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser} -import eu.dnetlib.scholexplorer.relation.RelationMapper -import org.apache.commons.io.IOUtils -import org.apache.hadoop.io.{IntWritable, Text} -import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} -import org.slf4j.LoggerFactory - -import scala.collection.JavaConverters._ - - -/** - * This new version of the Job read a sequential File containing XML stored in the aggregator and generates a Dataset OAF of heterogeneous - * entities like Dataset, Relation, Publication and Unknown - */ - -object SparkXMLToOAFDataset { - - - def main(args: Array[String]): Unit = { - val logger = LoggerFactory.getLogger(SparkXMLToOAFDataset.getClass) - val conf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(SparkXMLToOAFDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_graph_scholix_parameters.json"))) - parser.parseArgument(args) - val spark = - SparkSession - .builder() - .config(conf) - .appName(SparkXMLToOAFDataset.getClass.getSimpleName) - .master(parser.get("master")).getOrCreate() - - val sc = spark.sparkContext - - implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val datasetEncoder:Encoder[DLIDataset] = Encoders.kryo[DLIDataset] - implicit val publicationEncoder:Encoder[DLIPublication] = Encoders.kryo[DLIPublication] - implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation] - - val relationMapper = RelationMapper.load - - val inputPath: String = parser.get("sourcePath") - val entity: String = parser.get("entity") - val targetPath = parser.get("targetPath") - - logger.info(s"Input path is $inputPath") - logger.info(s"Entity path is $entity") - logger.info(s"Target Path is $targetPath") - - val scholixRdd:RDD[Oaf] = sc.sequenceFile(inputPath, classOf[IntWritable], classOf[Text]) - .map(s => s._2.toString) - .flatMap(s => { - entity match { - case "publication" => - val p = new PublicationScholexplorerParser - val l =p.parseObject(s, relationMapper) - if (l != null) l.asScala else List() - case "dataset" => - val d = new DatasetScholexplorerParser - val l =d.parseObject(s, relationMapper) - if (l != null) l.asScala else List() - } - }).filter(s => s!= null) - spark.createDataset(scholixRdd).write.mode(SaveMode.Append).save(targetPath) - - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala index b4314631a..4c304848b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala @@ -11,9 +11,9 @@ import org.slf4j.{Logger, LoggerFactory} object SparkEBILinksToOaf { def main(args: Array[String]): Unit = { - val log: Logger = LoggerFactory.getLogger(SparkEBILinksToOaf.getClass) + val log: Logger = LoggerFactory.getLogger(getClass) val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(SparkEBILinksToOaf.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/ebi_to_df_params.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/ebi_to_df_params.json"))) parser.parseArgument(args) val spark: SparkSession = SparkSession diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extract_entities_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extract_entities_params.json new file mode 100644 index 000000000..8bfdde5b0 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extract_entities_params.json @@ -0,0 +1,5 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json new file mode 100644 index 000000000..f211adb9a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json @@ -0,0 +1,6 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"r", "paramLongName":"relationPath", "paramDescription": "the source Path", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true}, + {"paramName":"e", "paramLongName":"entityPath", "paramDescription": "the path of the raw graph", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml index c94394b1e..4045e2dfb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml @@ -1,121 +1,72 @@ - + - reuseContent - false - should import content from the aggregator or reuse a previous version - - - workingPath + sourcePath the working dir base path - targetXMLPath + targetPath the graph Raw base path - - targetEntityPath - the graph Raw base path - - - format - the postgres URL to access to the database - - - layout - the user postgres - - - interpretation - the password postgres - - - dbhost - mongoDB url, example: mongodb://[username:password@]host[:port] - - - dbName - mongo database - - - entity - the entity type - - - - - - ${wf:conf('reuseContent') eq false} - ${wf:conf('reuseContent') eq true} - - - - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.sx.graph.ImportDataFromMongo - -t${targetXMLPath} - -n${nameNode} - -h${dbhost} - -p27017 - -dn${dbName} - -f${format} - -l${layout} - -i${interpretation} - - - - - - - - - - - - - - + - ${jobTracker} - ${nameNode} - yarn-cluster + yarn cluster - Import ${entity} and related entities - eu.dnetlib.dhp.sx.graph.SparkXMLToOAFDataset + Extract entities in raw graph + eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph dhp-graph-mapper-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} + --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=2000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - -mt yarn - --sourcePath${targetXMLPath} - --targetPath${workingPath}/input/OAFDataset - --entity${entity} + --masteryarn + --sourcePath${sourcePath} + --targetPath${targetPath} + + + + + + + + + yarn + cluster + Resolve Relations in raw graph + eu.dnetlib.dhp.sx.graph.SparkResolveRelation + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.shuffle.partitions=3000 + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn + --relationPath${targetPath}/extracted/relation + --workingPath${targetPath}/resolved/ + --entityPath${targetPath}/dedup + \ No newline at end of file