From 7d29b61c629f34b04882af65dbb83364c9c2257b Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 28 May 2020 09:57:46 +0200 Subject: [PATCH] code refactor --- .../eu/dnetlib/dhp/schema/oaf/Result.java | 13 +- .../dhp/oa/dedup/EntityMergerTest.java | 4 +- dhp-workflows/dhp-doiboost/pom.xml | 5 +- .../SparkGenerateDOIBoostActionSet.scala | 77 +++++------ .../doiboost/crossref/Crossref2Oaf.scala | 30 ++--- .../doiboost/crossref/CrossrefImporter.java | 14 +- .../crossref/SparkMapDumpIntoOAF.scala | 62 +++++++-- .../dnetlib/doiboost/mag/MagDataModel.scala | 89 ++++++++++++- .../doiboost/mag/SparkPreProcessMAG.scala | 126 ++++++------------ .../crossref/oozie_app/config-default.xml | 4 + .../doiboost/crossref/oozie_app/workflow.xml | 15 ++- .../dhp/doiboost/mag/oozie_app/workflow.xml | 6 +- .../crossref/CrossrefMappingTest.scala | 22 +++ .../doiboost/crossref/empty_title.json | 121 +++++++++++++++++ 14 files changed, 405 insertions(+), 183 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/empty_title.json diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java index 213a585a8..11fdaa4f9 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java @@ -244,20 +244,20 @@ public class Result extends OafEntity implements Serializable { subject = mergeLists(subject, r.getSubject()); - //merge title lists: main title with higher trust and distinct between the others + // merge title lists: main title with higher trust and distinct between the others StructuredProperty baseMainTitle = null; - if(title != null) { + if (title != null) { baseMainTitle = getMainTitle(title); title.remove(baseMainTitle); } StructuredProperty newMainTitle = null; - if(r.getTitle() != null) { + if (r.getTitle() != null) { newMainTitle = getMainTitle(r.getTitle()); r.getTitle().remove(newMainTitle); } - if (newMainTitle != null && compareTrust(this, r) < 0 ) + if (newMainTitle != null && compareTrust(this, r) < 0) baseMainTitle = newMainTitle; title = mergeLists(title, r.getTitle()); @@ -314,8 +314,9 @@ public class Result extends OafEntity implements Serializable { } private StructuredProperty getMainTitle(List titles) { - //need to check if the list of titles contains more than 1 main title? (in that case, we should chose which main title select in the list) - for (StructuredProperty title: titles) { + // need to check if the list of titles contains more than 1 main title? (in that case, we should chose which + // main title select in the list) + for (StructuredProperty title : titles) { if (title.getQualifier() != null && title.getQualifier().getClassid() != null) if (title.getQualifier().getClassid().equals("main title")) return title; diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java index f4b2c2252..b8ccb038d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java @@ -91,9 +91,9 @@ public class EntityMergerTest implements Serializable { assertEquals(pub_merged.getAuthor().size(), 9); assertEquals(AuthorMerger.countAuthorsPids(pub_merged.getAuthor()), 4); - //verify title + // verify title int count = 0; - for (StructuredProperty title: pub_merged.getTitle()){ + for (StructuredProperty title : pub_merged.getTitle()) { if (title.getQualifier().getClassid().equals("main title")) count++; } diff --git a/dhp-workflows/dhp-doiboost/pom.xml b/dhp-workflows/dhp-doiboost/pom.xml index 01ac60b3a..168442942 100644 --- a/dhp-workflows/dhp-doiboost/pom.xml +++ b/dhp-workflows/dhp-doiboost/pom.xml @@ -4,7 +4,7 @@ dhp-workflows eu.dnetlib.dhp - 1.2.1-SNAPSHOT + 1.2.2-SNAPSHOT 4.0.0 @@ -41,6 +41,9 @@ + + + org.apache.hadoop diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala index 6c7627d82..686d1438c 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala @@ -3,12 +3,9 @@ package eu.dnetlib.doiboost import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.action.AtomicAction import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} -import org.apache.hadoop.io.Text import org.apache.commons.io.IOUtils -import org.apache.hadoop.io.compress.GzipCodec -import org.apache.hadoop.mapred.{SequenceFileOutputFormat, TextOutputFormat} import org.apache.spark.SparkConf -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql.{ Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} object SparkGenerateDOIBoostActionSet { @@ -25,45 +22,43 @@ object SparkGenerateDOIBoostActionSet { .appName(getClass.getSimpleName) .master(parser.get("master")).getOrCreate() -// implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] -// implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset] -// implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation] -// -// implicit val mapEncoderAtomiAction: Encoder[AtomicAction[OafDataset]] = Encoders.kryo[AtomicAction[OafDataset]] -// -// -// -// val dbPublicationPath = parser.get("dbPublicationPath") -// val dbDatasetPath = parser.get("dbDatasetPath") -// val crossRefRelation = parser.get("crossRefRelation") -// val dbaffiliationRelationPath = parser.get("dbaffiliationRelationPath") - val workingDirPath = parser.get("targetPath") -// -// spark.read.load(dbDatasetPath).as[OafDataset] -// .map(d =>DoiBoostMappingUtil.fixResult(d)) -// .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) -// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") -// -// spark.read.load(dbPublicationPath).as[Publication] -// .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) -// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") -// -// spark.read.load(crossRefRelation).as[Relation] -// .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) -// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") -// -// spark.read.load(dbaffiliationRelationPath).as[Relation] -// .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) -// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") + implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] + implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset] + implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation] + + implicit val mapEncoderAtomiAction: Encoder[AtomicAction[OafDataset]] = Encoders.kryo[AtomicAction[OafDataset]] + + val dbPublicationPath = parser.get("dbPublicationPath") + val dbDatasetPath = parser.get("dbDatasetPath") + val crossRefRelation = parser.get("crossRefRelation") + val dbaffiliationRelationPath = parser.get("dbaffiliationRelationPath") + val workingDirPath = parser.get("targetPath") + + spark.read.load(dbDatasetPath).as[OafDataset] + .map(d =>DoiBoostMappingUtil.fixResult(d)) + .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") + + spark.read.load(dbPublicationPath).as[Publication] + .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") + + spark.read.load(crossRefRelation).as[Relation] + .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") + + spark.read.load(dbaffiliationRelationPath).as[Relation] + .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") - implicit val mapEncoderPub: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING) - - val d: Dataset[(String, String)] =spark.read.load(s"$workingDirPath/actionSet").as[(String,String)] - - SequenceFileOutputFormat - - d.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingDirPath/rawset_a8c2f90b-a3ae-4d6e-8187-47a437156e18_1590223414", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text,Text]], classOf[GzipCodec]) +// implicit val mapEncoderPub: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING) +// +// val d: Dataset[(String, String)] =spark.read.load(s"$workingDirPath/actionSet").as[(String,String)] +// +// +// +// d.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingDirPath/rawset_a8c2f90b-a3ae-4d6e-8187-47a437156e18_1590223414", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text,Text]], classOf[GzipCodec]) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index 690d937bc..0887e5811 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -1,9 +1,8 @@ package eu.dnetlib.doiboost.crossref -import java.util - import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.utils.DHPUtils +import eu.dnetlib.doiboost.DoiBoostMappingUtil._ import org.apache.commons.lang.StringUtils import org.json4s import org.json4s.DefaultFormats @@ -14,7 +13,6 @@ import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.matching.Regex -import eu.dnetlib.doiboost.DoiBoostMappingUtil._ case class mappingAffiliation(name: String) {} @@ -26,8 +24,6 @@ case class mappingFunder(name: String, DOI: Option[String], award: Option[List[S case object Crossref2Oaf { val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass) - - val mappingCrossrefType = Map( "book-section" -> "publication", "book" -> "publication", @@ -113,17 +109,18 @@ case object Crossref2Oaf { result.setPublisher(asField(publisher)) // TITLE - val mainTitles = for {JString(title) <- json \ "title"} yield createSP(title, "main title", "dnet:dataCite_title") - val originalTitles = for {JString(title) <- json \ "original-title"} yield createSP(title, "alternative title", "dnet:dataCite_title") - val shortTitles = for {JString(title) <- json \ "short-title"} yield createSP(title, "alternative title", "dnet:dataCite_title") - val subtitles = for {JString(title) <- json \ "subtitle"} yield createSP(title, "subtitle", "dnet:dataCite_title") + val mainTitles = for {JString(title) <- json \ "title" if title.nonEmpty} yield createSP(title, "main title", "dnet:dataCite_title") + val originalTitles = for {JString(title) <- json \ "original-title" if title.nonEmpty} yield createSP(title, "alternative title", "dnet:dataCite_title") + val shortTitles = for {JString(title) <- json \ "short-title" if title.nonEmpty} yield createSP(title, "alternative title", "dnet:dataCite_title") + val subtitles = for {JString(title) <- json \ "subtitle" if title.nonEmpty} yield createSP(title, "subtitle", "dnet:dataCite_title") result.setTitle((mainTitles ::: originalTitles ::: shortTitles ::: subtitles).asJava) // DESCRIPTION val descriptionList = for {JString(description) <- json \ "abstract"} yield asField(description) result.setDescription(descriptionList.asJava) + // Source - val sourceList = for {JString(source) <- json \ "source"} yield asField(source) + val sourceList = for {JString(source) <- json \ "source" if source.nonEmpty} yield asField(source) result.setSource(sourceList.asJava) //RELEVANT DATE Mapping @@ -142,7 +139,6 @@ case object Crossref2Oaf { } result.setRelevantdate(List(createdDate, postedDate, acceptedDate, publishedOnlineDate, publishedPrintDate).filter(p => p != null).asJava) - //Mapping Subject val subjectList:List[String] = (json \ "subject").extractOrElse[List[String]](List()) @@ -152,7 +148,7 @@ case object Crossref2Oaf { - //Mapping AUthor + //Mapping Author val authorList: List[mappingAuthor] = (json \ "author").extractOrElse[List[mappingAuthor]](List()) result.setAuthor(authorList.map(a => generateAuhtor(a.given.orNull, a.family, a.ORCID.orNull)).asJava) @@ -173,7 +169,6 @@ case object Crossref2Oaf { instance.setAccessright(createQualifier("Restricted", "dnet:access_modes")) - result.setInstance(List(instance).asJava) instance.setInstancetype(createQualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), "dnet:publication_resource", "dnet:publication_resource")) @@ -405,11 +400,8 @@ case object Crossref2Oaf { publication.setJournal(journal) } } - - } - def extractDate(dt: String, datePart: List[List[Int]]): String = { if (StringUtils.isNotBlank(dt)) return dt @@ -427,18 +419,12 @@ case object Crossref2Oaf { } def generateDate(dt: String, datePart: List[List[Int]], classId: String, schemeId: String): StructuredProperty = { - val dp = extractDate(dt, datePart) if (StringUtils.isNotBlank(dp)) return createSP(dp, classId, schemeId) null } - - - - - def generateItemFromType(objectType: String, objectSubType: String): Result = { if (mappingCrossrefType.contains(objectType)) { if (mappingCrossrefType(objectType).equalsIgnoreCase("publication")) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefImporter.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefImporter.java index 5e63ad847..f69a05da1 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefImporter.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefImporter.java @@ -27,20 +27,20 @@ public class CrossrefImporter { CrossrefImporter.class .getResourceAsStream( "/eu/dnetlib/dhp/doiboost/import_from_es.json"))); - Logger logger = LoggerFactory.getLogger(CrossrefImporter.class); + parser.parseArgument(args); final String hdfsuri = parser.get("namenode"); - logger.info("HDFS URI" + hdfsuri); + System.out.println("HDFS URI" + hdfsuri); Path hdfswritepath = new Path(parser.get("targetPath")); - logger.info("TargetPath: " + hdfsuri); + System.out.println("TargetPath: " + hdfsuri); final Long timestamp = StringUtils.isNotBlank(parser.get("timestamp")) ? Long.parseLong(parser.get("timestamp")) : -1; if (timestamp > 0) - logger.info("Timestamp added " + timestamp); + System.out.println("Timestamp added " + timestamp); // ====== Init HDFS File System Object Configuration conf = new Configuration(); @@ -70,11 +70,11 @@ public class CrossrefImporter { key.set(i++); value.set(client.next()); writer.append(key, value); - if (i % 1000000 == 0) { + if (i % 100000 == 0) { end = System.currentTimeMillis(); final float time = (end - start) / 1000.0F; - logger - .info( + System.out + .println( String.format("Imported %d records last 100000 imported in %f seconds", i, time)); start = System.currentTimeMillis(); } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala index d69415f4a..fac4c90b4 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala @@ -2,10 +2,11 @@ package eu.dnetlib.doiboost.crossref import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf -import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Result} +import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} import org.apache.commons.io.IOUtils import org.apache.hadoop.io.{IntWritable, Text} import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} @@ -28,9 +29,9 @@ object SparkMapDumpIntoOAF { .appName(SparkMapDumpIntoOAF.getClass.getSimpleName) .master(parser.get("master")).getOrCreate() - implicit val mapEncoderPubs: Encoder[Publication] = Encoders.bean(classOf[Publication]) - implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.bean(classOf[Relation]) - implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.bean(classOf[eu.dnetlib.dhp.schema.oaf.Dataset]) + implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] + implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.kryo[Relation] + implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset] val sc = spark.sparkContext val targetPath = parser.get("targetPath") @@ -40,19 +41,56 @@ object SparkMapDumpIntoOAF { .map(k => k._2.toString).map(CrossrefImporter.decompressBlob) .flatMap(k => Crossref2Oaf.convert(k)).saveAsObjectFile(s"${targetPath}/mixObject") - val inputRDD = sc.objectFile[Oaf](s"${targetPath}/mixObject").filter(p=> p!= null) - val pubs: Dataset[Publication] = spark.createDataset(inputRDD.filter(k => k != null && k.isInstanceOf[Publication]) - .map(k => k.asInstanceOf[Publication])) + val distinctPubs:RDD[Publication] = inputRDD.filter(k => k != null && k.isInstanceOf[Publication]) + .map(k => k.asInstanceOf[Publication]).map { p: Publication => Tuple2(p.getId, p) }.reduceByKey { case (p1: Publication, p2: Publication) => + var r = if (p1 == null) p2 else p1 + if (p1 != null && p2 != null) { + if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) { + if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp) + r = p2 + else + r = p1 + } else { + r = if (p1.getLastupdatetimestamp == null) p2 else p1 + } + } + r + }.map(_._2) + + val pubs:Dataset[Publication] = spark.createDataset(distinctPubs) pubs.write.mode(SaveMode.Overwrite).save(s"${targetPath}/publication") - val ds: Dataset[eu.dnetlib.dhp.schema.oaf.Dataset] = spark.createDataset(inputRDD.filter(k => k != null && k.isInstanceOf[eu.dnetlib.dhp.schema.oaf.Dataset]) - .map(k => k.asInstanceOf[eu.dnetlib.dhp.schema.oaf.Dataset])) - ds.write.mode(SaveMode.Overwrite).save(s"${targetPath}/dataset") - val rels: Dataset[Relation] = spark.createDataset(inputRDD.filter(k => k != null && k.isInstanceOf[Relation]) - .map(k => k.asInstanceOf[Relation])) + val distincDatasets:RDD[OafDataset] = inputRDD.filter(k => k != null && k.isInstanceOf[OafDataset]) + .map(k => k.asInstanceOf[OafDataset]).map(p => Tuple2(p.getId, p)).reduceByKey { case (p1: OafDataset, p2: OafDataset) => + var r = if (p1 == null) p2 else p1 + if (p1 != null && p2 != null) { + if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) { + if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp) + r = p2 + else + r = p1 + } else { + r = if (p1.getLastupdatetimestamp == null) p2 else p1 + } + } + r + }.map(_._2) + + spark.createDataset(distincDatasets).write.mode(SaveMode.Overwrite).save(s"${targetPath}/dataset") + + + + val distinctRels =inputRDD.filter(k => k != null && k.isInstanceOf[Relation]) + .map(k => k.asInstanceOf[Relation]).map(r=> (s"${r.getSource}::${r.getTarget}",r)) + .reduceByKey { case (p1: Relation, p2: Relation) => + if (p1 == null) p2 else p1 + }.map(_._2) + + val rels: Dataset[Relation] = spark.createDataset(distinctRels) + rels.write.mode(SaveMode.Overwrite).save(s"${targetPath}/relations") } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala index c8c725e07..6d0ac2c2d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -1,7 +1,8 @@ package eu.dnetlib.doiboost.mag -import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication} +import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication, StructuredProperty} +import eu.dnetlib.doiboost.DoiBoostMappingUtil import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse @@ -63,12 +64,98 @@ case object ConversionUtil { } + def mergePublication(a: Publication, b:Publication) : Publication = { + if ((a != null) && (b != null)) { + a.mergeFrom(b) + a + } else { + if (a == null) b else a + } + } + + def choiceLatestMagArtitcle(p1: MagPapers, p2:MagPapers) :MagPapers = { + var r = if (p1 == null) p2 else p1 + if (p1 != null && p2 != null) { + if (p1.CreatedDate != null && p2.CreatedDate != null) { + if (p1.CreatedDate.before(p2.CreatedDate)) + r = p2 + else + r = p1 + } else { + r = if (p1.CreatedDate == null) p2 else p1 + } + } + r + + } + + + def updatePubsWithDescription(inputItem:((String, Publication), MagPaperAbstract)) : Publication = { + val pub = inputItem._1._2 + val abst = inputItem._2 + if (abst != null) { + pub.setDescription(List(asField(abst.IndexedAbstract)).asJava) + } + pub + + } + + + def updatePubsWithConferenceInfo(inputItem:((String, Publication), MagConferenceInstance)) : Publication = { + val publication:Publication= inputItem._1._2 + val ci:MagConferenceInstance = inputItem._2 + + if (ci!= null){ + + val j:Journal = new Journal + if (ci.Location.isDefined) + j.setConferenceplace(ci.Location.get) + j.setName(ci.DisplayName.get) + if (ci.StartDate.isDefined && ci.EndDate.isDefined) + { + j.setConferencedate(s"${ci.StartDate.get.toString} - ${ci.EndDate.get.toString}") + } + + publication.setJournal(j) + } + publication + } + + def updatePubsWithSubject(item:((String, Publication), MagFieldOfStudy)) : Publication = { + + val publication = item._1._2 + val fieldOfStudy = item._2 + if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) { + val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => { + val s1 = createSP(s.DisplayName, "keywords", "dnet:subject_classification_typologies") + val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString) + var resList: List[StructuredProperty] = List(s1) + if (s.MainType.isDefined) { + val maintp = s.MainType.get + val s2 = createSP(s.MainType.get, "keywords", "dnet:subject_classification_typologies") + s2.setDataInfo(di) + resList = resList ::: List(s2) + if (maintp.contains(".")) { + val s3 = createSP(maintp.split("\\.").head, "keywords", "dnet:subject_classification_typologies") + s3.setDataInfo(di) + resList = resList ::: List(s3) + } + } + resList + }) + publication.setSubject(p.asJava) + } + publication + } + + def addInstances(a: (Publication, MagUrl)): Publication = { val pub = a._1 val urls = a._2 + val i = new Instance diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala index 087669c27..b3e1d6caa 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala @@ -1,15 +1,13 @@ package eu.dnetlib.doiboost.mag import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Journal, Publication, StructuredProperty} -import eu.dnetlib.doiboost.DoiBoostMappingUtil -import eu.dnetlib.doiboost.DoiBoostMappingUtil.{asField, createSP} +import eu.dnetlib.dhp.schema.oaf.Publication import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} -import org.slf4j.{Logger, LoggerFactory} import org.apache.spark.sql.functions._ +import org.apache.spark.sql._ +import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ @@ -36,25 +34,15 @@ object SparkPreProcessMAG { val d: Dataset[MagPapers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[MagPapers] // Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one - val result: RDD[MagPapers] = d.where(col("Doi").isNotNull).rdd.map { p: MagPapers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: MagPapers, p2: MagPapers) => - var r = if (p1 == null) p2 else p1 - if (p1 != null && p2 != null) { - if (p1.CreatedDate != null && p2.CreatedDate != null) { - if (p1.CreatedDate.before(p2.CreatedDate)) - r = p1 - else - r = p2 - } else { - r = if (p1.CreatedDate == null) p2 else p1 - } - } - r - }.map(_._2) - + val result: RDD[MagPapers] = d.where(col("Doi").isNotNull) + .rdd + .map{ p: MagPapers => Tuple2(p.Doi, p) } + .reduceByKey((p1:MagPapers,p2:MagPapers) => ConversionUtil.choiceLatestMagArtitcle(p1,p2)) + .map(_._2) val distinctPaper: Dataset[MagPapers] = spark.createDataset(result) distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct") - logger.info(s"Total number of element: ${result.count()}") + logger.info("Phase 3) Group Author by PaperId") val authors = spark.read.load(s"$sourcePath/Authors").as[MagAuthor] @@ -85,46 +73,35 @@ object SparkPreProcessMAG { val firstJoin = papers.joinWith(journals, papers("JournalId").equalTo(journals("JournalId")), "left") firstJoin.joinWith(paperWithAuthors, firstJoin("_1.PaperId").equalTo(paperWithAuthors("PaperId")), "left") - .map { a: ((MagPapers, MagJournal), MagPaperWithAuthorList) => ConversionUtil.createOAFFromJournalAuthorPaper(a) }.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2") + .map { a => ConversionUtil.createOAFFromJournalAuthorPaper(a) } + .write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2") - var magPubs: Dataset[(String, Publication)] = spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] + var magPubs: Dataset[(String, Publication)] = + spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication] + .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] - val conference = spark.read.load(s"$sourcePath/ConferenceInstances").select($"ConferenceInstanceId".as("ci"), $"DisplayName", $"Location", $"StartDate",$"EndDate" ) - val conferenceInstance = conference.joinWith(papers, papers("ConferenceInstanceId").equalTo(conference("ci"))).select($"_1.ci", $"_1.DisplayName", $"_1.Location", $"_1.StartDate",$"_1.EndDate", $"_2.PaperId").as[MagConferenceInstance] + val conference = spark.read.load(s"$sourcePath/ConferenceInstances") + .select($"ConferenceInstanceId".as("ci"), $"DisplayName", $"Location", $"StartDate",$"EndDate" ) + val conferenceInstance = conference.joinWith(papers, papers("ConferenceInstanceId").equalTo(conference("ci"))) + .select($"_1.ci", $"_1.DisplayName", $"_1.Location", $"_1.StartDate",$"_1.EndDate", $"_2.PaperId").as[MagConferenceInstance] magPubs.joinWith(conferenceInstance, col("_1").equalTo(conferenceInstance("PaperId")), "left") - .map(p => { - val publication:Publication= p._1._2 - val ci:MagConferenceInstance = p._2 - - if (ci!= null){ - - val j:Journal = new Journal - if (ci.Location.isDefined) - j.setConferenceplace(ci.Location.get) - j.setName(ci.DisplayName.get) - if (ci.StartDate.isDefined && ci.EndDate.isDefined) - { - j.setConferencedate(s"${ci.StartDate.get.toString} - ${ci.EndDate.get.toString}") - } - - publication.setJournal(j) - } - publication - - }).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2_conference") + .map(item => ConversionUtil.updatePubsWithConferenceInfo(item)) + .write + .mode(SaveMode.Overwrite) + .save(s"${parser.get("targetPath")}/merge_step_2_conference") - magPubs= spark.read.load(s"${parser.get("targetPath")}/merge_step_2_conference").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] + magPubs= spark.read.load(s"${parser.get("targetPath")}/merge_step_2_conference").as[Publication] + .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl] logger.info("Phase 5) enrich publication with URL and Instances") - magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left") .map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) } .write.mode(SaveMode.Overwrite) @@ -138,22 +115,18 @@ object SparkPreProcessMAG { val paperAbstract = spark.read.load((s"${parser.get("targetPath")}/PaperAbstract")).as[MagPaperAbstract] - magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] + magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication] + .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] - magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left").map(p => { - val pub = p._1._2 - val abst = p._2 - if (abst != null) { - pub.setDescription(List(asField(abst.IndexedAbstract)).asJava) - } - pub - } + magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left") + .map(item => ConversionUtil.updatePubsWithDescription(item) ).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_4") logger.info("Phase 7) Enrich Publication with FieldOfStudy") - magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_4").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] + magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_4").as[Publication] + .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] val fos = spark.read.load(s"$sourcePath/FieldsOfStudy").select($"FieldOfStudyId".alias("fos"), $"DisplayName", $"MainType") @@ -164,31 +137,18 @@ object SparkPreProcessMAG { .groupBy($"PaperId").agg(collect_list(struct($"FieldOfStudyId", $"DisplayName", $"MainType", $"Score")).as("subjects")) .as[MagFieldOfStudy] - magPubs.joinWith(paperField, col("_1").equalTo(paperField("PaperId")), "left"). - map(item => { - val publication = item._1._2 - val fieldOfStudy = item._2 - if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) { - val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => { - val s1 = createSP(s.DisplayName, "keywords", "dnet:subject_classification_typologies") - val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString) - var resList: List[StructuredProperty] = List(s1) - if (s.MainType.isDefined) { - val maintp = s.MainType.get - val s2 = createSP(s.MainType.get, "keywords", "dnet:subject_classification_typologies") - s2.setDataInfo(di) - resList = resList ::: List(s2) - if (maintp.contains(".")) { - val s3 = createSP(maintp.split("\\.").head, "keywords", "dnet:subject_classification_typologies") - s3.setDataInfo(di) - resList = resList ::: List(s3) - } - } - resList - }) - publication.setSubject(p.asJava) - } - publication - }).map { s: Publication => s }(Encoders.bean(classOf[Publication])).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/mag_publication") + magPubs.joinWith(paperField, col("_1") + .equalTo(paperField("PaperId")), "left") + .map(item => ConversionUtil.updatePubsWithSubject(item)) + .write.mode(SaveMode.Overwrite) + .save(s"${parser.get("targetPath")}/mag_publication") + + + val s:RDD[Publication] = spark.read.load(s"${parser.get("targetPath")}/mag_publication").as[Publication] + .map(p=>Tuple2(p.getId, p)).rdd.reduceByKey((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b)) + .map(_._2) + + spark.createDataset(s).as[Publication].write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/mag_publication_u") + } } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/config-default.xml index cf617a84c..508202e30 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/config-default.xml @@ -15,6 +15,10 @@ oozie.action.sharelib.for.spark spark2 + + oozie.launcher.mapreduce.user.classpath.first + true + hive_metastore_uris thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml index 782e0fed4..db4ac96f9 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml @@ -16,6 +16,11 @@ sparkExecutorCores number of cores used by single executor + + timestamp + Timestamp for incremental Harvesting + + @@ -27,8 +32,8 @@ - - + + @@ -43,13 +48,13 @@ eu.dnetlib.doiboost.crossref.CrossrefImporter -t${workingPath}/input/crossref/index_dump -n${nameNode} + -ts${timestamp} - + - yarn-cluster @@ -63,7 +68,7 @@ --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} - --sourcePath${workingPath}/crossref/index_dump + --sourcePath${workingPath}/input/crossref/index_dump,${workingPath}/crossref/index_dump --targetPath${workingPath}/input/crossref --masteryarn-cluster diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml index 2277b79b0..56d99f14f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml @@ -22,7 +22,7 @@ - + @@ -31,8 +31,8 @@ - - + + diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala index 5d4587a1c..d31f80248 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala @@ -58,6 +58,28 @@ class CrossrefMappingTest { } + @Test + def testEmptyTitle() :Unit = { + val json = Source.fromInputStream(getClass.getResourceAsStream("empty_title.json")).mkString + + + assertNotNull(json) + assertFalse(json.isEmpty); + + val resultList: List[Oaf] = Crossref2Oaf.convert(json) + + assertTrue(resultList.nonEmpty) + + val items = resultList.filter(p => p.isInstanceOf[Result]) + + + mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) + items.foreach(p => println(mapper.writeValueAsString(p))) + + + } + + @Test def testPeerReviewed(): Unit = { val json = Source.fromInputStream(getClass.getResourceAsStream("prwTest.json")).mkString diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/empty_title.json b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/empty_title.json new file mode 100644 index 000000000..2d274f885 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/empty_title.json @@ -0,0 +1,121 @@ +{ + "indexed": { + "date-parts": [ + [ + 2020, + 4, + 7 + ] + ], + "date-time": "2020-04-07T15:54:28Z", + "timestamp": 1586274868901 + }, + "reference-count": 0, + "publisher": "Japan Society of Mechanical Engineers", + "issue": "432", + "content-domain": { + "domain": [], + "crossmark-restriction": false + }, + "short-container-title": [ + "JSMET" + ], + "published-print": { + "date-parts": [ + [ + 1982 + ] + ] + }, + "DOI": "10.1299\/kikaib.48.1474", + "type": "journal-article", + "created": { + "date-parts": [ + [ + 2011, + 9, + 13 + ] + ], + "date-time": "2011-09-13T05:59:01Z", + "timestamp": 1315893541000 + }, + "page": "1474-1482", + "source": "Crossref", + "is-referenced-by-count": 0, + "title": [ + "" + ], + "prefix": "10.1299", + "volume": "48", + "author": [ + { + "given": "Hiroshi", + "family": "KATO", + "sequence": "first", + "affiliation": [] + }, + { + "given": "Yoshichika", + "family": "MIZUNO", + "sequence": "additional", + "affiliation": [] + } + ], + "member": "124", + "container-title": [ + "Transactions of the Japan Society of Mechanical Engineers Series B" + ], + "original-title": [ + "\u5e0c\u8584\u9ad8\u5206\u5b50\u6eb6\u6db2\u4e2d\u306e\u6709\u9650\u9577\u5186\u67f1\u306e\u62b5\u6297" + ], + "language": "ja", + "deposited": { + "date-parts": [ + [ + 2011, + 9, + 13 + ] + ], + "date-time": "2011-09-13T06:01:33Z", + "timestamp": 1315893693000 + }, + "score": 1.0, + "subtitle": [], + "short-title": [], + "issued": { + "date-parts": [ + [ + 1982 + ] + ] + }, + "references-count": 0, + "journal-issue": { + "published-print": { + "date-parts": [ + [ + 1982 + ] + ] + }, + "issue": "432" + }, + "URL": "http:\/\/dx.doi.org\/10.1299\/kikaib.48.1474", + "relation": {}, + "ISSN": [ + "0387-5016", + "1884-8346" + ], + "issn-type": [ + { + "value": "0387-5016", + "type": "print" + }, + { + "value": "1884-8346", + "type": "electronic" + } + ] +} \ No newline at end of file