diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/IdReplace.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/IdReplace.scala
deleted file mode 100644
index 8d375600ce..0000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/IdReplace.scala
+++ /dev/null
@@ -1,3 +0,0 @@
-package eu.dnetlib.dhp.sx.graph
-
-case class IdReplace(newId:String, oldId:String) {}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ImportDataFromMongo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ImportDataFromMongo.java
deleted file mode 100644
index bc40afbfd6..0000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ImportDataFromMongo.java
+++ /dev/null
@@ -1,153 +0,0 @@
-
-package eu.dnetlib.dhp.sx.graph;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-import com.mongodb.DBObject;
-import com.mongodb.MongoClient;
-import com.mongodb.QueryBuilder;
-import com.mongodb.client.FindIterable;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-
-/**
- * This job is responsible to collect data from mongoDatabase and store in a sequence File on HDFS Mongo database
- * contains information of each MDSTore in two collections: -metadata That contains info like: ID, format, layout,
- * interpretation -metadataManager: that contains info : ID, mongoCollectionName from the metadata collection we filter
- * the ids with Format, layout, and Interpretation from the metadataManager we get the current MONGO collection name
- * which contains metadata XML see function getCurrentId
- *
- * This Job will be called different times in base at the triple we want import, and generates for each triple a
- * sequence file of XML
- */
-public class ImportDataFromMongo {
- /**
- * It requires in input some parameters described on a file
- * eu/dnetlib/dhp/graph/sx/import_from_mongo_parameters.json
- *
- * - the name node - the paht where store HDFS File - the mongo host - the mongo port - the metadata format to
- * import - the metadata layout to import - the metadata interpretation to import - the mongo database Name
- *
- * This params are encoded into args
- *
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- ImportDataFromMongo.class
- .getResourceAsStream(
- "/eu/dnetlib/dhp/sx/graph/argumentparser/import_from_mongo_parameters.json")));
- parser.parseArgument(args);
- final int port = Integer.parseInt(parser.get("dbport"));
- final String host = parser.get("dbhost");
-
- final String format = parser.get("format");
- final String layout = parser.get("layout");
- final String interpretation = parser.get("interpretation");
-
- final String dbName = parser.get("dbName");
- final MongoClient client = new MongoClient(host, port);
- MongoDatabase database = client.getDatabase(dbName);
-
- MongoCollection metadata = database.getCollection("metadata");
- MongoCollection metadataManager = database.getCollection("metadataManager");
- final DBObject query = QueryBuilder
- .start("format")
- .is(format)
- .and("layout")
- .is(layout)
- .and("interpretation")
- .is(interpretation)
- .get();
- final List ids = new ArrayList<>();
- metadata
- .find((Bson) query)
- .forEach((Consumer) document -> ids.add(document.getString("mdId")));
- List databaseId = ids
- .stream()
- .map(it -> getCurrentId(it, metadataManager))
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
-
- final String hdfsuri = parser.get("namenode");
- // ====== Init HDFS File System Object
- Configuration conf = new Configuration();
- // Set FileSystem URI
- conf.set("fs.defaultFS", hdfsuri);
- // Because of Maven
- conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
- conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
-
- FileSystem.get(URI.create(hdfsuri), conf);
- Path hdfswritepath = new Path(parser.get("targetPath"));
-
- final AtomicInteger counter = new AtomicInteger(0);
- try (SequenceFile.Writer writer = SequenceFile
- .createWriter(
- conf,
- SequenceFile.Writer.file(hdfswritepath),
- SequenceFile.Writer.keyClass(IntWritable.class),
- SequenceFile.Writer.valueClass(Text.class))) {
- final IntWritable key = new IntWritable(counter.get());
- final Text value = new Text();
- databaseId
- .forEach(
- id -> {
- System.out.println("Reading :" + id);
- MongoCollection collection = database.getCollection(id);
- collection
- .find()
- .forEach(
- (Consumer) document -> {
- key.set(counter.getAndIncrement());
- value.set(document.getString("body"));
-
- if (counter.get() % 10000 == 0) {
- System.out.println("Added " + counter.get());
- }
- try {
- writer.append(key, value);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- });
- }
- }
-
- /**
- * Return the name of mongo collection giving an MdStore ID
- *
- * @param mdId The id of the MDStore
- * @param metadataManager The collection metadataManager on mongo which contains this information
- * @return
- */
- private static String getCurrentId(
- final String mdId, final MongoCollection metadataManager) {
- FindIterable result = metadataManager.find((Bson) QueryBuilder.start("mdId").is(mdId).get());
- final Document item = result.first();
- return item == null ? null : item.getString("currentId");
- }
-}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala
new file mode 100644
index 0000000000..5605ad0016
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala
@@ -0,0 +1,79 @@
+package eu.dnetlib.dhp.sx.graph
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset}
+import org.apache.commons.io.IOUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+
+
+object SparkCreateInputGraph {
+
+ def main(args: Array[String]): Unit = {
+
+ val log: Logger = LoggerFactory.getLogger(getClass)
+ val conf: SparkConf = new SparkConf()
+ val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/extract_entities_params.json")))
+ parser.parseArgument(args)
+ val spark: SparkSession =
+ SparkSession
+ .builder()
+ .config(conf)
+ .appName(getClass.getSimpleName)
+ .master(parser.get("master")).getOrCreate()
+
+
+ val resultObject = List(
+ ("publication", classOf[Publication]),
+ ("dataset", classOf[OafDataset]),
+ ("software", classOf[Software]),
+ ("otherResearchProduct", classOf[OtherResearchProduct])
+
+ )
+
+ implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
+
+ val sourcePath = parser.get("sourcePath")
+ log.info(s"sourcePath -> $sourcePath")
+ val targetPath = parser.get("targetPath")
+ log.info(s"targetPath -> $targetPath")
+
+
+ val oafDs:Dataset[Oaf] = spark.read.load(s"$sourcePath/*").as[Oaf]
+
+ resultObject.foreach(r => extractEntities(oafDs,s"$targetPath/extracted/${r._1}", r._2, log))
+
+ extractEntities(oafDs,s"$targetPath/extracted/relation", classOf[Relation], log)
+
+ resultObject.foreach { r =>
+ log.info(s"Make ${r._1} unique")
+ makeDatasetUnique(s"$targetPath/extracted/${r._1}",s"$targetPath/dedup/${r._1}",spark, r._2)
+ }
+ }
+
+
+ def extractEntities[T <: Oaf ](oafDs:Dataset[Oaf], targetPath:String, clazz:Class[T], log:Logger) :Unit = {
+
+ implicit val resEncoder: Encoder[T] = Encoders.kryo(clazz)
+ log.info(s"Extract ${clazz.getSimpleName}")
+ oafDs.filter(o => o.isInstanceOf[T]).map(p => p.asInstanceOf[T]).write.mode(SaveMode.Overwrite).save(targetPath)
+ }
+
+
+ def makeDatasetUnique[T <: Result ](sourcePath:String, targetPath:String, spark:SparkSession, clazz:Class[T]) :Unit = {
+ import spark.implicits._
+
+ implicit val resEncoder: Encoder[T] = Encoders.kryo(clazz)
+
+ val ds:Dataset[T] = spark.read.load(sourcePath).as[T]
+
+ ds.groupByKey(_.getId).reduceGroups{(x,y) =>
+ x.mergeFrom(y)
+ x
+ }.map(_._2).write.mode(SaveMode.Overwrite).save(targetPath)
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkExtractEntitiesJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkExtractEntitiesJob.java
deleted file mode 100644
index 4f015a9ad8..0000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkExtractEntitiesJob.java
+++ /dev/null
@@ -1,126 +0,0 @@
-
-package eu.dnetlib.dhp.sx.graph;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SparkSession;
-
-import com.jayway.jsonpath.JsonPath;
-
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import net.minidev.json.JSONArray;
-
-/**
- * This Job extracts a typology of entity and stores it in a new RDD This job is called different times, for each file
- * generated by the Job {@link ImportDataFromMongo} and store the new RDD in a path that should be under a folder:
- * extractedEntities/entity/version1
- *
- * at the end of this process we will have : extractedEntities/dataset/version1 extractedEntities/dataset/version2
- * extractedEntities/dataset/... extractedEntities/publication/version1 extractedEntities/publication/version2
- * extractedEntities/publication/... extractedEntities/unknown/version1 extractedEntities/unknown/version2
- * extractedEntities/unknown/... extractedEntities/relation/version1 extractedEntities/relation/version2
- * extractedEntities/relation/...
- */
-public class SparkExtractEntitiesJob {
- static final String IDJSONPATH = "$.id";
- static final String SOURCEJSONPATH = "$.source";
- static final String TARGETJSONPATH = "$.target";
-
- public static void main(String[] args) throws Exception {
-
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- SparkExtractEntitiesJob.class
- .getResourceAsStream(
- "/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")));
- parser.parseArgument(args);
- final SparkSession spark = SparkSession
- .builder()
- .appName(SparkExtractEntitiesJob.class.getSimpleName())
- .master(parser.get("master"))
- .getOrCreate();
- final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
- final String inputPath = parser.get("sourcePath");
- final String targetPath = parser.get("targetPath");
- final String tdir = parser.get("targetDir");
- final JavaRDD inputRDD = sc.textFile(inputPath);
-
- List entities = Arrays
- .stream(parser.get("entities").split(","))
- .map(String::trim)
- .collect(Collectors.toList());
- if (entities.stream().anyMatch("dataset"::equalsIgnoreCase)) {
- // Extract Dataset
- inputRDD
- .filter(SparkExtractEntitiesJob::isDataset)
- .saveAsTextFile(targetPath + "/dataset/" + tdir, GzipCodec.class);
- }
- if (entities.stream().anyMatch("unknown"::equalsIgnoreCase)) {
- // Extract Unknown
- inputRDD
- .filter(SparkExtractEntitiesJob::isUnknown)
- .saveAsTextFile(targetPath + "/unknown/" + tdir, GzipCodec.class);
- }
-
- if (entities.stream().anyMatch("relation"::equalsIgnoreCase)) {
- // Extract Relation
- inputRDD
- .filter(SparkExtractEntitiesJob::isRelation)
- .saveAsTextFile(targetPath + "/relation/" + tdir, GzipCodec.class);
- }
- if (entities.stream().anyMatch("publication"::equalsIgnoreCase)) {
- // Extract Relation
- inputRDD
- .filter(SparkExtractEntitiesJob::isPublication)
- .saveAsTextFile(targetPath + "/publication/" + tdir, GzipCodec.class);
- }
- }
-
- public static boolean isDataset(final String json) {
- final String id = getJPathString(IDJSONPATH, json);
- if (StringUtils.isBlank(id))
- return false;
- return id.startsWith("60|");
- }
-
- public static boolean isPublication(final String json) {
- final String id = getJPathString(IDJSONPATH, json);
- if (StringUtils.isBlank(id))
- return false;
- return id.startsWith("50|");
- }
-
- public static boolean isUnknown(final String json) {
- final String id = getJPathString(IDJSONPATH, json);
- if (StringUtils.isBlank(id))
- return false;
- return id.startsWith("70|");
- }
-
- public static boolean isRelation(final String json) {
- final String source = getJPathString(SOURCEJSONPATH, json);
- final String target = getJPathString(TARGETJSONPATH, json);
- return StringUtils.isNotBlank(source) && StringUtils.isNotBlank(target);
- }
-
- public static String getJPathString(final String jsonPath, final String json) {
- try {
- Object o = JsonPath.read(json, jsonPath);
- if (o instanceof String)
- return (String) o;
- if (o instanceof JSONArray && ((JSONArray) o).size() > 0)
- return (String) ((JSONArray) o).get(0);
- return "";
- } catch (Exception e) {
- return "";
- }
- }
-}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala
new file mode 100644
index 0000000000..6ee575e2a8
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala
@@ -0,0 +1,90 @@
+package eu.dnetlib.dhp.sx.graph
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.schema.oaf.{Relation, Result}
+import org.apache.commons.io.IOUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.sql._
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+object SparkResolveRelation {
+ def main(args: Array[String]): Unit = {
+ val log: Logger = LoggerFactory.getLogger(getClass)
+ val conf: SparkConf = new SparkConf()
+ val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json")))
+ parser.parseArgument(args)
+ val spark: SparkSession =
+ SparkSession
+ .builder()
+ .config(conf)
+ .appName(getClass.getSimpleName)
+ .master(parser.get("master")).getOrCreate()
+
+
+ val relationPath = parser.get("relationPath")
+ log.info(s"sourcePath -> $relationPath")
+ val entityPath = parser.get("entityPath")
+ log.info(s"targetPath -> $entityPath")
+ val workingPath = parser.get("workingPath")
+ log.info(s"workingPath -> $workingPath")
+
+
+ implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
+ implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
+ import spark.implicits._
+ val entities:Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result]
+
+ entities.flatMap(e => e.getPid.asScala
+ .map(p =>
+ convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid))
+ .filter(s => s!= null)
+ .map(s => (s,e.getId))
+ ).groupByKey(_._1)
+ .reduceGroups((x,y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y)
+ .map(s =>s._2)
+ .write
+ .mode(SaveMode.Overwrite)
+ .save(s"$workingPath/resolvedPid")
+
+ val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/resolvedPid").as[(String,String)]
+
+ val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
+
+ relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_1")), "left").map{
+ m =>
+ val sourceResolved = m._2
+ val currentRelation = m._1._2
+ if (sourceResolved!=null && sourceResolved._2.nonEmpty)
+ currentRelation.setSource(sourceResolved._2)
+ currentRelation
+ }.write
+ .mode(SaveMode.Overwrite)
+ .save(s"$workingPath/resolvedSource")
+
+
+ val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/resolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
+ relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_1")), "left").map{
+ m =>
+ val targetResolved = m._2
+ val currentRelation = m._1._2
+ if (targetResolved!=null && targetResolved._2.nonEmpty)
+ currentRelation.setTarget(targetResolved._2)
+ currentRelation
+ }.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50"))
+ .write
+ .mode(SaveMode.Overwrite)
+ .save(s"$workingPath/resolvedRelation")
+ }
+
+
+
+
+ def convertPidToDNETIdentifier(pid:String, pidType: String):String = {
+ if (pid==null || pid.isEmpty || pidType== null || pidType.isEmpty)
+ null
+ else
+ s"unresolved::${pid.toLowerCase}::${pidType.toLowerCase}"
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java
deleted file mode 100644
index 7003b179dd..0000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSXGeneratePidSimlarity.java
+++ /dev/null
@@ -1,75 +0,0 @@
-
-package eu.dnetlib.dhp.sx.graph;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
-
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.utils.DHPUtils;
-import scala.Tuple2;
-
-/**
- * In some case the identifier generated for the Entity in @{@link SparkExtractEntitiesJob} is different from the
- * identifier * associated by the aggregator, this means that some relation points to missing identifier To avoid this
- * problem we store in the model the Id and the OriginalObJIdentifier This jobs extract this pair and creates a Similar
- * relation that will be used in SparkMergeEntities
- */
-public class SparkSXGeneratePidSimlarity {
-
- static final String IDJSONPATH = "$.id";
- static final String OBJIDPATH = "$.originalObjIdentifier";
-
- public static void generateDataFrame(
- final SparkSession spark,
- final JavaSparkContext sc,
- final String inputPath,
- final String targetPath) {
-
- final JavaPairRDD datasetSimRel = sc
- .textFile(inputPath + "/dataset/*")
- .mapToPair(
- (PairFunction) k -> new Tuple2<>(
- DHPUtils.getJPathString(IDJSONPATH, k),
- DHPUtils.getJPathString(OBJIDPATH, k)))
- .filter(
- t -> !StringUtils
- .substringAfter(t._1(), "|")
- .equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::")))
- .distinct();
-
- final JavaPairRDD publicationSimRel = sc
- .textFile(inputPath + "/publication/*")
- .mapToPair(
- (PairFunction) k -> new Tuple2<>(
- DHPUtils.getJPathString(IDJSONPATH, k),
- DHPUtils.getJPathString(OBJIDPATH, k)))
- .filter(
- t -> !StringUtils
- .substringAfter(t._1(), "|")
- .equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::")))
- .distinct();
-
- JavaRDD simRel = datasetSimRel
- .union(publicationSimRel)
- .map(
- s -> {
- final Relation r = new Relation();
- r.setSource(s._1());
- r.setTarget(s._2());
- r.setRelType("similar");
- return r;
- });
- spark
- .createDataset(simRel.rdd(), Encoders.bean(Relation.class))
- .distinct()
- .write()
- .mode(SaveMode.Overwrite)
- .save(targetPath + "/pid_simRel");
- }
-}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java
deleted file mode 100644
index 05fb826db0..0000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerCreateRawGraphJob.java
+++ /dev/null
@@ -1,256 +0,0 @@
-
-package eu.dnetlib.dhp.sx.graph;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.jayway.jsonpath.JsonPath;
-
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
-import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
-import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
-import eu.dnetlib.dhp.utils.DHPUtils;
-import net.minidev.json.JSONArray;
-import scala.Tuple2;
-
-/**
- * This job is responsible of the creation of RAW Graph It is applied to the different entities generated from
- * {@link SparkExtractEntitiesJob} In case of dataset, publication and Unknown Entities we group all the entities of the
- * same type by their identifier, and then in the reduce phase we merge all the entities. Merge means: -merge all the
- * metadata -merge the collected From values
- *
- * In case of relation we need to make a different work: -Phase 1: Map reduce jobs Map: Get all Relation and emit a key
- * constructed by (source, relType, Target) and the relation itself Reduce: Merge all relations Looking at the javadoc
- * of {@link SparkSXGeneratePidSimlarity} we take the dataset of pid relation and joining by source and target we
- * replace the wrong identifier in the relation with the correct ones. At the end we replace the new Dataset of Relation
- */
-public class SparkScholexplorerCreateRawGraphJob {
-
- static final String IDJSONPATH = "$.id";
- static final String SOURCEJSONPATH = "$.source";
- static final String TARGETJSONPATH = "$.target";
- static final String RELJSONPATH = "$.relType";
-
- public static void main(String[] args) throws Exception {
-
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- SparkScholexplorerCreateRawGraphJob.class
- .getResourceAsStream(
- "/eu/dnetlib/dhp/sx/graph/argumentparser/merge_entities_scholix_parameters.json")));
- parser.parseArgument(args);
- final SparkSession spark = SparkSession
- .builder()
- .config(
- new SparkConf()
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"))
- .appName(SparkScholexplorerCreateRawGraphJob.class.getSimpleName())
- .master(parser.get("master"))
- .getOrCreate();
- final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
- final String inputPath = parser.get("sourcePath");
- final String targetPath = parser.get("targetPath");
- final String entity = parser.get("entity");
- FileSystem fs = FileSystem.get(sc.sc().hadoopConfiguration());
- List subFolder = Arrays
- .stream(fs.listStatus(new Path(inputPath)))
- .filter(FileStatus::isDirectory)
- .map(FileStatus::getPath)
- .collect(Collectors.toList());
- List> inputRdd = new ArrayList<>();
- subFolder.forEach(p -> inputRdd.add(sc.textFile(p.toUri().getRawPath())));
- JavaRDD union = sc.emptyRDD();
- for (JavaRDD item : inputRdd) {
- union = union.union(item);
- }
- switch (entity) {
- case "dataset":
- union
- .mapToPair(
- (PairFunction) f -> {
- final String id = getJPathString(IDJSONPATH, f);
- ObjectMapper mapper = new ObjectMapper();
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- return new Tuple2<>(id, mapper.readValue(f, DLIDataset.class));
- })
- .reduceByKey(
- (a, b) -> {
- a.mergeFrom(b);
- return a;
- })
- .map(
- item -> {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.writeValueAsString(item._2());
- })
- .saveAsTextFile(targetPath, GzipCodec.class);
- break;
- case "publication":
- union
- .mapToPair(
- (PairFunction) f -> {
- final String id = getJPathString(IDJSONPATH, f);
- ObjectMapper mapper = new ObjectMapper();
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- return new Tuple2<>(id, mapper.readValue(f, DLIPublication.class));
- })
- .reduceByKey(
- (a, b) -> {
- a.mergeFrom(b);
- return a;
- })
- .map(
- item -> {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.writeValueAsString(item._2());
- })
- .saveAsTextFile(targetPath, GzipCodec.class);
- break;
- case "unknown":
- union
- .mapToPair(
- (PairFunction) f -> {
- final String id = getJPathString(IDJSONPATH, f);
- ObjectMapper mapper = new ObjectMapper();
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- return new Tuple2<>(id, mapper.readValue(f, DLIUnknown.class));
- })
- .reduceByKey(
- (a, b) -> {
- a.mergeFrom(b);
- return a;
- })
- .map(
- item -> {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.writeValueAsString(item._2());
- })
- .saveAsTextFile(targetPath, GzipCodec.class);
- break;
- case "relation":
- SparkSXGeneratePidSimlarity
- .generateDataFrame(
- spark, sc, inputPath.replace("/relation", ""), targetPath.replace("/relation", ""));
- RDD rdd = union
- .mapToPair(
- (PairFunction) f -> {
- final String source = getJPathString(SOURCEJSONPATH, f);
- final String target = getJPathString(TARGETJSONPATH, f);
- final String reltype = getJPathString(RELJSONPATH, f);
- ObjectMapper mapper = new ObjectMapper();
- mapper
- .configure(
- DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- return new Tuple2<>(
- DHPUtils
- .md5(
- String
- .format(
- "%s::%s::%s",
- source.toLowerCase(),
- reltype.toLowerCase(),
- target.toLowerCase())),
- mapper.readValue(f, Relation.class));
- })
- .reduceByKey(
- (a, b) -> {
- a.mergeFrom(b);
- return a;
- })
- .map(Tuple2::_2)
- .rdd();
-
- spark
- .createDataset(rdd, Encoders.bean(Relation.class))
- .write()
- .mode(SaveMode.Overwrite)
- .save(targetPath);
- Dataset rel_ds = spark.read().load(targetPath).as(Encoders.bean(Relation.class));
-
- System.out.println("LOADING PATH :" + targetPath.replace("/relation", "") + "/pid_simRel");
- Dataset sim_ds = spark
- .read()
- .load(targetPath.replace("/relation", "") + "/pid_simRel")
- .as(Encoders.bean(Relation.class));
-
- Dataset ids = sim_ds
- .map(
- (MapFunction) relation -> {
- final String type = StringUtils.substringBefore(relation.getSource(), "|");
- relation
- .setTarget(
- String
- .format(
- "%s|%s",
- type, StringUtils.substringAfter(relation.getTarget(), "::")));
- return relation;
- },
- Encoders.bean(Relation.class));
-
- final Dataset firstJoin = rel_ds
- .joinWith(ids, ids.col("target").equalTo(rel_ds.col("source")), "left_outer")
- .map(
- (MapFunction, Relation>) s -> {
- if (s._2() != null) {
- s._1().setSource(s._2().getSource());
- }
- return s._1();
- },
- Encoders.bean(Relation.class));
-
- Dataset secondJoin = firstJoin
- .joinWith(ids, ids.col("target").equalTo(firstJoin.col("target")), "left_outer")
- .map(
- (MapFunction, Relation>) s -> {
- if (s._2() != null) {
- s._1().setTarget(s._2().getSource());
- }
- return s._1();
- },
- Encoders.bean(Relation.class));
- secondJoin.write().mode(SaveMode.Overwrite).save(targetPath + "_fixed");
-
- FileSystem fileSystem = FileSystem.get(sc.hadoopConfiguration());
-
- fileSystem.delete(new Path(targetPath), true);
- fileSystem.rename(new Path(targetPath + "_fixed"), new Path(targetPath));
- }
- }
-
- public static String getJPathString(final String jsonPath, final String json) {
- try {
- Object o = JsonPath.read(json, jsonPath);
- if (o instanceof String)
- return (String) o;
- if (o instanceof JSONArray && ((JSONArray) o).size() > 0)
- return (String) ((JSONArray) o).get(0);
- return "";
- } catch (Exception e) {
- return "";
- }
- }
-}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporter.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporter.java
deleted file mode 100644
index 97f1251f0a..0000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerGraphImporter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-
-package eu.dnetlib.dhp.sx.graph;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.sql.SparkSession;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.sx.graph.parser.DatasetScholexplorerParser;
-import eu.dnetlib.dhp.sx.graph.parser.PublicationScholexplorerParser;
-import eu.dnetlib.scholexplorer.relation.RelationMapper;
-import scala.Tuple2;
-
-/**
- * This Job read a sequential File containing XML stored in the aggregator and generates an RDD of heterogeneous
- * entities like Dataset, Relation, Publication and Unknown
- */
-public class SparkScholexplorerGraphImporter {
-
- public static void main(String[] args) throws Exception {
-
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- SparkScholexplorerGraphImporter.class
- .getResourceAsStream(
- "/eu/dnetlib/dhp/sx/graph/argumentparser/input_graph_scholix_parameters.json")));
-
- parser.parseArgument(args);
- final SparkSession spark = SparkSession
- .builder()
- .appName(SparkScholexplorerGraphImporter.class.getSimpleName())
- .master(parser.get("master"))
- .getOrCreate();
- final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
- final String inputPath = parser.get("sourcePath");
-
- RelationMapper relationMapper = RelationMapper.load();
-
- sc
- .sequenceFile(inputPath, IntWritable.class, Text.class)
- .map(Tuple2::_2)
- .map(Text::toString)
- .repartition(500)
- .flatMap(
- (FlatMapFunction) record -> {
- switch (parser.get("entity")) {
- case "dataset":
- final DatasetScholexplorerParser d = new DatasetScholexplorerParser();
- return d.parseObject(record, relationMapper).iterator();
- case "publication":
- final PublicationScholexplorerParser p = new PublicationScholexplorerParser();
- return p.parseObject(record, relationMapper).iterator();
- default:
- throw new IllegalArgumentException("wrong values of entities");
- }
- })
- .map(
- k -> {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.writeValueAsString(k);
- })
- .saveAsTextFile(parser.get("targetPath"), GzipCodec.class);
- }
-}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala
deleted file mode 100644
index 9bdf26acb6..0000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala
+++ /dev/null
@@ -1,203 +0,0 @@
-package eu.dnetlib.dhp.sx.graph
-
-import eu.dnetlib.dhp.application.ArgumentApplicationParser
-import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
-import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown}
-import eu.dnetlib.dhp.sx.graph.ebi.EBIAggregator
-import org.apache.commons.io.IOUtils
-import org.apache.commons.lang3.StringUtils
-import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
-import org.slf4j.LoggerFactory
-import org.apache.spark.sql.functions.col
-
-
-object SparkSplitOafTODLIEntities {
-
-
- def getKeyRelation(rel:Relation):String = {
- s"${rel.getSource}::${rel.getRelType}::${rel.getTarget}"
-
-
- }
-
-
- def extract_dataset(spark:SparkSession, workingPath:String) :Unit = {
-
- implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
- implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
-
- val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf].repartition(4000)
-
- val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset].repartition(1000)
-
-
- OAFDataset
- .filter(s => s != null && s.isInstanceOf[DLIDataset])
- .map(s =>s.asInstanceOf[DLIDataset])
- .union(ebi_dataset)
- .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder))
- .groupByKey(_._1)(Encoders.STRING)
- .agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
- .map(p => p._2)
- .repartition(2000)
- .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset")
-
- }
-
- def extract_publication(spark:SparkSession, workingPath:String) :Unit = {
-
- implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
- implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
-
- val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
-
- val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication].repartition(1000)
-
-
- OAFDataset
- .filter(s => s != null && s.isInstanceOf[DLIPublication])
- .map(s =>s.asInstanceOf[DLIPublication])
- .union(ebi_publication)
- .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
- .groupByKey(_._1)(Encoders.STRING)
- .agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
- .map(p => p._2)
- .repartition(2000)
- .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication")
-
- }
-
- def extract_unknown(spark:SparkSession, workingPath:String) :Unit = {
-
- implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
- implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
-
- val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
-
- OAFDataset
- .filter(s => s != null && s.isInstanceOf[DLIUnknown])
- .map(s =>s.asInstanceOf[DLIUnknown])
- .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, unkEncoder))
- .groupByKey(_._1)(Encoders.STRING)
- .agg(EBIAggregator.getDLIUnknownAggregator().toColumn)
- .map(p => p._2)
- .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown")
-
- }
-
-
- def extract_ids(o:Oaf) :(String, String) = {
-
- o match {
- case p: DLIPublication =>
- val prefix = StringUtils.substringBefore(p.getId, "|")
- val original = StringUtils.substringAfter(p.getOriginalObjIdentifier, "::")
- (p.getId, s"$prefix|$original")
- case p: DLIDataset =>
- val prefix = StringUtils.substringBefore(p.getId, "|")
- val original = StringUtils.substringAfter(p.getOriginalObjIdentifier, "::")
- (p.getId, s"$prefix|$original")
- case _ =>null
- }
- }
-
- def extract_relations(spark:SparkSession, workingPath:String) :Unit = {
-
- implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
- implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation]
- import spark.implicits._
-
- val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
- val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation].repartition(2000)
-
-
- OAFDataset
- .filter(o => o.isInstanceOf[Result])
- .map(extract_ids)(Encoders.tuple(Encoders.STRING, Encoders.STRING))
- .filter(r => r != null)
- .where("_1 != _2")
- .select(col("_1").alias("newId"), col("_2").alias("oldId"))
- .distinct()
- .map(f => IdReplace(f.getString(0), f.getString(1)))
- .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/id_replace")
-
-
- OAFDataset
- .filter(s => s != null && s.isInstanceOf[Relation])
- .map(s =>s.asInstanceOf[Relation])
- .union(ebi_relation)
- .map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder))
- .groupByKey(_._1)(Encoders.STRING)
- .agg(EBIAggregator.getRelationAggregator().toColumn)
- .map(p => p._2)
- .repartition(4000)
- .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation_unfixed")
-
-
- val relations = spark.read.load(s"$workingPath/graph/relation_unfixed").as[Relation]
- val ids = spark.read.load(s"$workingPath/graph/id_replace").as[IdReplace]
-
- relations
- .map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder))
- .joinWith(ids, col("_1").equalTo(ids("oldId")), "left")
- .map(i =>{
- val r = i._1._2
- if (i._2 != null)
- {
- val id = i._2.newId
- r.setSource(id)
- }
- r
- }).write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/rel_f_source")
-
- val rel_source:Dataset[Relation] = spark.read.load(s"$workingPath/graph/rel_f_source").as[Relation]
-
- rel_source
- .map(r => (r.getTarget, r))(Encoders.tuple(Encoders.STRING, relEncoder))
- .joinWith(ids, col("_1").equalTo(ids("oldId")), "left")
- .map(i =>{
- val r:Relation = i._1._2
- if (i._2 != null)
- {
- val id = i._2.newId
- r.setTarget(id)
- }
- r
- }).write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")
-
-
-
- }
-
-
- def main(args: Array[String]): Unit = {
- val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")))
- val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass)
- parser.parseArgument(args)
-
- val workingPath: String = parser.get("workingPath")
- val entity:String = parser.get("entity")
- logger.info(s"Working dir path = $workingPath")
-
- val spark:SparkSession = SparkSession
- .builder()
- .appName(SparkSplitOafTODLIEntities.getClass.getSimpleName)
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .master(parser.get("master"))
- .getOrCreate()
-
-
- entity match {
- case "publication" => extract_publication(spark, workingPath)
- case "dataset" => extract_dataset(spark,workingPath)
- case "relation" => extract_relations(spark, workingPath)
- case "unknown" => extract_unknown(spark, workingPath)
- }
-
-
-
-
-
- }
-
-}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala
deleted file mode 100644
index c63ad43704..0000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-package eu.dnetlib.dhp.sx.graph
-
-import eu.dnetlib.dhp.application.ArgumentApplicationParser
-import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation}
-import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
-import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
-import eu.dnetlib.scholexplorer.relation.RelationMapper
-import org.apache.commons.io.IOUtils
-import org.apache.hadoop.io.{IntWritable, Text}
-import org.apache.spark.SparkConf
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-
-/**
- * This new version of the Job read a sequential File containing XML stored in the aggregator and generates a Dataset OAF of heterogeneous
- * entities like Dataset, Relation, Publication and Unknown
- */
-
-object SparkXMLToOAFDataset {
-
-
- def main(args: Array[String]): Unit = {
- val logger = LoggerFactory.getLogger(SparkXMLToOAFDataset.getClass)
- val conf = new SparkConf()
- val parser = new ArgumentApplicationParser(IOUtils.toString(SparkXMLToOAFDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_graph_scholix_parameters.json")))
- parser.parseArgument(args)
- val spark =
- SparkSession
- .builder()
- .config(conf)
- .appName(SparkXMLToOAFDataset.getClass.getSimpleName)
- .master(parser.get("master")).getOrCreate()
-
- val sc = spark.sparkContext
-
- implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
- implicit val datasetEncoder:Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
- implicit val publicationEncoder:Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
- implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation]
-
- val relationMapper = RelationMapper.load
-
- val inputPath: String = parser.get("sourcePath")
- val entity: String = parser.get("entity")
- val targetPath = parser.get("targetPath")
-
- logger.info(s"Input path is $inputPath")
- logger.info(s"Entity path is $entity")
- logger.info(s"Target Path is $targetPath")
-
- val scholixRdd:RDD[Oaf] = sc.sequenceFile(inputPath, classOf[IntWritable], classOf[Text])
- .map(s => s._2.toString)
- .flatMap(s => {
- entity match {
- case "publication" =>
- val p = new PublicationScholexplorerParser
- val l =p.parseObject(s, relationMapper)
- if (l != null) l.asScala else List()
- case "dataset" =>
- val d = new DatasetScholexplorerParser
- val l =d.parseObject(s, relationMapper)
- if (l != null) l.asScala else List()
- }
- }).filter(s => s!= null)
- spark.createDataset(scholixRdd).write.mode(SaveMode.Append).save(targetPath)
-
- }
-
-}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala
index b4314631a4..4c304848b9 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/ebi/SparkEBILinksToOaf.scala
@@ -11,9 +11,9 @@ import org.slf4j.{Logger, LoggerFactory}
object SparkEBILinksToOaf {
def main(args: Array[String]): Unit = {
- val log: Logger = LoggerFactory.getLogger(SparkEBILinksToOaf.getClass)
+ val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
- val parser = new ArgumentApplicationParser(IOUtils.toString(SparkEBILinksToOaf.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/ebi_to_df_params.json")))
+ val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/ebi/ebi_to_df_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extract_entities_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extract_entities_params.json
new file mode 100644
index 0000000000..8bfdde5b0d
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extract_entities_params.json
@@ -0,0 +1,5 @@
+[
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true},
+ {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json
new file mode 100644
index 0000000000..f211adb9a4
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json
@@ -0,0 +1,6 @@
+[
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"r", "paramLongName":"relationPath", "paramDescription": "the source Path", "paramRequired": true},
+ {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true},
+ {"paramName":"e", "paramLongName":"entityPath", "paramDescription": "the path of the raw graph", "paramRequired": true}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml
index c94394b1ee..4045e2dfb6 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml
@@ -1,121 +1,72 @@
-
+
- reuseContent
- false
- should import content from the aggregator or reuse a previous version
-
-
- workingPath
+ sourcePath
the working dir base path
- targetXMLPath
+ targetPath
the graph Raw base path
-
- targetEntityPath
- the graph Raw base path
-
-
- format
- the postgres URL to access to the database
-
-
- layout
- the user postgres
-
-
- interpretation
- the password postgres
-
-
- dbhost
- mongoDB url, example: mongodb://[username:password@]host[:port]
-
-
- dbName
- mongo database
-
-
- entity
- the entity type
-
-
-
-
-
- ${wf:conf('reuseContent') eq false}
- ${wf:conf('reuseContent') eq true}
-
-
-
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
-
-
-
-
-
-
-
-
-
-
- ${jobTracker}
- ${nameNode}
- eu.dnetlib.dhp.sx.graph.ImportDataFromMongo
- -t${targetXMLPath}
- -n${nameNode}
- -h${dbhost}
- -p27017
- -dn${dbName}
- -f${format}
- -l${layout}
- -i${interpretation}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
- ${jobTracker}
- ${nameNode}
- yarn-cluster
+ yarn
cluster
- Import ${entity} and related entities
- eu.dnetlib.dhp.sx.graph.SparkXMLToOAFDataset
+ Extract entities in raw graph
+ eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph
dhp-graph-mapper-${projectVersion}.jar
- --executor-memory ${sparkExecutorMemory}
+ --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
- ${sparkExtraOPT}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=2000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- -mt yarn
- --sourcePath${targetXMLPath}
- --targetPath${workingPath}/input/OAFDataset
- --entity${entity}
+ --masteryarn
+ --sourcePath${sourcePath}
+ --targetPath${targetPath}
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Resolve Relations in raw graph
+ eu.dnetlib.dhp.sx.graph.SparkResolveRelation
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=3000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --relationPath${targetPath}/extracted/relation
+ --workingPath${targetPath}/resolved/
+ --entityPath${targetPath}/dedup
+
\ No newline at end of file