From 1d420eedb4135c72ecef00383bff7df5fe5c9aec Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 2 Jul 2020 12:37:43 +0200 Subject: [PATCH] added generation of EBI Dataset --- .../eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala | 89 +++++++++++++++++++ .../dhp/sx/ebi/SparkCreateEBIDataFrame.scala | 83 +++++++++++++++++ .../parser/AbstractScholexplorerParser.java | 13 +++ .../parser/DatasetScholexplorerParser.java | 46 +++++++++- .../PublicationScholexplorerParser.java | 1 + .../dnetlib/dhp/sx/ebi/ebi_to_df_params.json | 4 + .../dhp/sx/ebi/oozie_app/config-default.xml | 38 ++++++++ .../dnetlib/dhp/sx/ebi/oozie_app/workflow.xml | 51 +++++++++++ .../eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala | 22 +++++ 9 files changed, 346 insertions(+), 1 deletion(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala new file mode 100644 index 000000000..41fcd2636 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala @@ -0,0 +1,89 @@ +package eu.dnetlib.dhp.sx.ebi +import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} +import org.apache.spark.sql.{Encoder, Encoders} +import org.apache.spark.sql.expressions.Aggregator + + + +object EBIAggregator { + + def getDatasetAggregator(): Aggregator[(String, OafDataset), OafDataset, OafDataset] = new Aggregator[(String, OafDataset), OafDataset, OafDataset]{ + + override def zero: OafDataset = new OafDataset() + + override def reduce(b: OafDataset, a: (String, OafDataset)): OafDataset = { + b.mergeFrom(a._2) + if (b.getId == null) + b.setId(a._2.getId) + b + } + + + override def merge(wx: OafDataset, wy: OafDataset): OafDataset = { + wx.mergeFrom(wy) + if(wx.getId == null && wy.getId.nonEmpty) + wx.setId(wy.getId) + wx + } + override def finish(reduction: OafDataset): OafDataset = reduction + + override def bufferEncoder: Encoder[OafDataset] = + Encoders.kryo(classOf[OafDataset]) + + override def outputEncoder: Encoder[OafDataset] = + Encoders.kryo(classOf[OafDataset]) + } + + + def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{ + + override def zero: Publication = new Publication() + + override def reduce(b: Publication, a: (String, Publication)): Publication = { + b.mergeFrom(a._2) + if (b.getId == null) + b.setId(a._2.getId) + b + } + + + override def merge(wx: Publication, wy: Publication): Publication = { + wx.mergeFrom(wy) + if(wx.getId == null && wy.getId.nonEmpty) + wx.setId(wy.getId) + wx + } + override def finish(reduction: Publication): Publication = reduction + + override def bufferEncoder: Encoder[Publication] = + Encoders.kryo(classOf[Publication]) + + override def outputEncoder: Encoder[Publication] = + Encoders.kryo(classOf[Publication]) + } + + + def getRelationAggregator(): Aggregator[(String, Relation), Relation, Relation] = new Aggregator[(String, Relation), Relation, Relation]{ + + override def zero: Relation = new Relation() + + override def reduce(b: Relation, a: (String, Relation)): Relation = { + a._2 + } + + + override def merge(a: Relation, b: Relation): Relation = { + if(b!= null) b else a + } + override def finish(reduction: Relation): Relation = reduction + + override def bufferEncoder: Encoder[Relation] = + Encoders.kryo(classOf[Relation]) + + override def outputEncoder: Encoder[Relation] = + Encoders.kryo(classOf[Relation]) + } + + + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala new file mode 100644 index 000000000..c7cc3ed04 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala @@ -0,0 +1,83 @@ +package eu.dnetlib.dhp.sx.ebi + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} +import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser} +import eu.dnetlib.scholexplorer.relation.RelationMapper +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} +import org.slf4j.{Logger, LoggerFactory} +import scala.collection.JavaConverters._ + +object SparkCreateEBIDataFrame { + + + def main(args: Array[String]): Unit = { + val logger: Logger = LoggerFactory.getLogger(SparkCreateEBIDataFrame.getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(SparkCreateEBIDataFrame.getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sc = spark.sparkContext + + + val workingPath = parser.get("workingPath") + val relationMapper = RelationMapper.load + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) + implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) + implicit val pubEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) + implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) + + logger.info("Extract Publication and relation from publication_xml") + val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s => + { + new ObjectMapper().readValue(s, classOf[String]) + }).flatMap(s => { + val d = new PublicationScholexplorerParser + d.parseObject(s, relationMapper).asScala.iterator}) + + val mapper = new ObjectMapper() + mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) + spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf") + + logger.info("Extract Publication and relation from dataset_xml") + val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/_dataset_xml").map(s => + { + new ObjectMapper().readValue(s, classOf[String]) + }).flatMap(s => { + val d = new DatasetScholexplorerParser + d.parseObject(s, relationMapper).asScala.iterator}) + + spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf") + val dataset: Dataset[OafDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[OafDataset]).map(d => d.asInstanceOf[OafDataset]) + val publication: Dataset[Publication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Publication]).map(d => d.asInstanceOf[Publication]) + val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation]) + publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getPublicationAggregator().toColumn) + .map(p => p._2) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/publication") + + dataset.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datasetEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDatasetAggregator().toColumn) + .map(p => p._2) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset") + + relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getRelationAggregator().toColumn) + .map(p => p._2) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/relation") + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java index 0db2b2688..f0c789323 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java @@ -150,6 +150,19 @@ public abstract class AbstractScholexplorerParser { return uk; } + + protected Qualifier generateQualifier(final String classId, final String className, final String schemeId, final String schemeName) { + final Qualifier q = new Qualifier(); + q.setClassid(classId); + q.setClassid(className); + q.setSchemeid(schemeId); + q.setSchemename(schemeName); + return q; + + + + } + protected void generateRelations( RelationMapper relationMapper, Result parsedObject, diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java index afba57bb8..c5c6b1493 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java @@ -64,7 +64,6 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { currentDate.setQualifier(dateQualifier); parsedObject.setRelevantdate(Collections.singletonList(currentDate)); } - final String completionStatus = VtdUtilityParser .getSingleValue(ap, vn, "//*[local-name()='completionStatus']"); final String provisionMode = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='provisionMode']"); @@ -149,6 +148,42 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { inferPid(currentPid); parsedObject.setPid(Collections.singletonList(currentPid)); + + String resolvedURL = null; + + switch (currentPid.getQualifier().getClassname().toLowerCase()) { + case "uniprot": + resolvedURL ="https://www.uniprot.org/uniprot/"+currentPid.getValue(); + break; + case "ena": + if (StringUtils.isNotBlank(currentPid.getValue()) && currentPid.getValue().length() > 7) + resolvedURL ="https://www.ebi.ac.uk/ena/data/view/"+currentPid.getValue().substring(0,8); + break; + case "chembl": + resolvedURL ="https://www.ebi.ac.uk/chembl/compound_report_card/"+currentPid.getValue(); + break; + + case "ncbi-n": + resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue(); + break; + case "ncbi-p": + resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue(); + break; + case "genbank": + resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue(); + break; + case "pdb": + resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue(); + break; + case "url": + resolvedURL =currentPid.getValue(); + break; + } + + + + + final String sourceId = generateId( currentPid.getValue(), currentPid.getQualifier().getClassid(), "dataset"); parsedObject.setId(sourceId); @@ -251,6 +286,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { t -> { final StructuredProperty st = new StructuredProperty(); st.setValue(t); + st.setQualifier(generateQualifier( "main title","main title", "dnet:dataCite_title","dnet:dataCite_title")); return st; }) .collect(Collectors.toList())); @@ -282,6 +318,14 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { .collect(Collectors.toList())); } + + if(StringUtils.isNotBlank(resolvedURL)) { + Instance i = new Instance(); + i.setCollectedfrom(parsedObject.getCollectedfrom().get(0)); + i.setUrl(Collections.singletonList(resolvedURL)); + parsedObject.setInstance(Collections.singletonList(i)); + } + result.add(parsedObject); return result; } catch (Throwable e) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java index bf59a6f0e..b66bfd161 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java @@ -202,6 +202,7 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser t -> { final StructuredProperty st = new StructuredProperty(); st.setValue(t); + st.setQualifier(generateQualifier( "main title","main title", "dnet:dataCite_title","dnet:dataCite_title")); return st; }) .collect(Collectors.toList())); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json new file mode 100644 index 000000000..366f1426e --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json @@ -0,0 +1,4 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml new file mode 100644 index 000000000..cf617a84c --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml @@ -0,0 +1,38 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + "com.cloudera.spark.lineage.NavigatorAppListener" + + + spark2SqlQueryExecutionListeners + "com.cloudera.spark.lineage.NavigatorQueryListener" + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml new file mode 100644 index 000000000..b81669674 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml @@ -0,0 +1,51 @@ + + + + workingPath + the Working Path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + yarn-cluster + cluster + Create EBI DataSet + eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=1000 + ${sparkExtraOPT} + + --workingPath${workingPath} + --masteryarn-cluster + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala new file mode 100644 index 000000000..979bf7e14 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala @@ -0,0 +1,22 @@ +package eu.dnetlib.dhp.sx.ebi + +import org.junit.jupiter.api.Test + +class TestEBITODS { + + + @Test + def testEBI():Unit = { + + + + + + + + + } + + + +}