diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala
index f8010bbd5..f35af0905 100644
--- a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala
+++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala
@@ -12,7 +12,7 @@ import org.json4s.jackson.JsonMethods.parse
import scala.collection.JavaConverters._
import scala.io.Source
-object ScholixUtils {
+object ScholixUtils extends Serializable {
val DNET_IDENTIFIER_SCHEMA: String = "DNET Identifier"
@@ -24,7 +24,7 @@ object ScholixUtils {
case class RelatedEntities(id: String, relatedDataset: Long, relatedPublication: Long) {}
val relations: Map[String, RelationVocabulary] = {
- val input = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/relations.json")).mkString
+ val input = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/scholexplorer/relation/relations.json")).mkString
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
@@ -62,6 +62,11 @@ object ScholixUtils {
}
+ def generateScholixResourceFromResult(r:Result) :ScholixResource = {
+ generateScholixResourceFromSummary(ScholixUtils.resultToSummary(r))
+ }
+
+
val statsAggregator: Aggregator[(String, String, Long), RelatedEntities, RelatedEntities] = new Aggregator[(String, String, Long), RelatedEntities, RelatedEntities] with Serializable {
override def zero: RelatedEntities = null
@@ -184,6 +189,19 @@ object ScholixUtils {
}
+ def generateCompleteScholix(scholix: Scholix, target: ScholixResource): Scholix = {
+ val s = new Scholix
+ s.setPublicationDate(scholix.getPublicationDate)
+ s.setPublisher(scholix.getPublisher)
+ s.setLinkprovider(scholix.getLinkprovider)
+ s.setRelationship(scholix.getRelationship)
+ s.setSource(scholix.getSource)
+ s.setTarget(target)
+ s.setIdentifier(DHPUtils.md5(s"${s.getSource.getIdentifier}::${s.getRelationship.getName}::${s.getTarget.getIdentifier}"))
+ s
+ }
+
+
def generateScholixResourceFromSummary(summaryObject: ScholixSummary): ScholixResource = {
val r = new ScholixResource
r.setIdentifier(summaryObject.getLocalIdentifier)
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/config-default.xml
new file mode 100644
index 000000000..bdd48b0ab
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/config-default.xml
@@ -0,0 +1,19 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/workflow.xml
new file mode 100644
index 000000000..751b124cf
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/workflow.xml
@@ -0,0 +1,62 @@
+
+
+
+ sourcePath
+ the source path of scholix graph
+
+
+ datacitePath
+ the datacite native path
+
+
+ workingSupportPath
+ the working Support path
+
+
+ isLookupUrl
+ The IS lookUp service endopoint
+
+
+ updateDS
+ false
+ The transformation Rule to apply
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ yarn
+ cluster
+ New Update from Datacite to Scholix
+ eu.dnetlib.dhp.sx.graph.SparkRetrieveDataciteDelta
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=6000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePath${sourcePath}
+ --datacitePath${datacitePath}
+ --masteryarn
+ --workingSupportPath${workingSupportPath}
+ --isLookupUrl${isLookupUrl}
+ --updateDS${updateDS}
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json
new file mode 100644
index 000000000..78777ffff
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json
@@ -0,0 +1,41 @@
+[
+ {
+ "paramName": "s",
+ "paramLongName": "sourcePath",
+ "paramDescription": "the source mdstore path",
+ "paramRequired": true
+ },
+
+ {
+ "paramName": "d",
+ "paramLongName": "datacitePath",
+ "paramDescription": "the datacite native path",
+ "paramRequired": true
+ },
+
+ {
+ "paramName": "w",
+ "paramLongName": "workingSupportPath",
+ "paramDescription": "the working Support path",
+ "paramRequired": true
+ },
+ {
+ "paramName": "i",
+ "paramLongName": "isLookupUrl",
+ "paramDescription": "the isLookup URL",
+ "paramRequired": true
+ },
+ {
+ "paramName": "m",
+ "paramLongName": "master",
+ "paramDescription": "the master name",
+ "paramRequired": true
+ },
+ {
+ "paramName": "u",
+ "paramLongName": "updateDS",
+ "paramDescription": "Need to regenerate all support Dataset",
+ "paramRequired": false
+ }
+
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala
index 7f37829ee..45a6cfc89 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala
@@ -9,9 +9,10 @@ import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource}
import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils
import eu.dnetlib.dhp.utils.{DHPUtils, ISLookupClientFactory}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
-import org.slf4j.Logger
+import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import java.text.SimpleDateFormat
@@ -58,14 +59,15 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L
def retrieveLastCollectedFrom(spark:SparkSession, entitiesPath:String):Long = {
log.info("Retrieve last entities collected From")
- implicit val oafEncoder:Encoder[Result] = Encoders.kryo[Result]
+ implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
+ implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result]
import spark.implicits._
- val entitiesDS = spark.read.load(s"$entitiesPath/*").as[Result]
+ val entitiesDS = spark.read.load(s"$entitiesPath/*").as[Oaf].filter(o =>o.isInstanceOf[Result]).map(r => r.asInstanceOf[Result])
val date = entitiesDS.filter(r => r.getDateofcollection!= null).map(_.getDateofcollection).select(max("value")).first.getString(0)
- ISO8601toEpochMillis(date)
+ ISO8601toEpochMillis(date) / 1000
}
@@ -73,7 +75,7 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L
* The method of update Datacite relationships on Scholexplorer
* needs some utilities data structures
* One is the scholixResource DS that stores all the nodes in the Scholix Graph
- * in format (dnetID, ScholixResource )
+ * in format ScholixResource
* @param summaryPath the path of the summary in Scholix
* @param workingPath the working path
* @param spark the spark session
@@ -84,11 +86,37 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L
log.info("Convert All summary to ScholixResource")
spark.read.load(summaryPath).as[ScholixSummary]
- .map(ScholixUtils.generateScholixResourceFromSummary)
+ .map(ScholixUtils.generateScholixResourceFromSummary)(scholixResourceEncoder)
.filter(r => r.getIdentifier!= null && r.getIdentifier.size>0)
- .map(r=> (r.getIdentifier,r))(Encoders.tuple(Encoders.STRING, scholixResourceEncoder))
- .write.mode(SaveMode.Overwrite).save(scholixResourcePath(workingPath))
+ .write.mode(SaveMode.Overwrite).save(s"${scholixResourcePath(workingPath)}_native")
+ }
+ /**
+ * This method convert the new Datacite Resource into Scholix Resource
+ * Needed to fill the source and the type of Scholix Relationships
+ * @param workingPath the Working Path
+ * @param spark The spark Session
+ */
+ def addMissingScholixResource(workingPath:String, spark:SparkSession ) :Unit = {
+ implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
+ implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource]
+ implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result]
+ import spark.implicits._
+
+ spark.read.load(dataciteOAFPath(workingPath)).as[Oaf]
+ .filter(_.isInstanceOf[Result])
+ .map(_.asInstanceOf[Result])
+ .map(ScholixUtils.generateScholixResourceFromResult)
+ .filter(r => r.getIdentifier!= null && r.getIdentifier.size>0)
+ .write.mode(SaveMode.Overwrite).save(s"${scholixResourcePath(workingPath)}_update")
+
+ val update = spark.read.load(s"${scholixResourcePath(workingPath)}_update").as[ScholixResource]
+ val native = spark.read.load(s"${scholixResourcePath(workingPath)}_native").as[ScholixResource]
+ val graph = update.union(native)
+ .groupByKey(_.getDnetIdentifier)
+ .reduceGroups((a,b) => if (a!= null && a.getDnetIdentifier!= null) a else b)
+ .map(_._2)
+ graph.write.mode(SaveMode.Overwrite).save(s"${scholixResourcePath(workingPath)}_graph")
}
@@ -102,29 +130,20 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L
* @param vocabularies Vocabularies needed for transformation
*/
- def getDataciteUpdate(datacitePath:String, timestamp:Long, workingPath:String, spark:SparkSession,vocabularies: VocabularyGroup): Unit = {
+ def getDataciteUpdate(datacitePath:String, timestamp:Long, workingPath:String, spark:SparkSession,vocabularies: VocabularyGroup): Long = {
import spark.implicits._
val ds = spark.read.load(datacitePath).as[DataciteType]
implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
- ds.filter(_.timestamp>=timestamp)
- .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks = true))
- .flatMap(i => fixRelations(i)).filter(i => i != null)
- .write.mode(SaveMode.Overwrite).save(dataciteOAFPath(workingPath))
-
+ val total = ds.filter(_.timestamp>=timestamp).count()
+ if (total >0) {
+ ds.filter(_.timestamp >= timestamp)
+ .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks = true))
+ .flatMap(i => fixRelations(i)).filter(i => i != null)
+ .write.mode(SaveMode.Overwrite).save(dataciteOAFPath(workingPath))
+ }
+ total
}
-
- /**
- * This method convert an Instance of OAF Result into
- * Scholix Resource
- * @param r The input Result
- * @return The Scholix Resource
- */
- def resultToScholixResource(r:Result):ScholixResource = {
- ScholixUtils.generateScholixResourceFromSummary(ScholixUtils.resultToSummary(r))
- }
-
-
/**
* After added the new ScholixResource, we need to update the scholix Pid Map
* to intersected with the new Datacite Relations
@@ -135,11 +154,11 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L
def generatePidMap(workingPath:String, spark:SparkSession ) :Unit = {
implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource]
import spark.implicits._
- spark.read.load(scholixResourcePath(workingPath)).as[(String,ScholixResource)]
+ spark.read.load(s"${scholixResourcePath(workingPath)}_graph").as[ScholixResource]
.flatMap(r=>
- r._2.getIdentifier.asScala
+ r.getIdentifier.asScala
.map(i =>DHPUtils.generateUnresolvedIdentifier(i.getIdentifier, i.getSchema))
- .map((_, r._1))
+ .map(t =>(t, r.getDnetIdentifier))
)(Encoders.tuple(Encoders.STRING, Encoders.STRING))
.groupByKey(_._1)
.reduceGroups((a,b) => if (a!= null && a._2!= null) a else b)
@@ -147,27 +166,6 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L
.write.mode(SaveMode.Overwrite).save(pidMapPath(workingPath))
}
- /**
- * This method convert the new Datacite Resource into Scholix Resource
- * Needed to fill the source and the type of Scholix Relationships
- * @param workingPath the Working Path
- * @param spark The spark Session
- */
- def addMissingScholixResource(workingPath:String, spark:SparkSession ) :Unit = {
- implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
- implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource]
- implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result]
-
- spark.read.load(dataciteOAFPath(workingPath)).as[Oaf]
- .filter(_.isInstanceOf[Result])
- .map(_.asInstanceOf[Result])
- .map(resultToScholixResource)
- .filter(r => r.getIdentifier!= null && r.getIdentifier.size>0)
- .map(r=> (r.getIdentifier,r))(Encoders.tuple(Encoders.STRING, scholixResourceEncoder))
- .write.mode(SaveMode.Append).save(scholixResourcePath(workingPath))
- }
-
-
/**
* This method resolve the datacite relation and filter the resolved
* relation
@@ -222,25 +220,26 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L
implicit val scholixEncoder:Encoder[Scholix] = Encoders.kryo[Scholix]
implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource]
implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation]
- import spark.implicits._
+ implicit val intermediateEncoder :Encoder[(String,Scholix)] = Encoders.tuple(Encoders.STRING, scholixEncoder)
- val relationss:Dataset[(String, Relation)] = spark.read.load(s"$workingPath/ResolvedRelation").as[Relation].map(r =>(r.getSource,r))(Encoders.tuple(Encoders.STRING, relationEncoder))
- val id_summary:Dataset[(String,ScholixResource)] = spark.read.load(scholixResourcePath(workingPath)).as[(String,ScholixResource)]
+ val relations:Dataset[(String, Relation)] = spark.read.load(resolvedRelationPath(workingPath)).as[Relation].map(r =>(r.getSource,r))(Encoders.tuple(Encoders.STRING, relationEncoder))
- relationss.joinWith(id_summary, relationss("_1").equalTo(id_summary("_1")),"inner")
- .map(t => (t._1._2.getTarget,ScholixUtils.scholixFromSource(t._1._2, t._2._2)))(Encoders.tuple(Encoders.STRING, scholixEncoder))
- .write.mode(SaveMode.Overwrite)
- .save(s"$workingPath/scholix_one_verse")
+ val id_summary:Dataset[(String,ScholixResource)] = spark.read.load(s"${scholixResourcePath(workingPath)}_graph").as[ScholixResource].map(r => (r.getDnetIdentifier,r))(Encoders.tuple(Encoders.STRING, scholixResourceEncoder))
- val source_scholix:Dataset[(String,Scholix)] = spark.read.load(s"$workingPath/scholix_one_verse").as[(String,Scholix)]
+ id_summary.cache()
+
+ relations.joinWith(id_summary, relations("_1").equalTo(id_summary("_1")),"inner")
+ .map(t => (t._1._2.getTarget,ScholixUtils.scholixFromSource(t._1._2, t._2._2)))
+ .write.mode(SaveMode.Overwrite).save(s"$workingPath/scholix_one_verse")
+
+ val source_scholix:Dataset[(String, Scholix)] =spark.read.load(s"$workingPath/scholix_one_verse").as[(String,Scholix)]
source_scholix.joinWith(id_summary, source_scholix("_1").equalTo(id_summary("_1")),"inner")
.map(t => {
- val target = t._2._2
- val scholix = t._1._2
- scholix.setTarget(target)
- scholix
+ val target:ScholixResource =t._2._2
+ val scholix:Scholix = t._1._2
+ ScholixUtils.generateCompleteScholix(scholix,target)
})(scholixEncoder).write.mode(SaveMode.Overwrite).save(s"$workingPath/scholix")
}
@@ -259,7 +258,7 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L
val datacitePath = parser.get("datacitePath")
log.info(s"DatacitePath is '$datacitePath'")
- val workingPath = parser.get("workingPath")
+ val workingPath = parser.get("workingSupportPath")
log.info(s"workingPath is '$workingPath'")
val isLookupUrl: String = parser.get("isLookupUrl")
@@ -279,13 +278,28 @@ class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:L
log.info("Retrieve last entities collected From starting from scholix Graph")
lastCollectionDate = retrieveLastCollectedFrom(spark, s"$sourcePath/entities")
}
+ else {
+ val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
+ fs.delete(new Path(s"${scholixResourcePath(workingPath)}_native"), true)
+ fs.rename(new Path(s"${scholixResourcePath(workingPath)}_graph"), new Path(s"${scholixResourcePath(workingPath)}_native"))
+ lastCollectionDate = retrieveLastCollectedFrom(spark, dataciteOAFPath(workingPath))
+ }
- getDataciteUpdate(datacitePath, lastCollectionDate, workingPath, spark, vocabularies)
- addMissingScholixResource(workingPath,spark)
- generatePidMap(workingPath, spark)
- resolveUpdateRelation(workingPath,spark)
- generateScholixUpdate(workingPath, spark)
-
-
+ val numRecords = getDataciteUpdate(datacitePath, lastCollectionDate, workingPath, spark, vocabularies)
+ if (numRecords>0) {
+ addMissingScholixResource(workingPath,spark)
+ generatePidMap(workingPath, spark)
+ resolveUpdateRelation(workingPath,spark)
+ generateScholixUpdate(workingPath, spark)
+ }
+ }
+}
+
+
+object SparkRetrieveDataciteDelta {
+ val log: Logger = LoggerFactory.getLogger(SparkRetrieveDataciteDelta.getClass)
+
+ def main(args: Array[String]): Unit = {
+ new SparkRetrieveDataciteDelta("/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json", args, log).initialize().run()
}
}
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java
index 707462f24..23909fd9a 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java
@@ -33,13 +33,14 @@ public class PrepareInfo implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareInfo.class);
// associate orgs with all their parent
- private static final String ORGANIZATION_ORGANIZATION_QUERY = "SELECT target key, collect_set(source) as valueSet " +
+ private static final String ORGANIZATION_ORGANIZATION_QUERY = "SELECT target key, collect_set(source) as valueSet "
+ +
"FROM relation " +
"WHERE lower(relclass) = '" + ModelConstants.IS_PARENT_OF.toLowerCase() +
"' and datainfo.deletedbyinference = false " +
"GROUP BY target";
- //associates results with all the orgs they are affiliated to
+ // associates results with all the orgs they are affiliated to
private static final String RESULT_ORGANIZATION_QUERY = "SELECT source key, collect_set(target) as valueSet " +
"FROM relation " +
"WHERE lower(relclass) = '" + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() +
@@ -88,7 +89,7 @@ public class PrepareInfo implements Serializable {
childParentPath,
leavesPath,
resultOrganizationPath,
- relationPath));
+ relationPath));
}
private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath,
@@ -113,13 +114,13 @@ public class PrepareInfo implements Serializable {
.json(resultOrganizationPath);
relation
- .filter(
- (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() &&
- r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION))
- .write()
- .mode(SaveMode.Overwrite)
- .option("compression","gzip")
- .json(relationPath);
+ .filter(
+ (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() &&
+ r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(relationPath);
Dataset children = spark
.sql(
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java
index e26276f53..9ceebf222 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java
@@ -2,7 +2,6 @@
package eu.dnetlib.dhp.resulttoorganizationfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*;
-
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable;
@@ -22,13 +21,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.KeyValueSet;
-
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
-
public class SparkResultToOrganizationFromSemRel implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromSemRel.class);
private static final int MAX_ITERATION = 5;
@@ -201,6 +198,4 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
.json(outputPath);
}
-
-
}
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java
index 02444cb15..1adbbe60e 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java
@@ -75,8 +75,8 @@ public class StepActions implements Serializable {
ret.setValueSet(orgs);
return ret;
}, Encoders.bean(KeyValueSet.class))
- .write()
- .mode(SaveMode.Overwrite)
+ .write()
+ .mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
@@ -179,7 +179,6 @@ public class StepActions implements Serializable {
"GROUP BY resId")
.as(Encoders.bean(KeyValueSet.class));
-
// create new relations from result to organization for each result linked to a leaf
return resultParent
.flatMap(
@@ -200,7 +199,6 @@ public class StepActions implements Serializable {
.iterator(),
Encoders.bean(Relation.class));
-
}
}
diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java
index 21d99321b..2d2668db3 100644
--- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java
+++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java
@@ -84,7 +84,7 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/",
- "-relationPath", workingDir.toString() + "/relation"
+ "-relationPath", workingDir.toString() + "/relation"
});
@@ -229,7 +229,7 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/",
- "-relationPath", workingDir.toString() + "/relation"
+ "-relationPath", workingDir.toString() + "/relation"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@@ -335,34 +335,35 @@ public class PrepareInfoJobTest {
}
@Test
- public void relationTest()throws Exception {
+ public void relationTest() throws Exception {
PrepareInfo
- .main(
- new String[] {
- "-isSparkSessionManaged", Boolean.FALSE.toString(),
- "-graphPath", getClass()
- .getResource(
- "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest")
- .getPath(),
- "-hive_metastore_uris", "",
- "-leavesPath", workingDir.toString() + "/currentIteration/",
- "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
- "-childParentPath", workingDir.toString() + "/childParentOrg/",
- "-relationPath", workingDir.toString() + "/relation"
+ .main(
+ new String[] {
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-graphPath", getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest")
+ .getPath(),
+ "-hive_metastore_uris", "",
+ "-leavesPath", workingDir.toString() + "/currentIteration/",
+ "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
+ "-childParentPath", workingDir.toString() + "/childParentOrg/",
+ "-relationPath", workingDir.toString() + "/relation"
- });
+ });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD tmp = sc
- .textFile(workingDir.toString() + "/relation")
- .map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
+ .textFile(workingDir.toString() + "/relation")
+ .map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
Assertions.assertEquals(7, verificationDs.count());
}
+
@Test
public void resultOrganizationTest1() throws Exception {
@@ -378,7 +379,7 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/",
- "-relationPath", workingDir.toString() + "/relation"
+ "-relationPath", workingDir.toString() + "/relation"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@@ -512,7 +513,7 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/",
- "-relationPath", workingDir.toString() + "/relation"
+ "-relationPath", workingDir.toString() + "/relation"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@@ -539,7 +540,7 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/",
- "-relationPath", workingDir.toString() + "/relation"
+ "-relationPath", workingDir.toString() + "/relation"
});
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json
index 4b15da623..890570a0b 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json
@@ -1,6 +1,7 @@
[
- {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
- {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true},
- {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true},
- {"paramName":"o", "paramLongName":"objectType", "paramDescription": "should be scholix or Summary", "paramRequired": true}
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true},
+ {"paramName":"su", "paramLongName":"scholixUpdatePath", "paramDescription": "the scholix updated Path", "paramRequired": false},
+ {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true},
+ {"paramName":"o", "paramLongName":"objectType", "paramDescription": "should be scholix or Summary", "paramRequired": true}
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml
index 85c0d486d..e46e59cc0 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml
@@ -90,68 +90,6 @@
--relationPath${targetPath}/relation
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- yarn
- cluster
- Serialize scholix to JSON
- eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson
- dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.shuffle.partitions=6000
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
-
- --masteryarn
- --sourcePath${targetPath}/provision/scholix/scholix
- --targetPath${targetPath}/index/scholix_json
- --objectTypescholix
-
-
-
-
-
-
-
-
- yarn
- cluster
- Serialize summary to JSON
- eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson
- dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.shuffle.partitions=6000
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
-
- --masteryarn
- --sourcePath${targetPath}/provision/summaries_filtered
- --targetPath${targetPath}/index/summaries_json
- --objectTypesummary
-
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/relations.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/relations.json
deleted file mode 100644
index 98e8daa18..000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/relations.json
+++ /dev/null
@@ -1,158 +0,0 @@
-{
- "cites":{
- "original":"Cites",
- "inverse":"IsCitedBy"
- },
- "compiles":{
- "original":"Compiles",
- "inverse":"IsCompiledBy"
- },
- "continues":{
- "original":"Continues",
- "inverse":"IsContinuedBy"
- },
- "derives":{
- "original":"IsSourceOf",
- "inverse":"IsDerivedFrom"
- },
- "describes":{
- "original":"Describes",
- "inverse":"IsDescribedBy"
- },
- "documents":{
- "original":"Documents",
- "inverse":"IsDocumentedBy"
- },
- "hasmetadata":{
- "original":"HasMetadata",
- "inverse":"IsMetadataOf"
- },
- "hasassociationwith":{
- "original":"HasAssociationWith",
- "inverse":"HasAssociationWith"
- },
- "haspart":{
- "original":"HasPart",
- "inverse":"IsPartOf"
- },
- "hasversion":{
- "original":"HasVersion",
- "inverse":"IsVersionOf"
- },
- "iscitedby":{
- "original":"IsCitedBy",
- "inverse":"Cites"
- },
- "iscompiledby":{
- "original":"IsCompiledBy",
- "inverse":"Compiles"
- },
- "iscontinuedby":{
- "original":"IsContinuedBy",
- "inverse":"Continues"
- },
- "isderivedfrom":{
- "original":"IsDerivedFrom",
- "inverse":"IsSourceOf"
- },
- "isdescribedby":{
- "original":"IsDescribedBy",
- "inverse":"Describes"
- },
- "isdocumentedby":{
- "original":"IsDocumentedBy",
- "inverse":"Documents"
- },
- "isidenticalto":{
- "original":"IsIdenticalTo",
- "inverse":"IsIdenticalTo"
- },
- "ismetadatafor":{
- "original":"IsMetadataFor",
- "inverse":"IsMetadataOf"
- },
- "ismetadataof":{
- "original":"IsMetadataOf",
- "inverse":"IsMetadataFor"
- },
- "isnewversionof":{
- "original":"IsNewVersionOf",
- "inverse":"IsPreviousVersionOf"
- },
- "isobsoletedby":{
- "original":"IsObsoletedBy",
- "inverse":"Obsoletes"
- },
- "isoriginalformof":{
- "original":"IsOriginalFormOf",
- "inverse":"IsVariantFormOf"
- },
- "ispartof":{
- "original":"IsPartOf",
- "inverse":"HasPart"
- },
- "ispreviousversionof":{
- "original":"IsPreviousVersionOf",
- "inverse":"IsNewVersionOf"
- },
- "isreferencedby":{
- "original":"IsReferencedBy",
- "inverse":"References"
- },
- "isrelatedto":{
- "original":"IsRelatedTo",
- "inverse":"IsRelatedTo"
- },
- "isrequiredby":{
- "original":"IsRequiredBy",
- "inverse":"Requires"
- },
- "isreviewedby":{
- "original":"IsReviewedBy",
- "inverse":"Reviews"
- },
- "issourceof":{
- "original":"IsSourceOf",
- "inverse":"IsDerivedFrom"
- },
- "issupplementedby":{
- "original":"IsSupplementedBy",
- "inverse":"IsSupplementTo"
- },
- "issupplementto":{
- "original":"IsSupplementTo",
- "inverse":"IsSupplementedBy"
- },
- "isvariantformof":{
- "original":"IsVariantFormOf",
- "inverse":"IsOriginalFormOf"
- },
- "isversionof":{
- "original":"IsVersionOf",
- "inverse":"HasVersion"
- },
- "obsoletes":{
- "original":"Obsoletes",
- "inverse":"IsObsoletedBy"
- },
- "references":{
- "original":"References",
- "inverse":"IsReferencedBy"
- },
- "requires":{
- "original":"Requires",
- "inverse":"IsRequiredBy"
- },
- "related":{
- "original":"IsRelatedTo",
- "inverse":"IsRelatedTo"
- },
- "reviews":{
- "original":"Reviews",
- "inverse":"IsReviewedBy"
- },
- "unknown":{
- "original":"Unknown",
- "inverse":"Unknown"
- }
-}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/config-default.xml
new file mode 100644
index 000000000..6fb2a1253
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/config-default.xml
@@ -0,0 +1,10 @@
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/workflow.xml
new file mode 100644
index 000000000..2844d7baa
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/workflow.xml
@@ -0,0 +1,83 @@
+
+
+
+ scholixUpdatePath
+ the working dir base path of the scholix updated
+
+
+ targetPath
+ the final graph path
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Serialize scholix to JSON
+ eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=6000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --sourcePath${targetPath}/provision/scholix/scholix
+ --targetPath${targetPath}/index/scholix_json
+ --scholixUpdatePath${scholixUpdatePath}
+ --objectTypescholix
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Serialize summary to JSON
+ eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=6000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --sourcePath${targetPath}/provision/summaries_filtered
+ --targetPath${targetPath}/index/summaries_json
+ --objectTypesummary
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala
index cc1b97fd6..0c54de7c8 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala
@@ -30,6 +30,9 @@ object SparkConvertObjectToJson {
log.info(s"targetPath -> $targetPath")
val objectType = parser.get("objectType")
log.info(s"objectType -> $objectType")
+ val scholixUpdatePath = parser.get("scholixUpdatePath")
+ log.info(s"scholixUpdatePath -> $scholixUpdatePath")
+
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
@@ -42,7 +45,8 @@ object SparkConvertObjectToJson {
case "scholix" =>
log.info("Serialize Scholix")
val d: Dataset[Scholix] = spark.read.load(sourcePath).as[Scholix]
- d.map(s => mapper.writeValueAsString(s))(Encoders.STRING).rdd.repartition(6000).saveAsTextFile(targetPath, classOf[GzipCodec])
+ val u :Dataset[Scholix]= spark.read.load(s"$scholixUpdatePath/scholix").as[Scholix]
+ d.union(u).repartition(8000).map(s => mapper.writeValueAsString(s))(Encoders.STRING).rdd.saveAsTextFile(targetPath, classOf[GzipCodec])
case "summary" =>
log.info("Serialize Summary")
val d: Dataset[ScholixSummary] = spark.read.load(sourcePath).as[ScholixSummary]