From ae4e99a4715811e193ee7892f679125cd8dbbdaa Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 20 Oct 2021 17:12:08 +0200 Subject: [PATCH] Adapted workflow of resolution of PID to work into OpenAIRE data workflow - Added relations in both verse on all Scholexplorer datasources --- .../java/eu/dnetlib/dhp/utils/DHPUtils.java | 9 + .../DataciteToOAFTransformation.scala | 11 +- .../GenerateDataciteDatasetSpark.scala | 19 ++- .../dhp/collection/CollectionUtils.scala | 43 +++++ .../bio/SparkTransformBioDatabaseToOAF.scala | 9 +- .../dhp/sx/bio/ebi/SparkEBILinksToOaf.scala | 2 + .../collection/oozie_app/config-default.xml | 23 +++ .../collection/oozie_app/workflow.xml | 52 ++++++ .../datacite/generate_dataset_params.json | 4 +- .../oozie_app/config-default.xml | 23 +++ .../transformation/oozie_app/workflow.xml | 126 ++++++++++++++ .../resolution/SparkResolveRelation.scala | 152 +++++++++++++++++ .../dhp/sx/graph/SparkResolveRelation.scala | 154 ------------------ .../resolution}/oozie_app/config-default.xml | 0 .../graph/resolution}/oozie_app/workflow.xml | 34 +--- .../resolution}/resolve_relations_params.json | 3 +- .../sx/graph/scholix/ScholixGraphTest.scala | 2 +- 17 files changed, 468 insertions(+), 198 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/{sx/graph/resolverelation => oa/graph/resolution}/oozie_app/config-default.xml (100%) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/{sx/graph/resolverelation => oa/graph/resolution}/oozie_app/workflow.xml (54%) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/{sx/graph => oa/graph/resolution}/resolve_relations_params.json (50%) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java index 6a86f30df7..c53affbadf 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java @@ -10,6 +10,7 @@ import java.util.Properties; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64OutputStream; import org.apache.commons.codec.binary.Hex; @@ -56,6 +57,14 @@ public class DHPUtils { return String.format("%s::%s", nsPrefix, DHPUtils.md5(originalId)); } + + public static String generateUnresolvedIdentifier(final String pid, final String pidType) { + + final String cleanedPid = CleaningFunctions.normalizePidValue(pidType, pid); + + return String.format("unresolved::%s::%s", cleanedPid, pidType.toLowerCase().trim()); + } + public static String getJPathString(final String jsonPath, final String json) { try { Object o = JsonPath.read(json, jsonPath); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala index cfdd98d30c..e3729e5b71 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala @@ -325,8 +325,9 @@ object DataciteToOAFTransformation { val grantId = m.matcher(awardUri).replaceAll("$2") val targetId = s"$p${DHPUtils.md5(grantId)}" List( - generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo), - generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo) + generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo) +// REMOVED INVERSE RELATION since there is a specific method that should generate later +// generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo) ) } else @@ -580,11 +581,11 @@ object DataciteToOAFTransformation { rel.setProperties(List(dateProps).asJava) rel.setSource(id) - rel.setTarget(s"unresolved::${r.relatedIdentifier}::${r.relatedIdentifierType}") + rel.setTarget(DHPUtils.generateUnresolvedIdentifier(r.relatedIdentifier,r.relatedIdentifierType)) rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava) - rel.getCollectedfrom.asScala.map(c => c.getValue)(collection.breakOut) + rel.getCollectedfrom.asScala.map(c => c.getValue).toList rel - })(collection breakOut) + }).toList } def generateDataInfo(trust: String): DataInfo = { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala index 2cabc78799..65d00c4d1d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala @@ -1,9 +1,13 @@ package eu.dnetlib.dhp.actionmanager.datacite +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH} import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup -import eu.dnetlib.dhp.schema.mdstore.MetadataRecord +import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations +import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord} import eu.dnetlib.dhp.schema.oaf.Oaf +import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile import eu.dnetlib.dhp.utils.ISLookupClientFactory import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} @@ -21,7 +25,6 @@ object GenerateDataciteDatasetSpark { parser.parseArgument(args) val master = parser.get("master") val sourcePath = parser.get("sourcePath") - val targetPath = parser.get("targetPath") val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks")) val isLookupUrl: String = parser.get("isLookupUrl") log.info("isLookupUrl: {}", isLookupUrl) @@ -39,10 +42,22 @@ object GenerateDataciteDatasetSpark { import spark.implicits._ + val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") + val mapper = new ObjectMapper() + val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) + val outputBasePath = cleanedMdStoreVersion.getHdfsPath + + log.info("outputBasePath: {}", outputBasePath) + val targetPath = s"$outputBasePath/$MDSTORE_DATA_PATH" + spark.read.load(sourcePath).as[DataciteType] .filter(d => d.isActive) .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks)) .filter(d => d != null) + .flatMap(i=> fixRelations(i)).filter(i => i != null) .write.mode(SaveMode.Overwrite).save(targetPath) + + val total_items =spark.read.load(targetPath).as[Oaf].count() + writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH) } } \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala new file mode 100644 index 0000000000..e212d7e2ad --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala @@ -0,0 +1,43 @@ +package eu.dnetlib.dhp.collection + +import eu.dnetlib.dhp.schema.common.ModelSupport +import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation} + +object CollectionUtils { + + /** + * This method in pipeline to the transformation phase, + * generates relations in both verse, typically it should be a phase of flatMap + * + * @param i input OAF + * @return + * If the input OAF is an entity -> List(i) + * If the input OAF is a relation -> List(relation, inverseRelation) + * + */ + + def fixRelations(i: Oaf): List[Oaf] = { + if (i.isInstanceOf[OafEntity]) + return List(i) + else { + val r: Relation = i.asInstanceOf[Relation] + val currentRel = ModelSupport.findRelation(r.getRelClass) + if (currentRel != null) { + + // Cleaning relation + r.setRelType(currentRel.getRelType) + r.setSubRelType(currentRel.getSubReltype) + r.setRelClass(currentRel.getRelClass) + val inverse = new Relation + inverse.setSource(r.getTarget) + inverse.setTarget(r.getSource) + inverse.setRelType(currentRel.getRelType) + inverse.setSubRelType(currentRel.getSubReltype) + inverse.setRelClass(currentRel.getInverseRelClass) + return List(r, inverse) + } + } + List() + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala index 7a62437a36..8ae8285e3f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.sx.bio import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.Oaf import BioDBToOAF.ScholixResolved +import eu.dnetlib.dhp.collection.CollectionUtils import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} @@ -35,13 +36,13 @@ object SparkTransformBioDatabaseToOAF { import spark.implicits._ database.toUpperCase() match { case "UNIPROT" => - spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).write.mode(SaveMode.Overwrite).save(targetPath) + spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) case "PDB" => - spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath) + spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) case "SCHOLIX" => - spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).write.mode(SaveMode.Overwrite).save(targetPath) + spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) case "CROSSREF_LINKS" => - spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath) + spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala index b19bfc23a5..8da617ca07 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala @@ -5,6 +5,7 @@ import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.sx.bio.BioDBToOAF import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem import BioDBToOAF.EBILinkItem +import eu.dnetlib.dhp.collection.CollectionUtils import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql._ @@ -37,6 +38,7 @@ object SparkEBILinksToOaf { ebLinks.flatMap(j => BioDBToOAF.parse_ebi_links(j.links)) .filter(p => BioDBToOAF.EBITargetLinksFilter(p)) .flatMap(p => BioDBToOAF.convertEBILinksToOaf(p)) + .flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null) .write.mode(SaveMode.Overwrite).save(targetPath) } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/config-default.xml new file mode 100644 index 0000000000..dd3c32c620 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/config-default.xml @@ -0,0 +1,23 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + + oozie.launcher.mapreduce.user.classpath.first + true + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/workflow.xml new file mode 100644 index 0000000000..41a2e22916 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/collection/oozie_app/workflow.xml @@ -0,0 +1,52 @@ + + + + mainPath + the working path of Datacite stores + + + isLookupUrl + The IS lookUp service endopoint + + + blocksize + 100 + The request block size + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + yarn-cluster + cluster + ImportDatacite + eu.dnetlib.dhp.actionmanager.datacite.ImportDatacite + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --targetPath${mainPath}/datacite_update + --dataciteDumpPath${mainPath}/datacite_dump + --namenode${nameNode} + --masteryarn-cluster + --blocksize${blocksize} + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json index 67e7f37dcb..04dc8b942c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json @@ -7,8 +7,8 @@ }, { - "paramName": "t", - "paramLongName": "targetPath", + "paramName": "mo", + "paramLongName": "mdstoreOutputVersion", "paramDescription": "the target mdstore path", "paramRequired": true }, diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/config-default.xml new file mode 100644 index 0000000000..dd3c32c620 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/config-default.xml @@ -0,0 +1,23 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + + oozie.launcher.mapreduce.user.classpath.first + true + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/workflow.xml new file mode 100644 index 0000000000..aeb824a41b --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/transformation/oozie_app/workflow.xml @@ -0,0 +1,126 @@ + + + + mainPath + the working path of Datacite stores + + + isLookupUrl + The IS lookUp service endopoint + + + mdStoreOutputId + the identifier of the cleaned MDStore + + + mdStoreManagerURI + the path of the cleaned mdstore + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionNEW_VERSION + --mdStoreID${mdStoreOutputId} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + yarn-cluster + cluster + TransformJob + eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${mainPath}/datacite_dump + --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --isLookupUrl${isLookupUrl} + --exportLinkstrue + --masteryarn-cluster + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionCOMMIT + --namenode${nameNode} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionREAD_UNLOCK + --mdStoreManagerURI${mdStoreManagerURI} + --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']} + + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionROLLBACK + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala new file mode 100644 index 0000000000..e87f46b00a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala @@ -0,0 +1,152 @@ +package eu.dnetlib.dhp.oa.graph.resolution + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.common.HdfsSupport +import eu.dnetlib.dhp.schema.oaf.{Relation, Result} +import eu.dnetlib.dhp.utils.DHPUtils +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST.{JField, JObject, JString} +import org.json4s.jackson.JsonMethods.parse +import org.slf4j.{Logger, LoggerFactory} + +object SparkResolveRelation { + def main(args: Array[String]): Unit = { + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + + val graphBasePath = parser.get("graphBasePath") + log.info(s"graphBasePath -> $graphBasePath") + val workingPath = parser.get("workingPath") + log.info(s"workingPath -> $workingPath") + + implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) + import spark.implicits._ + + + //CLEANING TEMPORARY FOLDER + HdfsSupport.remove(workingPath, spark.sparkContext.hadoopConfiguration) + val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) + fs.mkdirs(new Path(workingPath)) + + extractPidResolvedTableFromJsonRDD(spark, graphBasePath, workingPath) + + val mapper: ObjectMapper = new ObjectMapper() + + val rPid: Dataset[(String, String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String, String)] + + val relationDs: Dataset[(String, Relation)] = spark.read.text(s"$graphBasePath/relation").as[String] + .map(s => mapper.readValue(s, classOf[Relation])).as[Relation] + .map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) + + relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map { + m => + val sourceResolved = m._2 + val currentRelation = m._1._2 + if (sourceResolved != null && sourceResolved._1 != null && sourceResolved._1.nonEmpty) + currentRelation.setSource(sourceResolved._1) + currentRelation + }.write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/relationResolvedSource") + + + val relationSourceResolved: Dataset[(String, Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation] + .map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) + relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map { + m => + val targetResolved = m._2 + val currentRelation = m._1._2 + if (targetResolved != null && targetResolved._1.nonEmpty) + currentRelation.setTarget(targetResolved._1) + currentRelation + }.filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved")) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/relation_resolved") + + + // TO BE conservative we keep the original relation in the working dir + // and save the relation resolved on the graphBasePath + //In future this two line of code should be removed + + fs.rename(new Path(s"$graphBasePath/relation"), new Path(s"$workingPath/relation")) + + spark.read.load(s"$workingPath/relation_resolved").as[Relation] + .map(r => mapper.writeValueAsString(r)) + .write + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .text(s"$graphBasePath/relation") + } + + + def extractPidsFromRecord(input: String): (String, List[(String, String)]) = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + val id: String = (json \ "id").extract[String] + val result: List[(String, String)] = for { + JObject(pids) <- json \\ "instance" \ "pid" + JField("value", JString(pidValue)) <- pids + JField("qualifier", JObject(qualifier)) <- pids + JField("classname", JString(pidType)) <- qualifier + } yield (pidValue, pidType) + + val alternateIds: List[(String, String)] = for { + JObject(pids) <- json \\ "alternateIdentifier" + JField("value", JString(pidValue)) <- pids + JField("qualifier", JObject(qualifier)) <- pids + JField("classname", JString(pidType)) <- qualifier + } yield (pidValue, pidType) + + (id, result ::: alternateIds) + } + + + private def isRelation(input: String): Boolean = { + + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + val source = (json \ "source").extractOrElse[String](null) + + source != null + } + + private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, graphPath: String, workingPath: String) = { + import spark.implicits._ + + val d: RDD[(String, String)] = spark.sparkContext.textFile(s"$graphPath/*") + .filter(i => !isRelation(i)) + .map(i => extractPidsFromRecord(i)) + .filter(s => s != null && s._1 != null && s._2 != null && s._2.nonEmpty) + .flatMap { p => + p._2.map(pid => + (p._1, DHPUtils.generateUnresolvedIdentifier(pid._1, pid._2)) + ) + }.filter(r => r._1 != null || r._2 != null) + + spark.createDataset(d) + .groupByKey(_._2) + .reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y) + .map(s => s._2) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/relationResolvedPid") + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala deleted file mode 100644 index 1b13b81c77..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ /dev/null @@ -1,154 +0,0 @@ -package eu.dnetlib.dhp.sx.graph - -import com.fasterxml.jackson.databind.ObjectMapper -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Relation, Result} -import org.apache.commons.io.IOUtils -import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD -import org.apache.spark.sql._ -import org.json4s -import org.json4s.DefaultFormats -import org.json4s.JsonAST.{JField, JObject, JString} -import org.json4s.jackson.JsonMethods.parse -import org.slf4j.{Logger, LoggerFactory} - -import scala.collection.JavaConverters._ -object SparkResolveRelation { - def main(args: Array[String]): Unit = { - val log: Logger = LoggerFactory.getLogger(getClass) - val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json"))) - parser.parseArgument(args) - val spark: SparkSession = - SparkSession - .builder() - .config(conf) - .appName(getClass.getSimpleName) - .master(parser.get("master")).getOrCreate() - - - val relationPath = parser.get("relationPath") - log.info(s"sourcePath -> $relationPath") - val entityPath = parser.get("entityPath") - log.info(s"entityPath -> $entityPath") - val workingPath = parser.get("workingPath") - log.info(s"workingPath -> $workingPath") - - implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) - import spark.implicits._ - - - extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath) - - val mappper = new ObjectMapper() - - val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String,String)] - - val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) - - relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map{ - m => - val sourceResolved = m._2 - val currentRelation = m._1._2 - if (sourceResolved!=null && sourceResolved._1!=null && sourceResolved._1.nonEmpty) - currentRelation.setSource(sourceResolved._1) - currentRelation - }.write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/relationResolvedSource") - - - val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) - relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map{ - m => - val targetResolved = m._2 - val currentRelation = m._1._2 - if (targetResolved!=null && targetResolved._1.nonEmpty) - currentRelation.setTarget(targetResolved._1) - currentRelation - }.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50")) - .write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/relation_resolved") - - spark.read.load(s"$workingPath/relation_resolved").as[Relation] - .map(r => mappper.writeValueAsString(r)) - .rdd.saveAsTextFile(s"$workingPath/relation", classOf[GzipCodec]) - - } - - - def extractPidsFromRecord(input:String):(String,List[(String,String)]) = { - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(input) - val id:String = (json \ "id").extract[String] - val result: List[(String,String)] = for { - JObject(pids) <- json \ "pid" - JField("value", JString(pidValue)) <- pids - JField("qualifier", JObject(qualifier)) <- pids - JField("classname", JString(pidType)) <- qualifier - } yield (pidValue, pidType) - - val alternateIds: List[(String,String)] = for { - JObject(pids) <- json \\ "alternateIdentifier" - JField("value", JString(pidValue)) <- pids - JField("qualifier", JObject(qualifier)) <- pids - JField("classname", JString(pidType)) <- qualifier - } yield (pidValue, pidType) - - (id,result:::alternateIds) - } - - private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, entityPath: String, workingPath: String) = { - import spark.implicits._ - - val d: RDD[(String,String)] = spark.sparkContext.textFile(s"$entityPath/*") - .map(i => extractPidsFromRecord(i)) - .filter(s => s != null && s._1!= null && s._2!=null && s._2.nonEmpty) - .flatMap{ p => - p._2.map(pid => - (p._1, convertPidToDNETIdentifier(pid._1, pid._2)) - ) - }.filter(r =>r._1 != null || r._2 != null) - - spark.createDataset(d) - .groupByKey(_._2) - .reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y) - .map(s => s._2) - .write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/relationResolvedPid") - } - - - /* - This method should be used once we finally convert everythings in Kryo dataset - instead of using rdd of json - */ - private def extractPidResolvedTableFromKryo(spark: SparkSession, entityPath: String, workingPath: String) = { - import spark.implicits._ - implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) - val entities: Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result] - entities.flatMap(e => e.getPid.asScala - .map(p => - convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid)) - .filter(s => s != null) - .map(s => (s, e.getId)) - ).groupByKey(_._1) - .reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) - .map(s => s._2) - .write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/relationResolvedPid") - } - - def convertPidToDNETIdentifier(pid:String, pidType: String):String = { - if (pid==null || pid.isEmpty || pidType== null || pidType.isEmpty) - null - else - s"unresolved::${pid.toLowerCase}::${pidType.toLowerCase}" - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml similarity index 54% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml index 7683ff94cd..e9e1a8edea 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml @@ -1,44 +1,23 @@ - entityPath - the path of deduplicate Entities + graphBasePath + the path of the graph - - relationPath - the path of relation unresolved - - - targetPath - the path of relation unresolved - - - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - yarn cluster Resolve Relations in raw graph - eu.dnetlib.dhp.sx.graph.SparkResolveRelation + eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelation dhp-graph-mapper-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -51,9 +30,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --masteryarn - --relationPath${relationPath} - --workingPath${targetPath} - --entityPath${entityPath} + --graphBasePath${graphBasePath} + --workingPath${workingDir} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json similarity index 50% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json index f211adb9a4..1fbe206481 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json @@ -1,6 +1,5 @@ [ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"r", "paramLongName":"relationPath", "paramDescription": "the source Path", "paramRequired": true}, {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true}, - {"paramName":"e", "paramLongName":"entityPath", "paramDescription": "the path of the raw graph", "paramRequired": true} + {"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala index 5b7fbe1cf5..bd7e4fd094 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala @@ -1,10 +1,10 @@ package eu.dnetlib.dhp.sx.graph.scholix import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature} +import eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelation import eu.dnetlib.dhp.schema.oaf.{Relation, Result} import eu.dnetlib.dhp.schema.sx.scholix.Scholix import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary -import eu.dnetlib.dhp.sx.graph.SparkResolveRelation import eu.dnetlib.dhp.sx.graph.bio.pubmed.AbstractVocabularyTest import org.json4s import org.json4s.DefaultFormats