code refactor

This commit is contained in:
Sandro La Bruzzo 2020-05-28 09:57:46 +02:00
parent 79c26382da
commit 7d29b61c62
14 changed files with 405 additions and 183 deletions

View File

@ -314,7 +314,8 @@ public class Result extends OafEntity implements Serializable {
} }
private StructuredProperty getMainTitle(List<StructuredProperty> titles) { private StructuredProperty getMainTitle(List<StructuredProperty> titles) {
//need to check if the list of titles contains more than 1 main title? (in that case, we should chose which main title select in the list) // need to check if the list of titles contains more than 1 main title? (in that case, we should chose which
// main title select in the list)
for (StructuredProperty title : titles) { for (StructuredProperty title : titles) {
if (title.getQualifier() != null && title.getQualifier().getClassid() != null) if (title.getQualifier() != null && title.getQualifier().getClassid() != null)
if (title.getQualifier().getClassid().equals("main title")) if (title.getQualifier().getClassid().equals("main title"))

View File

@ -4,7 +4,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.1-SNAPSHOT</version> <version>1.2.2-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
@ -41,6 +41,9 @@
</build> </build>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>

View File

@ -3,12 +3,9 @@ package eu.dnetlib.doiboost
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.action.AtomicAction import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
import org.apache.hadoop.io.Text
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.mapred.{SequenceFileOutputFormat, TextOutputFormat}
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{ Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
object SparkGenerateDOIBoostActionSet { object SparkGenerateDOIBoostActionSet {
@ -25,45 +22,43 @@ object SparkGenerateDOIBoostActionSet {
.appName(getClass.getSimpleName) .appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate() .master(parser.get("master")).getOrCreate()
// implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
// implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset] implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
// implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation] implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation]
//
// implicit val mapEncoderAtomiAction: Encoder[AtomicAction[OafDataset]] = Encoders.kryo[AtomicAction[OafDataset]] implicit val mapEncoderAtomiAction: Encoder[AtomicAction[OafDataset]] = Encoders.kryo[AtomicAction[OafDataset]]
//
// val dbPublicationPath = parser.get("dbPublicationPath")
// val dbDatasetPath = parser.get("dbDatasetPath")
// val dbPublicationPath = parser.get("dbPublicationPath") val crossRefRelation = parser.get("crossRefRelation")
// val dbDatasetPath = parser.get("dbDatasetPath") val dbaffiliationRelationPath = parser.get("dbaffiliationRelationPath")
// val crossRefRelation = parser.get("crossRefRelation")
// val dbaffiliationRelationPath = parser.get("dbaffiliationRelationPath")
val workingDirPath = parser.get("targetPath") val workingDirPath = parser.get("targetPath")
spark.read.load(dbDatasetPath).as[OafDataset]
.map(d =>DoiBoostMappingUtil.fixResult(d))
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
spark.read.load(dbPublicationPath).as[Publication]
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
spark.read.load(crossRefRelation).as[Relation]
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
spark.read.load(dbaffiliationRelationPath).as[Relation]
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
// implicit val mapEncoderPub: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
// //
// spark.read.load(dbDatasetPath).as[OafDataset] // val d: Dataset[(String, String)] =spark.read.load(s"$workingDirPath/actionSet").as[(String,String)]
// .map(d =>DoiBoostMappingUtil.fixResult(d))
// .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
// //
// spark.read.load(dbPublicationPath).as[Publication]
// .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
// //
// spark.read.load(crossRefRelation).as[Relation]
// .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
// //
// spark.read.load(dbaffiliationRelationPath).as[Relation] // d.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingDirPath/rawset_a8c2f90b-a3ae-4d6e-8187-47a437156e18_1590223414", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text,Text]], classOf[GzipCodec])
// .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
implicit val mapEncoderPub: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
val d: Dataset[(String, String)] =spark.read.load(s"$workingDirPath/actionSet").as[(String,String)]
SequenceFileOutputFormat
d.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingDirPath/rawset_a8c2f90b-a3ae-4d6e-8187-47a437156e18_1590223414", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text,Text]], classOf[GzipCodec])

View File

@ -1,9 +1,8 @@
package eu.dnetlib.doiboost.crossref package eu.dnetlib.doiboost.crossref
import java.util
import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.schema.oaf._
import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.dhp.utils.DHPUtils
import eu.dnetlib.doiboost.DoiBoostMappingUtil._
import org.apache.commons.lang.StringUtils import org.apache.commons.lang.StringUtils
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
@ -14,7 +13,6 @@ import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
import scala.util.matching.Regex import scala.util.matching.Regex
import eu.dnetlib.doiboost.DoiBoostMappingUtil._
case class mappingAffiliation(name: String) {} case class mappingAffiliation(name: String) {}
@ -26,8 +24,6 @@ case class mappingFunder(name: String, DOI: Option[String], award: Option[List[S
case object Crossref2Oaf { case object Crossref2Oaf {
val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass) val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass)
val mappingCrossrefType = Map( val mappingCrossrefType = Map(
"book-section" -> "publication", "book-section" -> "publication",
"book" -> "publication", "book" -> "publication",
@ -113,17 +109,18 @@ case object Crossref2Oaf {
result.setPublisher(asField(publisher)) result.setPublisher(asField(publisher))
// TITLE // TITLE
val mainTitles = for {JString(title) <- json \ "title"} yield createSP(title, "main title", "dnet:dataCite_title") val mainTitles = for {JString(title) <- json \ "title" if title.nonEmpty} yield createSP(title, "main title", "dnet:dataCite_title")
val originalTitles = for {JString(title) <- json \ "original-title"} yield createSP(title, "alternative title", "dnet:dataCite_title") val originalTitles = for {JString(title) <- json \ "original-title" if title.nonEmpty} yield createSP(title, "alternative title", "dnet:dataCite_title")
val shortTitles = for {JString(title) <- json \ "short-title"} yield createSP(title, "alternative title", "dnet:dataCite_title") val shortTitles = for {JString(title) <- json \ "short-title" if title.nonEmpty} yield createSP(title, "alternative title", "dnet:dataCite_title")
val subtitles = for {JString(title) <- json \ "subtitle"} yield createSP(title, "subtitle", "dnet:dataCite_title") val subtitles = for {JString(title) <- json \ "subtitle" if title.nonEmpty} yield createSP(title, "subtitle", "dnet:dataCite_title")
result.setTitle((mainTitles ::: originalTitles ::: shortTitles ::: subtitles).asJava) result.setTitle((mainTitles ::: originalTitles ::: shortTitles ::: subtitles).asJava)
// DESCRIPTION // DESCRIPTION
val descriptionList = for {JString(description) <- json \ "abstract"} yield asField(description) val descriptionList = for {JString(description) <- json \ "abstract"} yield asField(description)
result.setDescription(descriptionList.asJava) result.setDescription(descriptionList.asJava)
// Source // Source
val sourceList = for {JString(source) <- json \ "source"} yield asField(source) val sourceList = for {JString(source) <- json \ "source" if source.nonEmpty} yield asField(source)
result.setSource(sourceList.asJava) result.setSource(sourceList.asJava)
//RELEVANT DATE Mapping //RELEVANT DATE Mapping
@ -142,7 +139,6 @@ case object Crossref2Oaf {
} }
result.setRelevantdate(List(createdDate, postedDate, acceptedDate, publishedOnlineDate, publishedPrintDate).filter(p => p != null).asJava) result.setRelevantdate(List(createdDate, postedDate, acceptedDate, publishedOnlineDate, publishedPrintDate).filter(p => p != null).asJava)
//Mapping Subject //Mapping Subject
val subjectList:List[String] = (json \ "subject").extractOrElse[List[String]](List()) val subjectList:List[String] = (json \ "subject").extractOrElse[List[String]](List())
@ -152,7 +148,7 @@ case object Crossref2Oaf {
//Mapping AUthor //Mapping Author
val authorList: List[mappingAuthor] = (json \ "author").extractOrElse[List[mappingAuthor]](List()) val authorList: List[mappingAuthor] = (json \ "author").extractOrElse[List[mappingAuthor]](List())
result.setAuthor(authorList.map(a => generateAuhtor(a.given.orNull, a.family, a.ORCID.orNull)).asJava) result.setAuthor(authorList.map(a => generateAuhtor(a.given.orNull, a.family, a.ORCID.orNull)).asJava)
@ -173,7 +169,6 @@ case object Crossref2Oaf {
instance.setAccessright(createQualifier("Restricted", "dnet:access_modes")) instance.setAccessright(createQualifier("Restricted", "dnet:access_modes"))
result.setInstance(List(instance).asJava) result.setInstance(List(instance).asJava)
instance.setInstancetype(createQualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), "dnet:publication_resource", "dnet:publication_resource")) instance.setInstancetype(createQualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), "dnet:publication_resource", "dnet:publication_resource"))
@ -405,11 +400,8 @@ case object Crossref2Oaf {
publication.setJournal(journal) publication.setJournal(journal)
} }
} }
} }
def extractDate(dt: String, datePart: List[List[Int]]): String = { def extractDate(dt: String, datePart: List[List[Int]]): String = {
if (StringUtils.isNotBlank(dt)) if (StringUtils.isNotBlank(dt))
return dt return dt
@ -427,18 +419,12 @@ case object Crossref2Oaf {
} }
def generateDate(dt: String, datePart: List[List[Int]], classId: String, schemeId: String): StructuredProperty = { def generateDate(dt: String, datePart: List[List[Int]], classId: String, schemeId: String): StructuredProperty = {
val dp = extractDate(dt, datePart) val dp = extractDate(dt, datePart)
if (StringUtils.isNotBlank(dp)) if (StringUtils.isNotBlank(dp))
return createSP(dp, classId, schemeId) return createSP(dp, classId, schemeId)
null null
} }
def generateItemFromType(objectType: String, objectSubType: String): Result = { def generateItemFromType(objectType: String, objectSubType: String): Result = {
if (mappingCrossrefType.contains(objectType)) { if (mappingCrossrefType.contains(objectType)) {
if (mappingCrossrefType(objectType).equalsIgnoreCase("publication")) if (mappingCrossrefType(objectType).equalsIgnoreCase("publication"))

View File

@ -27,20 +27,20 @@ public class CrossrefImporter {
CrossrefImporter.class CrossrefImporter.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/doiboost/import_from_es.json"))); "/eu/dnetlib/dhp/doiboost/import_from_es.json")));
Logger logger = LoggerFactory.getLogger(CrossrefImporter.class);
parser.parseArgument(args); parser.parseArgument(args);
final String hdfsuri = parser.get("namenode"); final String hdfsuri = parser.get("namenode");
logger.info("HDFS URI" + hdfsuri); System.out.println("HDFS URI" + hdfsuri);
Path hdfswritepath = new Path(parser.get("targetPath")); Path hdfswritepath = new Path(parser.get("targetPath"));
logger.info("TargetPath: " + hdfsuri); System.out.println("TargetPath: " + hdfsuri);
final Long timestamp = StringUtils.isNotBlank(parser.get("timestamp")) final Long timestamp = StringUtils.isNotBlank(parser.get("timestamp"))
? Long.parseLong(parser.get("timestamp")) ? Long.parseLong(parser.get("timestamp"))
: -1; : -1;
if (timestamp > 0) if (timestamp > 0)
logger.info("Timestamp added " + timestamp); System.out.println("Timestamp added " + timestamp);
// ====== Init HDFS File System Object // ====== Init HDFS File System Object
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -70,11 +70,11 @@ public class CrossrefImporter {
key.set(i++); key.set(i++);
value.set(client.next()); value.set(client.next());
writer.append(key, value); writer.append(key, value);
if (i % 1000000 == 0) { if (i % 100000 == 0) {
end = System.currentTimeMillis(); end = System.currentTimeMillis();
final float time = (end - start) / 1000.0F; final float time = (end - start) / 1000.0F;
logger System.out
.info( .println(
String.format("Imported %d records last 100000 imported in %f seconds", i, time)); String.format("Imported %d records last 100000 imported in %f seconds", i, time));
start = System.currentTimeMillis(); start = System.currentTimeMillis();
} }

View File

@ -2,10 +2,11 @@ package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf import eu.dnetlib.dhp.schema.oaf
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Result} import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
@ -28,9 +29,9 @@ object SparkMapDumpIntoOAF {
.appName(SparkMapDumpIntoOAF.getClass.getSimpleName) .appName(SparkMapDumpIntoOAF.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate() .master(parser.get("master")).getOrCreate()
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.bean(classOf[Publication]) implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.bean(classOf[Relation]) implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.kryo[Relation]
implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.bean(classOf[eu.dnetlib.dhp.schema.oaf.Dataset]) implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset]
val sc = spark.sparkContext val sc = spark.sparkContext
val targetPath = parser.get("targetPath") val targetPath = parser.get("targetPath")
@ -40,19 +41,56 @@ object SparkMapDumpIntoOAF {
.map(k => k._2.toString).map(CrossrefImporter.decompressBlob) .map(k => k._2.toString).map(CrossrefImporter.decompressBlob)
.flatMap(k => Crossref2Oaf.convert(k)).saveAsObjectFile(s"${targetPath}/mixObject") .flatMap(k => Crossref2Oaf.convert(k)).saveAsObjectFile(s"${targetPath}/mixObject")
val inputRDD = sc.objectFile[Oaf](s"${targetPath}/mixObject").filter(p=> p!= null) val inputRDD = sc.objectFile[Oaf](s"${targetPath}/mixObject").filter(p=> p!= null)
val pubs: Dataset[Publication] = spark.createDataset(inputRDD.filter(k => k != null && k.isInstanceOf[Publication]) val distinctPubs:RDD[Publication] = inputRDD.filter(k => k != null && k.isInstanceOf[Publication])
.map(k => k.asInstanceOf[Publication])) .map(k => k.asInstanceOf[Publication]).map { p: Publication => Tuple2(p.getId, p) }.reduceByKey { case (p1: Publication, p2: Publication) =>
var r = if (p1 == null) p2 else p1
if (p1 != null && p2 != null) {
if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) {
if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp)
r = p2
else
r = p1
} else {
r = if (p1.getLastupdatetimestamp == null) p2 else p1
}
}
r
}.map(_._2)
val pubs:Dataset[Publication] = spark.createDataset(distinctPubs)
pubs.write.mode(SaveMode.Overwrite).save(s"${targetPath}/publication") pubs.write.mode(SaveMode.Overwrite).save(s"${targetPath}/publication")
val ds: Dataset[eu.dnetlib.dhp.schema.oaf.Dataset] = spark.createDataset(inputRDD.filter(k => k != null && k.isInstanceOf[eu.dnetlib.dhp.schema.oaf.Dataset])
.map(k => k.asInstanceOf[eu.dnetlib.dhp.schema.oaf.Dataset]))
ds.write.mode(SaveMode.Overwrite).save(s"${targetPath}/dataset")
val rels: Dataset[Relation] = spark.createDataset(inputRDD.filter(k => k != null && k.isInstanceOf[Relation]) val distincDatasets:RDD[OafDataset] = inputRDD.filter(k => k != null && k.isInstanceOf[OafDataset])
.map(k => k.asInstanceOf[Relation])) .map(k => k.asInstanceOf[OafDataset]).map(p => Tuple2(p.getId, p)).reduceByKey { case (p1: OafDataset, p2: OafDataset) =>
var r = if (p1 == null) p2 else p1
if (p1 != null && p2 != null) {
if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) {
if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp)
r = p2
else
r = p1
} else {
r = if (p1.getLastupdatetimestamp == null) p2 else p1
}
}
r
}.map(_._2)
spark.createDataset(distincDatasets).write.mode(SaveMode.Overwrite).save(s"${targetPath}/dataset")
val distinctRels =inputRDD.filter(k => k != null && k.isInstanceOf[Relation])
.map(k => k.asInstanceOf[Relation]).map(r=> (s"${r.getSource}::${r.getTarget}",r))
.reduceByKey { case (p1: Relation, p2: Relation) =>
if (p1 == null) p2 else p1
}.map(_._2)
val rels: Dataset[Relation] = spark.createDataset(distinctRels)
rels.write.mode(SaveMode.Overwrite).save(s"${targetPath}/relations") rels.write.mode(SaveMode.Overwrite).save(s"${targetPath}/relations")
} }

View File

@ -1,7 +1,8 @@
package eu.dnetlib.doiboost.mag package eu.dnetlib.doiboost.mag
import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication} import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication, StructuredProperty}
import eu.dnetlib.doiboost.DoiBoostMappingUtil
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse import org.json4s.jackson.JsonMethods.parse
@ -63,12 +64,98 @@ case object ConversionUtil {
} }
def mergePublication(a: Publication, b:Publication) : Publication = {
if ((a != null) && (b != null)) {
a.mergeFrom(b)
a
} else {
if (a == null) b else a
}
}
def choiceLatestMagArtitcle(p1: MagPapers, p2:MagPapers) :MagPapers = {
var r = if (p1 == null) p2 else p1
if (p1 != null && p2 != null) {
if (p1.CreatedDate != null && p2.CreatedDate != null) {
if (p1.CreatedDate.before(p2.CreatedDate))
r = p2
else
r = p1
} else {
r = if (p1.CreatedDate == null) p2 else p1
}
}
r
}
def updatePubsWithDescription(inputItem:((String, Publication), MagPaperAbstract)) : Publication = {
val pub = inputItem._1._2
val abst = inputItem._2
if (abst != null) {
pub.setDescription(List(asField(abst.IndexedAbstract)).asJava)
}
pub
}
def updatePubsWithConferenceInfo(inputItem:((String, Publication), MagConferenceInstance)) : Publication = {
val publication:Publication= inputItem._1._2
val ci:MagConferenceInstance = inputItem._2
if (ci!= null){
val j:Journal = new Journal
if (ci.Location.isDefined)
j.setConferenceplace(ci.Location.get)
j.setName(ci.DisplayName.get)
if (ci.StartDate.isDefined && ci.EndDate.isDefined)
{
j.setConferencedate(s"${ci.StartDate.get.toString} - ${ci.EndDate.get.toString}")
}
publication.setJournal(j)
}
publication
}
def updatePubsWithSubject(item:((String, Publication), MagFieldOfStudy)) : Publication = {
val publication = item._1._2
val fieldOfStudy = item._2
if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) {
val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => {
val s1 = createSP(s.DisplayName, "keywords", "dnet:subject_classification_typologies")
val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString)
var resList: List[StructuredProperty] = List(s1)
if (s.MainType.isDefined) {
val maintp = s.MainType.get
val s2 = createSP(s.MainType.get, "keywords", "dnet:subject_classification_typologies")
s2.setDataInfo(di)
resList = resList ::: List(s2)
if (maintp.contains(".")) {
val s3 = createSP(maintp.split("\\.").head, "keywords", "dnet:subject_classification_typologies")
s3.setDataInfo(di)
resList = resList ::: List(s3)
}
}
resList
})
publication.setSubject(p.asJava)
}
publication
}
def addInstances(a: (Publication, MagUrl)): Publication = { def addInstances(a: (Publication, MagUrl)): Publication = {
val pub = a._1 val pub = a._1
val urls = a._2 val urls = a._2
val i = new Instance val i = new Instance

View File

@ -1,15 +1,13 @@
package eu.dnetlib.doiboost.mag package eu.dnetlib.doiboost.mag
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Journal, Publication, StructuredProperty} import eu.dnetlib.dhp.schema.oaf.Publication
import eu.dnetlib.doiboost.DoiBoostMappingUtil
import eu.dnetlib.doiboost.DoiBoostMappingUtil.{asField, createSP}
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
@ -36,25 +34,15 @@ object SparkPreProcessMAG {
val d: Dataset[MagPapers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[MagPapers] val d: Dataset[MagPapers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[MagPapers]
// Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one // Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one
val result: RDD[MagPapers] = d.where(col("Doi").isNotNull).rdd.map { p: MagPapers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: MagPapers, p2: MagPapers) => val result: RDD[MagPapers] = d.where(col("Doi").isNotNull)
var r = if (p1 == null) p2 else p1 .rdd
if (p1 != null && p2 != null) { .map{ p: MagPapers => Tuple2(p.Doi, p) }
if (p1.CreatedDate != null && p2.CreatedDate != null) { .reduceByKey((p1:MagPapers,p2:MagPapers) => ConversionUtil.choiceLatestMagArtitcle(p1,p2))
if (p1.CreatedDate.before(p2.CreatedDate)) .map(_._2)
r = p1
else
r = p2
} else {
r = if (p1.CreatedDate == null) p2 else p1
}
}
r
}.map(_._2)
val distinctPaper: Dataset[MagPapers] = spark.createDataset(result) val distinctPaper: Dataset[MagPapers] = spark.createDataset(result)
distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct") distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct")
logger.info(s"Total number of element: ${result.count()}")
logger.info("Phase 3) Group Author by PaperId") logger.info("Phase 3) Group Author by PaperId")
val authors = spark.read.load(s"$sourcePath/Authors").as[MagAuthor] val authors = spark.read.load(s"$sourcePath/Authors").as[MagAuthor]
@ -85,46 +73,35 @@ object SparkPreProcessMAG {
val firstJoin = papers.joinWith(journals, papers("JournalId").equalTo(journals("JournalId")), "left") val firstJoin = papers.joinWith(journals, papers("JournalId").equalTo(journals("JournalId")), "left")
firstJoin.joinWith(paperWithAuthors, firstJoin("_1.PaperId").equalTo(paperWithAuthors("PaperId")), "left") firstJoin.joinWith(paperWithAuthors, firstJoin("_1.PaperId").equalTo(paperWithAuthors("PaperId")), "left")
.map { a: ((MagPapers, MagJournal), MagPaperWithAuthorList) => ConversionUtil.createOAFFromJournalAuthorPaper(a) }.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2") .map { a => ConversionUtil.createOAFFromJournalAuthorPaper(a) }
.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2")
var magPubs: Dataset[(String, Publication)] = spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] var magPubs: Dataset[(String, Publication)] =
spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication]
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
val conference = spark.read.load(s"$sourcePath/ConferenceInstances").select($"ConferenceInstanceId".as("ci"), $"DisplayName", $"Location", $"StartDate",$"EndDate" ) val conference = spark.read.load(s"$sourcePath/ConferenceInstances")
val conferenceInstance = conference.joinWith(papers, papers("ConferenceInstanceId").equalTo(conference("ci"))).select($"_1.ci", $"_1.DisplayName", $"_1.Location", $"_1.StartDate",$"_1.EndDate", $"_2.PaperId").as[MagConferenceInstance] .select($"ConferenceInstanceId".as("ci"), $"DisplayName", $"Location", $"StartDate",$"EndDate" )
val conferenceInstance = conference.joinWith(papers, papers("ConferenceInstanceId").equalTo(conference("ci")))
.select($"_1.ci", $"_1.DisplayName", $"_1.Location", $"_1.StartDate",$"_1.EndDate", $"_2.PaperId").as[MagConferenceInstance]
magPubs.joinWith(conferenceInstance, col("_1").equalTo(conferenceInstance("PaperId")), "left") magPubs.joinWith(conferenceInstance, col("_1").equalTo(conferenceInstance("PaperId")), "left")
.map(p => { .map(item => ConversionUtil.updatePubsWithConferenceInfo(item))
val publication:Publication= p._1._2 .write
val ci:MagConferenceInstance = p._2 .mode(SaveMode.Overwrite)
.save(s"${parser.get("targetPath")}/merge_step_2_conference")
if (ci!= null){
val j:Journal = new Journal
if (ci.Location.isDefined)
j.setConferenceplace(ci.Location.get)
j.setName(ci.DisplayName.get)
if (ci.StartDate.isDefined && ci.EndDate.isDefined)
{
j.setConferencedate(s"${ci.StartDate.get.toString} - ${ci.EndDate.get.toString}")
}
publication.setJournal(j)
}
publication
}).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2_conference")
magPubs= spark.read.load(s"${parser.get("targetPath")}/merge_step_2_conference").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] magPubs= spark.read.load(s"${parser.get("targetPath")}/merge_step_2_conference").as[Publication]
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl] val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl]
logger.info("Phase 5) enrich publication with URL and Instances") logger.info("Phase 5) enrich publication with URL and Instances")
magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left") magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left")
.map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) } .map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) }
.write.mode(SaveMode.Overwrite) .write.mode(SaveMode.Overwrite)
@ -138,22 +115,18 @@ object SparkPreProcessMAG {
val paperAbstract = spark.read.load((s"${parser.get("targetPath")}/PaperAbstract")).as[MagPaperAbstract] val paperAbstract = spark.read.load((s"${parser.get("targetPath")}/PaperAbstract")).as[MagPaperAbstract]
magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication]
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left").map(p => { magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left")
val pub = p._1._2 .map(item => ConversionUtil.updatePubsWithDescription(item)
val abst = p._2
if (abst != null) {
pub.setDescription(List(asField(abst.IndexedAbstract)).asJava)
}
pub
}
).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_4") ).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_4")
logger.info("Phase 7) Enrich Publication with FieldOfStudy") logger.info("Phase 7) Enrich Publication with FieldOfStudy")
magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_4").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_4").as[Publication]
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
val fos = spark.read.load(s"$sourcePath/FieldsOfStudy").select($"FieldOfStudyId".alias("fos"), $"DisplayName", $"MainType") val fos = spark.read.load(s"$sourcePath/FieldsOfStudy").select($"FieldOfStudyId".alias("fos"), $"DisplayName", $"MainType")
@ -164,31 +137,18 @@ object SparkPreProcessMAG {
.groupBy($"PaperId").agg(collect_list(struct($"FieldOfStudyId", $"DisplayName", $"MainType", $"Score")).as("subjects")) .groupBy($"PaperId").agg(collect_list(struct($"FieldOfStudyId", $"DisplayName", $"MainType", $"Score")).as("subjects"))
.as[MagFieldOfStudy] .as[MagFieldOfStudy]
magPubs.joinWith(paperField, col("_1").equalTo(paperField("PaperId")), "left"). magPubs.joinWith(paperField, col("_1")
map(item => { .equalTo(paperField("PaperId")), "left")
val publication = item._1._2 .map(item => ConversionUtil.updatePubsWithSubject(item))
val fieldOfStudy = item._2 .write.mode(SaveMode.Overwrite)
if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) { .save(s"${parser.get("targetPath")}/mag_publication")
val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => {
val s1 = createSP(s.DisplayName, "keywords", "dnet:subject_classification_typologies")
val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString) val s:RDD[Publication] = spark.read.load(s"${parser.get("targetPath")}/mag_publication").as[Publication]
var resList: List[StructuredProperty] = List(s1) .map(p=>Tuple2(p.getId, p)).rdd.reduceByKey((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b))
if (s.MainType.isDefined) { .map(_._2)
val maintp = s.MainType.get
val s2 = createSP(s.MainType.get, "keywords", "dnet:subject_classification_typologies") spark.createDataset(s).as[Publication].write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/mag_publication_u")
s2.setDataInfo(di)
resList = resList ::: List(s2)
if (maintp.contains(".")) {
val s3 = createSP(maintp.split("\\.").head, "keywords", "dnet:subject_classification_typologies")
s3.setDataInfo(di)
resList = resList ::: List(s3)
}
}
resList
})
publication.setSubject(p.asJava)
}
publication
}).map { s: Publication => s }(Encoders.bean(classOf[Publication])).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/mag_publication")
} }
} }

View File

@ -15,6 +15,10 @@
<name>oozie.action.sharelib.for.spark</name> <name>oozie.action.sharelib.for.spark</name>
<value>spark2</value> <value>spark2</value>
</property> </property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property> <property>
<name>hive_metastore_uris</name> <name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value> <value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>

View File

@ -16,6 +16,11 @@
<name>sparkExecutorCores</name> <name>sparkExecutorCores</name>
<description>number of cores used by single executor</description> <description>number of cores used by single executor</description>
</property> </property>
<property>
<name>timestamp</name>
<description>Timestamp for incremental Harvesting</description>
</property>
</parameters> </parameters>
<start to="ExtractCrossrefToOAF"/> <start to="ExtractCrossrefToOAF"/>
@ -27,8 +32,8 @@
<action name="ResetWorkingPath"> <action name="ResetWorkingPath">
<fs> <fs>
<delete path='${workingPath}'/> <delete path='${workingPath}/input/crossref/index_dump'/>
<mkdir path='${workingPath}/input/crossref'/> <!-- <mkdir path='${workingPath}/input/crossref'/>-->
</fs> </fs>
<ok to="ImportCrossRef"/> <ok to="ImportCrossRef"/>
<error to="Kill"/> <error to="Kill"/>
@ -43,13 +48,13 @@
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class> <main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
<arg>-t</arg><arg>${workingPath}/input/crossref/index_dump</arg> <arg>-t</arg><arg>${workingPath}/input/crossref/index_dump</arg>
<arg>-n</arg><arg>${nameNode}</arg> <arg>-n</arg><arg>${nameNode}</arg>
<arg>-ts</arg><arg>${timestamp}</arg>
</java> </java>
<ok to="End"/> <ok to="ExtractCrossrefToOAF"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="ExtractCrossrefToOAF"> <action name="ExtractCrossrefToOAF">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master> <master>yarn-cluster</master>
@ -63,7 +68,7 @@
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
${sparkExtraOPT} ${sparkExtraOPT}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${workingPath}/crossref/index_dump</arg> <arg>--sourcePath</arg><arg>${workingPath}/input/crossref/index_dump,${workingPath}/crossref/index_dump</arg>
<arg>--targetPath</arg><arg>${workingPath}/input/crossref</arg> <arg>--targetPath</arg><arg>${workingPath}/input/crossref</arg>
<arg>--master</arg><arg>yarn-cluster</arg> <arg>--master</arg><arg>yarn-cluster</arg>
</spark> </spark>

View File

@ -22,7 +22,7 @@
</property> </property>
</parameters> </parameters>
<start to="ResetWorkingPath"/> <start to="PreprocessMag"/>
<kill name="Kill"> <kill name="Kill">
@ -31,8 +31,8 @@
<action name="ResetWorkingPath"> <action name="ResetWorkingPath">
<fs> <fs>
<delete path='${targetPath}'/> <delete path='${targetPath}/preprocess'/>
<mkdir path='${targetPath}'/> <mkdir path='${targetPath}/preprocess'/>
</fs> </fs>
<ok to="PreprocessMag"/> <ok to="PreprocessMag"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -58,6 +58,28 @@ class CrossrefMappingTest {
} }
@Test
def testEmptyTitle() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("empty_title.json")).mkString
assertNotNull(json)
assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json)
assertTrue(resultList.nonEmpty)
val items = resultList.filter(p => p.isInstanceOf[Result])
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
items.foreach(p => println(mapper.writeValueAsString(p)))
}
@Test @Test
def testPeerReviewed(): Unit = { def testPeerReviewed(): Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("prwTest.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("prwTest.json")).mkString

View File

@ -0,0 +1,121 @@
{
"indexed": {
"date-parts": [
[
2020,
4,
7
]
],
"date-time": "2020-04-07T15:54:28Z",
"timestamp": 1586274868901
},
"reference-count": 0,
"publisher": "Japan Society of Mechanical Engineers",
"issue": "432",
"content-domain": {
"domain": [],
"crossmark-restriction": false
},
"short-container-title": [
"JSMET"
],
"published-print": {
"date-parts": [
[
1982
]
]
},
"DOI": "10.1299\/kikaib.48.1474",
"type": "journal-article",
"created": {
"date-parts": [
[
2011,
9,
13
]
],
"date-time": "2011-09-13T05:59:01Z",
"timestamp": 1315893541000
},
"page": "1474-1482",
"source": "Crossref",
"is-referenced-by-count": 0,
"title": [
""
],
"prefix": "10.1299",
"volume": "48",
"author": [
{
"given": "Hiroshi",
"family": "KATO",
"sequence": "first",
"affiliation": []
},
{
"given": "Yoshichika",
"family": "MIZUNO",
"sequence": "additional",
"affiliation": []
}
],
"member": "124",
"container-title": [
"Transactions of the Japan Society of Mechanical Engineers Series B"
],
"original-title": [
"\u5e0c\u8584\u9ad8\u5206\u5b50\u6eb6\u6db2\u4e2d\u306e\u6709\u9650\u9577\u5186\u67f1\u306e\u62b5\u6297"
],
"language": "ja",
"deposited": {
"date-parts": [
[
2011,
9,
13
]
],
"date-time": "2011-09-13T06:01:33Z",
"timestamp": 1315893693000
},
"score": 1.0,
"subtitle": [],
"short-title": [],
"issued": {
"date-parts": [
[
1982
]
]
},
"references-count": 0,
"journal-issue": {
"published-print": {
"date-parts": [
[
1982
]
]
},
"issue": "432"
},
"URL": "http:\/\/dx.doi.org\/10.1299\/kikaib.48.1474",
"relation": {},
"ISSN": [
"0387-5016",
"1884-8346"
],
"issn-type": [
{
"value": "0387-5016",
"type": "print"
},
{
"value": "1884-8346",
"type": "electronic"
}
]
}