diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index 6c2c704198..2d3b9a43a2 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -378,13 +378,13 @@ case object Crossref2Oaf { val page = (json \ "page").extractOrElse[String](null) if (page != null) { val pp = page.split("-") - journal.setSp(pp.head) + if (pp.nonEmpty) + journal.setSp(pp.head) if (pp.size > 1) journal.setEp(pp(1)) } publication.setJournal(journal) } - } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala index 8cf9223f64..d69415f4a6 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala @@ -28,9 +28,9 @@ object SparkMapDumpIntoOAF { .appName(SparkMapDumpIntoOAF.getClass.getSimpleName) .master(parser.get("master")).getOrCreate() - implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo(classOf[Publication]) - implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.kryo(classOf[Relation]) - implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo(classOf[eu.dnetlib.dhp.schema.oaf.Dataset]) + implicit val mapEncoderPubs: Encoder[Publication] = Encoders.bean(classOf[Publication]) + implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.bean(classOf[Relation]) + implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.bean(classOf[eu.dnetlib.dhp.schema.oaf.Dataset]) val sc = spark.sparkContext val targetPath = parser.get("targetPath") @@ -42,17 +42,7 @@ object SparkMapDumpIntoOAF { val inputRDD = sc.objectFile[Oaf](s"${targetPath}/mixObject").filter(p=> p!= null) - val total = inputRDD.count() - val totalPub = inputRDD.filter(p => p.isInstanceOf[Publication]).count() - val totalDat = inputRDD.filter(p => p.isInstanceOf[eu.dnetlib.dhp.schema.oaf.Dataset]).count() - val totalRel = inputRDD.filter(p => p.isInstanceOf[eu.dnetlib.dhp.schema.oaf.Relation]).count() - - - logger.info(s"Created $total") - logger.info(s"totalPub $totalPub") - logger.info(s"totalDat $totalDat") - logger.info(s"totalRel $totalRel") val pubs: Dataset[Publication] = spark.createDataset(inputRDD.filter(k => k != null && k.isInstanceOf[Publication]) .map(k => k.asInstanceOf[Publication])) pubs.write.mode(SaveMode.Overwrite).save(s"${targetPath}/publication") @@ -64,10 +54,6 @@ object SparkMapDumpIntoOAF { val rels: Dataset[Relation] = spark.createDataset(inputRDD.filter(k => k != null && k.isInstanceOf[Relation]) .map(k => k.asInstanceOf[Relation])) rels.write.mode(SaveMode.Overwrite).save(s"${targetPath}/relations") - - - - } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala new file mode 100644 index 0000000000..82ea48f33f --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala @@ -0,0 +1,92 @@ +package eu.dnetlib.doiboost.mag + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.{SaveMode, SparkSession} +import org.apache.spark.sql.types._ +import org.slf4j.{Logger, LoggerFactory} +import org.apache.spark.sql.functions._ + +object SparkImportMagIntoDataset { + val datatypedict = Map( + "int" -> IntegerType, + "uint" -> IntegerType, + "long" -> LongType, + "ulong" -> LongType, + "float" -> FloatType, + "string" -> StringType, + "DateTime" -> DateType + ) + + + val stream = Map( + "Affiliations" -> Tuple2("mag/Affiliations.txt", Seq("AffiliationId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "GridId:string", "OfficialPage:string", "WikiPage:string", "PaperCount:long", "CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), + "Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")), + "ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), + "ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")), + "EntityRelatedEntities" -> Tuple2("advanced/EntityRelatedEntities.txt", Seq("EntityId:long", "EntityType:string", "RelatedEntityId:long", "RelatedEntityType:string", "RelatedType:int", "Score:float")), + "FieldOfStudyChildren" -> Tuple2("advanced/FieldOfStudyChildren.txt", Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")), + "FieldOfStudyExtendedAttributes" -> Tuple2("advanced/FieldOfStudyExtendedAttributes.txt", Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")), + "FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")), + "Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")), + "PaperAbstractsInvertedIndex" -> Tuple2("nlp/PaperAbstractsInvertedIndex.txt.*", Seq("PaperId:long", "IndexedAbstract:string")), + "PaperAuthorAffiliations" -> Tuple2("mag/PaperAuthorAffiliations.txt", Seq("PaperId:long", "AuthorId:long", "AffiliationId:long?", "AuthorSequenceNumber:uint", "OriginalAuthor:string", "OriginalAffiliation:string")), + "PaperCitationContexts" -> Tuple2("nlp/PaperCitationContexts.txt", Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")), + "PaperExtendedAttributes" -> Tuple2("mag/PaperExtendedAttributes.txt", Seq("PaperId:long", "AttributeType:int", "AttributeValue:string")), + "PaperFieldsOfStudy" -> Tuple2("advanced/PaperFieldsOfStudy.txt", Seq("PaperId:long", "FieldOfStudyId:long", "Score:float")), + "PaperRecommendations" -> Tuple2("advanced/PaperRecommendations.txt", Seq("PaperId:long", "RecommendedPaperId:long", "Score:float")), + "PaperReferences" -> Tuple2("mag/PaperReferences.txt", Seq("PaperId:long", "PaperReferenceId:long")), + "PaperResources" -> Tuple2("mag/PaperResources.txt", Seq("PaperId:long", "ResourceType:int", "ResourceUrl:string", "SourceUrl:string", "RelationshipType:int")), + "PaperUrls" -> Tuple2("mag/PaperUrls.txt", Seq("PaperId:long", "SourceType:int?", "SourceUrl:string", "LanguageCode:string")), + "Papers" -> Tuple2("mag/Papers.txt", Seq("PaperId:long", "Rank:uint", "Doi:string", "DocType:string", "PaperTitle:string", "OriginalTitle:string", "BookTitle:string", "Year:int?", "Date:DateTime?", "Publisher:string", "JournalId:long?", "ConferenceSeriesId:long?", "ConferenceInstanceId:long?", "Volume:string", "Issue:string", "FirstPage:string", "LastPage:string", "ReferenceCount:long", "CitationCount:long", "EstimatedCitation:long", "OriginalVenue:string", "FamilyId:long?", "CreatedDate:DateTime")), + "RelatedFieldOfStudy" -> Tuple2("advanced/RelatedFieldOfStudy.txt", Seq("FieldOfStudyId1:long", "Type1:string", "FieldOfStudyId2:long", "Type2:string", "Rank:float")) + ) + + + def getSchema(streamName: String): StructType = { + var schema = new StructType() + val d: Seq[String] = stream(streamName)._2 + d.foreach { case t => + val currentType = t.split(":") + val fieldName: String = currentType.head + var fieldType: String = currentType.last + val nullable: Boolean = fieldType.endsWith("?") + if (nullable) + fieldType = fieldType.replace("?", "") + schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable)) + } + schema + } + + + def main(args: Array[String]): Unit = { + val logger: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_mag_to_oaf_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + + + stream.foreach { case (k, v) => + val s: StructType = getSchema(k) + val df = spark.read + .option("header", "false") + .option("charset", "UTF8") + .option("delimiter", "\t") + .schema(s) + .csv(s"${parser.get("sourcePath")}/${v._1}") + logger.info(s"Converting $k") + + df.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/$k") + } + + } + +} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/application/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/application/oozie_app/config-default.xml deleted file mode 100644 index 9f009a781e..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/application/oozie_app/config-default.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.action.sharelib.for.java - spark2 - - - oozie.launcher.mapreduce.user.classpath.first - true - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/application/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/application/oozie_app/workflow.xml deleted file mode 100644 index a11d01c244..0000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/application/oozie_app/workflow.xml +++ /dev/null @@ -1,39 +0,0 @@ - - - - workingPath - the working dir base path - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.doiboost.crossref.CrossrefImporter - -t${workingPath}/input/crossref/index_dump - -n${nameNode} - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_mag_to_oaf_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_mag_to_oaf_params.json new file mode 100644 index 0000000000..bf0b80f69f --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_mag_to_oaf_params.json @@ -0,0 +1,6 @@ +[ + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the base path of MAG input", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true}, + {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/config-default.xml new file mode 100644 index 0000000000..cf617a84c4 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/config-default.xml @@ -0,0 +1,38 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + "com.cloudera.spark.lineage.NavigatorAppListener" + + + spark2SqlQueryExecutionListeners + "com.cloudera.spark.lineage.NavigatorQueryListener" + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml new file mode 100644 index 0000000000..16dbfa5388 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml @@ -0,0 +1,75 @@ + + + + workingPath + the working dir base path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.doiboost.crossref.CrossrefImporter + -t${workingPath}/input/crossref/index_dump + -n${nameNode} + + + + + + + + + + yarn-cluster + cluster + ExtractCrossrefToOAF + eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + ${sparkExtraOPT} + + --sourcePath${workingPath}/index_dump + --targetPath${workingPath} + --masteryarn-cluster + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/config-default.xml new file mode 100644 index 0000000000..cf617a84c4 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/config-default.xml @@ -0,0 +1,38 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + "com.cloudera.spark.lineage.NavigatorAppListener" + + + spark2SqlQueryExecutionListeners + "com.cloudera.spark.lineage.NavigatorQueryListener" + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml new file mode 100644 index 0000000000..801dca6122 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml @@ -0,0 +1,63 @@ + + + + sourcePath + the working dir base path + + + targetPath + the working dir base path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + yarn-cluster + cluster + Convert Mag to Dataset + eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + ${sparkExtraOPT} + + --sourcePath${sourcePath} + --targetPath${targetPath} + --masteryarn-cluster + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala index ad3f595fa9..75a63d70f1 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala @@ -2,7 +2,9 @@ package eu.dnetlib.doiboost import com.fasterxml.jackson.databind.SerializationFeature import eu.dnetlib.dhp.schema.oaf.{Dataset, KeyValue, Oaf, Publication, Relation, Result} +import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.doiboost.crossref.{Crossref2Oaf, SparkMapDumpIntoOAF} +import eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset import org.apache.spark.{SparkConf, sql} import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.codehaus.jackson.map.ObjectMapper @@ -13,6 +15,7 @@ import org.junit.jupiter.api.Assertions._ import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ +import scala.util.matching.Regex class CrossrefMappingTest { @@ -22,30 +25,8 @@ class CrossrefMappingTest { - //@Test - def testRelSpark() :Unit = { - val conf: SparkConf = new SparkConf() - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - - val spark: SparkSession = - SparkSession - .builder() - .config(conf) - .appName(SparkMapDumpIntoOAF.getClass.getSimpleName) - .master("local[*]").getOrCreate() - - import spark.implicits._ - implicit val mapEncoderRelations: Encoder[Relation] = Encoders.kryo(classOf[Relation]) - implicit val mapEncoderPublication: Encoder[Publication] = Encoders.kryo(classOf[Publication]) - implicit val mapEncoderTupleJoinPubs: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPublication) - implicit val mapEncoderTupleJoinRels: Encoder[(String, Relation)] = Encoders.tuple(Encoders.STRING, mapEncoderRelations) - - val relations:sql.Dataset[Relation] = spark.read.load("/data/doiboost/relations").as[Relation] - val publications :sql.Dataset[Publication] = spark.read.load("/data/doiboost/publication").as[Publication] - val ds1 = publications.map(p => Tuple2(p.getId, p)) - val ds2 = relations.map(p => Tuple2(p.getSource, p)) - val total =ds1.joinWith(ds2, ds1.col("_1")===ds2.col("_1")).count() - println(s"total : $total") + def testMAGCSV() :Unit = { + SparkImportMagIntoDataset.main(null) } @@ -86,6 +67,45 @@ class CrossrefMappingTest { }) + } + + def extractECAward(award: String): String = { + val awardECRegex: Regex = "[0-9]{4,9}".r + if (awardECRegex.findAllIn(award).hasNext) + return awardECRegex.findAllIn(award).max + null + } + + + @Test + def extractECTest(): Unit = { + val s = "FP7/2007-2013" + val awardExtracted = extractECAward(s) + println(awardExtracted) + + println(DHPUtils.md5(awardExtracted)) + + + } + + @Test + def testJournalRelation(): Unit = { + val json = Source.fromInputStream(getClass.getResourceAsStream("awardTest.json")).mkString + assertNotNull(json) + + assertFalse(json.isEmpty) + + val resultList: List[Oaf] = Crossref2Oaf.convert(json) + + assertTrue(resultList.nonEmpty) + val rels:List[Relation] = resultList.filter(p => p.isInstanceOf[Relation]).map(r=> r.asInstanceOf[Relation]) + + + assertEquals(rels.size, 4) + rels.foreach(s => logger.info(s.getTarget)) + + + } diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala new file mode 100644 index 0000000000..2da4708c4f --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala @@ -0,0 +1,53 @@ +package eu.dnetlib.doiboost.mag + +import org.apache.spark.SparkConf +import org.apache.spark.api.java.function.ReduceFunction +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Encoders, SaveMode, SparkSession} +import org.codehaus.jackson.map.ObjectMapper +import org.junit.jupiter.api.Test +import org.slf4j.{Logger, LoggerFactory} +import org.apache.spark.sql.functions._ + + +class MAGMappingTest { + + val logger: Logger = LoggerFactory.getLogger(getClass) + val mapper = new ObjectMapper() + + + //@Test + def testMAGCSV(): Unit = { + + val conf: SparkConf = new SparkConf() + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master("local[*]").getOrCreate() + + + import spark.implicits._ + val d: Dataset[Papers] = spark.read.load("/data/doiboost/mag/datasets/Papers").as[Papers] + logger.info(s"Total number of element: ${d.where(col("Doi").isNotNull).count()}") + implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Papers] + val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey {case (p1:Papers, p2:Papers) => + var r = if (p1==null) p2 else p1 + if (p1!=null && p2!=null ) if (p1.CreatedDate.before(p2.CreatedDate)) + r = p1 + else + r = p2 + r + }.map(_._2) + + + val distinctPaper:Dataset[Papers] = spark.createDataset(result) + distinctPaper.write.mode(SaveMode.Overwrite).save("/data/doiboost/mag/datasets/Papers_d") + logger.info(s"Total number of element: ${result.count()}") + + + } + + +} diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/awardTest.json b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/awardTest.json new file mode 100644 index 0000000000..f570b3bdbc --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/awardTest.json @@ -0,0 +1,192 @@ +{ + "DOI": "10.1016/j.infbeh.2016.11.001", + "issued": { + "date-parts": [ + [ + 2017, + 8 + ] + ] + }, + "update-policy": "http://dx.doi.org/10.1016/elsevier_cm_policy", + "prefix": "10.1016", + "subject": [ + "Developmental and Educational Psychology" + ], + "author": [ + { + "affiliation": [], + "given": "Dora", + "family": "Kampis", + "sequence": "first" + }, + { + "affiliation": [], + "given": "D\u00f3ra", + "family": "Fogd", + "sequence": "additional" + }, + { + "affiliation": [], + "given": "\u00c1gnes Melinda", + "family": "Kov\u00e1cs", + "sequence": "additional" + } + ], + "reference-count": 109, + "ISSN": [ + "0163-6383" + ], + "assertion": [ + { + "name": "publisher", + "value": "Elsevier", + "label": "This article is maintained by" + }, + { + "name": "articletitle", + "value": "Nonverbal components of Theory of Mind in typical and atypical development", + "label": "Article Title" + }, + { + "name": "journaltitle", + "value": "Infant Behavior and Development", + "label": "Journal Title" + }, + { + "name": "articlelink", + "value": "https://doi.org/10.1016/j.infbeh.2016.11.001", + "label": "CrossRef DOI link to publisher maintained version" + }, + { + "name": "content_type", + "value": "article", + "label": "Content Type" + }, + { + "name": "copyright", + "value": "\u00a9 2016 Elsevier Inc. All rights reserved.", + "label": "Copyright" + } + ], + "member": "78", + "source": "Crossref", + "score": 1.0, + "deposited": { + "timestamp": 1565383284000, + "date-parts": [ + [ + 2019, + 8, + 9 + ] + ], + "date-time": "2019-08-09T20:41:24Z" + }, + "indexed": { + "timestamp": 1565385055278, + "date-parts": [ + [ + 2019, + 8, + 9 + ] + ], + "date-time": "2019-08-09T21:10:55Z" + }, + "type": "journal-article", + "URL": "http://dx.doi.org/10.1016/j.infbeh.2016.11.001", + "is-referenced-by-count": 1, + "volume": "48", + "issn-type": [ + { + "type": "print", + "value": "0163-6383" + } + ], + "link": [ + { + "URL": "https://api.elsevier.com/content/article/PII:S0163638315300059?httpAccept=text/xml", + "intended-application": "text-mining", + "content-version": "vor", + "content-type": "text/xml" + }, + { + "URL": "https://api.elsevier.com/content/article/PII:S0163638315300059?httpAccept=text/plain", + "intended-application": "text-mining", + "content-version": "vor", + "content-type": "text/plain" + } + ], + "published-print": { + "date-parts": [ + [ + 2017, + 8 + ] + ] + }, + "references-count": 109, + "short-container-title": [ + "Infant Behavior and Development" + ], + "publisher": "Elsevier BV", + "content-domain": { + "domain": [ + "elsevier.com", + "sciencedirect.com" + ], + "crossmark-restriction": true + }, + "license": [ + { + "URL": "https://www.elsevier.com/tdm/userlicense/1.0/", + "start": { + "timestamp": 1501545600000, + "date-parts": [ + [ + 2017, + 8, + 1 + ] + ], + "date-time": "2017-08-01T00:00:00Z" + }, + "content-version": "tdm", + "delay-in-days": 0 + } + ], + "language": "en", + "created": { + "timestamp": 1479142046000, + "date-parts": [ + [ + 2016, + 11, + 14 + ] + ], + "date-time": "2016-11-14T16:47:26Z" + }, + "title": [ + "Nonverbal components of Theory of Mind in typical and atypical development" + ], + "alternative-id": [ + "S0163638315300059" + ], + "container-title": [ + "Infant Behavior and Development" + ], + "funder": [ + { + "doi-asserted-by": "publisher", + "DOI": "10.13039/501100000781", + "name": "European Research Council", + "award": [ + "284236-REPCOLLAB", + "FP7/2007-2013" + ] + } + ], + "page": "54-62" +} diff --git a/pom.xml b/pom.xml index 4838732190..0c55bff7d0 100644 --- a/pom.xml +++ b/pom.xml @@ -1,376 +1,376 @@ - + - 4.0.0 - eu.dnetlib.dhp - dhp - 1.1.7-SNAPSHOT - pom + 4.0.0 + eu.dnetlib.dhp + dhp + 1.1.7-SNAPSHOT + pom - - - GNU Affero General Public License v3.0 or later - https://spdx.org/licenses/AGPL-3.0-or-later.html#licenseText - repo - This program is free software: you can redistribute it and/or modify it under the terms of the - GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - + + + GNU Affero General Public License v3.0 or later + https://spdx.org/licenses/AGPL-3.0-or-later.html#licenseText + repo + This program is free software: you can redistribute it and/or modify it under the terms of the + GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + + - - dhp-build - dhp-schemas - dhp-common - dhp-workflows - + + dhp-build + dhp-schemas + dhp-common + dhp-workflows + - - Redmine - https://issue.openaire.research-infrastructures.eu/projects/openaire - + + Redmine + https://issue.openaire.research-infrastructures.eu/projects/openaire + - - jenkins - https://jenkins-dnet.d4science.org/ - + + jenkins + https://jenkins-dnet.d4science.org/ + - - scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git - scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git - https://code-repo.d4science.org/D-Net/dnet-hadoop/ - HEAD - + + scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git + scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git + https://code-repo.d4science.org/D-Net/dnet-hadoop/ + HEAD + - This module is the root descriptor for the dnet-hadoop project + This module is the root descriptor for the dnet-hadoop project - - + + - - - dnet45-releases - D-Net 45 releases - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases - default - - false - - - true - - - - cloudera - Cloudera Repository - https://repository.cloudera.com/artifactory/cloudera-repos - - true - - - false - - - + + + dnet45-releases + D-Net 45 releases + http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases + default + + false + + + true + + + + cloudera + Cloudera Repository + https://repository.cloudera.com/artifactory/cloudera-repos + + true + + + false + + + - - - org.junit.jupiter - junit-jupiter - ${junit-jupiter.version} - test - + + + org.junit.jupiter + junit-jupiter + ${junit-jupiter.version} + test + - - org.mockito - mockito-core - ${mockito-core.version} - test - + + org.mockito + mockito-core + ${mockito-core.version} + test + - - org.mockito - mockito-junit-jupiter - ${mockito-core.version} - test - + + org.mockito + mockito-junit-jupiter + ${mockito-core.version} + test + + + + + + + org.apache.hadoop + hadoop-hdfs + ${dhp.hadoop.version} + provided + + + org.apache.hadoop + hadoop-common + ${dhp.hadoop.version} + provided + + + org.apache.hadoop + hadoop-client + ${dhp.hadoop.version} + provided + + + org.apache.hadoop + hadoop-distcp + ${dhp.hadoop.version} + provided + + + org.apache.spark + spark-core_2.11 + ${dhp.spark.version} + provided + + + org.apache.spark + spark-sql_2.11 + ${dhp.spark.version} + provided + + + org.apache.spark + spark-graphx_2.11 + ${dhp.spark.version} + provided + + + org.apache.spark + spark-hive_2.11 + ${dhp.spark.version} + test + + + + org.slf4j + jcl-over-slf4j + 1.7.25 + provided + + + + org.apache.commons + commons-lang3 + ${dhp.commons.lang.version} + + + + com.google.guava + guava + ${dhp.guava.version} + - + + commons-codec + commons-codec + 1.9 + - - - - org.apache.hadoop - hadoop-hdfs - ${dhp.hadoop.version} - provided - - - org.apache.hadoop - hadoop-common - ${dhp.hadoop.version} - provided - - - org.apache.hadoop - hadoop-client - ${dhp.hadoop.version} - provided - - - org.apache.hadoop - hadoop-distcp - ${dhp.hadoop.version} - provided - - - org.apache.spark - spark-core_2.11 - ${dhp.spark.version} - provided - - - org.apache.spark - spark-sql_2.11 - ${dhp.spark.version} - provided - - - org.apache.spark - spark-graphx_2.11 - ${dhp.spark.version} - provided - - - org.apache.spark - spark-hive_2.11 - ${dhp.spark.version} - test - + + commons-io + commons-io + 2.4 + - - org.slf4j - jcl-over-slf4j - 1.7.25 - provided - + + commons-cli + commons-cli + 1.2 + provided + - - org.apache.commons - commons-lang3 - ${dhp.commons.lang.version} - + + net.sf.saxon + Saxon-HE + 9.9.1-6 + - - com.google.guava - guava - ${dhp.guava.version} - + + dom4j + dom4j + 1.6.1 + + + xml-apis + xml-apis + 1.4.01 + - - commons-codec - commons-codec - 1.9 - + + jaxen + jaxen + 1.1.6 + - - commons-io - commons-io - 2.4 - + + com.mycila.xmltool + xmltool + 3.3 + - - commons-cli - commons-cli - 1.2 - provided - + + org.apache.solr + solr-solrj + 7.5.0 + + + * + * + + + + + com.lucidworks.spark + spark-solr + 3.6.0 + + + * + * + + + - - net.sf.saxon - Saxon-HE - 9.9.1-6 - + + org.apache.httpcomponents + httpclient + 4.5.3 + + + org.apache.httpcomponents + httpmime + 4.5.3 + + + org.noggit + noggit + 0.8 + + + org.apache.zookeeper + zookeeper + 3.4.11 + - - dom4j - dom4j - 1.6.1 - + + net.schmizz + sshj + 0.10.0 + test + - - xml-apis - xml-apis - 1.4.01 - + + com.fasterxml.jackson.core + jackson-core + ${dhp.jackson.version} + provided + - - jaxen - jaxen - 1.1.6 - + + com.fasterxml.jackson.core + jackson-annotations + ${dhp.jackson.version} + provided + + + com.fasterxml.jackson.core + jackson-databind + ${dhp.jackson.version} + provided + - - com.mycila.xmltool - xmltool - 3.3 - + + eu.dnetlib + dnet-actionmanager-common + 6.0.5 + + + org.apache.hadoop + hadoop-common + + + + + eu.dnetlib + dnet-actionmanager-api + [4.0.1,5.0.0) + - - org.apache.solr - solr-solrj - 7.5.0 - - - * - * - - - - - com.lucidworks.spark - spark-solr - 3.6.0 - - - * - * - - - + + eu.dnetlib + dnet-openaire-data-protos + 3.9.8-proto250 + + + eu.dnetlib + dnet-pace-core + 4.0.1 + + + eu.dnetlib + cnr-rmi-api + [2.0.0,3.0.0) + - - org.apache.httpcomponents - httpclient - 4.5.3 - - - org.apache.httpcomponents - httpmime - 4.5.3 - - - org.noggit - noggit - 0.8 - - - org.apache.zookeeper - zookeeper - 3.4.11 - + + org.apache.cxf + cxf-rt-transports-http + 3.1.5 + + + javax.persistence + javax.persistence-api + 2.2 + provided + - - net.schmizz - sshj - 0.10.0 - test - + + com.rabbitmq + amqp-client + 5.6.0 + + + com.jayway.jsonpath + json-path + 2.4.0 + + + com.arakelian + java-jq + 0.10.1 + + + edu.cmu + secondstring + 1.0.0 + + + org.mongodb + mongo-java-driver + ${mongodb.driver.version} + + + org.postgresql + postgresql + 42.2.10 + - - com.fasterxml.jackson.core - jackson-core - ${dhp.jackson.version} - provided - - - - com.fasterxml.jackson.core - jackson-annotations - ${dhp.jackson.version} - provided - - - com.fasterxml.jackson.core - jackson-databind - ${dhp.jackson.version} - provided - - - - eu.dnetlib - dnet-actionmanager-common - 6.0.5 - - - org.apache.hadoop - hadoop-common - - - - - eu.dnetlib - dnet-actionmanager-api - [4.0.1,5.0.0) - - - - eu.dnetlib - dnet-openaire-data-protos - 3.9.8-proto250 - - - eu.dnetlib - dnet-pace-core - 4.0.1 - - - eu.dnetlib - cnr-rmi-api - [2.0.0,3.0.0) - - - - org.apache.cxf - cxf-rt-transports-http - 3.1.5 - - - javax.persistence - javax.persistence-api - 2.2 - provided - - - - com.rabbitmq - amqp-client - 5.6.0 - - - com.jayway.jsonpath - json-path - 2.4.0 - - - com.arakelian - java-jq - 0.10.1 - - - edu.cmu - secondstring - 1.0.0 - - - org.mongodb - mongo-java-driver - ${mongodb.driver.version} - - - org.postgresql - postgresql - 42.2.10 - - - - org.antlr - stringtemplate - 4.0 - + + org.antlr + stringtemplate + 4.0 + com.ximpleware @@ -386,237 +386,237 @@ - org.apache.oozie - oozie-client - ${dhp.oozie.version} - provided - - - - slf4j-simple - org.slf4j - - - - - + org.apache.oozie + oozie-client + ${dhp.oozie.version} + provided + + + + slf4j-simple + org.slf4j + + + + + - - target - target/classes - ${project.artifactId}-${project.version} - target/test-classes - - + + target + target/classes + ${project.artifactId}-${project.version} + target/test-classes + + - - org.apache.maven.plugins - maven-project-info-reports-plugin - 3.0.0 - - - org.apache.maven.plugins - maven-site-plugin - 3.7.1 - + + org.apache.maven.plugins + maven-project-info-reports-plugin + 3.0.0 + + + org.apache.maven.plugins + maven-site-plugin + 3.7.1 + - - org.apache.maven.plugins - maven-compiler-plugin - ${maven.compiler.plugin.version} - - 1.8 - 1.8 - ${project.build.sourceEncoding} - - + + org.apache.maven.plugins + maven-compiler-plugin + ${maven.compiler.plugin.version} + + 1.8 + 1.8 + ${project.build.sourceEncoding} + + - - org.apache.maven.plugins - maven-jar-plugin - 3.0.2 - + + org.apache.maven.plugins + maven-jar-plugin + 3.0.2 + - - org.apache.maven.plugins - maven-source-plugin - 3.0.1 - - - attach-sources - verify - - jar-no-fork - - - - + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + + attach-sources + verify + + jar-no-fork + + + + - - org.apache.maven.plugins - maven-surefire-plugin - 3.0.0-M4 - - true - - - - org.apache.maven.plugins - maven-javadoc-plugin - 3.2.0 - - true - none - - - - org.apache.maven.plugins - maven-dependency-plugin - 3.0.0 - + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0-M4 + + true + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.2.0 + + true + none + + + + org.apache.maven.plugins + maven-dependency-plugin + 3.0.0 + - - net.revelc.code.formatter - formatter-maven-plugin - 2.11.0 - - - eu.dnetlib.dhp - dhp-code-style - ${project.version} - - - - - - - - org.apache.maven.plugins - maven-site-plugin - - - org.apache.maven.plugins - maven-project-info-reports-plugin - - - net.revelc.code.formatter - formatter-maven-plugin - - - - format - - - eclipse/formatter_dnet.xml - - - - - - net.revelc.code - impsort-maven-plugin - 1.4.1 - - java.,javax.,org.,com. - java,* - - **/thrift/*.java - - - - - sort-imports - - sort - - - - - - org.apache.maven.plugins - maven-release-plugin - 2.5.3 - - - org.jacoco - jacoco-maven-plugin - 0.7.9 - - - **/schemas/* - **/com/cloudera/**/* - **/org/apache/avro/io/**/* - - - - - default-prepare-agent - - prepare-agent - - - - default-report - prepare-package - - report - - - - + + net.revelc.code.formatter + formatter-maven-plugin + 2.11.0 + + + eu.dnetlib.dhp + dhp-code-style + ${project.version} + + + + + + + + org.apache.maven.plugins + maven-site-plugin + + + org.apache.maven.plugins + maven-project-info-reports-plugin + + + net.revelc.code.formatter + formatter-maven-plugin + + + + format + + + eclipse/formatter_dnet.xml + + + + + + net.revelc.code + impsort-maven-plugin + 1.4.1 + + java.,javax.,org.,com. + java,* + + **/thrift/*.java + + + + + sort-imports + + sort + + + + + + org.apache.maven.plugins + maven-release-plugin + 2.5.3 + + + org.jacoco + jacoco-maven-plugin + 0.7.9 + + + **/schemas/* + **/com/cloudera/**/* + **/org/apache/avro/io/**/* + + + + + default-prepare-agent + + prepare-agent + + + + default-report + prepare-package + + report + + + + - + - - - org.apache.maven.wagon - wagon-ssh - 2.10 - - - - - - dnet45-snapshots - DNet45 Snapshots - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots - default - - - dnet45-releases - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - - true - none - - - - + + + org.apache.maven.wagon + wagon-ssh + 2.10 + + + + + + dnet45-snapshots + DNet45 Snapshots + http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots + default + + + dnet45-releases + http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + true + none + + + + - - UTF-8 - UTF-8 - 3.6.0 - 2.22.2 - 2.0.1 - cdh5.9.2 - 2.6.0-${dhp.cdh.version} - 4.1.0-${dhp.cdh.version} - 2.4.0.cloudera2 - 2.9.6 - 3.5 - 11.0.2 - 2.11.12 - 5.6.1 - 3.3.3 - 3.4.2 - [2.12,3.0) - + + UTF-8 + UTF-8 + 3.6.0 + 2.22.2 + 2.0.1 + cdh5.9.2 + 2.6.0-${dhp.cdh.version} + 4.1.0-${dhp.cdh.version} + 2.4.0.cloudera2 + 2.9.6 + 3.5 + 11.0.2 + 2.11.12 + 5.6.1 + 3.3.3 + 3.4.2 + [2.12,3.0) +