diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala
new file mode 100644
index 000000000..41fcd2636
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala
@@ -0,0 +1,89 @@
+package eu.dnetlib.dhp.sx.ebi
+import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
+import org.apache.spark.sql.{Encoder, Encoders}
+import org.apache.spark.sql.expressions.Aggregator
+
+
+
+object EBIAggregator {
+
+ def getDatasetAggregator(): Aggregator[(String, OafDataset), OafDataset, OafDataset] = new Aggregator[(String, OafDataset), OafDataset, OafDataset]{
+
+ override def zero: OafDataset = new OafDataset()
+
+ override def reduce(b: OafDataset, a: (String, OafDataset)): OafDataset = {
+ b.mergeFrom(a._2)
+ if (b.getId == null)
+ b.setId(a._2.getId)
+ b
+ }
+
+
+ override def merge(wx: OafDataset, wy: OafDataset): OafDataset = {
+ wx.mergeFrom(wy)
+ if(wx.getId == null && wy.getId.nonEmpty)
+ wx.setId(wy.getId)
+ wx
+ }
+ override def finish(reduction: OafDataset): OafDataset = reduction
+
+ override def bufferEncoder: Encoder[OafDataset] =
+ Encoders.kryo(classOf[OafDataset])
+
+ override def outputEncoder: Encoder[OafDataset] =
+ Encoders.kryo(classOf[OafDataset])
+ }
+
+
+ def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{
+
+ override def zero: Publication = new Publication()
+
+ override def reduce(b: Publication, a: (String, Publication)): Publication = {
+ b.mergeFrom(a._2)
+ if (b.getId == null)
+ b.setId(a._2.getId)
+ b
+ }
+
+
+ override def merge(wx: Publication, wy: Publication): Publication = {
+ wx.mergeFrom(wy)
+ if(wx.getId == null && wy.getId.nonEmpty)
+ wx.setId(wy.getId)
+ wx
+ }
+ override def finish(reduction: Publication): Publication = reduction
+
+ override def bufferEncoder: Encoder[Publication] =
+ Encoders.kryo(classOf[Publication])
+
+ override def outputEncoder: Encoder[Publication] =
+ Encoders.kryo(classOf[Publication])
+ }
+
+
+ def getRelationAggregator(): Aggregator[(String, Relation), Relation, Relation] = new Aggregator[(String, Relation), Relation, Relation]{
+
+ override def zero: Relation = new Relation()
+
+ override def reduce(b: Relation, a: (String, Relation)): Relation = {
+ a._2
+ }
+
+
+ override def merge(a: Relation, b: Relation): Relation = {
+ if(b!= null) b else a
+ }
+ override def finish(reduction: Relation): Relation = reduction
+
+ override def bufferEncoder: Encoder[Relation] =
+ Encoders.kryo(classOf[Relation])
+
+ override def outputEncoder: Encoder[Relation] =
+ Encoders.kryo(classOf[Relation])
+ }
+
+
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala
new file mode 100644
index 000000000..c7cc3ed04
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala
@@ -0,0 +1,83 @@
+package eu.dnetlib.dhp.sx.ebi
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
+import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
+import eu.dnetlib.scholexplorer.relation.RelationMapper
+import org.apache.commons.io.IOUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
+import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
+import org.slf4j.{Logger, LoggerFactory}
+import scala.collection.JavaConverters._
+
+object SparkCreateEBIDataFrame {
+
+
+ def main(args: Array[String]): Unit = {
+ val logger: Logger = LoggerFactory.getLogger(SparkCreateEBIDataFrame.getClass)
+ val conf: SparkConf = new SparkConf()
+ val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
+ parser.parseArgument(args)
+ val spark: SparkSession =
+ SparkSession
+ .builder()
+ .config(conf)
+ .appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
+ .master(parser.get("master")).getOrCreate()
+
+ val sc = spark.sparkContext
+
+
+ val workingPath = parser.get("workingPath")
+ val relationMapper = RelationMapper.load
+
+ implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
+ implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset])
+ implicit val pubEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication])
+ implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
+
+ logger.info("Extract Publication and relation from publication_xml")
+ val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s =>
+ {
+ new ObjectMapper().readValue(s, classOf[String])
+ }).flatMap(s => {
+ val d = new PublicationScholexplorerParser
+ d.parseObject(s, relationMapper).asScala.iterator})
+
+ val mapper = new ObjectMapper()
+ mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
+ spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf")
+
+ logger.info("Extract Publication and relation from dataset_xml")
+ val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/_dataset_xml").map(s =>
+ {
+ new ObjectMapper().readValue(s, classOf[String])
+ }).flatMap(s => {
+ val d = new DatasetScholexplorerParser
+ d.parseObject(s, relationMapper).asScala.iterator})
+
+ spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf")
+ val dataset: Dataset[OafDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[OafDataset]).map(d => d.asInstanceOf[OafDataset])
+ val publication: Dataset[Publication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Publication]).map(d => d.asInstanceOf[Publication])
+ val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation])
+ publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
+ .groupByKey(_._1)(Encoders.STRING)
+ .agg(EBIAggregator.getPublicationAggregator().toColumn)
+ .map(p => p._2)
+ .write.mode(SaveMode.Overwrite).save(s"$workingPath/publication")
+
+ dataset.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datasetEncoder))
+ .groupByKey(_._1)(Encoders.STRING)
+ .agg(EBIAggregator.getDatasetAggregator().toColumn)
+ .map(p => p._2)
+ .write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset")
+
+ relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
+ .groupByKey(_._1)(Encoders.STRING)
+ .agg(EBIAggregator.getRelationAggregator().toColumn)
+ .map(p => p._2)
+ .write.mode(SaveMode.Overwrite).save(s"$workingPath/relation")
+ }
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java
index 0db2b2688..f0c789323 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java
@@ -150,6 +150,19 @@ public abstract class AbstractScholexplorerParser {
return uk;
}
+
+ protected Qualifier generateQualifier(final String classId, final String className, final String schemeId, final String schemeName) {
+ final Qualifier q = new Qualifier();
+ q.setClassid(classId);
+ q.setClassid(className);
+ q.setSchemeid(schemeId);
+ q.setSchemename(schemeName);
+ return q;
+
+
+
+ }
+
protected void generateRelations(
RelationMapper relationMapper,
Result parsedObject,
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java
index afba57bb8..c5c6b1493 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java
@@ -64,7 +64,6 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
currentDate.setQualifier(dateQualifier);
parsedObject.setRelevantdate(Collections.singletonList(currentDate));
}
-
final String completionStatus = VtdUtilityParser
.getSingleValue(ap, vn, "//*[local-name()='completionStatus']");
final String provisionMode = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='provisionMode']");
@@ -149,6 +148,42 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
inferPid(currentPid);
parsedObject.setPid(Collections.singletonList(currentPid));
+
+ String resolvedURL = null;
+
+ switch (currentPid.getQualifier().getClassname().toLowerCase()) {
+ case "uniprot":
+ resolvedURL ="https://www.uniprot.org/uniprot/"+currentPid.getValue();
+ break;
+ case "ena":
+ if (StringUtils.isNotBlank(currentPid.getValue()) && currentPid.getValue().length() > 7)
+ resolvedURL ="https://www.ebi.ac.uk/ena/data/view/"+currentPid.getValue().substring(0,8);
+ break;
+ case "chembl":
+ resolvedURL ="https://www.ebi.ac.uk/chembl/compound_report_card/"+currentPid.getValue();
+ break;
+
+ case "ncbi-n":
+ resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue();
+ break;
+ case "ncbi-p":
+ resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue();
+ break;
+ case "genbank":
+ resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue();
+ break;
+ case "pdb":
+ resolvedURL ="https://www.ncbi.nlm.nih.gov/nuccore/"+currentPid.getValue();
+ break;
+ case "url":
+ resolvedURL =currentPid.getValue();
+ break;
+ }
+
+
+
+
+
final String sourceId = generateId(
currentPid.getValue(), currentPid.getQualifier().getClassid(), "dataset");
parsedObject.setId(sourceId);
@@ -251,6 +286,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
t -> {
final StructuredProperty st = new StructuredProperty();
st.setValue(t);
+ st.setQualifier(generateQualifier( "main title","main title", "dnet:dataCite_title","dnet:dataCite_title"));
return st;
})
.collect(Collectors.toList()));
@@ -282,6 +318,14 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
.collect(Collectors.toList()));
}
+
+ if(StringUtils.isNotBlank(resolvedURL)) {
+ Instance i = new Instance();
+ i.setCollectedfrom(parsedObject.getCollectedfrom().get(0));
+ i.setUrl(Collections.singletonList(resolvedURL));
+ parsedObject.setInstance(Collections.singletonList(i));
+ }
+
result.add(parsedObject);
return result;
} catch (Throwable e) {
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java
index bf59a6f0e..b66bfd161 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java
@@ -202,6 +202,7 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
t -> {
final StructuredProperty st = new StructuredProperty();
st.setValue(t);
+ st.setQualifier(generateQualifier( "main title","main title", "dnet:dataCite_title","dnet:dataCite_title"));
return st;
})
.collect(Collectors.toList()));
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json
new file mode 100644
index 000000000..366f1426e
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json
@@ -0,0 +1,4 @@
+[
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml
new file mode 100644
index 000000000..cf617a84c
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml
@@ -0,0 +1,38 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ hive_metastore_uris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ spark2YarnHistoryServerAddress
+ http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
+
+
+ spark2EventLogDir
+ /user/spark/spark2ApplicationHistory
+
+
+ spark2ExtraListeners
+ "com.cloudera.spark.lineage.NavigatorAppListener"
+
+
+ spark2SqlQueryExecutionListeners
+ "com.cloudera.spark.lineage.NavigatorQueryListener"
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml
new file mode 100644
index 000000000..b81669674
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml
@@ -0,0 +1,51 @@
+
+
+
+ workingPath
+ the Working Path
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+ yarn-cluster
+ cluster
+ Create EBI DataSet
+ eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame
+ dhp-doiboost-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=1000
+ ${sparkExtraOPT}
+
+ --workingPath${workingPath}
+ --masteryarn-cluster
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala
new file mode 100644
index 000000000..979bf7e14
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBITODS.scala
@@ -0,0 +1,22 @@
+package eu.dnetlib.dhp.sx.ebi
+
+import org.junit.jupiter.api.Test
+
+class TestEBITODS {
+
+
+ @Test
+ def testEBI():Unit = {
+
+
+
+
+
+
+
+
+ }
+
+
+
+}