diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index ec7c14e90..c112212ae 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -124,6 +124,12 @@ eu.dnetlib cnr-rmi-api + + + log4j + log4j + + diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala index 784e15734..195858b6f 100644 --- a/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala @@ -141,11 +141,7 @@ object ScholixUtils extends Serializable { s.setRelationship(inverseRelationShip(scholix.getRelationship)) s.setSource(scholix.getTarget) s.setTarget(scholix.getSource) - s.setIdentifier( - DHPUtils.md5( - s"${s.getSource.getIdentifier}::${s.getRelationship.getName}::${s.getTarget.getIdentifier}" - ) - ) + updateId(s) s } @@ -184,6 +180,21 @@ object ScholixUtils extends Serializable { } else List() } + def updateId(scholix: Scholix): Scholix = { + scholix.setIdentifier( + generateIdentifier( + scholix.getSource.getDnetIdentifier, + scholix.getTarget.getDnetIdentifier, + scholix.getRelationship.getName + ) + ) + scholix + } + + def generateIdentifier(sourceId: String, targetId: String, relation: String): String = { + DHPUtils.md5(s"$sourceId::$relation::$targetId") + } + def generateCompleteScholix(scholix: Scholix, target: ScholixSummary): Scholix = { val s = new Scholix s.setPublicationDate(scholix.getPublicationDate) @@ -192,11 +203,7 @@ object ScholixUtils extends Serializable { s.setRelationship(scholix.getRelationship) s.setSource(scholix.getSource) s.setTarget(generateScholixResourceFromSummary(target)) - s.setIdentifier( - DHPUtils.md5( - s"${s.getSource.getIdentifier}::${s.getRelationship.getName}::${s.getTarget.getIdentifier}" - ) - ) + updateId(s) s } @@ -208,11 +215,7 @@ object ScholixUtils extends Serializable { s.setRelationship(scholix.getRelationship) s.setSource(scholix.getSource) s.setTarget(target) - s.setIdentifier( - DHPUtils.md5( - s"${s.getSource.getIdentifier}::${s.getRelationship.getName}::${s.getTarget.getIdentifier}" - ) - ) + updateId(s) s } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadProjects.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadProjects.java index 58d23d953..f652b3dba 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadProjects.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadProjects.java @@ -1,11 +1,14 @@ package eu.dnetlib.dhp.actionmanager.project.utils; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.project.PrepareProjects; -import eu.dnetlib.dhp.actionmanager.project.utils.model.Project; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -15,13 +18,12 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Serializable; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.project.PrepareProjects; +import eu.dnetlib.dhp.actionmanager.project.utils.model.Project; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; /** * @author miriam.baglioni diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PMParser.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PMParser.scala index c97b8ead2..153039059 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PMParser.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PMParser.scala @@ -26,8 +26,6 @@ class PMParser(stream: java.io.InputStream) extends Iterator[PMArticle] { tmp } - - def extractAttributes(attrs: MetaData, key: String): String = { val res = attrs.get(key) @@ -68,7 +66,7 @@ class PMParser(stream: java.io.InputStream) extends Iterator[PMArticle] { val next = reader.nextEvent() if (next.isStartElement) { - if(insideChar) { + if (insideChar) { if (sb.nonEmpty) println(s"got data ${sb.toString.trim}") insideChar = false @@ -76,8 +74,8 @@ class PMParser(stream: java.io.InputStream) extends Iterator[PMArticle] { val name = next.asStartElement().getName.getLocalPart println(s"Start Element $name") next.asStartElement().getAttributes.forEachRemaining(e => print(e.toString)) - - } else if (next.isEndElement) { + + } else if (next.isEndElement) { if (insideChar) { if (sb.nonEmpty) println(s"got data ${sb.toString.trim}") @@ -90,18 +88,16 @@ class PMParser(stream: java.io.InputStream) extends Iterator[PMArticle] { println("Condizione di uscita") } - } else if (next.isCharacters) { + } else if (next.isCharacters) { if (!insideChar) { insideChar = true sb.clear() } val d = next.asCharacters().getData if (d.trim.nonEmpty) - sb.append(d.trim) + sb.append(d.trim) } - - // next match { // case _ if (next.isStartElement) => // val name = next.asStartElement().getName.getLocalPart @@ -116,7 +112,7 @@ class PMParser(stream: java.io.InputStream) extends Iterator[PMArticle] { // // } - // + // // // reader.next match { // diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala index b537ba797..a6ca95073 100644 --- a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala @@ -5,7 +5,7 @@ import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest import eu.dnetlib.dhp.schema.oaf.utils.PidType import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Result} import eu.dnetlib.dhp.sx.bio.BioDBToOAF.ScholixResolved -import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMParser, PMSubject, PubMedToOaf, PubmedParser} +import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMParser, PMSubject, PubMedToOaf} import org.json4s.DefaultFormats import org.json4s.JsonAST.{JField, JObject, JString} import org.json4s.jackson.JsonMethods.parse @@ -19,7 +19,6 @@ import java.util.zip.GZIPInputStream import scala.collection.JavaConverters._ import scala.io.Source - @ExtendWith(Array(classOf[MockitoExtension])) class BioScholixTest extends AbstractVocabularyTest { @@ -50,8 +49,7 @@ class BioScholixTest extends AbstractVocabularyTest { def testEBIData() = { val inputXML = getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/pubmed.xml") - - // new PubmedParser(new GZIPInputStream(new FileInputStream("/Users/sandro/Downloads/pubmed23n1078.xml.gz"))) + // new PubmedParser(new GZIPInputStream(new FileInputStream("/Users/sandro/Downloads/pubmed23n1078.xml.gz"))) new PMParser(new GZIPInputStream(new FileInputStream("/Users/sandro/Downloads/pubmed23n1078.xml.gz"))) print("DONE") } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/config-default.xml index 6fb2a1253..7aef1367a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/config-default.xml @@ -5,6 +5,6 @@ oozie.action.sharelib.for.spark - spark2 + spark342 \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml index e46e59cc0..5e36f9d73 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml @@ -10,7 +10,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -78,9 +78,10 @@ --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=30000 - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.sql.shuffle.partitions=6000 + --conf spark.dynamicAllocation.enabled=true + --conf spark.shuffle.service.enabled=true + --conf spark.dynamicAllocation.minExecutors=100 --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala index 40b526a20..50c8c5de5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala @@ -64,7 +64,7 @@ object SparkCreateScholix { .as[ScholixSummary] .map(s => ScholixUtils.generateScholixResourceFromSummary(s)) - relationDS + val scholixSource: Dataset[Scholix] = relationDS .joinWith(summaryDS, relationDS("source").equalTo(summaryDS("dnetIdentifier")), "left") .map { input: (Relation, ScholixResource) => if (input._1 != null && input._2 != null) { @@ -76,14 +76,6 @@ object SparkCreateScholix { } else null }(scholixEncoder) .filter(r => r != null) - .write - .option("compression", "lz4") - .mode(SaveMode.Overwrite) - .save(s"$targetPath/scholix_from_source") - - val scholixSource: Dataset[Scholix] = spark.read - .load(s"$targetPath/scholix_from_source") - .as[Scholix] scholixSource .joinWith(summaryDS, scholixSource("identifier").equalTo(summaryDS("dnetIdentifier")), "left") @@ -105,17 +97,32 @@ object SparkCreateScholix { val scholix_o_v: Dataset[Scholix] = spark.read.load(s"$targetPath/scholix_one_verse").as[Scholix] - scholix_o_v - .flatMap(s => List(s, ScholixUtils.createInverseScholixRelation(s))) - .as[Scholix] - .map(s => (s.getIdentifier, s)) - .dropDuplicates("identifier") - .write - .option("compression", "lz4") - .mode(SaveMode.Overwrite) - .save(s"$targetPath/scholix") + def scholix_complete(s: Scholix): Boolean = { + if (s == null || s.getIdentifier == null) { + false + } else if (s.getSource == null || s.getTarget == null) { + false + } else if (s.getLinkprovider == null || s.getLinkprovider.isEmpty) + false + else + true + } - val scholix_final: Dataset[Scholix] = spark.read.load(s"$targetPath/scholix").as[Scholix] + val scholix_final: Dataset[Scholix] = scholix_o_v + .filter(s => scholix_complete(s)) + .groupByKey(s => + scala.Ordering.String + .min(s.getSource.getDnetIdentifier, s.getTarget.getDnetIdentifier) + .concat(s.getRelationship.getName) + .concat(scala.Ordering.String.max(s.getSource.getDnetIdentifier, s.getTarget.getDnetIdentifier)) + ) + .flatMapGroups((id, scholixes) => { + val s = scholixes.toList + if (s.size == 1) Seq(s(0), ScholixUtils.createInverseScholixRelation(s(0))) + else s + }) + + scholix_final.write.mode(SaveMode.Overwrite).save(s"$targetPath/scholix") val stats: Dataset[(String, String, Long)] = scholix_final .map(s => (s.getSource.getDnetIdentifier, s.getTarget.getObjectType))