diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
index 2e5652b43..b04d62dd2 100644
--- a/dhp-schemas/pom.xml
+++ b/dhp-schemas/pom.xml
@@ -14,6 +14,37 @@
This module contains common schema classes meant to be used across the dnet-hadoop submodules
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 4.0.1
+
+
+ scala-compile-first
+ initialize
+
+ add-source
+ compile
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+ ${scala.version}
+
+
+
+
+
+
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/OafUtils.scala b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/OafUtils.scala
new file mode 100644
index 000000000..27eec77fa
--- /dev/null
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/OafUtils.scala
@@ -0,0 +1,90 @@
+package eu.dnetlib.dhp.schema.scholexplorer
+
+import eu.dnetlib.dhp.schema.oaf.{DataInfo, Field, KeyValue, Qualifier, StructuredProperty}
+
+object OafUtils {
+
+
+
+ def generateKeyValue(key: String, value: String): KeyValue = {
+ val kv: KeyValue = new KeyValue()
+ kv.setKey(key)
+ kv.setValue(value)
+ kv.setDataInfo(generateDataInfo("0.9"))
+ kv
+ }
+
+
+ def generateDataInfo(trust: String = "0.9", invisibile: Boolean = false): DataInfo = {
+ val di = new DataInfo
+ di.setDeletedbyinference(false)
+ di.setInferred(false)
+ di.setInvisible(false)
+ di.setTrust(trust)
+ di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions"))
+ di
+ }
+
+ def createQualifier(cls: String, sch: String): Qualifier = {
+ createQualifier(cls, cls, sch, sch)
+ }
+
+
+ def createQualifier(classId: String, className: String, schemeId: String, schemeName: String): Qualifier = {
+ val q: Qualifier = new Qualifier
+ q.setClassid(classId)
+ q.setClassname(className)
+ q.setSchemeid(schemeId)
+ q.setSchemename(schemeName)
+ q
+ }
+
+
+ def asField[T](value: T): Field[T] = {
+ val tmp = new Field[T]
+ tmp.setValue(value)
+ tmp
+
+
+ }
+
+ def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String): StructuredProperty = {
+ val sp = new StructuredProperty
+ sp.setQualifier(createQualifier(classId,className, schemeId, schemeName))
+ sp.setValue(value)
+ sp
+
+ }
+
+
+
+ def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String, dataInfo: DataInfo): StructuredProperty = {
+ val sp = new StructuredProperty
+ sp.setQualifier(createQualifier(classId,className, schemeId, schemeName))
+ sp.setValue(value)
+ sp.setDataInfo(dataInfo)
+ sp
+
+ }
+
+ def createSP(value: String, classId: String, schemeId: String): StructuredProperty = {
+ val sp = new StructuredProperty
+ sp.setQualifier(createQualifier(classId, schemeId))
+ sp.setValue(value)
+ sp
+
+ }
+
+
+
+ def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = {
+ val sp = new StructuredProperty
+ sp.setQualifier(createQualifier(classId, schemeId))
+ sp.setValue(value)
+ sp.setDataInfo(dataInfo)
+ sp
+
+ }
+
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
index 4a58cfd36..0cb0d7801 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
@@ -34,7 +34,10 @@ public class EventFactory {
final MappedFields map = createMapFromResult(updateInfo);
final String eventId = calculateEventId(
- updateInfo.getTopicPath(), updateInfo.getTarget().getOpenaireId(), updateInfo.getHighlightValueAsString());
+ updateInfo.getTopicPath(), updateInfo.getTargetDs().getOpenaireId(), updateInfo
+ .getTarget()
+ .getOpenaireId(),
+ updateInfo.getHighlightValueAsString());
res.setEventId(eventId);
res.setProducerId(PRODUCER_ID);
@@ -93,11 +96,13 @@ public class EventFactory {
return map;
}
- private static String calculateEventId(final String topic, final String publicationId, final String value) {
+ private static String calculateEventId(final String topic, final String dsId, final String publicationId,
+ final String value) {
return "event-"
- + DigestUtils.md5Hex(topic).substring(0, 6) + "-"
- + DigestUtils.md5Hex(publicationId).substring(0, 8) + "-"
- + DigestUtils.md5Hex(value).substring(0, 8);
+ + DigestUtils.md5Hex(topic).substring(0, 4) + "-"
+ + DigestUtils.md5Hex(dsId).substring(0, 4) + "-"
+ + DigestUtils.md5Hex(publicationId).substring(0, 7) + "-"
+ + DigestUtils.md5Hex(value).substring(0, 5);
}
private static long calculateExpiryDate(final long now) {
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
index 2271a9e0e..80505ca1e 100644
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
@@ -64,157 +64,12 @@
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
-
- yarn
- cluster
- JoinStep0
- eu.dnetlib.dhp.broker.oa.JoinStep0Job
- dhp-broker-events-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCores}
- --executor-memory=${sparkExecutorMemory}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
-
- --graphPath${graphInputPath}
- --workingPath${workingPath}
-
-
-
-
-
-
-
- yarn
- cluster
- JoinStep1
- eu.dnetlib.dhp.broker.oa.JoinStep1Job
- dhp-broker-events-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCores}
- --executor-memory=${sparkExecutorMemory}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
-
- --graphPath${graphInputPath}
- --workingPath${workingPath}
-
-
-
-
-
-
-
- yarn
- cluster
- JoinStep2
- eu.dnetlib.dhp.broker.oa.JoinStep2Job
- dhp-broker-events-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCores}
- --executor-memory=${sparkExecutorMemory}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
-
- --graphPath${graphInputPath}
- --workingPath${workingPath}
-
-
-
-
-
-
-
- yarn
- cluster
- JoinStep3
- eu.dnetlib.dhp.broker.oa.JoinStep3Job
- dhp-broker-events-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCores}
- --executor-memory=${sparkExecutorMemory}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
-
- --graphPath${graphInputPath}
- --workingPath${workingPath}
-
-
-
-
-
-
-
- yarn
- cluster
- JoinStep4
- eu.dnetlib.dhp.broker.oa.JoinStep4Job
- dhp-broker-events-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCores}
- --executor-memory=${sparkExecutorMemory}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
-
- --graphPath${graphInputPath}
- --workingPath${workingPath}
-
-
-
-
-
-
-
- yarn
- cluster
- PrepareGroupsJob
- eu.dnetlib.dhp.broker.oa.PrepareGroupsJob
- dhp-broker-events-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCores}
- --executor-memory=${sparkExecutorMemory}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
-
- --graphPath${graphInputPath}
- --workingPath${workingPath}
-
-
-
-
-
yarn
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java
index 03e6674e4..ae5bf9252 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java
@@ -12,8 +12,6 @@ import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Joiner;
-
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml
index 0439c2ba3..5ddcda3fa 100644
--- a/dhp-workflows/dhp-graph-mapper/pom.xml
+++ b/dhp-workflows/dhp-graph-mapper/pom.xml
@@ -9,6 +9,37 @@
dhp-graph-mapper
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 4.0.1
+
+
+ scala-compile-first
+ initialize
+
+ add-source
+ compile
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+ ${scala.version}
+
+
+
+
+
+
@@ -61,6 +92,13 @@
org.postgresql
postgresql
+
+ org.json4s
+ json4s-jackson_2.11
+ 3.5.3
+
+
+
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
index c43ee29fe..3e042834f 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
@@ -266,7 +266,9 @@ public abstract class AbstractMdRecordToOafMapper {
r.setDataInfo(info);
r.setLastupdatetimestamp(lastUpdateTimestamp);
r.setId(createOpenaireId(50, doc.valueOf("//dri:objIdentifier"), false));
- r.setOriginalId(Arrays.asList(doc.valueOf("//dri:objIdentifier")));
+
+ r.setOriginalId(Arrays.asList(findOriginalId(doc)));
+
r.setCollectedfrom(Arrays.asList(collectedFrom));
r.setPid(prepareResultPids(doc, info));
r.setDateofcollection(doc.valueOf("//dr:dateOfCollection"));
@@ -429,6 +431,18 @@ public abstract class AbstractMdRecordToOafMapper {
return null;
}
+ private String findOriginalId(final Document doc) {
+ final Node n = doc.selectSingleNode("//*[local-name()='provenance']/*[local-name()='originDescription']");
+ if (n != null) {
+ final String id = n.valueOf("./*[local-name()='identifier']");
+ if (StringUtils.isNotBlank(id)) {
+ return id;
+ }
+ }
+ return doc.valueOf("//*[local-name()='header']/*[local-name()='identifier']");
+
+ }
+
protected Qualifier prepareQualifier(final Node node, final String xpath, final String schemeId) {
return prepareQualifier(node.valueOf(xpath).trim(), schemeId);
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala
new file mode 100644
index 000000000..41fcd2636
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala
@@ -0,0 +1,89 @@
+package eu.dnetlib.dhp.sx.ebi
+import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
+import org.apache.spark.sql.{Encoder, Encoders}
+import org.apache.spark.sql.expressions.Aggregator
+
+
+
+object EBIAggregator {
+
+ def getDatasetAggregator(): Aggregator[(String, OafDataset), OafDataset, OafDataset] = new Aggregator[(String, OafDataset), OafDataset, OafDataset]{
+
+ override def zero: OafDataset = new OafDataset()
+
+ override def reduce(b: OafDataset, a: (String, OafDataset)): OafDataset = {
+ b.mergeFrom(a._2)
+ if (b.getId == null)
+ b.setId(a._2.getId)
+ b
+ }
+
+
+ override def merge(wx: OafDataset, wy: OafDataset): OafDataset = {
+ wx.mergeFrom(wy)
+ if(wx.getId == null && wy.getId.nonEmpty)
+ wx.setId(wy.getId)
+ wx
+ }
+ override def finish(reduction: OafDataset): OafDataset = reduction
+
+ override def bufferEncoder: Encoder[OafDataset] =
+ Encoders.kryo(classOf[OafDataset])
+
+ override def outputEncoder: Encoder[OafDataset] =
+ Encoders.kryo(classOf[OafDataset])
+ }
+
+
+ def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{
+
+ override def zero: Publication = new Publication()
+
+ override def reduce(b: Publication, a: (String, Publication)): Publication = {
+ b.mergeFrom(a._2)
+ if (b.getId == null)
+ b.setId(a._2.getId)
+ b
+ }
+
+
+ override def merge(wx: Publication, wy: Publication): Publication = {
+ wx.mergeFrom(wy)
+ if(wx.getId == null && wy.getId.nonEmpty)
+ wx.setId(wy.getId)
+ wx
+ }
+ override def finish(reduction: Publication): Publication = reduction
+
+ override def bufferEncoder: Encoder[Publication] =
+ Encoders.kryo(classOf[Publication])
+
+ override def outputEncoder: Encoder[Publication] =
+ Encoders.kryo(classOf[Publication])
+ }
+
+
+ def getRelationAggregator(): Aggregator[(String, Relation), Relation, Relation] = new Aggregator[(String, Relation), Relation, Relation]{
+
+ override def zero: Relation = new Relation()
+
+ override def reduce(b: Relation, a: (String, Relation)): Relation = {
+ a._2
+ }
+
+
+ override def merge(a: Relation, b: Relation): Relation = {
+ if(b!= null) b else a
+ }
+ override def finish(reduction: Relation): Relation = reduction
+
+ override def bufferEncoder: Encoder[Relation] =
+ Encoders.kryo(classOf[Relation])
+
+ override def outputEncoder: Encoder[Relation] =
+ Encoders.kryo(classOf[Relation])
+ }
+
+
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala
new file mode 100644
index 000000000..897bbd540
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala
@@ -0,0 +1,138 @@
+package eu.dnetlib.dhp.sx.ebi
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.schema.oaf.{Instance, KeyValue, Oaf}
+import eu.dnetlib.dhp.schema.scholexplorer.OafUtils.createQualifier
+import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIRelation, OafUtils, ProvenaceInfo}
+import eu.dnetlib.dhp.utils.DHPUtils
+import eu.dnetlib.scholexplorer.relation.RelationMapper
+import org.apache.commons.io.IOUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.sql._
+import org.json4s
+import org.json4s.DefaultFormats
+import org.json4s.JsonAST.{JField, JObject, JString}
+import org.json4s.jackson.JsonMethods.parse
+
+import scala.collection.JavaConverters._
+
+object SparkAddLinkUpdates {
+
+ val relationMapper = RelationMapper.load
+
+
+case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:String, turl:String, title:String, publisher:String) {}
+
+
+ def generatePubmedDLICollectedFrom(): KeyValue = {
+ OafUtils.generateKeyValue("dli_________::europe_pmc__", "Europe PMC")
+ }
+
+
+ def ebiLinksToOaf(input:(String, String)):List[Oaf] = {
+ val pmid :String = input._1
+ val input_json :String = input._2
+ implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+ lazy val json: json4s.JValue = parse(input_json)
+
+
+ val targets:List[EBILinks] = for {
+ JObject(link) <- json \\ "Category" \\ "Link"
+ JField("PublicationDate", JString(pubdate)) <- link
+ JField("RelationshipType", JObject(relationshipType)) <- link
+ JField("Name", JString(relname)) <- relationshipType
+ JField("Target", JObject(target)) <- link
+ JField("Identifier", JObject(identifier)) <- target
+ JField("ID", JString(tpid)) <- identifier
+ JField("IDScheme", JString(tpidtype)) <- identifier
+ JField("IDURL", JString(turl)) <- identifier
+ JField("Title", JString(title)) <- target
+ JField("Publisher", JObject(pub)) <- target
+ JField("Name", JString(publisher)) <- pub
+ } yield EBILinks(relname, pubdate, tpid, tpidtype, turl,title, publisher)
+
+
+
+ val dnetPublicationId = s"50|${DHPUtils.md5(s"$pmid::pmid")}"
+
+ targets.flatMap(l => {
+ val relation = new DLIRelation
+ val inverseRelation = new DLIRelation
+ val targetDnetId = s"50|${DHPUtils.md5(s"${l.tpid.toLowerCase.trim}::${l.tpidType.toLowerCase.trim}")}"
+ val relInfo = relationMapper.get(l.relation.toLowerCase)
+ val relationSemantic = relInfo.getOriginal
+ val inverseRelationSemantic = relInfo.getInverse
+
+ relation.setSource(dnetPublicationId)
+ relation.setTarget(targetDnetId)
+ relation.setRelClass("datacite")
+ relation.setRelType(relationSemantic)
+ relation.setCollectedfrom(List(generatePubmedDLICollectedFrom()).asJava)
+
+ inverseRelation.setSource(targetDnetId)
+ inverseRelation.setTarget(dnetPublicationId)
+ inverseRelation.setRelClass("datacite")
+ inverseRelation.setRelType(inverseRelationSemantic)
+ inverseRelation.setCollectedfrom(List(generatePubmedDLICollectedFrom()).asJava)
+
+
+
+ val d = new DLIDataset
+ d.setId(targetDnetId)
+ d.setDataInfo(OafUtils.generateDataInfo())
+ d.setPid(List(OafUtils.createSP(l.tpid.toLowerCase.trim, l.tpidType.toLowerCase.trim, "dnet:pid_types")).asJava)
+ d.setCompletionStatus("complete")
+ val pi = new ProvenaceInfo
+ pi.setId("dli_________::europe_pmc__")
+ pi.setName( "Europe PMC")
+ pi.setCompletionStatus("complete")
+ pi.setCollectionMode("collected")
+ d.setDlicollectedfrom(List(pi).asJava)
+ d.setCollectedfrom(List(generatePubmedDLICollectedFrom()).asJava)
+ d.setPublisher(OafUtils.asField(l.publisher))
+ d.setTitle(List(OafUtils.createSP(l.title, "main title", "dnet:dataCite_title")).asJava)
+ d.setDateofacceptance(OafUtils.asField(l.pubdate))
+ val i = new Instance
+ i.setCollectedfrom(generatePubmedDLICollectedFrom())
+ i.setDateofacceptance(d.getDateofacceptance)
+ i.setUrl(List(l.turl).asJava)
+ i.setInstancetype(createQualifier("0021", "Dataset", "dnet:publication_resource", "dnet:publication_resource"))
+ d.setInstance(List(i).asJava)
+ List(relation, inverseRelation, d)
+ })
+ }
+
+
+ def main(args: Array[String]): Unit = {
+ val conf: SparkConf = new SparkConf()
+ val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
+ parser.parseArgument(args)
+ val spark: SparkSession =
+ SparkSession
+ .builder()
+ .config(conf)
+ .appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
+ .master(parser.get("master")).getOrCreate()
+
+
+ val workingPath = parser.get("workingPath")
+ implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
+ implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation])
+ implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
+
+ val ds:Dataset[(String,String)] = spark.read.load(s"$workingPath/baseline_links_updates").as[(String,String)](Encoders.tuple(Encoders.STRING, Encoders.STRING))
+
+ ds.flatMap(l =>ebiLinksToOaf(l)).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_oaf")
+
+ ds.filter(s => s.isInstanceOf)
+
+
+
+ val oDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/baseline_links_updates_oaf").as[Oaf]
+
+ oDataset.filter(p =>p.isInstanceOf[DLIRelation]).map(p => p.asInstanceOf[DLIRelation]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_relation")
+ oDataset.filter(p =>p.isInstanceOf[DLIDataset]).map(p => p.asInstanceOf[DLIDataset]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_dataset")
+
+
+
+ }
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateBaselineDataFrame.scala
new file mode 100644
index 000000000..77e03c9b3
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateBaselineDataFrame.scala
@@ -0,0 +1,49 @@
+package eu.dnetlib.dhp.sx.ebi
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import org.apache.commons.io.IOUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
+import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal, PMParser}
+
+
+import scala.io.Source
+import scala.xml.pull.XMLEventReader
+
+object SparkCreateBaselineDataFrame {
+
+
+ def main(args: Array[String]): Unit = {
+ val conf: SparkConf = new SparkConf()
+ val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
+ parser.parseArgument(args)
+ val spark: SparkSession =
+ SparkSession
+ .builder()
+ .config(conf)
+ .appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
+ .master(parser.get("master")).getOrCreate()
+
+ val sc = spark.sparkContext
+
+ val workingPath = parser.get("workingPath")
+
+ implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
+ implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
+ implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
+ val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline",2000)
+
+ val ds:Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i =>{
+ val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes()))
+ new PMParser(xml)
+
+ } ))
+
+ ds.write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset")
+
+
+
+
+ }
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala
new file mode 100644
index 000000000..60857f0fc
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala
@@ -0,0 +1,87 @@
+package eu.dnetlib.dhp.sx.ebi
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
+import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
+import eu.dnetlib.scholexplorer.relation.RelationMapper
+import org.apache.commons.io.IOUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
+import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
+import org.slf4j.{Logger, LoggerFactory}
+import scala.collection.JavaConverters._
+
+object SparkCreateEBIDataFrame {
+
+
+ def main(args: Array[String]): Unit = {
+ val logger: Logger = LoggerFactory.getLogger(SparkCreateEBIDataFrame.getClass)
+ val conf: SparkConf = new SparkConf()
+ val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
+ parser.parseArgument(args)
+ val spark: SparkSession =
+ SparkSession
+ .builder()
+ .config(conf)
+ .appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
+ .master(parser.get("master")).getOrCreate()
+
+ val sc = spark.sparkContext
+
+
+ val workingPath = parser.get("workingPath")
+ val relationMapper = RelationMapper.load
+
+ implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
+ implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset])
+ implicit val pubEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication])
+ implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
+
+ logger.info("Extract Publication and relation from publication_xml")
+ val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s =>
+ {
+ new ObjectMapper().readValue(s, classOf[String])
+ }).flatMap(s => {
+ val d = new PublicationScholexplorerParser
+ d.parseObject(s, relationMapper).asScala.iterator})
+
+ val mapper = new ObjectMapper()
+ mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
+ spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf")
+
+ logger.info("Extract Publication and relation from dataset_xml")
+ val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s =>
+ {
+ new ObjectMapper().readValue(s, classOf[String])
+ }).flatMap(s => {
+ val d = new DatasetScholexplorerParser
+ d.parseObject(s, relationMapper).asScala.iterator})
+
+ spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf")
+ val dataset: Dataset[OafDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[OafDataset]).map(d => d.asInstanceOf[OafDataset])
+ val publication: Dataset[Publication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Publication]).map(d => d.asInstanceOf[Publication])
+ val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation])
+ publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
+ .groupByKey(_._1)(Encoders.STRING)
+ .agg(EBIAggregator.getPublicationAggregator().toColumn)
+ .map(p => p._2)
+ .write.mode(SaveMode.Overwrite).save(s"$workingPath/publication")
+
+ dataset.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datasetEncoder))
+ .groupByKey(_._1)(Encoders.STRING)
+ .agg(EBIAggregator.getDatasetAggregator().toColumn)
+ .map(p => p._2)
+ .write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset")
+
+ relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
+ .groupByKey(_._1)(Encoders.STRING)
+ .agg(EBIAggregator.getRelationAggregator().toColumn)
+ .map(p => p._2)
+ .write.mode(SaveMode.Overwrite).save(s"$workingPath/relation")
+
+
+
+ relations.map(r => (r.getSource, r.getTarget))(Encoders.tuple(Encoders.STRING,Encoders.STRING))
+ }
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMArticle.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMArticle.java
new file mode 100644
index 000000000..75d4628e6
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMArticle.java
@@ -0,0 +1,64 @@
+
+package eu.dnetlib.dhp.sx.ebi.model;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class PMArticle implements Serializable {
+
+ private String pmid;
+ private String date;
+ private PMJournal journal;
+ private String title;
+ private String description;
+ private List authors = new ArrayList<>();
+
+ public String getPmid() {
+ return pmid;
+ }
+
+ public void setPmid(String pmid) {
+ this.pmid = pmid;
+ }
+
+ public String getDate() {
+ return date;
+ }
+
+ public void setDate(String date) {
+ this.date = date;
+ }
+
+ public PMJournal getJournal() {
+ return journal;
+ }
+
+ public void setJournal(PMJournal journal) {
+ this.journal = journal;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public List getAuthors() {
+ return authors;
+ }
+
+ public void setAuthors(List authors) {
+ this.authors = authors;
+ }
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java
new file mode 100644
index 000000000..4a2198542
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java
@@ -0,0 +1,31 @@
+
+package eu.dnetlib.dhp.sx.ebi.model;
+
+import java.io.Serializable;
+
+public class PMAuthor implements Serializable {
+
+ private String lastName;
+ private String foreName;
+
+ public String getLastName() {
+ return lastName;
+ }
+
+ public void setLastName(String lastName) {
+ this.lastName = lastName;
+ }
+
+ public String getForeName() {
+ return foreName;
+ }
+
+ public void setForeName(String foreName) {
+ this.foreName = foreName;
+ }
+
+ public String getFullName() {
+ return String.format("%s, %s", this.foreName, this.lastName);
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMJournal.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMJournal.java
new file mode 100644
index 000000000..d4ff5a158
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMJournal.java
@@ -0,0 +1,53 @@
+
+package eu.dnetlib.dhp.sx.ebi.model;
+
+import java.io.Serializable;
+
+public class PMJournal implements Serializable {
+
+ private String issn;
+ private String volume;
+ private String issue;
+ private String date;
+ private String title;
+
+ public String getIssn() {
+ return issn;
+ }
+
+ public void setIssn(String issn) {
+ this.issn = issn;
+ }
+
+ public String getVolume() {
+ return volume;
+ }
+
+ public void setVolume(String volume) {
+ this.volume = volume;
+ }
+
+ public String getIssue() {
+ return issue;
+ }
+
+ public void setIssue(String issue) {
+ this.issue = issue;
+ }
+
+ public String getDate() {
+ return date;
+ }
+
+ public void setDate(String date) {
+ this.date = date;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMParser.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMParser.scala
new file mode 100644
index 000000000..903eba134
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMParser.scala
@@ -0,0 +1,92 @@
+package eu.dnetlib.dhp.sx.ebi.model
+import scala.xml.pull.{EvElemEnd, EvElemStart, EvText, XMLEventReader}
+class PMParser(xml:XMLEventReader) extends Iterator[PMArticle] {
+
+ var currentArticle:PMArticle = generateNextArticle()
+
+ override def hasNext: Boolean = currentArticle!= null
+
+ override def next(): PMArticle = {
+ val tmp = currentArticle
+ currentArticle = generateNextArticle()
+ tmp
+ }
+
+
+ def generateNextArticle():PMArticle = {
+
+ var currentAuthor: PMAuthor = null
+ var currentJournal: PMJournal = null
+ var currNode: String = null
+ var currentYear = "0"
+ var currentMonth = "01"
+ var currentDay = "01"
+
+ while (xml.hasNext) {
+ xml.next match {
+ case EvElemStart(_, label, _, _) =>
+ currNode = label
+ label match {
+ case "PubmedArticle" => currentArticle = new PMArticle
+ case "Author" => currentAuthor = new PMAuthor
+ case "Journal" => currentJournal = new PMJournal
+ case _ =>
+ }
+ case EvElemEnd(_, label) =>
+ label match {
+ case "PubmedArticle" => return currentArticle
+ case "Author" => currentArticle.getAuthors.add(currentAuthor)
+ case "Journal" => currentArticle.setJournal(currentJournal)
+ case "DateCompleted" => currentArticle.setDate(s"$currentYear-$currentMonth-$currentDay")
+ case "PubDate" => currentJournal.setDate(s"$currentYear-$currentMonth-$currentDay")
+ case _ =>
+ }
+ case EvText(text) =>
+ if (currNode!= null && text.trim.nonEmpty)
+ currNode match {
+ case "ArticleTitle" => {
+ if (currentArticle.getTitle==null)
+ currentArticle.setTitle(text.trim)
+ else
+ currentArticle.setTitle(currentArticle.getTitle + text.trim)
+ }
+ case "AbstractText" => {
+ if (currentArticle.getDescription==null)
+ currentArticle.setDescription(text.trim)
+ else
+ currentArticle.setDescription(currentArticle.getDescription + text.trim)
+ }
+ case "PMID" => currentArticle.setPmid(text.trim)
+ case "ISSN" => currentJournal.setIssn(text.trim)
+ case "Year" => currentYear = text.trim
+ case "Month" => currentMonth = text.trim
+ case "Day" => currentDay = text.trim
+ case "Volume" => currentJournal.setVolume( text.trim)
+ case "Issue" => currentJournal.setIssue (text.trim)
+ case "LastName" => {
+ if (currentAuthor != null)
+ currentAuthor.setLastName(text.trim)
+
+ }
+ case "ForeName" => if (currentAuthor != null)
+ currentAuthor.setForeName(text.trim)
+ case "Title" =>
+ if (currentJournal.getTitle==null)
+ currentJournal.setTitle(text.trim)
+ else
+ currentJournal.setTitle(currentJournal.getTitle + text.trim)
+ case _ =>
+
+ }
+ case _ =>
+ }
+
+ }
+ null
+ }
+}
+
+
+
+
+
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java
index 0db2b2688..75f28c129 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/AbstractScholexplorerParser.java
@@ -150,6 +150,17 @@ public abstract class AbstractScholexplorerParser {
return uk;
}
+ protected Qualifier generateQualifier(final String classId, final String className, final String schemeId,
+ final String schemeName) {
+ final Qualifier q = new Qualifier();
+ q.setClassid(classId);
+ q.setClassid(className);
+ q.setSchemeid(schemeId);
+ q.setSchemename(schemeName);
+ return q;
+
+ }
+
protected void generateRelations(
RelationMapper relationMapper,
Result parsedObject,
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java
index afba57bb8..60371fa53 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java
@@ -64,7 +64,6 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
currentDate.setQualifier(dateQualifier);
parsedObject.setRelevantdate(Collections.singletonList(currentDate));
}
-
final String completionStatus = VtdUtilityParser
.getSingleValue(ap, vn, "//*[local-name()='completionStatus']");
final String provisionMode = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='provisionMode']");
@@ -149,6 +148,37 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
inferPid(currentPid);
parsedObject.setPid(Collections.singletonList(currentPid));
+ String resolvedURL = null;
+
+ switch (currentPid.getQualifier().getClassname().toLowerCase()) {
+ case "uniprot":
+ resolvedURL = "https://www.uniprot.org/uniprot/" + currentPid.getValue();
+ break;
+ case "ena":
+ if (StringUtils.isNotBlank(currentPid.getValue()) && currentPid.getValue().length() > 7)
+ resolvedURL = "https://www.ebi.ac.uk/ena/data/view/" + currentPid.getValue().substring(0, 8);
+ break;
+ case "chembl":
+ resolvedURL = "https://www.ebi.ac.uk/chembl/compound_report_card/" + currentPid.getValue();
+ break;
+
+ case "ncbi-n":
+ resolvedURL = "https://www.ncbi.nlm.nih.gov/nuccore/" + currentPid.getValue();
+ break;
+ case "ncbi-p":
+ resolvedURL = "https://www.ncbi.nlm.nih.gov/nuccore/" + currentPid.getValue();
+ break;
+ case "genbank":
+ resolvedURL = "https://www.ncbi.nlm.nih.gov/nuccore/" + currentPid.getValue();
+ break;
+ case "pdb":
+ resolvedURL = "https://www.ncbi.nlm.nih.gov/nuccore/" + currentPid.getValue();
+ break;
+ case "url":
+ resolvedURL = currentPid.getValue();
+ break;
+ }
+
final String sourceId = generateId(
currentPid.getValue(), currentPid.getQualifier().getClassid(), "dataset");
parsedObject.setId(sourceId);
@@ -251,6 +281,11 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
t -> {
final StructuredProperty st = new StructuredProperty();
st.setValue(t);
+ st
+ .setQualifier(
+ generateQualifier(
+ "main title", "main title", "dnet:dataCite_title",
+ "dnet:dataCite_title"));
return st;
})
.collect(Collectors.toList()));
@@ -282,6 +317,13 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
.collect(Collectors.toList()));
}
+ if (StringUtils.isNotBlank(resolvedURL)) {
+ Instance i = new Instance();
+ i.setCollectedfrom(parsedObject.getCollectedfrom().get(0));
+ i.setUrl(Collections.singletonList(resolvedURL));
+ parsedObject.setInstance(Collections.singletonList(i));
+ }
+
result.add(parsedObject);
return result;
} catch (Throwable e) {
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java
index bf59a6f0e..8d76004dc 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/PublicationScholexplorerParser.java
@@ -202,6 +202,11 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
t -> {
final StructuredProperty st = new StructuredProperty();
st.setValue(t);
+ st
+ .setQualifier(
+ generateQualifier(
+ "main title", "main title", "dnet:dataCite_title",
+ "dnet:dataCite_title"));
return st;
})
.collect(Collectors.toList()));
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json
new file mode 100644
index 000000000..366f1426e
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json
@@ -0,0 +1,4 @@
+[
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml
new file mode 100644
index 000000000..cac3cc2bb
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml
@@ -0,0 +1,68 @@
+
+
+
+
+
+
+
+
+ jobTracker
+ yarn
+
+
+ nameNode
+ hdfs://hadoop-rm1.garr-pa1.d4science.org:8020
+
+
+ hive_metastore_uris
+ thrift://hadoop-edge3.garr-pa1.d4science.org:9083
+
+
+ spark2YarnHistoryServerAddress
+ http://hadoop-rm2.garr-pa1.d4science.org:19888
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ spark2EventLogDir
+ /user/spark/spark2ApplicationHistory
+
+
+ spark2ExtraListeners
+ "com.cloudera.spark.lineage.NavigatorAppListener"
+
+
+ spark2SqlQueryExecutionListeners
+ "com.cloudera.spark.lineage.NavigatorQueryListener"
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml
new file mode 100644
index 000000000..a5035c56c
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml
@@ -0,0 +1,97 @@
+
+
+
+ workingPath
+ the Working Path
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+ yarn-cluster
+ cluster
+ Create Baselnie DataSet
+
+ eu.dnetlib.dhp.sx.ebi.SparkCreateBaselineDataFrame
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=1
+ --driver-memory=${sparkDriverMemory}
+ --executor-cores=${sparkExecutorCores}
+ ${sparkExtraOPT}
+
+ --workingPath${workingPath}
+ --masteryarn
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ Create Baselnie DataSet
+
+ eu.dnetlib.dhp.sx.ebi.SparkAddLinkUpdates
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=1
+ --driver-memory=${sparkDriverMemory}
+ --executor-cores=${sparkExecutorCores}
+ ${sparkExtraOPT}
+
+ --workingPath${workingPath}
+ --masteryarn
+
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ Create EBI DataSet
+
+ eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=1000
+ ${sparkExtraOPT}
+
+ --workingPath${workingPath}
+ --masteryarn
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java
index b1f0ecf0d..542fd00f3 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java
@@ -68,6 +68,10 @@ public class MappersTest {
final Relation r2 = (Relation) list.get(2);
assertValidId(p.getId());
+
+ assertTrue(p.getOriginalId().size() == 1);
+ assertEquals("10.3897/oneeco.2.e13718", p.getOriginalId().get(0));
+
assertValidId(p.getCollectedfrom().get(0).getKey());
assertTrue(StringUtils.isNotBlank(p.getTitle().get(0).getValue()));
assertFalse(p.getDataInfo().getInvisible());
@@ -169,6 +173,8 @@ public class MappersTest {
final Relation r2 = (Relation) list.get(2);
assertValidId(d.getId());
+ assertTrue(d.getOriginalId().size() == 1);
+ assertEquals("oai:zenodo.org:3234526", d.getOriginalId().get(0));
assertValidId(d.getCollectedfrom().get(0).getKey());
assertTrue(StringUtils.isNotBlank(d.getTitle().get(0).getValue()));
assertTrue(d.getAuthor().size() > 0);
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBI.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBI.scala
new file mode 100644
index 000000000..fa390a21b
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/ebi/TestEBI.scala
@@ -0,0 +1,20 @@
+package eu.dnetlib.dhp.sx.ebi
+
+import org.junit.jupiter.api.Test
+
+class TestEBI {
+
+
+
+ @Test
+ def testEBIData() = {
+ SparkAddLinkUpdates.main("-mt local[*] -w /home/sandro/Downloads".split(" "))
+
+
+
+
+
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/ebi/rel1.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/ebi/rel1.json
new file mode 100644
index 000000000..038b84a49
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/ebi/rel1.json
@@ -0,0 +1,55 @@
+{
+ "Category": [
+ {
+ "Section": [
+ {
+ "Linklist": {
+ "Link": [
+ {
+ "LinkProvider": {
+ "Name": "Europe PMC"
+ },
+ "Target": {
+ "Publisher": {
+ "Name": "Altmetric"
+ },
+ "ImageURL": "https://api.altmetric.com/v1/donut/58578459_64.png",
+ "Identifier": {
+ "ID": "https://www.altmetric.com/details/58578459",
+ "IDScheme": "URL",
+ "IDURL": "https://www.altmetric.com/details/58578459"
+ },
+ "Type": {
+ "Name": "dataset"
+ },
+ "Title": "Optical clumped isotope thermometry of carbon dioxide"
+ },
+ "Source": {
+ "Identifier": {
+ "ID": "30886173",
+ "IDScheme": "PMID"
+ },
+ "Type": {
+ "Name": "literature"
+ }
+ },
+ "PublicationDate": "06-04-2019",
+ "RelationshipType": {
+ "Name": "IsReferencedBy"
+ },
+ "ObtainedBy": "ext_links"
+ }
+ ]
+ },
+ "ObtainedBy": "ext_links",
+ "SectionLinkCount": 1,
+ "Tags": [
+ "altmetrics"
+ ]
+ }
+ ],
+ "CategoryLinkCount": 1,
+ "Name": "Altmetric"
+ }
+ ]
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/ebi/rel_multiple.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/ebi/rel_multiple.json
new file mode 100644
index 000000000..2ad55861e
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/sx/ebi/rel_multiple.json
@@ -0,0 +1,191 @@
+{
+ "version": "6.3",
+ "hitCount": 4,
+ "request": {
+ "id": "28818901",
+ "source": "MED"
+ },
+ "dataLinkList": {
+ "Category": [
+ {
+ "Name": "Nucleotide Sequences",
+ "CategoryLinkCount": 3,
+ "Section": [
+ {
+ "ObtainedBy": "tm_accession",
+ "Tags": [
+ "supporting_data"
+ ],
+ "SectionLinkCount": 1,
+ "Linklist": {
+ "Link": [
+ {
+ "ObtainedBy": "tm_accession",
+ "PublicationDate": "27-02-2020",
+ "LinkProvider": {
+ "Name": "Europe PMC"
+ },
+ "RelationshipType": {
+ "Name": "References"
+ },
+ "Source": {
+ "Type": {
+ "Name": "literature"
+ },
+ "Identifier": {
+ "ID": "28818901",
+ "IDScheme": "MED"
+ }
+ },
+ "Target": {
+ "Type": {
+ "Name": "dataset"
+ },
+ "Identifier": {
+ "ID": "AP008937",
+ "IDScheme": "ENA",
+ "IDURL": "http://identifiers.org/ena.embl/AP008937"
+ },
+ "Title": "AP008937",
+ "Publisher": {
+ "Name": "Europe PMC"
+ }
+ },
+ "Frequency": 1
+ }
+ ]
+ }
+ },
+ {
+ "ObtainedBy": "submission",
+ "Tags": [
+ "related_data"
+ ],
+ "SectionLinkCount": 2,
+ "CollectionURL": "http://www.ebi.ac.uk/ena/data/search?query=28818901",
+ "Linklist": {
+ "Link": [
+ {
+ "ObtainedBy": "submission",
+ "PublicationDate": "25-06-2018",
+ "LinkProvider": {
+ "Name": "Europe PMC"
+ },
+ "RelationshipType": {
+ "Name": "IsReferencedBy"
+ },
+ "Source": {
+ "Type": {
+ "Name": "literature"
+ },
+ "Identifier": {
+ "ID": "28818901",
+ "IDScheme": "PMID"
+ }
+ },
+ "Target": {
+ "Type": {
+ "Name": "dataset"
+ },
+ "Identifier": {
+ "ID": "NIWV01000000",
+ "IDScheme": "ENA",
+ "IDURL": "http://www.ebi.ac.uk/ena/data/view/NIWV01000000"
+ },
+ "Title": "Nucleotide sequences",
+ "Publisher": {
+ "Name": "ENA"
+ }
+ }
+ },
+ {
+ "ObtainedBy": "submission",
+ "PublicationDate": "25-06-2018",
+ "LinkProvider": {
+ "Name": "Europe PMC"
+ },
+ "RelationshipType": {
+ "Name": "IsReferencedBy"
+ },
+ "Source": {
+ "Type": {
+ "Name": "literature"
+ },
+ "Identifier": {
+ "ID": "28818901",
+ "IDScheme": "PMID"
+ }
+ },
+ "Target": {
+ "Type": {
+ "Name": "dataset"
+ },
+ "Identifier": {
+ "ID": "PRJNA390617",
+ "IDScheme": "ENA",
+ "IDURL": "http://www.ebi.ac.uk/ena/data/view/PRJNA390617"
+ },
+ "Title": "Lactobacillus fermentum strain:BFE 6620",
+ "Publisher": {
+ "Name": "ENA"
+ }
+ }
+ }
+ ]
+ }
+ }
+ ]
+ },
+ {
+ "Name": "BioStudies: supplemental material and supporting data",
+ "CategoryLinkCount": 1,
+ "Section": [
+ {
+ "ObtainedBy": "ext_links",
+ "Tags": [
+ "supporting_data"
+ ],
+ "SectionLinkCount": 1,
+ "Linklist": {
+ "Link": [
+ {
+ "ObtainedBy": "ext_links",
+ "PublicationDate": "24-07-2018",
+ "LinkProvider": {
+ "Name": "Europe PMC"
+ },
+ "RelationshipType": {
+ "Name": "IsReferencedBy"
+ },
+ "Source": {
+ "Type": {
+ "Name": "literature"
+ },
+ "Identifier": {
+ "ID": "28818901",
+ "IDScheme": "PMID"
+ }
+ },
+ "Target": {
+ "Type": {
+ "Name": "dataset"
+ },
+ "Identifier": {
+ "ID": "http://www.ebi.ac.uk/biostudies/studies/S-EPMC5604774?xr=true",
+ "IDScheme": "URL",
+ "IDURL": "http://www.ebi.ac.uk/biostudies/studies/S-EPMC5604774?xr=true"
+ },
+ "Title": "Draft Genome Sequence of Lactobacillus fermentum BFE 6620, a Potential Starter Culture for African Vegetable Foods, Isolated from Fermented Cassava.",
+ "Publisher": {
+ "Name": "BioStudies: supplemental material and supporting data"
+ }
+ }
+ }
+ ]
+ }
+ }
+ ]
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala
index 637362acf..86b68fbd2 100644
--- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala
+++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/DLIToOAF.scala
@@ -5,11 +5,12 @@ import java.time.format.DateTimeFormatter
import eu.dnetlib.dhp.common.PacePerson
import eu.dnetlib.dhp.schema.action.AtomicAction
-import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, StructuredProperty}
+import eu.dnetlib.dhp.schema.oaf.{Author, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.lang3.StringUtils
import org.codehaus.jackson.map.ObjectMapper
+import eu.dnetlib.dhp.schema.scholexplorer.OafUtils._
import scala.collection.JavaConverters._
@@ -99,6 +100,20 @@ object DLIToOAF {
)
+ def fixInstance(r:Publication) :Publication = {
+ val collectedFrom = r.getCollectedfrom.asScala.head
+ r.getInstance().asScala.foreach(i => i.setCollectedfrom(collectedFrom))
+ r
+ }
+
+
+ def fixInstanceDataset(r:Dataset) :Dataset = {
+ val collectedFrom = r.getCollectedfrom.asScala.head
+ r.getInstance().asScala.foreach(i => i.setCollectedfrom(collectedFrom))
+ r
+ }
+
+
def toActionSet(item: Oaf): (String, String) = {
val mapper = new ObjectMapper()
@@ -412,46 +427,6 @@ object DLIToOAF {
}
- def generateKeyValue(key: String, value: String): KeyValue = {
- val kv: KeyValue = new KeyValue()
- kv.setKey(key)
- kv.setValue(value)
- kv.setDataInfo(generateDataInfo("0.9"))
- kv
- }
- def generateDataInfo(trust: String = "0.9", invisibile: Boolean = false): DataInfo = {
- val di = new DataInfo
- di.setDeletedbyinference(false)
- di.setInferred(false)
- di.setInvisible(false)
- di.setTrust(trust)
- di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions"))
- di
- }
-
- def createQualifier(cls: String, sch: String): Qualifier = {
- createQualifier(cls, cls, sch, sch)
- }
-
-
- def createQualifier(classId: String, className: String, schemeId: String, schemeName: String): Qualifier = {
- val q: Qualifier = new Qualifier
- q.setClassid(classId)
- q.setClassname(className)
- q.setSchemeid(schemeId)
- q.setSchemename(schemeName)
- q
- }
-
-
- def asField[T](value: T): Field[T] = {
- val tmp = new Field[T]
- tmp.setValue(value)
- tmp
-
-
- }
-
}
diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala
index edf951df4..fd8f2d136 100644
--- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala
+++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/SparkExportContentForOpenAire.scala
@@ -1,7 +1,7 @@
package eu.dnetlib.dhp.`export`
import eu.dnetlib.dhp.application.ArgumentApplicationParser
-import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
+import eu.dnetlib.dhp.schema.oaf.{Instance, Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.Text
@@ -166,10 +166,13 @@ object SparkExportContentForOpenAire {
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationAS")
- val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationAS").as[Relation].map(DLIToOAF.toActionSet)
- val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/publicationAS").as[Publication].map(DLIToOAF.toActionSet)
- val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/datasetAS").as[OafDataset].map(DLIToOAF.toActionSet)
+ spark.read.load(s"$workingPath/publicationAS").as[Publication].map(DLIToOAF.fixInstance).write.mode(SaveMode.Overwrite).save(s"$workingPath/publicationAS_fixed")
+ spark.read.load(s"$workingPath/datasetAS").as[OafDataset].map(DLIToOAF.fixInstanceDataset).write.mode(SaveMode.Overwrite).save(s"$workingPath/datasetAS_fixed")
+
+ val fRels:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationAS").as[Relation].map(DLIToOAF.toActionSet)
+ val fpubs:Dataset[(String,String)] = spark.read.load(s"$workingPath/publicationAS_fixed").as[Publication].map(DLIToOAF.toActionSet)
+ val fdats:Dataset[(String,String)] = spark.read.load(s"$workingPath/datasetAS_fixed").as[OafDataset].map(DLIToOAF.toActionSet)
fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
}
diff --git a/pom.xml b/pom.xml
index 5ece9aaf6..cec3dd75a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -315,7 +315,7 @@
eu.dnetlib
dnet-pace-core
- 4.0.2
+ 4.0.4
eu.dnetlib