added generation of EBI Dataset

This commit is contained in:
Sandro La Bruzzo 2020-07-02 12:37:43 +02:00
parent dab783b173
commit 1d420eedb4
9 changed files with 346 additions and 1 deletions

View File

@ -0,0 +1,89 @@
package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator
object EBIAggregator {
def getDatasetAggregator(): Aggregator[(String, OafDataset), OafDataset, OafDataset] = new Aggregator[(String, OafDataset), OafDataset, OafDataset]{
override def zero: OafDataset = new OafDataset()
override def reduce(b: OafDataset, a: (String, OafDataset)): OafDataset = {
b.mergeFrom(a._2)
if (b.getId == null)
b.setId(a._2.getId)
b
}
override def merge(wx: OafDataset, wy: OafDataset): OafDataset = {
wx.mergeFrom(wy)
if(wx.getId == null && wy.getId.nonEmpty)
wx.setId(wy.getId)
wx
}
override def finish(reduction: OafDataset): OafDataset = reduction
override def bufferEncoder: Encoder[OafDataset] =
Encoders.kryo(classOf[OafDataset])
override def outputEncoder: Encoder[OafDataset] =
Encoders.kryo(classOf[OafDataset])
}
def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{
override def zero: Publication = new Publication()
override def reduce(b: Publication, a: (String, Publication)): Publication = {
b.mergeFrom(a._2)
if (b.getId == null)
b.setId(a._2.getId)
b
}
override def merge(wx: Publication, wy: Publication): Publication = {
wx.mergeFrom(wy)
if(wx.getId == null && wy.getId.nonEmpty)
wx.setId(wy.getId)
wx
}
override def finish(reduction: Publication): Publication = reduction
override def bufferEncoder: Encoder[Publication] =
Encoders.kryo(classOf[Publication])
override def outputEncoder: Encoder[Publication] =
Encoders.kryo(classOf[Publication])
}
def getRelationAggregator(): Aggregator[(String, Relation), Relation, Relation] = new Aggregator[(String, Relation), Relation, Relation]{
override def zero: Relation = new Relation()
override def reduce(b: Relation, a: (String, Relation)): Relation = {
a._2
}
override def merge(a: Relation, b: Relation): Relation = {
if(b!= null) b else a
}
override def finish(reduction: Relation): Relation = reduction
override def bufferEncoder: Encoder[Relation] =
Encoders.kryo(classOf[Relation])
override def outputEncoder: Encoder[Relation] =
Encoders.kryo(classOf[Relation])
}
}

View File

@ -0,0 +1,83 @@
package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
import eu.dnetlib.scholexplorer.relation.RelationMapper
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
object SparkCreateEBIDataFrame {
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(SparkCreateEBIDataFrame.getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sc = spark.sparkContext
val workingPath = parser.get("workingPath")
val relationMapper = RelationMapper.load
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset])
implicit val pubEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication])
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
logger.info("Extract Publication and relation from publication_xml")
val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s =>
{
new ObjectMapper().readValue(s, classOf[String])
}).flatMap(s => {
val d = new PublicationScholexplorerParser
d.parseObject(s, relationMapper).asScala.iterator})
val mapper = new ObjectMapper()
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf")
logger.info("Extract Publication and relation from dataset_xml")
val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/_dataset_xml").map(s =>
{
new ObjectMapper().readValue(s, classOf[String])
}).flatMap(s => {
val d = new DatasetScholexplorerParser
d.parseObject(s, relationMapper).asScala.iterator})
spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf")
val dataset: Dataset[OafDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[OafDataset]).map(d => d.asInstanceOf[OafDataset])
val publication: Dataset[Publication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Publication]).map(d => d.asInstanceOf[Publication])
val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation])
publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getPublicationAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/publication")
dataset.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datasetEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDatasetAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset")
relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getRelationAggregator().toColumn)
.map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/relation")
}
}

View File

@ -150,6 +150,19 @@ public abstract class AbstractScholexplorerParser {
return uk; return uk;
} }
protected Qualifier generateQualifier(final String classId, final String className, final String schemeId, final String schemeName) {
final Qualifier q = new Qualifier();
q.setClassid(classId);
q.setClassid(className);
q.setSchemeid(schemeId);
q.setSchemename(schemeName);
return q;
}
protected void generateRelations( protected void generateRelations(
RelationMapper relationMapper, RelationMapper relationMapper,
Result parsedObject, Result parsedObject,

View File

@ -64,7 +64,6 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
currentDate.setQualifier(dateQualifier); currentDate.setQualifier(dateQualifier);
parsedObject.setRelevantdate(Collections.singletonList(currentDate)); parsedObject.setRelevantdate(Collections.singletonList(currentDate));
} }
final String completionStatus = VtdUtilityParser final String completionStatus = VtdUtilityParser
.getSingleValue(ap, vn, "//*[local-name()='completionStatus']"); .getSingleValue(ap, vn, "//*[local-name()='completionStatus']");
final String provisionMode = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='provisionMode']"); final String provisionMode = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='provisionMode']");
@ -149,6 +148,42 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
inferPid(currentPid); inferPid(currentPid);
parsedObject.setPid(Collections.singletonList(currentPid)); parsedObject.setPid(Collections.singletonList(currentPid));
String resolvedURL = null;
switch (currentPid.getQualifier().getClassname().toLowerCase()) {
case "uniprot":
resolvedURL ="https://www.uniprot.org/uniprot/"+currentPid.getValue();
break;
case "ena":
if (StringUtils.isNotBlank(currentPid.getValue()) && currentPid.getValue().length() > 7)
resolvedURL ="https://www.ebi.ac.uk/ena/data/view/"+currentPid.getValue().substring(0,8);
break;
case "chembl":
resolvedURL ="https://www.ebi.ac.uk/chembl/compound_report_card/"+currentPid.getValue();
break;
case "ncbi-n":
resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue();
break;
case "ncbi-p":
resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue();
break;
case "genbank":
resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue();
break;
case "pdb":
resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue();
break;
case "url":
resolvedURL =currentPid.getValue();
break;
}
final String sourceId = generateId( final String sourceId = generateId(
currentPid.getValue(), currentPid.getQualifier().getClassid(), "dataset"); currentPid.getValue(), currentPid.getQualifier().getClassid(), "dataset");
parsedObject.setId(sourceId); parsedObject.setId(sourceId);
@ -251,6 +286,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
t -> { t -> {
final StructuredProperty st = new StructuredProperty(); final StructuredProperty st = new StructuredProperty();
st.setValue(t); st.setValue(t);
st.setQualifier(generateQualifier( "main title","main title", "dnet:dataCite_title","dnet:dataCite_title"));
return st; return st;
}) })
.collect(Collectors.toList())); .collect(Collectors.toList()));
@ -282,6 +318,14 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
if(StringUtils.isNotBlank(resolvedURL)) {
Instance i = new Instance();
i.setCollectedfrom(parsedObject.getCollectedfrom().get(0));
i.setUrl(Collections.singletonList(resolvedURL));
parsedObject.setInstance(Collections.singletonList(i));
}
result.add(parsedObject); result.add(parsedObject);
return result; return result;
} catch (Throwable e) { } catch (Throwable e) {

View File

@ -202,6 +202,7 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
t -> { t -> {
final StructuredProperty st = new StructuredProperty(); final StructuredProperty st = new StructuredProperty();
st.setValue(t); st.setValue(t);
st.setQualifier(generateQualifier( "main title","main title", "dnet:dataCite_title","dnet:dataCite_title"));
return st; return st;
}) })
.collect(Collectors.toList())); .collect(Collectors.toList()));

View File

@ -0,0 +1,4 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}
]

View File

@ -0,0 +1,38 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
</configuration>

View File

@ -0,0 +1,51 @@
<workflow-app name="Create EBI Dataset" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingPath</name>
<description>the Working Path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="CreateEBIDataSet"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="CreateEBIDataSet">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create EBI DataSet</name>
<class>eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=1000
${sparkExtraOPT}
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,22 @@
package eu.dnetlib.dhp.sx.ebi
import org.junit.jupiter.api.Test
class TestEBITODS {
@Test
def testEBI():Unit = {
}
}