diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java
index 6a86f30df..c53affbad 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java
@@ -10,6 +10,7 @@ import java.util.Properties;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
+import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Base64OutputStream;
import org.apache.commons.codec.binary.Hex;
@@ -56,6 +57,14 @@ public class DHPUtils {
return String.format("%s::%s", nsPrefix, DHPUtils.md5(originalId));
}
+
+ public static String generateUnresolvedIdentifier(final String pid, final String pidType) {
+
+ final String cleanedPid = CleaningFunctions.normalizePidValue(pidType, pid);
+
+ return String.format("unresolved::%s::%s", cleanedPid, pidType.toLowerCase().trim());
+ }
+
public static String getJPathString(final String jsonPath, final String json) {
try {
Object o = JsonPath.read(json, jsonPath);
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala
index cfdd98d30..e3729e5b7 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala
@@ -325,8 +325,9 @@ object DataciteToOAFTransformation {
val grantId = m.matcher(awardUri).replaceAll("$2")
val targetId = s"$p${DHPUtils.md5(grantId)}"
List(
- generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo),
- generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo)
+ generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo)
+// REMOVED INVERSE RELATION since there is a specific method that should generate later
+// generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo)
)
}
else
@@ -580,11 +581,11 @@ object DataciteToOAFTransformation {
rel.setProperties(List(dateProps).asJava)
rel.setSource(id)
- rel.setTarget(s"unresolved::${r.relatedIdentifier}::${r.relatedIdentifierType}")
+ rel.setTarget(DHPUtils.generateUnresolvedIdentifier(r.relatedIdentifier,r.relatedIdentifierType))
rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava)
- rel.getCollectedfrom.asScala.map(c => c.getValue)(collection.breakOut)
+ rel.getCollectedfrom.asScala.map(c => c.getValue).toList
rel
- })(collection breakOut)
+ }).toList
}
def generateDataInfo(trust: String): DataInfo = {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala
index 2cabc7879..65d00c4d1 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala
@@ -1,9 +1,13 @@
package eu.dnetlib.dhp.actionmanager.datacite
+import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH}
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
-import eu.dnetlib.dhp.schema.mdstore.MetadataRecord
+import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations
+import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord}
import eu.dnetlib.dhp.schema.oaf.Oaf
+import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
@@ -21,7 +25,6 @@ object GenerateDataciteDatasetSpark {
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
- val targetPath = parser.get("targetPath")
val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks"))
val isLookupUrl: String = parser.get("isLookupUrl")
log.info("isLookupUrl: {}", isLookupUrl)
@@ -39,10 +42,22 @@ object GenerateDataciteDatasetSpark {
import spark.implicits._
+ val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
+ val mapper = new ObjectMapper()
+ val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
+ val outputBasePath = cleanedMdStoreVersion.getHdfsPath
+
+ log.info("outputBasePath: {}", outputBasePath)
+ val targetPath = s"$outputBasePath/$MDSTORE_DATA_PATH"
+
spark.read.load(sourcePath).as[DataciteType]
.filter(d => d.isActive)
.flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks))
.filter(d => d != null)
+ .flatMap(i=> fixRelations(i)).filter(i => i != null)
.write.mode(SaveMode.Overwrite).save(targetPath)
+
+ val total_items =spark.read.load(targetPath).as[Oaf].count()
+ writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH)
}
}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala
new file mode 100644
index 000000000..e212d7e2a
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala
@@ -0,0 +1,43 @@
+package eu.dnetlib.dhp.collection
+
+import eu.dnetlib.dhp.schema.common.ModelSupport
+import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation}
+
+object CollectionUtils {
+
+ /**
+ * This method in pipeline to the transformation phase,
+ * generates relations in both verse, typically it should be a phase of flatMap
+ *
+ * @param i input OAF
+ * @return
+ * If the input OAF is an entity -> List(i)
+ * If the input OAF is a relation -> List(relation, inverseRelation)
+ *
+ */
+
+ def fixRelations(i: Oaf): List[Oaf] = {
+ if (i.isInstanceOf[OafEntity])
+ return List(i)
+ else {
+ val r: Relation = i.asInstanceOf[Relation]
+ val currentRel = ModelSupport.findRelation(r.getRelClass)
+ if (currentRel != null) {
+
+ // Cleaning relation
+ r.setRelType(currentRel.getRelType)
+ r.setSubRelType(currentRel.getSubReltype)
+ r.setRelClass(currentRel.getRelClass)
+ val inverse = new Relation
+ inverse.setSource(r.getTarget)
+ inverse.setTarget(r.getSource)
+ inverse.setRelType(currentRel.getRelType)
+ inverse.setSubRelType(currentRel.getSubReltype)
+ inverse.setRelClass(currentRel.getInverseRelClass)
+ return List(r, inverse)
+ }
+ }
+ List()
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala
index 7a62437a3..8ae8285e3 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala
@@ -3,6 +3,7 @@ package eu.dnetlib.dhp.sx.bio
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf
import BioDBToOAF.ScholixResolved
+import eu.dnetlib.dhp.collection.CollectionUtils
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
@@ -35,13 +36,13 @@ object SparkTransformBioDatabaseToOAF {
import spark.implicits._
database.toUpperCase() match {
case "UNIPROT" =>
- spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).write.mode(SaveMode.Overwrite).save(targetPath)
+ spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
case "PDB" =>
- spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath)
+ spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
case "SCHOLIX" =>
- spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).write.mode(SaveMode.Overwrite).save(targetPath)
+ spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
case "CROSSREF_LINKS" =>
- spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath)
+ spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
}
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala
index b19bfc23a..8da617ca0 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala
@@ -5,6 +5,7 @@ import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.sx.bio.BioDBToOAF
import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem
import BioDBToOAF.EBILinkItem
+import eu.dnetlib.dhp.collection.CollectionUtils
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
@@ -37,6 +38,7 @@ object SparkEBILinksToOaf {
ebLinks.flatMap(j => BioDBToOAF.parse_ebi_links(j.links))
.filter(p => BioDBToOAF.EBITargetLinksFilter(p))
.flatMap(p => BioDBToOAF.convertEBILinksToOaf(p))
+ .flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null)
.write.mode(SaveMode.Overwrite).save(targetPath)
}
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/config-default.xml
new file mode 100644
index 000000000..dd3c32c62
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/config-default.xml
@@ -0,0 +1,23 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/workflow.xml
new file mode 100644
index 000000000..41a2e2291
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/workflow.xml
@@ -0,0 +1,52 @@
+
+
+
+ mainPath
+ the working path of Datacite stores
+
+
+ isLookupUrl
+ The IS lookUp service endopoint
+
+
+ blocksize
+ 100
+ The request block size
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+ yarn-cluster
+ cluster
+ ImportDatacite
+ eu.dnetlib.dhp.actionmanager.datacite.ImportDatacite
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --targetPath${mainPath}/datacite_update
+ --dataciteDumpPath${mainPath}/datacite_dump
+ --namenode${nameNode}
+ --masteryarn-cluster
+ --blocksize${blocksize}
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json
index 67e7f37dc..04dc8b942 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json
@@ -7,8 +7,8 @@
},
{
- "paramName": "t",
- "paramLongName": "targetPath",
+ "paramName": "mo",
+ "paramLongName": "mdstoreOutputVersion",
"paramDescription": "the target mdstore path",
"paramRequired": true
},
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/config-default.xml
new file mode 100644
index 000000000..dd3c32c62
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/config-default.xml
@@ -0,0 +1,23 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/workflow.xml
new file mode 100644
index 000000000..aeb824a41
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/workflow.xml
@@ -0,0 +1,126 @@
+
+
+
+ mainPath
+ the working path of Datacite stores
+
+
+ isLookupUrl
+ The IS lookUp service endopoint
+
+
+ mdStoreOutputId
+ the identifier of the cleaned MDStore
+
+
+ mdStoreManagerURI
+ the path of the cleaned mdstore
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionNEW_VERSION
+ --mdStoreID${mdStoreOutputId}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ TransformJob
+ eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=3840
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePath${mainPath}/datacite_dump
+ --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --isLookupUrl${isLookupUrl}
+ --exportLinkstrue
+ --masteryarn-cluster
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionCOMMIT
+ --namenode${nameNode}
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionREAD_UNLOCK
+ --mdStoreManagerURI${mdStoreManagerURI}
+ --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']}
+
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionROLLBACK
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala
new file mode 100644
index 000000000..e87f46b00
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala
@@ -0,0 +1,152 @@
+package eu.dnetlib.dhp.oa.graph.resolution
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.common.HdfsSupport
+import eu.dnetlib.dhp.schema.oaf.{Relation, Result}
+import eu.dnetlib.dhp.utils.DHPUtils
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.spark.SparkConf
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.json4s
+import org.json4s.DefaultFormats
+import org.json4s.JsonAST.{JField, JObject, JString}
+import org.json4s.jackson.JsonMethods.parse
+import org.slf4j.{Logger, LoggerFactory}
+
+object SparkResolveRelation {
+ def main(args: Array[String]): Unit = {
+ val log: Logger = LoggerFactory.getLogger(getClass)
+ val conf: SparkConf = new SparkConf()
+ val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json")))
+ parser.parseArgument(args)
+ val spark: SparkSession =
+ SparkSession
+ .builder()
+ .config(conf)
+ .appName(getClass.getSimpleName)
+ .master(parser.get("master")).getOrCreate()
+
+
+ val graphBasePath = parser.get("graphBasePath")
+ log.info(s"graphBasePath -> $graphBasePath")
+ val workingPath = parser.get("workingPath")
+ log.info(s"workingPath -> $workingPath")
+
+ implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
+ import spark.implicits._
+
+
+ //CLEANING TEMPORARY FOLDER
+ HdfsSupport.remove(workingPath, spark.sparkContext.hadoopConfiguration)
+ val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
+ fs.mkdirs(new Path(workingPath))
+
+ extractPidResolvedTableFromJsonRDD(spark, graphBasePath, workingPath)
+
+ val mapper: ObjectMapper = new ObjectMapper()
+
+ val rPid: Dataset[(String, String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String, String)]
+
+ val relationDs: Dataset[(String, Relation)] = spark.read.text(s"$graphBasePath/relation").as[String]
+ .map(s => mapper.readValue(s, classOf[Relation])).as[Relation]
+ .map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
+
+ relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map {
+ m =>
+ val sourceResolved = m._2
+ val currentRelation = m._1._2
+ if (sourceResolved != null && sourceResolved._1 != null && sourceResolved._1.nonEmpty)
+ currentRelation.setSource(sourceResolved._1)
+ currentRelation
+ }.write
+ .mode(SaveMode.Overwrite)
+ .save(s"$workingPath/relationResolvedSource")
+
+
+ val relationSourceResolved: Dataset[(String, Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation]
+ .map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
+ relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map {
+ m =>
+ val targetResolved = m._2
+ val currentRelation = m._1._2
+ if (targetResolved != null && targetResolved._1.nonEmpty)
+ currentRelation.setTarget(targetResolved._1)
+ currentRelation
+ }.filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved"))
+ .write
+ .mode(SaveMode.Overwrite)
+ .save(s"$workingPath/relation_resolved")
+
+
+ // TO BE conservative we keep the original relation in the working dir
+ // and save the relation resolved on the graphBasePath
+ //In future this two line of code should be removed
+
+ fs.rename(new Path(s"$graphBasePath/relation"), new Path(s"$workingPath/relation"))
+
+ spark.read.load(s"$workingPath/relation_resolved").as[Relation]
+ .map(r => mapper.writeValueAsString(r))
+ .write
+ .option("compression", "gzip")
+ .mode(SaveMode.Overwrite)
+ .text(s"$graphBasePath/relation")
+ }
+
+
+ def extractPidsFromRecord(input: String): (String, List[(String, String)]) = {
+ implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+ lazy val json: json4s.JValue = parse(input)
+ val id: String = (json \ "id").extract[String]
+ val result: List[(String, String)] = for {
+ JObject(pids) <- json \\ "instance" \ "pid"
+ JField("value", JString(pidValue)) <- pids
+ JField("qualifier", JObject(qualifier)) <- pids
+ JField("classname", JString(pidType)) <- qualifier
+ } yield (pidValue, pidType)
+
+ val alternateIds: List[(String, String)] = for {
+ JObject(pids) <- json \\ "alternateIdentifier"
+ JField("value", JString(pidValue)) <- pids
+ JField("qualifier", JObject(qualifier)) <- pids
+ JField("classname", JString(pidType)) <- qualifier
+ } yield (pidValue, pidType)
+
+ (id, result ::: alternateIds)
+ }
+
+
+ private def isRelation(input: String): Boolean = {
+
+ implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+ lazy val json: json4s.JValue = parse(input)
+ val source = (json \ "source").extractOrElse[String](null)
+
+ source != null
+ }
+
+ private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, graphPath: String, workingPath: String) = {
+ import spark.implicits._
+
+ val d: RDD[(String, String)] = spark.sparkContext.textFile(s"$graphPath/*")
+ .filter(i => !isRelation(i))
+ .map(i => extractPidsFromRecord(i))
+ .filter(s => s != null && s._1 != null && s._2 != null && s._2.nonEmpty)
+ .flatMap { p =>
+ p._2.map(pid =>
+ (p._1, DHPUtils.generateUnresolvedIdentifier(pid._1, pid._2))
+ )
+ }.filter(r => r._1 != null || r._2 != null)
+
+ spark.createDataset(d)
+ .groupByKey(_._2)
+ .reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y)
+ .map(s => s._2)
+ .write
+ .mode(SaveMode.Overwrite)
+ .save(s"$workingPath/relationResolvedPid")
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala
deleted file mode 100644
index 1b13b81c7..000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-package eu.dnetlib.dhp.sx.graph
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import eu.dnetlib.dhp.application.ArgumentApplicationParser
-import eu.dnetlib.dhp.schema.oaf.{Relation, Result}
-import org.apache.commons.io.IOUtils
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.SparkConf
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.json4s
-import org.json4s.DefaultFormats
-import org.json4s.JsonAST.{JField, JObject, JString}
-import org.json4s.jackson.JsonMethods.parse
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.collection.JavaConverters._
-object SparkResolveRelation {
- def main(args: Array[String]): Unit = {
- val log: Logger = LoggerFactory.getLogger(getClass)
- val conf: SparkConf = new SparkConf()
- val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json")))
- parser.parseArgument(args)
- val spark: SparkSession =
- SparkSession
- .builder()
- .config(conf)
- .appName(getClass.getSimpleName)
- .master(parser.get("master")).getOrCreate()
-
-
- val relationPath = parser.get("relationPath")
- log.info(s"sourcePath -> $relationPath")
- val entityPath = parser.get("entityPath")
- log.info(s"entityPath -> $entityPath")
- val workingPath = parser.get("workingPath")
- log.info(s"workingPath -> $workingPath")
-
- implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
- import spark.implicits._
-
-
- extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath)
-
- val mappper = new ObjectMapper()
-
- val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String,String)]
-
- val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
-
- relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map{
- m =>
- val sourceResolved = m._2
- val currentRelation = m._1._2
- if (sourceResolved!=null && sourceResolved._1!=null && sourceResolved._1.nonEmpty)
- currentRelation.setSource(sourceResolved._1)
- currentRelation
- }.write
- .mode(SaveMode.Overwrite)
- .save(s"$workingPath/relationResolvedSource")
-
-
- val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
- relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map{
- m =>
- val targetResolved = m._2
- val currentRelation = m._1._2
- if (targetResolved!=null && targetResolved._1.nonEmpty)
- currentRelation.setTarget(targetResolved._1)
- currentRelation
- }.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50"))
- .write
- .mode(SaveMode.Overwrite)
- .save(s"$workingPath/relation_resolved")
-
- spark.read.load(s"$workingPath/relation_resolved").as[Relation]
- .map(r => mappper.writeValueAsString(r))
- .rdd.saveAsTextFile(s"$workingPath/relation", classOf[GzipCodec])
-
- }
-
-
- def extractPidsFromRecord(input:String):(String,List[(String,String)]) = {
- implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
- lazy val json: json4s.JValue = parse(input)
- val id:String = (json \ "id").extract[String]
- val result: List[(String,String)] = for {
- JObject(pids) <- json \ "pid"
- JField("value", JString(pidValue)) <- pids
- JField("qualifier", JObject(qualifier)) <- pids
- JField("classname", JString(pidType)) <- qualifier
- } yield (pidValue, pidType)
-
- val alternateIds: List[(String,String)] = for {
- JObject(pids) <- json \\ "alternateIdentifier"
- JField("value", JString(pidValue)) <- pids
- JField("qualifier", JObject(qualifier)) <- pids
- JField("classname", JString(pidType)) <- qualifier
- } yield (pidValue, pidType)
-
- (id,result:::alternateIds)
- }
-
- private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, entityPath: String, workingPath: String) = {
- import spark.implicits._
-
- val d: RDD[(String,String)] = spark.sparkContext.textFile(s"$entityPath/*")
- .map(i => extractPidsFromRecord(i))
- .filter(s => s != null && s._1!= null && s._2!=null && s._2.nonEmpty)
- .flatMap{ p =>
- p._2.map(pid =>
- (p._1, convertPidToDNETIdentifier(pid._1, pid._2))
- )
- }.filter(r =>r._1 != null || r._2 != null)
-
- spark.createDataset(d)
- .groupByKey(_._2)
- .reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y)
- .map(s => s._2)
- .write
- .mode(SaveMode.Overwrite)
- .save(s"$workingPath/relationResolvedPid")
- }
-
-
- /*
- This method should be used once we finally convert everythings in Kryo dataset
- instead of using rdd of json
- */
- private def extractPidResolvedTableFromKryo(spark: SparkSession, entityPath: String, workingPath: String) = {
- import spark.implicits._
- implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
- val entities: Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result]
- entities.flatMap(e => e.getPid.asScala
- .map(p =>
- convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid))
- .filter(s => s != null)
- .map(s => (s, e.getId))
- ).groupByKey(_._1)
- .reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y)
- .map(s => s._2)
- .write
- .mode(SaveMode.Overwrite)
- .save(s"$workingPath/relationResolvedPid")
- }
-
- def convertPidToDNETIdentifier(pid:String, pidType: String):String = {
- if (pid==null || pid.isEmpty || pidType== null || pidType.isEmpty)
- null
- else
- s"unresolved::${pid.toLowerCase}::${pidType.toLowerCase}"
- }
-
-}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/config-default.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml
similarity index 54%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml
index 7683ff94c..e9e1a8ede 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml
@@ -1,44 +1,23 @@
- entityPath
- the path of deduplicate Entities
+ graphBasePath
+ the path of the graph
-
- relationPath
- the path of relation unresolved
-
-
- targetPath
- the path of relation unresolved
-
-
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
-
-
-
-
-
-
-
-
-
-
yarn
cluster
Resolve Relations in raw graph
- eu.dnetlib.dhp.sx.graph.SparkResolveRelation
+ eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelation
dhp-graph-mapper-${projectVersion}.jar
--executor-memory=${sparkExecutorMemory}
@@ -51,9 +30,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--masteryarn
- --relationPath${relationPath}
- --workingPath${targetPath}
- --entityPath${entityPath}
+ --graphBasePath${graphBasePath}
+ --workingPath${workingDir}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json
similarity index 50%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json
index f211adb9a..1fbe20648 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json
@@ -1,6 +1,5 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
- {"paramName":"r", "paramLongName":"relationPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true},
- {"paramName":"e", "paramLongName":"entityPath", "paramDescription": "the path of the raw graph", "paramRequired": true}
+ {"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true}
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala
index 5b7fbe1cf..bd7e4fd09 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala
@@ -1,10 +1,10 @@
package eu.dnetlib.dhp.sx.graph.scholix
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
+import eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelation
import eu.dnetlib.dhp.schema.oaf.{Relation, Result}
import eu.dnetlib.dhp.schema.sx.scholix.Scholix
import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
-import eu.dnetlib.dhp.sx.graph.SparkResolveRelation
import eu.dnetlib.dhp.sx.graph.bio.pubmed.AbstractVocabularyTest
import org.json4s
import org.json4s.DefaultFormats