From 46631a44217917f8a23b4c019d7f73641e255e67 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 24 Jun 2020 14:06:38 +0200 Subject: [PATCH 1/3] updated mapping scholexplorer to OAF --- .../java/eu/dnetlib/dhp/export/DLIToOAF.scala | 16 +++++++++++++++- .../export/SparkExportContentForOpenAire.scala | 11 +++++++---- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala index 637362acf0..967834d694 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala @@ -5,7 +5,7 @@ import java.time.format.DateTimeFormatter import eu.dnetlib.dhp.common.PacePerson import eu.dnetlib.dhp.schema.action.AtomicAction -import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, StructuredProperty} +import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import eu.dnetlib.dhp.utils.DHPUtils import org.apache.commons.lang3.StringUtils @@ -99,6 +99,20 @@ object DLIToOAF { ) + def fixInstance(r:Publication) :Publication = { + val collectedFrom = r.getCollectedfrom.asScala.head + r.getInstance().asScala.foreach(i => i.setCollectedfrom(collectedFrom)) + r + } + + + def fixInstanceDataset(r:Dataset) :Dataset = { + val collectedFrom = r.getCollectedfrom.asScala.head + r.getInstance().asScala.foreach(i => i.setCollectedfrom(collectedFrom)) + r + } + + def toActionSet(item: Oaf): (String, String) = { val mapper = new ObjectMapper() diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala index edf951df4d..fd8f2d136d 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala @@ -1,7 +1,7 @@ package eu.dnetlib.dhp.`export` import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.oaf.{Instance, Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import org.apache.commons.io.IOUtils import org.apache.hadoop.io.Text @@ -166,10 +166,13 @@ object SparkExportContentForOpenAire { }).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationAS") - val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationAS").as[Relation].map(DLIToOAF.toActionSet) - val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/publicationAS").as[Publication].map(DLIToOAF.toActionSet) - val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/datasetAS").as[OafDataset].map(DLIToOAF.toActionSet) + spark.read.load(s"$workingPath/publicationAS").as[Publication].map(DLIToOAF.fixInstance).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationAS_fixed") + spark.read.load(s"$workingPath/datasetAS").as[OafDataset].map(DLIToOAF.fixInstanceDataset).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetAS_fixed") + + val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationAS").as[Relation].map(DLIToOAF.toActionSet) + val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/publicationAS_fixed").as[Publication].map(DLIToOAF.toActionSet) + val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/datasetAS_fixed").as[OafDataset].map(DLIToOAF.toActionSet) fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) } From 1d420eedb4135c72ecef00383bff7df5fe5c9aec Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 2 Jul 2020 12:37:43 +0200 Subject: [PATCH 2/3] added generation of EBI Dataset --- .../eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala | 89 +++++++++++++++++++ .../dhp/sx/ebi/SparkCreateEBIDataFrame.scala | 83 +++++++++++++++++ .../parser/AbstractScholexplorerParser.java | 13 +++ .../parser/DatasetScholexplorerParser.java | 46 +++++++++- .../PublicationScholexplorerParser.java | 1 + .../dnetlib/dhp/sx/ebi/ebi_to_df_params.json | 4 + .../dhp/sx/ebi/oozie_app/config-default.xml | 38 ++++++++ .../dnetlib/dhp/sx/ebi/oozie_app/workflow.xml | 51 +++++++++++ .../eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala | 22 +++++ 9 files changed, 346 insertions(+), 1 deletion(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala new file mode 100644 index 0000000000..41fcd26362 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala @@ -0,0 +1,89 @@ +package eu.dnetlib.dhp.sx.ebi +import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} +import org.apache.spark.sql.{Encoder, Encoders} +import org.apache.spark.sql.expressions.Aggregator + + + +object EBIAggregator { + + def getDatasetAggregator(): Aggregator[(String, OafDataset), OafDataset, OafDataset] = new Aggregator[(String, OafDataset), OafDataset, OafDataset]{ + + override def zero: OafDataset = new OafDataset() + + override def reduce(b: OafDataset, a: (String, OafDataset)): OafDataset = { + b.mergeFrom(a._2) + if (b.getId == null) + b.setId(a._2.getId) + b + } + + + override def merge(wx: OafDataset, wy: OafDataset): OafDataset = { + wx.mergeFrom(wy) + if(wx.getId == null && wy.getId.nonEmpty) + wx.setId(wy.getId) + wx + } + override def finish(reduction: OafDataset): OafDataset = reduction + + override def bufferEncoder: Encoder[OafDataset] = + Encoders.kryo(classOf[OafDataset]) + + override def outputEncoder: Encoder[OafDataset] = + Encoders.kryo(classOf[OafDataset]) + } + + + def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{ + + override def zero: Publication = new Publication() + + override def reduce(b: Publication, a: (String, Publication)): Publication = { + b.mergeFrom(a._2) + if (b.getId == null) + b.setId(a._2.getId) + b + } + + + override def merge(wx: Publication, wy: Publication): Publication = { + wx.mergeFrom(wy) + if(wx.getId == null && wy.getId.nonEmpty) + wx.setId(wy.getId) + wx + } + override def finish(reduction: Publication): Publication = reduction + + override def bufferEncoder: Encoder[Publication] = + Encoders.kryo(classOf[Publication]) + + override def outputEncoder: Encoder[Publication] = + Encoders.kryo(classOf[Publication]) + } + + + def getRelationAggregator(): Aggregator[(String, Relation), Relation, Relation] = new Aggregator[(String, Relation), Relation, Relation]{ + + override def zero: Relation = new Relation() + + override def reduce(b: Relation, a: (String, Relation)): Relation = { + a._2 + } + + + override def merge(a: Relation, b: Relation): Relation = { + if(b!= null) b else a + } + override def finish(reduction: Relation): Relation = reduction + + override def bufferEncoder: Encoder[Relation] = + Encoders.kryo(classOf[Relation]) + + override def outputEncoder: Encoder[Relation] = + Encoders.kryo(classOf[Relation]) + } + + + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala new file mode 100644 index 0000000000..c7cc3ed048 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala @@ -0,0 +1,83 @@ +package eu.dnetlib.dhp.sx.ebi + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} +import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser} +import eu.dnetlib.scholexplorer.relation.RelationMapper +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} +import org.slf4j.{Logger, LoggerFactory} +import scala.collection.JavaConverters._ + +object SparkCreateEBIDataFrame { + + + def main(args: Array[String]): Unit = { + val logger: Logger = LoggerFactory.getLogger(SparkCreateEBIDataFrame.getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(SparkCreateEBIDataFrame.getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sc = spark.sparkContext + + + val workingPath = parser.get("workingPath") + val relationMapper = RelationMapper.load + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) + implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) + implicit val pubEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) + implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) + + logger.info("Extract Publication and relation from publication_xml") + val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s => + { + new ObjectMapper().readValue(s, classOf[String]) + }).flatMap(s => { + val d = new PublicationScholexplorerParser + d.parseObject(s, relationMapper).asScala.iterator}) + + val mapper = new ObjectMapper() + mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) + spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf") + + logger.info("Extract Publication and relation from dataset_xml") + val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/_dataset_xml").map(s => + { + new ObjectMapper().readValue(s, classOf[String]) + }).flatMap(s => { + val d = new DatasetScholexplorerParser + d.parseObject(s, relationMapper).asScala.iterator}) + + spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf") + val dataset: Dataset[OafDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[OafDataset]).map(d => d.asInstanceOf[OafDataset]) + val publication: Dataset[Publication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Publication]).map(d => d.asInstanceOf[Publication]) + val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation]) + publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getPublicationAggregator().toColumn) + .map(p => p._2) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/publication") + + dataset.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datasetEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDatasetAggregator().toColumn) + .map(p => p._2) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset") + + relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getRelationAggregator().toColumn) + .map(p => p._2) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/relation") + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java index 0db2b26884..f0c7893239 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java @@ -150,6 +150,19 @@ public abstract class AbstractScholexplorerParser { return uk; } + + protected Qualifier generateQualifier(final String classId, final String className, final String schemeId, final String schemeName) { + final Qualifier q = new Qualifier(); + q.setClassid(classId); + q.setClassid(className); + q.setSchemeid(schemeId); + q.setSchemename(schemeName); + return q; + + + + } + protected void generateRelations( RelationMapper relationMapper, Result parsedObject, diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java index afba57bb8a..c5c6b1493f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java @@ -64,7 +64,6 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { currentDate.setQualifier(dateQualifier); parsedObject.setRelevantdate(Collections.singletonList(currentDate)); } - final String completionStatus = VtdUtilityParser .getSingleValue(ap, vn, "//*[local-name()='completionStatus']"); final String provisionMode = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='provisionMode']"); @@ -149,6 +148,42 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { inferPid(currentPid); parsedObject.setPid(Collections.singletonList(currentPid)); + + String resolvedURL = null; + + switch (currentPid.getQualifier().getClassname().toLowerCase()) { + case "uniprot": + resolvedURL ="https://www.uniprot.org/uniprot/"+currentPid.getValue(); + break; + case "ena": + if (StringUtils.isNotBlank(currentPid.getValue()) && currentPid.getValue().length() > 7) + resolvedURL ="https://www.ebi.ac.uk/ena/data/view/"+currentPid.getValue().substring(0,8); + break; + case "chembl": + resolvedURL ="https://www.ebi.ac.uk/chembl/compound_report_card/"+currentPid.getValue(); + break; + + case "ncbi-n": + resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue(); + break; + case "ncbi-p": + resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue(); + break; + case "genbank": + resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue(); + break; + case "pdb": + resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue(); + break; + case "url": + resolvedURL =currentPid.getValue(); + break; + } + + + + + final String sourceId = generateId( currentPid.getValue(), currentPid.getQualifier().getClassid(), "dataset"); parsedObject.setId(sourceId); @@ -251,6 +286,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { t -> { final StructuredProperty st = new StructuredProperty(); st.setValue(t); + st.setQualifier(generateQualifier( "main title","main title", "dnet:dataCite_title","dnet:dataCite_title")); return st; }) .collect(Collectors.toList())); @@ -282,6 +318,14 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { .collect(Collectors.toList())); } + + if(StringUtils.isNotBlank(resolvedURL)) { + Instance i = new Instance(); + i.setCollectedfrom(parsedObject.getCollectedfrom().get(0)); + i.setUrl(Collections.singletonList(resolvedURL)); + parsedObject.setInstance(Collections.singletonList(i)); + } + result.add(parsedObject); return result; } catch (Throwable e) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java index bf59a6f0ef..b66bfd161a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java @@ -202,6 +202,7 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser t -> { final StructuredProperty st = new StructuredProperty(); st.setValue(t); + st.setQualifier(generateQualifier( "main title","main title", "dnet:dataCite_title","dnet:dataCite_title")); return st; }) .collect(Collectors.toList())); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json new file mode 100644 index 0000000000..366f1426e4 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json @@ -0,0 +1,4 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml new file mode 100644 index 0000000000..cf617a84c4 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml @@ -0,0 +1,38 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + "com.cloudera.spark.lineage.NavigatorAppListener" + + + spark2SqlQueryExecutionListeners + "com.cloudera.spark.lineage.NavigatorQueryListener" + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml new file mode 100644 index 0000000000..b816696747 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml @@ -0,0 +1,51 @@ + + + + workingPath + the Working Path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + yarn-cluster + cluster + Create EBI DataSet + eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=1000 + ${sparkExtraOPT} + + --workingPath${workingPath} + --masteryarn-cluster + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala new file mode 100644 index 0000000000..979bf7e14c --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala @@ -0,0 +1,22 @@ +package eu.dnetlib.dhp.sx.ebi + +import org.junit.jupiter.api.Test + +class TestEBITODS { + + + @Test + def testEBI():Unit = { + + + + + + + + + } + + + +} From a7d39774812ab3ede6570e65ebe3cb902ca11667 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 10 Jul 2020 14:44:50 +0200 Subject: [PATCH 3/3] added generation of EBI Dataset --- dhp-schemas/pom.xml | 31 +++ .../dhp/schema/scholexplorer/OafUtils.scala | 90 +++++++++ dhp-workflows/dhp-graph-mapper/pom.xml | 38 ++++ .../dhp/sx/ebi/SparkAddLinkUpdates.scala | 138 +++++++++++++ .../sx/ebi/SparkCreateBaselineDataFrame.scala | 49 +++++ .../dhp/sx/ebi/SparkCreateEBIDataFrame.scala | 6 +- .../dnetlib/dhp/sx/ebi/model/PMArticle.java | 64 ++++++ .../eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java | 31 +++ .../dnetlib/dhp/sx/ebi/model/PMJournal.java | 53 +++++ .../dnetlib/dhp/sx/ebi/model/PMParser.scala | 92 +++++++++ .../parser/AbstractScholexplorerParser.java | 6 +- .../parser/DatasetScholexplorerParser.java | 30 ++- .../PublicationScholexplorerParser.java | 6 +- .../dhp/sx/ebi/oozie_app/config-default.xml | 46 ++++- .../dnetlib/dhp/sx/ebi/oozie_app/workflow.xml | 52 ++++- .../java/eu/dnetlib/dhp/sx/ebi/TestEBI.scala | 20 ++ .../eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala | 22 -- .../resources/eu/dnetlib/dhp/sx/ebi/rel1.json | 55 +++++ .../eu/dnetlib/dhp/sx/ebi/rel_multiple.json | 191 ++++++++++++++++++ .../java/eu/dnetlib/dhp/export/DLIToOAF.scala | 43 +--- .../oa/provision/PrepareRelationsJobTest.java | 29 ++- 21 files changed, 985 insertions(+), 107 deletions(-) create mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/OafUtils.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateBaselineDataFrame.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMArticle.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMJournal.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMParser.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBI.scala delete mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/ebi/rel1.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/ebi/rel_multiple.json diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml index 2e5652b430..b04d62dd29 100644 --- a/dhp-schemas/pom.xml +++ b/dhp-schemas/pom.xml @@ -14,6 +14,37 @@ This module contains common schema classes meant to be used across the dnet-hadoop submodules + + + + net.alchim31.maven + scala-maven-plugin + 4.0.1 + + + scala-compile-first + initialize + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + ${scala.version} + + + + + + diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/OafUtils.scala b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/OafUtils.scala new file mode 100644 index 0000000000..27eec77fa2 --- /dev/null +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/OafUtils.scala @@ -0,0 +1,90 @@ +package eu.dnetlib.dhp.schema.scholexplorer + +import eu.dnetlib.dhp.schema.oaf.{DataInfo, Field, KeyValue, Qualifier, StructuredProperty} + +object OafUtils { + + + + def generateKeyValue(key: String, value: String): KeyValue = { + val kv: KeyValue = new KeyValue() + kv.setKey(key) + kv.setValue(value) + kv.setDataInfo(generateDataInfo("0.9")) + kv + } + + + def generateDataInfo(trust: String = "0.9", invisibile: Boolean = false): DataInfo = { + val di = new DataInfo + di.setDeletedbyinference(false) + di.setInferred(false) + di.setInvisible(false) + di.setTrust(trust) + di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions")) + di + } + + def createQualifier(cls: String, sch: String): Qualifier = { + createQualifier(cls, cls, sch, sch) + } + + + def createQualifier(classId: String, className: String, schemeId: String, schemeName: String): Qualifier = { + val q: Qualifier = new Qualifier + q.setClassid(classId) + q.setClassname(className) + q.setSchemeid(schemeId) + q.setSchemename(schemeName) + q + } + + + def asField[T](value: T): Field[T] = { + val tmp = new Field[T] + tmp.setValue(value) + tmp + + + } + + def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String): StructuredProperty = { + val sp = new StructuredProperty + sp.setQualifier(createQualifier(classId,className, schemeId, schemeName)) + sp.setValue(value) + sp + + } + + + + def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String, dataInfo: DataInfo): StructuredProperty = { + val sp = new StructuredProperty + sp.setQualifier(createQualifier(classId,className, schemeId, schemeName)) + sp.setValue(value) + sp.setDataInfo(dataInfo) + sp + + } + + def createSP(value: String, classId: String, schemeId: String): StructuredProperty = { + val sp = new StructuredProperty + sp.setQualifier(createQualifier(classId, schemeId)) + sp.setValue(value) + sp + + } + + + + def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = { + val sp = new StructuredProperty + sp.setQualifier(createQualifier(classId, schemeId)) + sp.setValue(value) + sp.setDataInfo(dataInfo) + sp + + } + + +} diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml index 0439c2ba32..5ddcda3fa0 100644 --- a/dhp-workflows/dhp-graph-mapper/pom.xml +++ b/dhp-workflows/dhp-graph-mapper/pom.xml @@ -9,6 +9,37 @@ dhp-graph-mapper + + + + net.alchim31.maven + scala-maven-plugin + 4.0.1 + + + scala-compile-first + initialize + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + ${scala.version} + + + + + + @@ -61,6 +92,13 @@ org.postgresql postgresql + + org.json4s + json4s-jackson_2.11 + 3.5.3 + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala new file mode 100644 index 0000000000..897bbd5407 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala @@ -0,0 +1,138 @@ +package eu.dnetlib.dhp.sx.ebi +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{Instance, KeyValue, Oaf} +import eu.dnetlib.dhp.schema.scholexplorer.OafUtils.createQualifier +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIRelation, OafUtils, ProvenaceInfo} +import eu.dnetlib.dhp.utils.DHPUtils +import eu.dnetlib.scholexplorer.relation.RelationMapper +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql._ +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST.{JField, JObject, JString} +import org.json4s.jackson.JsonMethods.parse + +import scala.collection.JavaConverters._ + +object SparkAddLinkUpdates { + + val relationMapper = RelationMapper.load + + +case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:String, turl:String, title:String, publisher:String) {} + + + def generatePubmedDLICollectedFrom(): KeyValue = { + OafUtils.generateKeyValue("dli_________::europe_pmc__", "Europe PMC") + } + + + def ebiLinksToOaf(input:(String, String)):List[Oaf] = { + val pmid :String = input._1 + val input_json :String = input._2 + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input_json) + + + val targets:List[EBILinks] = for { + JObject(link) <- json \\ "Category" \\ "Link" + JField("PublicationDate", JString(pubdate)) <- link + JField("RelationshipType", JObject(relationshipType)) <- link + JField("Name", JString(relname)) <- relationshipType + JField("Target", JObject(target)) <- link + JField("Identifier", JObject(identifier)) <- target + JField("ID", JString(tpid)) <- identifier + JField("IDScheme", JString(tpidtype)) <- identifier + JField("IDURL", JString(turl)) <- identifier + JField("Title", JString(title)) <- target + JField("Publisher", JObject(pub)) <- target + JField("Name", JString(publisher)) <- pub + } yield EBILinks(relname, pubdate, tpid, tpidtype, turl,title, publisher) + + + + val dnetPublicationId = s"50|${DHPUtils.md5(s"$pmid::pmid")}" + + targets.flatMap(l => { + val relation = new DLIRelation + val inverseRelation = new DLIRelation + val targetDnetId = s"50|${DHPUtils.md5(s"${l.tpid.toLowerCase.trim}::${l.tpidType.toLowerCase.trim}")}" + val relInfo = relationMapper.get(l.relation.toLowerCase) + val relationSemantic = relInfo.getOriginal + val inverseRelationSemantic = relInfo.getInverse + + relation.setSource(dnetPublicationId) + relation.setTarget(targetDnetId) + relation.setRelClass("datacite") + relation.setRelType(relationSemantic) + relation.setCollectedfrom(List(generatePubmedDLICollectedFrom()).asJava) + + inverseRelation.setSource(targetDnetId) + inverseRelation.setTarget(dnetPublicationId) + inverseRelation.setRelClass("datacite") + inverseRelation.setRelType(inverseRelationSemantic) + inverseRelation.setCollectedfrom(List(generatePubmedDLICollectedFrom()).asJava) + + + + val d = new DLIDataset + d.setId(targetDnetId) + d.setDataInfo(OafUtils.generateDataInfo()) + d.setPid(List(OafUtils.createSP(l.tpid.toLowerCase.trim, l.tpidType.toLowerCase.trim, "dnet:pid_types")).asJava) + d.setCompletionStatus("complete") + val pi = new ProvenaceInfo + pi.setId("dli_________::europe_pmc__") + pi.setName( "Europe PMC") + pi.setCompletionStatus("complete") + pi.setCollectionMode("collected") + d.setDlicollectedfrom(List(pi).asJava) + d.setCollectedfrom(List(generatePubmedDLICollectedFrom()).asJava) + d.setPublisher(OafUtils.asField(l.publisher)) + d.setTitle(List(OafUtils.createSP(l.title, "main title", "dnet:dataCite_title")).asJava) + d.setDateofacceptance(OafUtils.asField(l.pubdate)) + val i = new Instance + i.setCollectedfrom(generatePubmedDLICollectedFrom()) + i.setDateofacceptance(d.getDateofacceptance) + i.setUrl(List(l.turl).asJava) + i.setInstancetype(createQualifier("0021", "Dataset", "dnet:publication_resource", "dnet:publication_resource")) + d.setInstance(List(i).asJava) + List(relation, inverseRelation, d) + }) + } + + + def main(args: Array[String]): Unit = { + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(SparkCreateEBIDataFrame.getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + + val workingPath = parser.get("workingPath") + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) + implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation]) + implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset]) + + val ds:Dataset[(String,String)] = spark.read.load(s"$workingPath/baseline_links_updates").as[(String,String)](Encoders.tuple(Encoders.STRING, Encoders.STRING)) + + ds.flatMap(l =>ebiLinksToOaf(l)).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_oaf") + + ds.filter(s => s.isInstanceOf) + + + + val oDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/baseline_links_updates_oaf").as[Oaf] + + oDataset.filter(p =>p.isInstanceOf[DLIRelation]).map(p => p.asInstanceOf[DLIRelation]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_relation") + oDataset.filter(p =>p.isInstanceOf[DLIDataset]).map(p => p.asInstanceOf[DLIDataset]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_dataset") + + + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateBaselineDataFrame.scala new file mode 100644 index 0000000000..77e03c9b3e --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateBaselineDataFrame.scala @@ -0,0 +1,49 @@ +package eu.dnetlib.dhp.sx.ebi + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal, PMParser} + + +import scala.io.Source +import scala.xml.pull.XMLEventReader + +object SparkCreateBaselineDataFrame { + + + def main(args: Array[String]): Unit = { + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(SparkCreateEBIDataFrame.getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sc = spark.sparkContext + + val workingPath = parser.get("workingPath") + + implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle]) + implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal]) + implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor]) + val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline",2000) + + val ds:Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i =>{ + val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes())) + new PMParser(xml) + + } )) + + ds.write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset") + + + + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala index c7cc3ed048..60857f0fcf 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala @@ -51,7 +51,7 @@ object SparkCreateEBIDataFrame { spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf") logger.info("Extract Publication and relation from dataset_xml") - val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/_dataset_xml").map(s => + val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s => { new ObjectMapper().readValue(s, classOf[String]) }).flatMap(s => { @@ -79,5 +79,9 @@ object SparkCreateEBIDataFrame { .agg(EBIAggregator.getRelationAggregator().toColumn) .map(p => p._2) .write.mode(SaveMode.Overwrite).save(s"$workingPath/relation") + + + + relations.map(r => (r.getSource, r.getTarget))(Encoders.tuple(Encoders.STRING,Encoders.STRING)) } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMArticle.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMArticle.java new file mode 100644 index 0000000000..75d4628e67 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMArticle.java @@ -0,0 +1,64 @@ + +package eu.dnetlib.dhp.sx.ebi.model; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class PMArticle implements Serializable { + + private String pmid; + private String date; + private PMJournal journal; + private String title; + private String description; + private List authors = new ArrayList<>(); + + public String getPmid() { + return pmid; + } + + public void setPmid(String pmid) { + this.pmid = pmid; + } + + public String getDate() { + return date; + } + + public void setDate(String date) { + this.date = date; + } + + public PMJournal getJournal() { + return journal; + } + + public void setJournal(PMJournal journal) { + this.journal = journal; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public List getAuthors() { + return authors; + } + + public void setAuthors(List authors) { + this.authors = authors; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java new file mode 100644 index 0000000000..4a21985424 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java @@ -0,0 +1,31 @@ + +package eu.dnetlib.dhp.sx.ebi.model; + +import java.io.Serializable; + +public class PMAuthor implements Serializable { + + private String lastName; + private String foreName; + + public String getLastName() { + return lastName; + } + + public void setLastName(String lastName) { + this.lastName = lastName; + } + + public String getForeName() { + return foreName; + } + + public void setForeName(String foreName) { + this.foreName = foreName; + } + + public String getFullName() { + return String.format("%s, %s", this.foreName, this.lastName); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMJournal.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMJournal.java new file mode 100644 index 0000000000..d4ff5a1580 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMJournal.java @@ -0,0 +1,53 @@ + +package eu.dnetlib.dhp.sx.ebi.model; + +import java.io.Serializable; + +public class PMJournal implements Serializable { + + private String issn; + private String volume; + private String issue; + private String date; + private String title; + + public String getIssn() { + return issn; + } + + public void setIssn(String issn) { + this.issn = issn; + } + + public String getVolume() { + return volume; + } + + public void setVolume(String volume) { + this.volume = volume; + } + + public String getIssue() { + return issue; + } + + public void setIssue(String issue) { + this.issue = issue; + } + + public String getDate() { + return date; + } + + public void setDate(String date) { + this.date = date; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMParser.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMParser.scala new file mode 100644 index 0000000000..903eba1347 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMParser.scala @@ -0,0 +1,92 @@ +package eu.dnetlib.dhp.sx.ebi.model +import scala.xml.pull.{EvElemEnd, EvElemStart, EvText, XMLEventReader} +class PMParser(xml:XMLEventReader) extends Iterator[PMArticle] { + + var currentArticle:PMArticle = generateNextArticle() + + override def hasNext: Boolean = currentArticle!= null + + override def next(): PMArticle = { + val tmp = currentArticle + currentArticle = generateNextArticle() + tmp + } + + + def generateNextArticle():PMArticle = { + + var currentAuthor: PMAuthor = null + var currentJournal: PMJournal = null + var currNode: String = null + var currentYear = "0" + var currentMonth = "01" + var currentDay = "01" + + while (xml.hasNext) { + xml.next match { + case EvElemStart(_, label, _, _) => + currNode = label + label match { + case "PubmedArticle" => currentArticle = new PMArticle + case "Author" => currentAuthor = new PMAuthor + case "Journal" => currentJournal = new PMJournal + case _ => + } + case EvElemEnd(_, label) => + label match { + case "PubmedArticle" => return currentArticle + case "Author" => currentArticle.getAuthors.add(currentAuthor) + case "Journal" => currentArticle.setJournal(currentJournal) + case "DateCompleted" => currentArticle.setDate(s"$currentYear-$currentMonth-$currentDay") + case "PubDate" => currentJournal.setDate(s"$currentYear-$currentMonth-$currentDay") + case _ => + } + case EvText(text) => + if (currNode!= null && text.trim.nonEmpty) + currNode match { + case "ArticleTitle" => { + if (currentArticle.getTitle==null) + currentArticle.setTitle(text.trim) + else + currentArticle.setTitle(currentArticle.getTitle + text.trim) + } + case "AbstractText" => { + if (currentArticle.getDescription==null) + currentArticle.setDescription(text.trim) + else + currentArticle.setDescription(currentArticle.getDescription + text.trim) + } + case "PMID" => currentArticle.setPmid(text.trim) + case "ISSN" => currentJournal.setIssn(text.trim) + case "Year" => currentYear = text.trim + case "Month" => currentMonth = text.trim + case "Day" => currentDay = text.trim + case "Volume" => currentJournal.setVolume( text.trim) + case "Issue" => currentJournal.setIssue (text.trim) + case "LastName" => { + if (currentAuthor != null) + currentAuthor.setLastName(text.trim) + + } + case "ForeName" => if (currentAuthor != null) + currentAuthor.setForeName(text.trim) + case "Title" => + if (currentJournal.getTitle==null) + currentJournal.setTitle(text.trim) + else + currentJournal.setTitle(currentJournal.getTitle + text.trim) + case _ => + + } + case _ => + } + + } + null + } +} + + + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java index f0c7893239..75f28c1299 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java @@ -150,8 +150,8 @@ public abstract class AbstractScholexplorerParser { return uk; } - - protected Qualifier generateQualifier(final String classId, final String className, final String schemeId, final String schemeName) { + protected Qualifier generateQualifier(final String classId, final String className, final String schemeId, + final String schemeName) { final Qualifier q = new Qualifier(); q.setClassid(classId); q.setClassid(className); @@ -159,8 +159,6 @@ public abstract class AbstractScholexplorerParser { q.setSchemename(schemeName); return q; - - } protected void generateRelations( diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java index c5c6b1493f..60371fa536 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java @@ -148,42 +148,37 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { inferPid(currentPid); parsedObject.setPid(Collections.singletonList(currentPid)); - String resolvedURL = null; switch (currentPid.getQualifier().getClassname().toLowerCase()) { case "uniprot": - resolvedURL ="https://www.uniprot.org/uniprot/"+currentPid.getValue(); + resolvedURL = "https://www.uniprot.org/uniprot/" + currentPid.getValue(); break; case "ena": if (StringUtils.isNotBlank(currentPid.getValue()) && currentPid.getValue().length() > 7) - resolvedURL ="https://www.ebi.ac.uk/ena/data/view/"+currentPid.getValue().substring(0,8); + resolvedURL = "https://www.ebi.ac.uk/ena/data/view/" + currentPid.getValue().substring(0, 8); break; case "chembl": - resolvedURL ="https://www.ebi.ac.uk/chembl/compound_report_card/"+currentPid.getValue(); + resolvedURL = "https://www.ebi.ac.uk/chembl/compound_report_card/" + currentPid.getValue(); break; case "ncbi-n": - resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue(); + resolvedURL = "https://www.ncbi.nlm.nih.gov/nuccore/" + currentPid.getValue(); break; case "ncbi-p": - resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue(); + resolvedURL = "https://www.ncbi.nlm.nih.gov/nuccore/" + currentPid.getValue(); break; case "genbank": - resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue(); + resolvedURL = "https://www.ncbi.nlm.nih.gov/nuccore/" + currentPid.getValue(); break; case "pdb": - resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue(); + resolvedURL = "https://www.ncbi.nlm.nih.gov/nuccore/" + currentPid.getValue(); break; case "url": - resolvedURL =currentPid.getValue(); + resolvedURL = currentPid.getValue(); break; } - - - - final String sourceId = generateId( currentPid.getValue(), currentPid.getQualifier().getClassid(), "dataset"); parsedObject.setId(sourceId); @@ -286,7 +281,11 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { t -> { final StructuredProperty st = new StructuredProperty(); st.setValue(t); - st.setQualifier(generateQualifier( "main title","main title", "dnet:dataCite_title","dnet:dataCite_title")); + st + .setQualifier( + generateQualifier( + "main title", "main title", "dnet:dataCite_title", + "dnet:dataCite_title")); return st; }) .collect(Collectors.toList())); @@ -318,8 +317,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { .collect(Collectors.toList())); } - - if(StringUtils.isNotBlank(resolvedURL)) { + if (StringUtils.isNotBlank(resolvedURL)) { Instance i = new Instance(); i.setCollectedfrom(parsedObject.getCollectedfrom().get(0)); i.setUrl(Collections.singletonList(resolvedURL)); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java index b66bfd161a..8d76004dcb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java @@ -202,7 +202,11 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser t -> { final StructuredProperty st = new StructuredProperty(); st.setValue(t); - st.setQualifier(generateQualifier( "main title","main title", "dnet:dataCite_title","dnet:dataCite_title")); + st + .setQualifier( + generateQualifier( + "main title", "main title", "dnet:dataCite_title", + "dnet:dataCite_title")); return st; }) .collect(Collectors.toList())); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml index cf617a84c4..cac3cc2bb0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml @@ -1,4 +1,7 @@ + + + + + + + + jobTracker + yarn + + + nameNode + hdfs://hadoop-rm1.garr-pa1.d4science.org:8020 + + + hive_metastore_uris + thrift://hadoop-edge3.garr-pa1.d4science.org:9083 + + + spark2YarnHistoryServerAddress + http://hadoop-rm2.garr-pa1.d4science.org:19888 + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + spark2EventLogDir /user/spark/spark2ApplicationHistory diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml index b816696747..a5035c56c6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml @@ -18,7 +18,7 @@ - + @@ -26,13 +26,59 @@ + + + yarn-cluster + cluster + Create Baselnie DataSet + + eu.dnetlib.dhp.sx.ebi.SparkCreateBaselineDataFrame + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=1 + --driver-memory=${sparkDriverMemory} + --executor-cores=${sparkExecutorCores} + ${sparkExtraOPT} + + --workingPath${workingPath} + --masteryarn + + + + + + + + yarn-cluster + cluster + Create Baselnie DataSet + + eu.dnetlib.dhp.sx.ebi.SparkAddLinkUpdates + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=1 + --driver-memory=${sparkDriverMemory} + --executor-cores=${sparkExecutorCores} + ${sparkExtraOPT} + + --workingPath${workingPath} + --masteryarn + + + + + + yarn-cluster cluster Create EBI DataSet + eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame - dhp-doiboost-${projectVersion}.jar + dhp-graph-mapper-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} @@ -41,7 +87,7 @@ ${sparkExtraOPT} --workingPath${workingPath} - --masteryarn-cluster + --masteryarn diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBI.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBI.scala new file mode 100644 index 0000000000..fa390a21b1 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBI.scala @@ -0,0 +1,20 @@ +package eu.dnetlib.dhp.sx.ebi + +import org.junit.jupiter.api.Test + +class TestEBI { + + + + @Test + def testEBIData() = { + SparkAddLinkUpdates.main("-mt local[*] -w /home/sandro/Downloads".split(" ")) + + + + + + + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala deleted file mode 100644 index 979bf7e14c..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala +++ /dev/null @@ -1,22 +0,0 @@ -package eu.dnetlib.dhp.sx.ebi - -import org.junit.jupiter.api.Test - -class TestEBITODS { - - - @Test - def testEBI():Unit = { - - - - - - - - - } - - - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/ebi/rel1.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/ebi/rel1.json new file mode 100644 index 0000000000..038b84a499 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/ebi/rel1.json @@ -0,0 +1,55 @@ +{ + "Category": [ + { + "Section": [ + { + "Linklist": { + "Link": [ + { + "LinkProvider": { + "Name": "Europe PMC" + }, + "Target": { + "Publisher": { + "Name": "Altmetric" + }, + "ImageURL": "https://api.altmetric.com/v1/donut/58578459_64.png", + "Identifier": { + "ID": "https://www.altmetric.com/details/58578459", + "IDScheme": "URL", + "IDURL": "https://www.altmetric.com/details/58578459" + }, + "Type": { + "Name": "dataset" + }, + "Title": "Optical clumped isotope thermometry of carbon dioxide" + }, + "Source": { + "Identifier": { + "ID": "30886173", + "IDScheme": "PMID" + }, + "Type": { + "Name": "literature" + } + }, + "PublicationDate": "06-04-2019", + "RelationshipType": { + "Name": "IsReferencedBy" + }, + "ObtainedBy": "ext_links" + } + ] + }, + "ObtainedBy": "ext_links", + "SectionLinkCount": 1, + "Tags": [ + "altmetrics" + ] + } + ], + "CategoryLinkCount": 1, + "Name": "Altmetric" + } + ] +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/ebi/rel_multiple.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/ebi/rel_multiple.json new file mode 100644 index 0000000000..2ad55861e8 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/ebi/rel_multiple.json @@ -0,0 +1,191 @@ +{ + "version": "6.3", + "hitCount": 4, + "request": { + "id": "28818901", + "source": "MED" + }, + "dataLinkList": { + "Category": [ + { + "Name": "Nucleotide Sequences", + "CategoryLinkCount": 3, + "Section": [ + { + "ObtainedBy": "tm_accession", + "Tags": [ + "supporting_data" + ], + "SectionLinkCount": 1, + "Linklist": { + "Link": [ + { + "ObtainedBy": "tm_accession", + "PublicationDate": "27-02-2020", + "LinkProvider": { + "Name": "Europe PMC" + }, + "RelationshipType": { + "Name": "References" + }, + "Source": { + "Type": { + "Name": "literature" + }, + "Identifier": { + "ID": "28818901", + "IDScheme": "MED" + } + }, + "Target": { + "Type": { + "Name": "dataset" + }, + "Identifier": { + "ID": "AP008937", + "IDScheme": "ENA", + "IDURL": "http://identifiers.org/ena.embl/AP008937" + }, + "Title": "AP008937", + "Publisher": { + "Name": "Europe PMC" + } + }, + "Frequency": 1 + } + ] + } + }, + { + "ObtainedBy": "submission", + "Tags": [ + "related_data" + ], + "SectionLinkCount": 2, + "CollectionURL": "http://www.ebi.ac.uk/ena/data/search?query=28818901", + "Linklist": { + "Link": [ + { + "ObtainedBy": "submission", + "PublicationDate": "25-06-2018", + "LinkProvider": { + "Name": "Europe PMC" + }, + "RelationshipType": { + "Name": "IsReferencedBy" + }, + "Source": { + "Type": { + "Name": "literature" + }, + "Identifier": { + "ID": "28818901", + "IDScheme": "PMID" + } + }, + "Target": { + "Type": { + "Name": "dataset" + }, + "Identifier": { + "ID": "NIWV01000000", + "IDScheme": "ENA", + "IDURL": "http://www.ebi.ac.uk/ena/data/view/NIWV01000000" + }, + "Title": "Nucleotide sequences", + "Publisher": { + "Name": "ENA" + } + } + }, + { + "ObtainedBy": "submission", + "PublicationDate": "25-06-2018", + "LinkProvider": { + "Name": "Europe PMC" + }, + "RelationshipType": { + "Name": "IsReferencedBy" + }, + "Source": { + "Type": { + "Name": "literature" + }, + "Identifier": { + "ID": "28818901", + "IDScheme": "PMID" + } + }, + "Target": { + "Type": { + "Name": "dataset" + }, + "Identifier": { + "ID": "PRJNA390617", + "IDScheme": "ENA", + "IDURL": "http://www.ebi.ac.uk/ena/data/view/PRJNA390617" + }, + "Title": "Lactobacillus fermentum strain:BFE 6620", + "Publisher": { + "Name": "ENA" + } + } + } + ] + } + } + ] + }, + { + "Name": "BioStudies: supplemental material and supporting data", + "CategoryLinkCount": 1, + "Section": [ + { + "ObtainedBy": "ext_links", + "Tags": [ + "supporting_data" + ], + "SectionLinkCount": 1, + "Linklist": { + "Link": [ + { + "ObtainedBy": "ext_links", + "PublicationDate": "24-07-2018", + "LinkProvider": { + "Name": "Europe PMC" + }, + "RelationshipType": { + "Name": "IsReferencedBy" + }, + "Source": { + "Type": { + "Name": "literature" + }, + "Identifier": { + "ID": "28818901", + "IDScheme": "PMID" + } + }, + "Target": { + "Type": { + "Name": "dataset" + }, + "Identifier": { + "ID": "http://www.ebi.ac.uk/biostudies/studies/S-EPMC5604774?xr=true", + "IDScheme": "URL", + "IDURL": "http://www.ebi.ac.uk/biostudies/studies/S-EPMC5604774?xr=true" + }, + "Title": "Draft Genome Sequence of Lactobacillus fermentum BFE 6620, a Potential Starter Culture for African Vegetable Foods, Isolated from Fermented Cassava.", + "Publisher": { + "Name": "BioStudies: supplemental material and supporting data" + } + } + } + ] + } + } + ] + } + ] + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala index 967834d694..86b68fbd2a 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala @@ -5,11 +5,12 @@ import java.time.format.DateTimeFormatter import eu.dnetlib.dhp.common.PacePerson import eu.dnetlib.dhp.schema.action.AtomicAction -import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty} +import eu.dnetlib.dhp.schema.oaf.{Author, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import eu.dnetlib.dhp.utils.DHPUtils import org.apache.commons.lang3.StringUtils import org.codehaus.jackson.map.ObjectMapper +import eu.dnetlib.dhp.schema.scholexplorer.OafUtils._ import scala.collection.JavaConverters._ @@ -426,46 +427,6 @@ object DLIToOAF { } - def generateKeyValue(key: String, value: String): KeyValue = { - val kv: KeyValue = new KeyValue() - kv.setKey(key) - kv.setValue(value) - kv.setDataInfo(generateDataInfo("0.9")) - kv - } - def generateDataInfo(trust: String = "0.9", invisibile: Boolean = false): DataInfo = { - val di = new DataInfo - di.setDeletedbyinference(false) - di.setInferred(false) - di.setInvisible(false) - di.setTrust(trust) - di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions")) - di - } - - def createQualifier(cls: String, sch: String): Qualifier = { - createQualifier(cls, cls, sch, sch) - } - - - def createQualifier(classId: String, className: String, schemeId: String, schemeName: String): Qualifier = { - val q: Qualifier = new Qualifier - q.setClassid(classId) - q.setClassname(className) - q.setSchemeid(schemeId) - q.setSchemename(schemeName) - q - } - - - def asField[T](value: T): Field[T] = { - val tmp = new Field[T] - tmp.setValue(value) - tmp - - - } - } diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java index c16bbc6fba..528532eddd 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java @@ -1,9 +1,10 @@ package eu.dnetlib.dhp.oa.provision; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; -import eu.dnetlib.dhp.schema.oaf.Relation; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -19,9 +20,10 @@ import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; +import eu.dnetlib.dhp.schema.oaf.Relation; public class PrepareRelationsJobTest { @@ -74,14 +76,19 @@ public class PrepareRelationsJobTest { "-maxRelations", String.valueOf(maxRelations) }); - Dataset out = spark.read() - .parquet(testPath.toString()) - .as(Encoders.bean(Relation.class)) - .cache(); + Dataset out = spark + .read() + .parquet(testPath.toString()) + .as(Encoders.bean(Relation.class)) + .cache(); Assertions.assertEquals(10, out.count()); - Dataset freq = out.toDF().cube(SUBRELTYPE).count().filter((FilterFunction) value -> !value.isNullAt(0)); + Dataset freq = out + .toDF() + .cube(SUBRELTYPE) + .count() + .filter((FilterFunction) value -> !value.isNullAt(0)); long outcome = freq.filter(freq.col(SUBRELTYPE).equalTo(OUTCOME)).collectAsList().get(0).getAs("count"); long supplement = freq.filter(freq.col(SUBRELTYPE).equalTo(SUPPLEMENT)).collectAsList().get(0).getAs("count");