diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala
similarity index 83%
rename from dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala
rename to dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala
index bf81a26d46..f35af09056 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala
+++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala
@@ -12,7 +12,7 @@ import org.json4s.jackson.JsonMethods.parse
import scala.collection.JavaConverters._
import scala.io.Source
-object ScholixUtils {
+object ScholixUtils extends Serializable {
val DNET_IDENTIFIER_SCHEMA: String = "DNET Identifier"
@@ -24,7 +24,7 @@ object ScholixUtils {
case class RelatedEntities(id: String, relatedDataset: Long, relatedPublication: Long) {}
val relations: Map[String, RelationVocabulary] = {
- val input = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/relations.json")).mkString
+ val input = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/scholexplorer/relation/relations.json")).mkString
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
@@ -53,8 +53,6 @@ object ScholixUtils {
else {
summary.getDate.get(0)
}
-
-
}
def inverseRelationShip(rel: ScholixRelationship): ScholixRelationship = {
@@ -64,11 +62,15 @@ object ScholixUtils {
}
+ def generateScholixResourceFromResult(r:Result) :ScholixResource = {
+ generateScholixResourceFromSummary(ScholixUtils.resultToSummary(r))
+ }
+
+
val statsAggregator: Aggregator[(String, String, Long), RelatedEntities, RelatedEntities] = new Aggregator[(String, String, Long), RelatedEntities, RelatedEntities] with Serializable {
override def zero: RelatedEntities = null
override def reduce(b: RelatedEntities, a: (String, String, Long)): RelatedEntities = {
- val id = a._1
val relatedDataset = if ("dataset".equalsIgnoreCase(a._2)) a._3 else 0
val relatedPublication = if ("publication".equalsIgnoreCase(a._2)) a._3 else 0
@@ -142,6 +144,14 @@ object ScholixUtils {
}
+ def extractCollectedFrom(summary: ScholixResource): List[ScholixEntityId] = {
+ if (summary.getCollectedFrom != null && !summary.getCollectedFrom.isEmpty) {
+ val l: List[ScholixEntityId] = summary.getCollectedFrom.asScala.map {
+ d => new ScholixEntityId(d.getProvider.getName, d.getProvider.getIdentifiers)
+ }(collection.breakOut)
+ l
+ } else List()
+ }
def extractCollectedFrom(summary: ScholixSummary): List[ScholixEntityId] = {
if (summary.getDatasources != null && !summary.getDatasources.isEmpty) {
@@ -160,7 +170,7 @@ object ScholixUtils {
c =>
new ScholixEntityId(c.getValue, List(new ScholixIdentifier(c.getKey, DNET_IDENTIFIER_SCHEMA, null)).asJava)
- }(collection breakOut)
+ }.toList
l
} else List()
}
@@ -179,6 +189,19 @@ object ScholixUtils {
}
+ def generateCompleteScholix(scholix: Scholix, target: ScholixResource): Scholix = {
+ val s = new Scholix
+ s.setPublicationDate(scholix.getPublicationDate)
+ s.setPublisher(scholix.getPublisher)
+ s.setLinkprovider(scholix.getLinkprovider)
+ s.setRelationship(scholix.getRelationship)
+ s.setSource(scholix.getSource)
+ s.setTarget(target)
+ s.setIdentifier(DHPUtils.md5(s"${s.getSource.getIdentifier}::${s.getRelationship.getName}::${s.getTarget.getIdentifier}"))
+ s
+ }
+
+
def generateScholixResourceFromSummary(summaryObject: ScholixSummary): ScholixResource = {
val r = new ScholixResource
r.setIdentifier(summaryObject.getLocalIdentifier)
@@ -222,6 +245,38 @@ object ScholixUtils {
}
+
+ def scholixFromSource(relation: Relation, source: ScholixResource):Scholix = {
+ if (relation == null || source == null)
+ return null
+ val s = new Scholix
+ var l: List[ScholixEntityId] = extractCollectedFrom(relation)
+ if (l.isEmpty)
+ l = extractCollectedFrom(source)
+ if (l.isEmpty)
+ return null
+ s.setLinkprovider(l.asJava)
+ var d = extractRelationDate(relation)
+ if (d == null)
+ d = source.getPublicationDate
+
+ s.setPublicationDate(d)
+
+
+ if (source.getPublisher != null && !source.getPublisher.isEmpty) {
+ s.setPublisher(source.getPublisher)
+ }
+
+ val semanticRelation = relations.getOrElse(relation.getRelClass.toLowerCase, null)
+ if (semanticRelation == null)
+ return null
+ s.setRelationship(new ScholixRelationship(semanticRelation.original, "datacite", semanticRelation.inverse))
+ s.setSource(source)
+
+ s
+ }
+
+
def scholixFromSource(relation: Relation, source: ScholixSummary): Scholix = {
if (relation == null || source == null)
@@ -303,7 +358,7 @@ object ScholixUtils {
s.setSubType(r.getInstance().get(0).getInstancetype.getClassname)
if (r.getTitle != null && r.getTitle.asScala.nonEmpty) {
- val titles: List[String] = r.getTitle.asScala.map(t => t.getValue)(collection breakOut)
+ val titles: List[String] = r.getTitle.asScala.map(t => t.getValue).toList
if (titles.nonEmpty)
s.setTitle(titles.asJava)
else
@@ -311,12 +366,12 @@ object ScholixUtils {
}
if (r.getAuthor != null && !r.getAuthor.isEmpty) {
- val authors: List[String] = r.getAuthor.asScala.map(a => a.getFullname)(collection breakOut)
- if (authors nonEmpty)
+ val authors: List[String] = r.getAuthor.asScala.map(a => a.getFullname).toList
+ if (authors.nonEmpty)
s.setAuthor(authors.asJava)
}
if (r.getInstance() != null) {
- val dt: List[String] = r.getInstance().asScala.filter(i => i.getDateofacceptance != null).map(i => i.getDateofacceptance.getValue)(collection.breakOut)
+ val dt: List[String] = r.getInstance().asScala.filter(i => i.getDateofacceptance != null).map(i => i.getDateofacceptance.getValue).toList
if (dt.nonEmpty)
s.setDate(dt.distinct.asJava)
}
@@ -327,7 +382,7 @@ object ScholixUtils {
}
if (r.getSubject != null && !r.getSubject.isEmpty) {
- val subjects: List[SchemeValue] = r.getSubject.asScala.map(s => new SchemeValue(s.getQualifier.getClassname, s.getValue))(collection breakOut)
+ val subjects: List[SchemeValue] = r.getSubject.asScala.map(s => new SchemeValue(s.getQualifier.getClassname, s.getValue)).toList
if (subjects.nonEmpty)
s.setSubject(subjects.asJava)
}
@@ -336,7 +391,7 @@ object ScholixUtils {
s.setPublisher(List(r.getPublisher.getValue).asJava)
if (r.getCollectedfrom != null && !r.getCollectedfrom.isEmpty) {
- val cf: List[CollectedFromType] = r.getCollectedfrom.asScala.map(c => new CollectedFromType(c.getValue, c.getKey, "complete"))(collection breakOut)
+ val cf: List[CollectedFromType] = r.getCollectedfrom.asScala.map(c => new CollectedFromType(c.getValue, c.getKey, "complete")).toList
if (cf.nonEmpty)
s.setDatasources(cf.distinct.asJava)
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/config-default.xml
new file mode 100644
index 0000000000..bdd48b0ab2
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/config-default.xml
@@ -0,0 +1,19 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/workflow.xml
new file mode 100644
index 0000000000..751b124cfd
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/datacite/oozie_app/workflow.xml
@@ -0,0 +1,62 @@
+
+
+
+ sourcePath
+ the source path of scholix graph
+
+
+ datacitePath
+ the datacite native path
+
+
+ workingSupportPath
+ the working Support path
+
+
+ isLookupUrl
+ The IS lookUp service endopoint
+
+
+ updateDS
+ false
+ The transformation Rule to apply
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ yarn
+ cluster
+ New Update from Datacite to Scholix
+ eu.dnetlib.dhp.sx.graph.SparkRetrieveDataciteDelta
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=6000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePath${sourcePath}
+ --datacitePath${datacitePath}
+ --masteryarn
+ --workingSupportPath${workingSupportPath}
+ --isLookupUrl${isLookupUrl}
+ --updateDS${updateDS}
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json
new file mode 100644
index 0000000000..78777ffff2
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json
@@ -0,0 +1,41 @@
+[
+ {
+ "paramName": "s",
+ "paramLongName": "sourcePath",
+ "paramDescription": "the source mdstore path",
+ "paramRequired": true
+ },
+
+ {
+ "paramName": "d",
+ "paramLongName": "datacitePath",
+ "paramDescription": "the datacite native path",
+ "paramRequired": true
+ },
+
+ {
+ "paramName": "w",
+ "paramLongName": "workingSupportPath",
+ "paramDescription": "the working Support path",
+ "paramRequired": true
+ },
+ {
+ "paramName": "i",
+ "paramLongName": "isLookupUrl",
+ "paramDescription": "the isLookup URL",
+ "paramRequired": true
+ },
+ {
+ "paramName": "m",
+ "paramLongName": "master",
+ "paramDescription": "the master name",
+ "paramRequired": true
+ },
+ {
+ "paramName": "u",
+ "paramLongName": "updateDS",
+ "paramDescription": "Need to regenerate all support Dataset",
+ "paramRequired": false
+ }
+
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala
new file mode 100644
index 0000000000..45a6cfc89f
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkRetrieveDataciteDelta.scala
@@ -0,0 +1,305 @@
+package eu.dnetlib.dhp.sx.graph
+
+import eu.dnetlib.dhp.application.AbstractScalaApplication
+import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations
+import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
+import eu.dnetlib.dhp.datacite.{DataciteToOAFTransformation, DataciteType}
+import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
+import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource}
+import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
+import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils
+import eu.dnetlib.dhp.utils.{DHPUtils, ISLookupClientFactory}
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.spark.sql.functions.max
+import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+import java.text.SimpleDateFormat
+
+class SparkRetrieveDataciteDelta (propertyPath:String, args:Array[String], log:Logger) extends AbstractScalaApplication(propertyPath, args, log:Logger) {
+
+ val ISO_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ssZ"
+ val simpleFormatter = new SimpleDateFormat(ISO_DATE_PATTERN)
+
+ val SCHOLIX_RESOURCE_PATH_NAME = "scholixResource"
+ val DATACITE_OAF_PATH_NAME = "dataciteOAFUpdate"
+ val PID_MAP_PATH_NAME = "pidMap"
+ val RESOLVED_REL_PATH_NAME ="resolvedRelation"
+ val SCHOLIX_PATH_NAME = "scholix"
+
+
+ def scholixResourcePath(workingPath:String) = s"$workingPath/$SCHOLIX_RESOURCE_PATH_NAME"
+ def dataciteOAFPath(workingPath:String) = s"$workingPath/$DATACITE_OAF_PATH_NAME"
+ def pidMapPath(workingPath:String) = s"$workingPath/$PID_MAP_PATH_NAME"
+ def resolvedRelationPath(workingPath:String) = s"$workingPath/$RESOLVED_REL_PATH_NAME"
+ def scholixPath(workingPath:String) = s"$workingPath/$SCHOLIX_PATH_NAME"
+
+
+ /**
+ * Utility to parse Date in ISO8601 to epochMillis
+ * @param inputDate The String represents an input date in ISO8601
+ * @return The relative epochMillis of parsed date
+ */
+ def ISO8601toEpochMillis(inputDate:String):Long = {
+ simpleFormatter.parse(inputDate).getTime
+ }
+
+
+ /**
+ * This method tries to retrieve the last collection date from all datacite
+ * records in HDFS.
+ * This method should be called before indexing scholexplorer to retrieve
+ * the delta of Datacite record to download, since from the generation of
+ * raw graph to the generation of Scholexplorer sometimes it takes 20 days
+ * @param spark
+ * @param entitiesPath
+ * @return the last collection date from the current scholexplorer Graph of the datacite records
+ */
+ def retrieveLastCollectedFrom(spark:SparkSession, entitiesPath:String):Long = {
+ log.info("Retrieve last entities collected From")
+
+ implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
+ implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result]
+ import spark.implicits._
+
+ val entitiesDS = spark.read.load(s"$entitiesPath/*").as[Oaf].filter(o =>o.isInstanceOf[Result]).map(r => r.asInstanceOf[Result])
+
+ val date = entitiesDS.filter(r => r.getDateofcollection!= null).map(_.getDateofcollection).select(max("value")).first.getString(0)
+
+ ISO8601toEpochMillis(date) / 1000
+ }
+
+
+ /**
+ * The method of update Datacite relationships on Scholexplorer
+ * needs some utilities data structures
+ * One is the scholixResource DS that stores all the nodes in the Scholix Graph
+ * in format ScholixResource
+ * @param summaryPath the path of the summary in Scholix
+ * @param workingPath the working path
+ * @param spark the spark session
+ */
+ def generateScholixResource(summaryPath:String, workingPath: String, spark:SparkSession) :Unit = {
+ implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
+ implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource]
+
+ log.info("Convert All summary to ScholixResource")
+ spark.read.load(summaryPath).as[ScholixSummary]
+ .map(ScholixUtils.generateScholixResourceFromSummary)(scholixResourceEncoder)
+ .filter(r => r.getIdentifier!= null && r.getIdentifier.size>0)
+ .write.mode(SaveMode.Overwrite).save(s"${scholixResourcePath(workingPath)}_native")
+ }
+
+ /**
+ * This method convert the new Datacite Resource into Scholix Resource
+ * Needed to fill the source and the type of Scholix Relationships
+ * @param workingPath the Working Path
+ * @param spark The spark Session
+ */
+ def addMissingScholixResource(workingPath:String, spark:SparkSession ) :Unit = {
+ implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
+ implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource]
+ implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result]
+ import spark.implicits._
+
+ spark.read.load(dataciteOAFPath(workingPath)).as[Oaf]
+ .filter(_.isInstanceOf[Result])
+ .map(_.asInstanceOf[Result])
+ .map(ScholixUtils.generateScholixResourceFromResult)
+ .filter(r => r.getIdentifier!= null && r.getIdentifier.size>0)
+ .write.mode(SaveMode.Overwrite).save(s"${scholixResourcePath(workingPath)}_update")
+
+ val update = spark.read.load(s"${scholixResourcePath(workingPath)}_update").as[ScholixResource]
+ val native = spark.read.load(s"${scholixResourcePath(workingPath)}_native").as[ScholixResource]
+ val graph = update.union(native)
+ .groupByKey(_.getDnetIdentifier)
+ .reduceGroups((a,b) => if (a!= null && a.getDnetIdentifier!= null) a else b)
+ .map(_._2)
+ graph.write.mode(SaveMode.Overwrite).save(s"${scholixResourcePath(workingPath)}_graph")
+ }
+
+
+ /**
+ * This method get and Transform only datacite records with
+ * timestamp greater than timestamp
+ * @param datacitePath the datacite input Path
+ * @param timestamp the timestamp
+ * @param workingPath the working path where save the generated Dataset
+ * @param spark SparkSession
+ * @param vocabularies Vocabularies needed for transformation
+ */
+
+ def getDataciteUpdate(datacitePath:String, timestamp:Long, workingPath:String, spark:SparkSession,vocabularies: VocabularyGroup): Long = {
+ import spark.implicits._
+ val ds = spark.read.load(datacitePath).as[DataciteType]
+ implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
+ val total = ds.filter(_.timestamp>=timestamp).count()
+ if (total >0) {
+ ds.filter(_.timestamp >= timestamp)
+ .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks = true))
+ .flatMap(i => fixRelations(i)).filter(i => i != null)
+ .write.mode(SaveMode.Overwrite).save(dataciteOAFPath(workingPath))
+ }
+ total
+ }
+
+ /**
+ * After added the new ScholixResource, we need to update the scholix Pid Map
+ * to intersected with the new Datacite Relations
+
+ * @param workingPath The working Path starting from save the new Map
+ * @param spark the spark session
+ */
+ def generatePidMap(workingPath:String, spark:SparkSession ) :Unit = {
+ implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource]
+ import spark.implicits._
+ spark.read.load(s"${scholixResourcePath(workingPath)}_graph").as[ScholixResource]
+ .flatMap(r=>
+ r.getIdentifier.asScala
+ .map(i =>DHPUtils.generateUnresolvedIdentifier(i.getIdentifier, i.getSchema))
+ .map(t =>(t, r.getDnetIdentifier))
+ )(Encoders.tuple(Encoders.STRING, Encoders.STRING))
+ .groupByKey(_._1)
+ .reduceGroups((a,b) => if (a!= null && a._2!= null) a else b)
+ .map(_._2)(Encoders.tuple(Encoders.STRING, Encoders.STRING))
+ .write.mode(SaveMode.Overwrite).save(pidMapPath(workingPath))
+ }
+
+ /**
+ * This method resolve the datacite relation and filter the resolved
+ * relation
+ * @param workingPath the working path
+ * @param spark the spark session
+ */
+
+ def resolveUpdateRelation(workingPath:String, spark:SparkSession) :Unit = {
+ implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
+ implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation]
+ import spark.implicits._
+
+ val pidMap = spark.read.load(pidMapPath(workingPath)).as[(String,String)]
+
+ val unresolvedRelations:Dataset[(String,Relation)] = spark.read.load(dataciteOAFPath(workingPath)).as[Oaf]
+ .filter(_.isInstanceOf[Relation])
+ .map(_.asInstanceOf[Relation])
+ .map { r =>
+ if (r.getSource.startsWith("unresolved"))
+ (r.getSource, r)
+ else
+ (r.getTarget, r)
+ }(Encoders.tuple(Encoders.STRING, relationEncoder))
+
+ unresolvedRelations
+ .joinWith(pidMap, unresolvedRelations("_1").equalTo(pidMap("_1")))
+ .map(t => {
+ val r =t._1._2
+ val resolvedIdentifier = t._2._2
+ if (r.getSource.startsWith("unresolved"))
+ r.setSource(resolvedIdentifier)
+ else
+ r.setTarget(resolvedIdentifier)
+ r
+ })(relationEncoder)
+ .filter(r => !(r.getSource.startsWith("unresolved") || r.getTarget.startsWith("unresolved") ))
+ .write.mode(SaveMode.Overwrite)
+ .save(resolvedRelationPath(workingPath))
+ }
+
+
+
+ /**
+ * This method generate scholix starting from resolved relation
+ *
+ *
+ * @param workingPath
+ * @param spark
+ */
+ def generateScholixUpdate(workingPath:String, spark:SparkSession) :Unit = {
+ implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
+ implicit val scholixEncoder:Encoder[Scholix] = Encoders.kryo[Scholix]
+ implicit val scholixResourceEncoder:Encoder[ScholixResource] = Encoders.kryo[ScholixResource]
+ implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation]
+ implicit val intermediateEncoder :Encoder[(String,Scholix)] = Encoders.tuple(Encoders.STRING, scholixEncoder)
+
+
+ val relations:Dataset[(String, Relation)] = spark.read.load(resolvedRelationPath(workingPath)).as[Relation].map(r =>(r.getSource,r))(Encoders.tuple(Encoders.STRING, relationEncoder))
+
+ val id_summary:Dataset[(String,ScholixResource)] = spark.read.load(s"${scholixResourcePath(workingPath)}_graph").as[ScholixResource].map(r => (r.getDnetIdentifier,r))(Encoders.tuple(Encoders.STRING, scholixResourceEncoder))
+
+ id_summary.cache()
+
+ relations.joinWith(id_summary, relations("_1").equalTo(id_summary("_1")),"inner")
+ .map(t => (t._1._2.getTarget,ScholixUtils.scholixFromSource(t._1._2, t._2._2)))
+ .write.mode(SaveMode.Overwrite).save(s"$workingPath/scholix_one_verse")
+
+ val source_scholix:Dataset[(String, Scholix)] =spark.read.load(s"$workingPath/scholix_one_verse").as[(String,Scholix)]
+
+ source_scholix.joinWith(id_summary, source_scholix("_1").equalTo(id_summary("_1")),"inner")
+ .map(t => {
+ val target:ScholixResource =t._2._2
+ val scholix:Scholix = t._1._2
+ ScholixUtils.generateCompleteScholix(scholix,target)
+ })(scholixEncoder).write.mode(SaveMode.Overwrite).save(s"$workingPath/scholix")
+ }
+
+
+
+
+
+ /**
+ * Here all the spark applications runs this method
+ * where the whole logic of the spark node is defined
+ */
+ override def run(): Unit = {
+ val sourcePath = parser.get("sourcePath")
+ log.info(s"SourcePath is '$sourcePath'")
+
+ val datacitePath = parser.get("datacitePath")
+ log.info(s"DatacitePath is '$datacitePath'")
+
+ val workingPath = parser.get("workingSupportPath")
+ log.info(s"workingPath is '$workingPath'")
+
+ val isLookupUrl: String = parser.get("isLookupUrl")
+ log.info("isLookupUrl: {}", isLookupUrl)
+
+ val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
+ val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
+ require(vocabularies != null)
+
+
+ val updateDS:Boolean = "true".equalsIgnoreCase(parser.get("updateDS"))
+ log.info(s"updateDS is '$updateDS'")
+
+ var lastCollectionDate = 0L
+ if (updateDS) {
+ generateScholixResource(s"$sourcePath/provision/summaries", workingPath, spark)
+ log.info("Retrieve last entities collected From starting from scholix Graph")
+ lastCollectionDate = retrieveLastCollectedFrom(spark, s"$sourcePath/entities")
+ }
+ else {
+ val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
+ fs.delete(new Path(s"${scholixResourcePath(workingPath)}_native"), true)
+ fs.rename(new Path(s"${scholixResourcePath(workingPath)}_graph"), new Path(s"${scholixResourcePath(workingPath)}_native"))
+ lastCollectionDate = retrieveLastCollectedFrom(spark, dataciteOAFPath(workingPath))
+ }
+
+ val numRecords = getDataciteUpdate(datacitePath, lastCollectionDate, workingPath, spark, vocabularies)
+ if (numRecords>0) {
+ addMissingScholixResource(workingPath,spark)
+ generatePidMap(workingPath, spark)
+ resolveUpdateRelation(workingPath,spark)
+ generateScholixUpdate(workingPath, spark)
+ }
+ }
+}
+
+
+object SparkRetrieveDataciteDelta {
+ val log: Logger = LoggerFactory.getLogger(SparkRetrieveDataciteDelta.getClass)
+
+ def main(args: Array[String]): Unit = {
+ new SparkRetrieveDataciteDelta("/eu/dnetlib/dhp/sx/graph/retrieve_datacite_delta_params.json", args, log).initialize().run()
+ }
+}
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java
index 707462f24e..23909fd9a7 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java
@@ -33,13 +33,14 @@ public class PrepareInfo implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareInfo.class);
// associate orgs with all their parent
- private static final String ORGANIZATION_ORGANIZATION_QUERY = "SELECT target key, collect_set(source) as valueSet " +
+ private static final String ORGANIZATION_ORGANIZATION_QUERY = "SELECT target key, collect_set(source) as valueSet "
+ +
"FROM relation " +
"WHERE lower(relclass) = '" + ModelConstants.IS_PARENT_OF.toLowerCase() +
"' and datainfo.deletedbyinference = false " +
"GROUP BY target";
- //associates results with all the orgs they are affiliated to
+ // associates results with all the orgs they are affiliated to
private static final String RESULT_ORGANIZATION_QUERY = "SELECT source key, collect_set(target) as valueSet " +
"FROM relation " +
"WHERE lower(relclass) = '" + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() +
@@ -88,7 +89,7 @@ public class PrepareInfo implements Serializable {
childParentPath,
leavesPath,
resultOrganizationPath,
- relationPath));
+ relationPath));
}
private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath,
@@ -113,13 +114,13 @@ public class PrepareInfo implements Serializable {
.json(resultOrganizationPath);
relation
- .filter(
- (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() &&
- r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION))
- .write()
- .mode(SaveMode.Overwrite)
- .option("compression","gzip")
- .json(relationPath);
+ .filter(
+ (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() &&
+ r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(relationPath);
Dataset children = spark
.sql(
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java
index e26276f53f..9ceebf222b 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java
@@ -2,7 +2,6 @@
package eu.dnetlib.dhp.resulttoorganizationfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*;
-
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable;
@@ -22,13 +21,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.KeyValueSet;
-
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
-
public class SparkResultToOrganizationFromSemRel implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromSemRel.class);
private static final int MAX_ITERATION = 5;
@@ -201,6 +198,4 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
.json(outputPath);
}
-
-
}
diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java
index 02444cb152..1adbbe60ea 100644
--- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java
+++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java
@@ -75,8 +75,8 @@ public class StepActions implements Serializable {
ret.setValueSet(orgs);
return ret;
}, Encoders.bean(KeyValueSet.class))
- .write()
- .mode(SaveMode.Overwrite)
+ .write()
+ .mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
@@ -179,7 +179,6 @@ public class StepActions implements Serializable {
"GROUP BY resId")
.as(Encoders.bean(KeyValueSet.class));
-
// create new relations from result to organization for each result linked to a leaf
return resultParent
.flatMap(
@@ -200,7 +199,6 @@ public class StepActions implements Serializable {
.iterator(),
Encoders.bean(Relation.class));
-
}
}
diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java
index 21d99321bf..2d2668db34 100644
--- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java
+++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java
@@ -84,7 +84,7 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/",
- "-relationPath", workingDir.toString() + "/relation"
+ "-relationPath", workingDir.toString() + "/relation"
});
@@ -229,7 +229,7 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/",
- "-relationPath", workingDir.toString() + "/relation"
+ "-relationPath", workingDir.toString() + "/relation"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@@ -335,34 +335,35 @@ public class PrepareInfoJobTest {
}
@Test
- public void relationTest()throws Exception {
+ public void relationTest() throws Exception {
PrepareInfo
- .main(
- new String[] {
- "-isSparkSessionManaged", Boolean.FALSE.toString(),
- "-graphPath", getClass()
- .getResource(
- "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest")
- .getPath(),
- "-hive_metastore_uris", "",
- "-leavesPath", workingDir.toString() + "/currentIteration/",
- "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
- "-childParentPath", workingDir.toString() + "/childParentOrg/",
- "-relationPath", workingDir.toString() + "/relation"
+ .main(
+ new String[] {
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-graphPath", getClass()
+ .getResource(
+ "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest")
+ .getPath(),
+ "-hive_metastore_uris", "",
+ "-leavesPath", workingDir.toString() + "/currentIteration/",
+ "-resultOrgPath", workingDir.toString() + "/resultOrganization/",
+ "-childParentPath", workingDir.toString() + "/childParentOrg/",
+ "-relationPath", workingDir.toString() + "/relation"
- });
+ });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD tmp = sc
- .textFile(workingDir.toString() + "/relation")
- .map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
+ .textFile(workingDir.toString() + "/relation")
+ .map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
Assertions.assertEquals(7, verificationDs.count());
}
+
@Test
public void resultOrganizationTest1() throws Exception {
@@ -378,7 +379,7 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/",
- "-relationPath", workingDir.toString() + "/relation"
+ "-relationPath", workingDir.toString() + "/relation"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@@ -512,7 +513,7 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/",
- "-relationPath", workingDir.toString() + "/relation"
+ "-relationPath", workingDir.toString() + "/relation"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@@ -539,7 +540,7 @@ public class PrepareInfoJobTest {
"-leavesPath", workingDir.toString() + "/currentIteration/",
"-resultOrgPath", workingDir.toString() + "/resultOrganization/",
"-childParentPath", workingDir.toString() + "/childParentOrg/",
- "-relationPath", workingDir.toString() + "/relation"
+ "-relationPath", workingDir.toString() + "/relation"
});
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json
index 4b15da6232..890570a0bd 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_object_json_params.json
@@ -1,6 +1,7 @@
[
- {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
- {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true},
- {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true},
- {"paramName":"o", "paramLongName":"objectType", "paramDescription": "should be scholix or Summary", "paramRequired": true}
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true},
+ {"paramName":"su", "paramLongName":"scholixUpdatePath", "paramDescription": "the scholix updated Path", "paramRequired": false},
+ {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true},
+ {"paramName":"o", "paramLongName":"objectType", "paramDescription": "should be scholix or Summary", "paramRequired": true}
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml
index 17996c82c0..e46e59cc00 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml
@@ -79,7 +79,7 @@
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.shuffle.partitions=20000
+ --conf spark.sql.shuffle.partitions=30000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
@@ -90,68 +90,6 @@
--relationPath${targetPath}/relation
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- yarn
- cluster
- Serialize scholix to JSON
- eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson
- dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.shuffle.partitions=6000
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
-
- --masteryarn
- --sourcePath${targetPath}/provision/scholix/scholix
- --targetPath${targetPath}/index/scholix_json
- --objectTypescholix
-
-
-
-
-
-
-
-
- yarn
- cluster
- Serialize summary to JSON
- eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson
- dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.shuffle.partitions=6000
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
-
- --masteryarn
- --sourcePath${targetPath}/provision/summaries_filtered
- --targetPath${targetPath}/index/summaries_json
- --objectTypesummary
-
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/relations.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/relations.json
deleted file mode 100644
index 98e8daa18c..0000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/relations.json
+++ /dev/null
@@ -1,158 +0,0 @@
-{
- "cites":{
- "original":"Cites",
- "inverse":"IsCitedBy"
- },
- "compiles":{
- "original":"Compiles",
- "inverse":"IsCompiledBy"
- },
- "continues":{
- "original":"Continues",
- "inverse":"IsContinuedBy"
- },
- "derives":{
- "original":"IsSourceOf",
- "inverse":"IsDerivedFrom"
- },
- "describes":{
- "original":"Describes",
- "inverse":"IsDescribedBy"
- },
- "documents":{
- "original":"Documents",
- "inverse":"IsDocumentedBy"
- },
- "hasmetadata":{
- "original":"HasMetadata",
- "inverse":"IsMetadataOf"
- },
- "hasassociationwith":{
- "original":"HasAssociationWith",
- "inverse":"HasAssociationWith"
- },
- "haspart":{
- "original":"HasPart",
- "inverse":"IsPartOf"
- },
- "hasversion":{
- "original":"HasVersion",
- "inverse":"IsVersionOf"
- },
- "iscitedby":{
- "original":"IsCitedBy",
- "inverse":"Cites"
- },
- "iscompiledby":{
- "original":"IsCompiledBy",
- "inverse":"Compiles"
- },
- "iscontinuedby":{
- "original":"IsContinuedBy",
- "inverse":"Continues"
- },
- "isderivedfrom":{
- "original":"IsDerivedFrom",
- "inverse":"IsSourceOf"
- },
- "isdescribedby":{
- "original":"IsDescribedBy",
- "inverse":"Describes"
- },
- "isdocumentedby":{
- "original":"IsDocumentedBy",
- "inverse":"Documents"
- },
- "isidenticalto":{
- "original":"IsIdenticalTo",
- "inverse":"IsIdenticalTo"
- },
- "ismetadatafor":{
- "original":"IsMetadataFor",
- "inverse":"IsMetadataOf"
- },
- "ismetadataof":{
- "original":"IsMetadataOf",
- "inverse":"IsMetadataFor"
- },
- "isnewversionof":{
- "original":"IsNewVersionOf",
- "inverse":"IsPreviousVersionOf"
- },
- "isobsoletedby":{
- "original":"IsObsoletedBy",
- "inverse":"Obsoletes"
- },
- "isoriginalformof":{
- "original":"IsOriginalFormOf",
- "inverse":"IsVariantFormOf"
- },
- "ispartof":{
- "original":"IsPartOf",
- "inverse":"HasPart"
- },
- "ispreviousversionof":{
- "original":"IsPreviousVersionOf",
- "inverse":"IsNewVersionOf"
- },
- "isreferencedby":{
- "original":"IsReferencedBy",
- "inverse":"References"
- },
- "isrelatedto":{
- "original":"IsRelatedTo",
- "inverse":"IsRelatedTo"
- },
- "isrequiredby":{
- "original":"IsRequiredBy",
- "inverse":"Requires"
- },
- "isreviewedby":{
- "original":"IsReviewedBy",
- "inverse":"Reviews"
- },
- "issourceof":{
- "original":"IsSourceOf",
- "inverse":"IsDerivedFrom"
- },
- "issupplementedby":{
- "original":"IsSupplementedBy",
- "inverse":"IsSupplementTo"
- },
- "issupplementto":{
- "original":"IsSupplementTo",
- "inverse":"IsSupplementedBy"
- },
- "isvariantformof":{
- "original":"IsVariantFormOf",
- "inverse":"IsOriginalFormOf"
- },
- "isversionof":{
- "original":"IsVersionOf",
- "inverse":"HasVersion"
- },
- "obsoletes":{
- "original":"Obsoletes",
- "inverse":"IsObsoletedBy"
- },
- "references":{
- "original":"References",
- "inverse":"IsReferencedBy"
- },
- "requires":{
- "original":"Requires",
- "inverse":"IsRequiredBy"
- },
- "related":{
- "original":"IsRelatedTo",
- "inverse":"IsRelatedTo"
- },
- "reviews":{
- "original":"Reviews",
- "inverse":"IsReviewedBy"
- },
- "unknown":{
- "original":"Unknown",
- "inverse":"Unknown"
- }
-}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/config-default.xml
new file mode 100644
index 0000000000..6fb2a1253c
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/config-default.xml
@@ -0,0 +1,10 @@
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/workflow.xml
new file mode 100644
index 0000000000..2844d7baa8
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/serializeGraph/oozie_app/workflow.xml
@@ -0,0 +1,83 @@
+
+
+
+ scholixUpdatePath
+ the working dir base path of the scholix updated
+
+
+ targetPath
+ the final graph path
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Serialize scholix to JSON
+ eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=6000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --sourcePath${targetPath}/provision/scholix/scholix
+ --targetPath${targetPath}/index/scholix_json
+ --scholixUpdatePath${scholixUpdatePath}
+ --objectTypescholix
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Serialize summary to JSON
+ eu.dnetlib.dhp.sx.graph.SparkConvertObjectToJson
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=6000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --sourcePath${targetPath}/provision/summaries_filtered
+ --targetPath${targetPath}/index/summaries_json
+ --objectTypesummary
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala
index cc1b97fd6e..0c54de7c8f 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertObjectToJson.scala
@@ -30,6 +30,9 @@ object SparkConvertObjectToJson {
log.info(s"targetPath -> $targetPath")
val objectType = parser.get("objectType")
log.info(s"objectType -> $objectType")
+ val scholixUpdatePath = parser.get("scholixUpdatePath")
+ log.info(s"scholixUpdatePath -> $scholixUpdatePath")
+
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
@@ -42,7 +45,8 @@ object SparkConvertObjectToJson {
case "scholix" =>
log.info("Serialize Scholix")
val d: Dataset[Scholix] = spark.read.load(sourcePath).as[Scholix]
- d.map(s => mapper.writeValueAsString(s))(Encoders.STRING).rdd.repartition(6000).saveAsTextFile(targetPath, classOf[GzipCodec])
+ val u :Dataset[Scholix]= spark.read.load(s"$scholixUpdatePath/scholix").as[Scholix]
+ d.union(u).repartition(8000).map(s => mapper.writeValueAsString(s))(Encoders.STRING).rdd.saveAsTextFile(targetPath, classOf[GzipCodec])
case "summary" =>
log.info("Serialize Summary")
val d: Dataset[ScholixSummary] = spark.read.load(sourcePath).as[ScholixSummary]
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala
index 23f039c706..2115df1fd2 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala
@@ -2,12 +2,11 @@ package eu.dnetlib.dhp.sx.graph
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
-import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Software,Dataset => OafDataset}
+import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
-
object SparkConvertRDDtoDataset {
def main(args: Array[String]): Unit = {
@@ -32,39 +31,40 @@ object SparkConvertRDDtoDataset {
val entityPath = s"$t/entities"
val relPath = s"$t/relation"
val mapper = new ObjectMapper()
- implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset])
- implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication])
- implicit val relationEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
- implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct])
- implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software])
+ implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset])
+ implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication])
+ implicit val relationEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
+ implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct])
+ implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software])
log.info("Converting dataset")
- val rddDataset = spark.sparkContext.textFile(s"$sourcePath/dataset").map(s => mapper.readValue(s, classOf[OafDataset]))
+ val rddDataset =spark.sparkContext.textFile(s"$sourcePath/dataset").map(s => mapper.readValue(s, classOf[OafDataset])).filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false)
spark.createDataset(rddDataset).as[OafDataset].write.mode(SaveMode.Overwrite).save(s"$entityPath/dataset")
log.info("Converting publication")
- val rddPublication = spark.sparkContext.textFile(s"$sourcePath/publication").map(s => mapper.readValue(s, classOf[Publication]))
+ val rddPublication =spark.sparkContext.textFile(s"$sourcePath/publication").map(s => mapper.readValue(s, classOf[Publication])).filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false)
spark.createDataset(rddPublication).as[Publication].write.mode(SaveMode.Overwrite).save(s"$entityPath/publication")
log.info("Converting software")
- val rddSoftware = spark.sparkContext.textFile(s"$sourcePath/software").map(s => mapper.readValue(s, classOf[Software]))
+ val rddSoftware =spark.sparkContext.textFile(s"$sourcePath/software").map(s => mapper.readValue(s, classOf[Software])).filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false)
spark.createDataset(rddSoftware).as[Software].write.mode(SaveMode.Overwrite).save(s"$entityPath/software")
log.info("Converting otherresearchproduct")
- val rddOtherResearchProduct = spark.sparkContext.textFile(s"$sourcePath/otherresearchproduct").map(s => mapper.readValue(s, classOf[OtherResearchProduct]))
+ val rddOtherResearchProduct =spark.sparkContext.textFile(s"$sourcePath/otherresearchproduct").map(s => mapper.readValue(s, classOf[OtherResearchProduct])).filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false)
spark.createDataset(rddOtherResearchProduct).as[OtherResearchProduct].write.mode(SaveMode.Overwrite).save(s"$entityPath/otherresearchproduct")
log.info("Converting Relation")
- val relationSemanticFilter = List("cites", "iscitedby", "merges", "ismergedin")
+ val relationSemanticFilter = List("cites", "iscitedby","merges", "ismergedin", "HasAmongTopNSimilarDocuments","IsAmongTopNSimilarDocuments" )
- val rddRelation = spark.sparkContext.textFile(s"$sourcePath/relation")
+ val rddRelation =spark.sparkContext.textFile(s"$sourcePath/relation")
.map(s => mapper.readValue(s, classOf[Relation]))
- .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50"))
+ .filter(r=> r.getDataInfo!= null && r.getDataInfo.getDeletedbyinference == false)
+ .filter(r=> r.getSource.startsWith("50") && r.getTarget.startsWith("50"))
.filter(r => !relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass)))
spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/RetrieveDataciteDeltaTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/RetrieveDataciteDeltaTest.scala
new file mode 100644
index 0000000000..c277b0aa13
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/RetrieveDataciteDeltaTest.scala
@@ -0,0 +1,26 @@
+package eu.dnetlib.dhp.sx.graph
+import org.junit.jupiter.api.Test
+
+import java.text.SimpleDateFormat
+
+
+
+class RetrieveDataciteDeltaTest {
+
+ @Test
+ def testParsingDate(): Unit = {
+
+
+ val inputDate = "2021-12-02T11:17:36+0000"
+
+ val t = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").parse(inputDate).getTime
+
+
+ println(t)
+
+
+
+ }
+
+
+}