1
0
Fork 0

start implementing MAG mapping

This commit is contained in:
Sandro La Bruzzo 2020-05-11 09:38:27 +02:00
parent 1e06bbaee8
commit 4cebca09d2
14 changed files with 1169 additions and 663 deletions

View File

@ -378,13 +378,13 @@ case object Crossref2Oaf {
val page = (json \ "page").extractOrElse[String](null) val page = (json \ "page").extractOrElse[String](null)
if (page != null) { if (page != null) {
val pp = page.split("-") val pp = page.split("-")
journal.setSp(pp.head) if (pp.nonEmpty)
journal.setSp(pp.head)
if (pp.size > 1) if (pp.size > 1)
journal.setEp(pp(1)) journal.setEp(pp(1))
} }
publication.setJournal(journal) publication.setJournal(journal)
} }
} }

View File

@ -28,9 +28,9 @@ object SparkMapDumpIntoOAF {
.appName(SparkMapDumpIntoOAF.getClass.getSimpleName) .appName(SparkMapDumpIntoOAF.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate() .master(parser.get("master")).getOrCreate()
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo(classOf[Publication]) implicit val mapEncoderPubs: Encoder[Publication] = Encoders.bean(classOf[Publication])
implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.kryo(classOf[Relation]) implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.bean(classOf[Relation])
implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo(classOf[eu.dnetlib.dhp.schema.oaf.Dataset]) implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.bean(classOf[eu.dnetlib.dhp.schema.oaf.Dataset])
val sc = spark.sparkContext val sc = spark.sparkContext
val targetPath = parser.get("targetPath") val targetPath = parser.get("targetPath")
@ -42,17 +42,7 @@ object SparkMapDumpIntoOAF {
val inputRDD = sc.objectFile[Oaf](s"${targetPath}/mixObject").filter(p=> p!= null) val inputRDD = sc.objectFile[Oaf](s"${targetPath}/mixObject").filter(p=> p!= null)
val total = inputRDD.count()
val totalPub = inputRDD.filter(p => p.isInstanceOf[Publication]).count()
val totalDat = inputRDD.filter(p => p.isInstanceOf[eu.dnetlib.dhp.schema.oaf.Dataset]).count()
val totalRel = inputRDD.filter(p => p.isInstanceOf[eu.dnetlib.dhp.schema.oaf.Relation]).count()
logger.info(s"Created $total")
logger.info(s"totalPub $totalPub")
logger.info(s"totalDat $totalDat")
logger.info(s"totalRel $totalRel")
val pubs: Dataset[Publication] = spark.createDataset(inputRDD.filter(k => k != null && k.isInstanceOf[Publication]) val pubs: Dataset[Publication] = spark.createDataset(inputRDD.filter(k => k != null && k.isInstanceOf[Publication])
.map(k => k.asInstanceOf[Publication])) .map(k => k.asInstanceOf[Publication]))
pubs.write.mode(SaveMode.Overwrite).save(s"${targetPath}/publication") pubs.write.mode(SaveMode.Overwrite).save(s"${targetPath}/publication")
@ -64,10 +54,6 @@ object SparkMapDumpIntoOAF {
val rels: Dataset[Relation] = spark.createDataset(inputRDD.filter(k => k != null && k.isInstanceOf[Relation]) val rels: Dataset[Relation] = spark.createDataset(inputRDD.filter(k => k != null && k.isInstanceOf[Relation])
.map(k => k.asInstanceOf[Relation])) .map(k => k.asInstanceOf[Relation]))
rels.write.mode(SaveMode.Overwrite).save(s"${targetPath}/relations") rels.write.mode(SaveMode.Overwrite).save(s"${targetPath}/relations")
} }

View File

@ -0,0 +1,92 @@
package eu.dnetlib.doiboost.mag
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types._
import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.functions._
object SparkImportMagIntoDataset {
val datatypedict = Map(
"int" -> IntegerType,
"uint" -> IntegerType,
"long" -> LongType,
"ulong" -> LongType,
"float" -> FloatType,
"string" -> StringType,
"DateTime" -> DateType
)
val stream = Map(
"Affiliations" -> Tuple2("mag/Affiliations.txt", Seq("AffiliationId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "GridId:string", "OfficialPage:string", "WikiPage:string", "PaperCount:long", "CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")),
"Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")),
"ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"EntityRelatedEntities" -> Tuple2("advanced/EntityRelatedEntities.txt", Seq("EntityId:long", "EntityType:string", "RelatedEntityId:long", "RelatedEntityType:string", "RelatedType:int", "Score:float")),
"FieldOfStudyChildren" -> Tuple2("advanced/FieldOfStudyChildren.txt", Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")),
"FieldOfStudyExtendedAttributes" -> Tuple2("advanced/FieldOfStudyExtendedAttributes.txt", Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")),
"FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"PaperAbstractsInvertedIndex" -> Tuple2("nlp/PaperAbstractsInvertedIndex.txt.*", Seq("PaperId:long", "IndexedAbstract:string")),
"PaperAuthorAffiliations" -> Tuple2("mag/PaperAuthorAffiliations.txt", Seq("PaperId:long", "AuthorId:long", "AffiliationId:long?", "AuthorSequenceNumber:uint", "OriginalAuthor:string", "OriginalAffiliation:string")),
"PaperCitationContexts" -> Tuple2("nlp/PaperCitationContexts.txt", Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")),
"PaperExtendedAttributes" -> Tuple2("mag/PaperExtendedAttributes.txt", Seq("PaperId:long", "AttributeType:int", "AttributeValue:string")),
"PaperFieldsOfStudy" -> Tuple2("advanced/PaperFieldsOfStudy.txt", Seq("PaperId:long", "FieldOfStudyId:long", "Score:float")),
"PaperRecommendations" -> Tuple2("advanced/PaperRecommendations.txt", Seq("PaperId:long", "RecommendedPaperId:long", "Score:float")),
"PaperReferences" -> Tuple2("mag/PaperReferences.txt", Seq("PaperId:long", "PaperReferenceId:long")),
"PaperResources" -> Tuple2("mag/PaperResources.txt", Seq("PaperId:long", "ResourceType:int", "ResourceUrl:string", "SourceUrl:string", "RelationshipType:int")),
"PaperUrls" -> Tuple2("mag/PaperUrls.txt", Seq("PaperId:long", "SourceType:int?", "SourceUrl:string", "LanguageCode:string")),
"Papers" -> Tuple2("mag/Papers.txt", Seq("PaperId:long", "Rank:uint", "Doi:string", "DocType:string", "PaperTitle:string", "OriginalTitle:string", "BookTitle:string", "Year:int?", "Date:DateTime?", "Publisher:string", "JournalId:long?", "ConferenceSeriesId:long?", "ConferenceInstanceId:long?", "Volume:string", "Issue:string", "FirstPage:string", "LastPage:string", "ReferenceCount:long", "CitationCount:long", "EstimatedCitation:long", "OriginalVenue:string", "FamilyId:long?", "CreatedDate:DateTime")),
"RelatedFieldOfStudy" -> Tuple2("advanced/RelatedFieldOfStudy.txt", Seq("FieldOfStudyId1:long", "Type1:string", "FieldOfStudyId2:long", "Type2:string", "Rank:float"))
)
def getSchema(streamName: String): StructType = {
var schema = new StructType()
val d: Seq[String] = stream(streamName)._2
d.foreach { case t =>
val currentType = t.split(":")
val fieldName: String = currentType.head
var fieldType: String = currentType.last
val nullable: Boolean = fieldType.endsWith("?")
if (nullable)
fieldType = fieldType.replace("?", "")
schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable))
}
schema
}
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_mag_to_oaf_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
stream.foreach { case (k, v) =>
val s: StructType = getSchema(k)
val df = spark.read
.option("header", "false")
.option("charset", "UTF8")
.option("delimiter", "\t")
.schema(s)
.csv(s"${parser.get("sourcePath")}/${v._1}")
logger.info(s"Converting $k")
df.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/$k")
}
}
}

View File

@ -1,18 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.action.sharelib.for.java</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -1,39 +0,0 @@
<workflow-app name="import Crossref from index into HDFS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingPath</name>
<description>the working dir base path</description>
</property>
</parameters>
<start to="ResetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetWorkingPath">
<fs>
<delete path='${workingPath}'/>
<mkdir path='${workingPath}/input/crossref'/>
</fs>
<ok to="ImportCrossRef"/>
<error to="Kill"/>
</action>
<action name="ImportCrossRef">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
<arg>-t</arg><arg>${workingPath}/input/crossref/index_dump</arg>
<arg>-n</arg><arg>${nameNode}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,6 @@
[
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the base path of MAG input", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true},
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
]

View File

@ -0,0 +1,38 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
</configuration>

View File

@ -0,0 +1,75 @@
<workflow-app name="import Crossref from index into HDFS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingPath</name>
<description>the working dir base path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="ExtractCrossrefToOAF"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetWorkingPath">
<fs>
<delete path='${workingPath}'/>
<mkdir path='${workingPath}/input/crossref'/>
</fs>
<ok to="ImportCrossRef"/>
<error to="Kill"/>
</action>
<action name="ImportCrossRef">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
<arg>-t</arg><arg>${workingPath}/input/crossref/index_dump</arg>
<arg>-n</arg><arg>${nameNode}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="ExtractCrossrefToOAF">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractCrossrefToOAF</name>
<class>eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkExtraOPT}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingPath}/index_dump</arg>
<arg>--targetPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,38 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
</configuration>

View File

@ -0,0 +1,63 @@
<workflow-app name="import MAG into HDFS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the working dir base path</description>
</property>
<property>
<name>targetPath</name>
<description>the working dir base path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="ResetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetWorkingPath">
<fs>
<delete path='${targetPath}'/>
<mkdir path='${targetPath}'/>
</fs>
<ok to="ConvertMagToDataset"/>
<error to="Kill"/>
</action>
<action name="ConvertMagToDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert Mag to Dataset</name>
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkExtraOPT}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -2,7 +2,9 @@ package eu.dnetlib.doiboost
import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.databind.SerializationFeature
import eu.dnetlib.dhp.schema.oaf.{Dataset, KeyValue, Oaf, Publication, Relation, Result} import eu.dnetlib.dhp.schema.oaf.{Dataset, KeyValue, Oaf, Publication, Relation, Result}
import eu.dnetlib.dhp.utils.DHPUtils
import eu.dnetlib.doiboost.crossref.{Crossref2Oaf, SparkMapDumpIntoOAF} import eu.dnetlib.doiboost.crossref.{Crossref2Oaf, SparkMapDumpIntoOAF}
import eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset
import org.apache.spark.{SparkConf, sql} import org.apache.spark.{SparkConf, sql}
import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.codehaus.jackson.map.ObjectMapper import org.codehaus.jackson.map.ObjectMapper
@ -13,6 +15,7 @@ import org.junit.jupiter.api.Assertions._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.util.matching.Regex
class CrossrefMappingTest { class CrossrefMappingTest {
@ -22,30 +25,8 @@ class CrossrefMappingTest {
//@Test def testMAGCSV() :Unit = {
def testRelSpark() :Unit = { SparkImportMagIntoDataset.main(null)
val conf: SparkConf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(SparkMapDumpIntoOAF.getClass.getSimpleName)
.master("local[*]").getOrCreate()
import spark.implicits._
implicit val mapEncoderRelations: Encoder[Relation] = Encoders.kryo(classOf[Relation])
implicit val mapEncoderPublication: Encoder[Publication] = Encoders.kryo(classOf[Publication])
implicit val mapEncoderTupleJoinPubs: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPublication)
implicit val mapEncoderTupleJoinRels: Encoder[(String, Relation)] = Encoders.tuple(Encoders.STRING, mapEncoderRelations)
val relations:sql.Dataset[Relation] = spark.read.load("/data/doiboost/relations").as[Relation]
val publications :sql.Dataset[Publication] = spark.read.load("/data/doiboost/publication").as[Publication]
val ds1 = publications.map(p => Tuple2(p.getId, p))
val ds2 = relations.map(p => Tuple2(p.getSource, p))
val total =ds1.joinWith(ds2, ds1.col("_1")===ds2.col("_1")).count()
println(s"total : $total")
} }
@ -86,6 +67,45 @@ class CrossrefMappingTest {
}) })
}
def extractECAward(award: String): String = {
val awardECRegex: Regex = "[0-9]{4,9}".r
if (awardECRegex.findAllIn(award).hasNext)
return awardECRegex.findAllIn(award).max
null
}
@Test
def extractECTest(): Unit = {
val s = "FP7/2007-2013"
val awardExtracted = extractECAward(s)
println(awardExtracted)
println(DHPUtils.md5(awardExtracted))
}
@Test
def testJournalRelation(): Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("awardTest.json")).mkString
assertNotNull(json)
assertFalse(json.isEmpty)
val resultList: List[Oaf] = Crossref2Oaf.convert(json)
assertTrue(resultList.nonEmpty)
val rels:List[Relation] = resultList.filter(p => p.isInstanceOf[Relation]).map(r=> r.asInstanceOf[Relation])
assertEquals(rels.size, 4)
rels.foreach(s => logger.info(s.getTarget))
} }

View File

@ -0,0 +1,53 @@
package eu.dnetlib.doiboost.mag
import org.apache.spark.SparkConf
import org.apache.spark.api.java.function.ReduceFunction
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoders, SaveMode, SparkSession}
import org.codehaus.jackson.map.ObjectMapper
import org.junit.jupiter.api.Test
import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.functions._
class MAGMappingTest {
val logger: Logger = LoggerFactory.getLogger(getClass)
val mapper = new ObjectMapper()
//@Test
def testMAGCSV(): Unit = {
val conf: SparkConf = new SparkConf()
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master("local[*]").getOrCreate()
import spark.implicits._
val d: Dataset[Papers] = spark.read.load("/data/doiboost/mag/datasets/Papers").as[Papers]
logger.info(s"Total number of element: ${d.where(col("Doi").isNotNull).count()}")
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Papers]
val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey {case (p1:Papers, p2:Papers) =>
var r = if (p1==null) p2 else p1
if (p1!=null && p2!=null ) if (p1.CreatedDate.before(p2.CreatedDate))
r = p1
else
r = p2
r
}.map(_._2)
val distinctPaper:Dataset[Papers] = spark.createDataset(result)
distinctPaper.write.mode(SaveMode.Overwrite).save("/data/doiboost/mag/datasets/Papers_d")
logger.info(s"Total number of element: ${result.count()}")
}
}

View File

@ -0,0 +1,192 @@
{
"DOI": "10.1016/j.infbeh.2016.11.001",
"issued": {
"date-parts": [
[
2017,
8
]
]
},
"update-policy": "http://dx.doi.org/10.1016/elsevier_cm_policy",
"prefix": "10.1016",
"subject": [
"Developmental and Educational Psychology"
],
"author": [
{
"affiliation": [],
"given": "Dora",
"family": "Kampis",
"sequence": "first"
},
{
"affiliation": [],
"given": "D\u00f3ra",
"family": "Fogd",
"sequence": "additional"
},
{
"affiliation": [],
"given": "\u00c1gnes Melinda",
"family": "Kov\u00e1cs",
"sequence": "additional"
}
],
"reference-count": 109,
"ISSN": [
"0163-6383"
],
"assertion": [
{
"name": "publisher",
"value": "Elsevier",
"label": "This article is maintained by"
},
{
"name": "articletitle",
"value": "Nonverbal components of Theory of Mind in typical and atypical development",
"label": "Article Title"
},
{
"name": "journaltitle",
"value": "Infant Behavior and Development",
"label": "Journal Title"
},
{
"name": "articlelink",
"value": "https://doi.org/10.1016/j.infbeh.2016.11.001",
"label": "CrossRef DOI link to publisher maintained version"
},
{
"name": "content_type",
"value": "article",
"label": "Content Type"
},
{
"name": "copyright",
"value": "\u00a9 2016 Elsevier Inc. All rights reserved.",
"label": "Copyright"
}
],
"member": "78",
"source": "Crossref",
"score": 1.0,
"deposited": {
"timestamp": 1565383284000,
"date-parts": [
[
2019,
8,
9
]
],
"date-time": "2019-08-09T20:41:24Z"
},
"indexed": {
"timestamp": 1565385055278,
"date-parts": [
[
2019,
8,
9
]
],
"date-time": "2019-08-09T21:10:55Z"
},
"type": "journal-article",
"URL": "http://dx.doi.org/10.1016/j.infbeh.2016.11.001",
"is-referenced-by-count": 1,
"volume": "48",
"issn-type": [
{
"type": "print",
"value": "0163-6383"
}
],
"link": [
{
"URL": "https://api.elsevier.com/content/article/PII:S0163638315300059?httpAccept=text/xml",
"intended-application": "text-mining",
"content-version": "vor",
"content-type": "text/xml"
},
{
"URL": "https://api.elsevier.com/content/article/PII:S0163638315300059?httpAccept=text/plain",
"intended-application": "text-mining",
"content-version": "vor",
"content-type": "text/plain"
}
],
"published-print": {
"date-parts": [
[
2017,
8
]
]
},
"references-count": 109,
"short-container-title": [
"Infant Behavior and Development"
],
"publisher": "Elsevier BV",
"content-domain": {
"domain": [
"elsevier.com",
"sciencedirect.com"
],
"crossmark-restriction": true
},
"license": [
{
"URL": "https://www.elsevier.com/tdm/userlicense/1.0/",
"start": {
"timestamp": 1501545600000,
"date-parts": [
[
2017,
8,
1
]
],
"date-time": "2017-08-01T00:00:00Z"
},
"content-version": "tdm",
"delay-in-days": 0
}
],
"language": "en",
"created": {
"timestamp": 1479142046000,
"date-parts": [
[
2016,
11,
14
]
],
"date-time": "2016-11-14T16:47:26Z"
},
"title": [
"Nonverbal components of Theory of Mind in typical and atypical development"
],
"alternative-id": [
"S0163638315300059"
],
"container-title": [
"Infant Behavior and Development"
],
"funder": [
{
"doi-asserted-by": "publisher",
"DOI": "10.13039/501100000781",
"name": "European Research Council",
"award": [
"284236-REPCOLLAB",
"FP7/2007-2013"
]
}
],
"page": "54-62"
}

1126
pom.xml

File diff suppressed because it is too large Load Diff