merge with beta - resolved conflict in pom

pull/159/head
Miriam Baglioni 2 years ago
commit b3f9370125

@ -70,7 +70,7 @@ case object Crossref2Oaf {
"reference-book" -> "0002 Book",
"monograph" -> "0002 Book",
"journal-article" -> "0001 Article",
"dissertation" -> "0006 Doctoral thesis",
"dissertation" -> "0044 Thesis",
"other" -> "0038 Other literature type",
"peer-review" -> "0015 Review",
"proceedings" -> "0004 Conference object",
@ -206,11 +206,16 @@ case object Crossref2Oaf {
else {
instance.setDateofacceptance(asField(createdDate.getValue))
}
val s: String = (json \ "URL").extract[String]
val links: List[String] = ((for {JString(url) <- json \ "link" \ "URL"} yield url) ::: List(s)).filter(p => p != null).distinct
if (links.nonEmpty) {
instance.setUrl(links.asJava)
}
val s: List[String] = List("https://doi.org/" + doi)
// val links: List[String] = ((for {JString(url) <- json \ "link" \ "URL"} yield url) ::: List(s)).filter(p => p != null && p.toLowerCase().contains(doi.toLowerCase())).distinct
// if (links.nonEmpty) {
// instance.setUrl(links.asJava)
// }
if(s.nonEmpty)
{
instance.setUrl(s.asJava)
}
result.setInstance(List(instance).asJava)
//IMPORTANT

@ -111,26 +111,9 @@ object SparkProcessMAG {
.map(item => ConversionUtil.updatePubsWithConferenceInfo(item))
.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/merge_step_2_conference")
magPubs= spark.read.load(s"$workingPath/merge_step_2_conference").as[Publication]
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl]
logger.info("Phase 5) enrich publication with URL and Instances")
magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left")
.map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) }
.write.mode(SaveMode.Overwrite)
.save(s"$workingPath/merge_step_3")
// logger.info("Phase 6) Enrich Publication with description")
// val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
// pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")
val paperAbstract = spark.read.load((s"$workingPath/PaperAbstract")).as[MagPaperAbstract]
@ -162,12 +145,14 @@ object SparkProcessMAG {
.write.mode(SaveMode.Overwrite)
.save(s"$workingPath/mag_publication")
spark.read.load(s"$workingPath/mag_publication").as[Publication]
.filter(p => p.getId == null)
.groupByKey(p => p.getId)
.reduceGroups((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b))
.map(_._2)
.write.mode(SaveMode.Overwrite).save(s"$targetPath/magPublication")
val s:RDD[Publication] = spark.read.load(s"$workingPath/mag_publication").as[Publication]
.map(p=>Tuple2(p.getId, p)).rdd.reduceByKey((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b))
.map(_._2)
spark.createDataset(s).as[Publication].write.mode(SaveMode.Overwrite).save(s"$targetPath/magPublication")
}
}

@ -612,4 +612,26 @@ class CrossrefMappingTest {
}
@Test
def testMultipleURLs() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("multiple_urls.json")).mkString
assertNotNull(json)
assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json)
assertTrue(resultList.nonEmpty)
val item : Result = resultList.filter(p => p.isInstanceOf[Result]).head.asInstanceOf[Result]
assertEquals(1, item.getInstance().size())
assertEquals(1, item.getInstance().get(0).getUrl().size())
assertEquals("https://doi.org/10.1016/j.jas.2019.105013", item.getInstance().get(0).getUrl().get(0))
//println(mapper.writeValueAsString(item))
}
}

@ -0,0 +1,614 @@
{
"indexed": {
"date-parts": [
[
2021,
10,
31
]
],
"date-time": "2021-10-31T15:48:01Z",
"timestamp": 1635695281393
},
"reference-count": 39,
"publisher": "Elsevier BV",
"license": [
{
"start": {
"date-parts": [
[
2019,
12,
1
]
],
"date-time": "2019-12-01T00:00:00Z",
"timestamp": 1575158400000
},
"content-version": "tdm",
"delay-in-days": 0,
"URL": "https://www.elsevier.com/tdm/userlicense/1.0/"
},
{
"start": {
"date-parts": [
[
2019,
9,
13
]
],
"date-time": "2019-09-13T00:00:00Z",
"timestamp": 1568332800000
},
"content-version": "vor",
"delay-in-days": 0,
"URL": "http://creativecommons.org/licenses/by/4.0/"
}
],
"funder": [
{
"DOI": "10.13039/100001182",
"name": "INSTAP",
"doi-asserted-by": "publisher"
},
{
"DOI": "10.13039/100014440",
"name": "Ministry of Science, Innovation and Universities",
"doi-asserted-by": "publisher",
"award": [
"RYC-2016-19637"
]
},
{
"DOI": "10.13039/100010661",
"name": "European Unions Horizon 2020",
"doi-asserted-by": "publisher",
"award": [
"746446"
]
}
],
"content-domain": {
"domain": [
"elsevier.com",
"sciencedirect.com"
],
"crossmark-restriction": true
},
"short-container-title": [
"Journal of Archaeological Science"
],
"published-print": {
"date-parts": [
[
2019,
12
]
]
},
"DOI": "10.1016/j.jas.2019.105013",
"type": "journal-article",
"created": {
"date-parts": [
[
2019,
9,
25
]
],
"date-time": "2019-09-25T20:05:08Z",
"timestamp": 1569441908000
},
"page": "105013",
"update-policy": "http://dx.doi.org/10.1016/elsevier_cm_policy",
"source": "Crossref",
"is-referenced-by-count": 21,
"title": [
"A brave new world for archaeological survey: Automated machine learning-based potsherd detection using high-resolution drone imagery"
],
"prefix": "10.1016",
"volume": "112",
"author": [
{
"given": "H.A.",
"family": "Orengo",
"sequence": "first",
"affiliation": [
]
},
{
"given": "A.",
"family": "Garcia-Molsosa",
"sequence": "additional",
"affiliation": [
]
}
],
"member": "78",
"reference": [
{
"key": "10.1016/j.jas.2019.105013_bib1",
"doi-asserted-by": "crossref",
"first-page": "85",
"DOI": "10.1080/17538947.2016.1250829",
"article-title": "Remote sensing heritage in a petabyte-scale: satellite data and heritage Earth Engine© applications",
"volume": "10",
"author": "Agapiou",
"year": "2017",
"journal-title": "Int. J. Digit. Earth"
},
{
"key": "10.1016/j.jas.2019.105013_bib2",
"series-title": "Extracting Meaning from Ploughsoil Assemblages",
"first-page": "1",
"article-title": "Extracting meaning from ploughsoil assemblages: assessments of the past, strategies for the future",
"author": "Alcock",
"year": "2000"
},
{
"key": "10.1016/j.jas.2019.105013_bib3",
"series-title": "Side-by-Side Survey. Comparative Regional Studies in the Mediterranean World",
"first-page": "1",
"article-title": "Introduction",
"author": "Alcock",
"year": "2004"
},
{
"key": "10.1016/j.jas.2019.105013_bib4",
"doi-asserted-by": "crossref",
"first-page": "93",
"DOI": "10.1111/j.1538-4632.1995.tb00338.x",
"article-title": "Local indicators of spatial association—LISA",
"volume": "27",
"author": "Anselin",
"year": "1995",
"journal-title": "Geogr. Anal."
},
{
"key": "10.1016/j.jas.2019.105013_bib5",
"series-title": "Archaeological Survey",
"author": "Banning",
"year": "2002"
},
{
"issue": "1/2",
"key": "10.1016/j.jas.2019.105013_bib6",
"doi-asserted-by": "crossref",
"first-page": "123",
"DOI": "10.2307/3181488",
"article-title": "GIS, archaeological survey and landscape archaeology on the island of Kythera, Greece",
"volume": "29",
"author": "Bevan",
"year": "2004",
"journal-title": "J. Field Archaeol."
},
{
"issue": "1",
"key": "10.1016/j.jas.2019.105013_bib8",
"doi-asserted-by": "crossref",
"first-page": "5",
"DOI": "10.1023/A:1010933404324",
"article-title": "Random forests",
"volume": "45",
"author": "Breiman",
"year": "2001",
"journal-title": "Mach. Learn."
},
{
"key": "10.1016/j.jas.2019.105013_bib9",
"series-title": "Sampling in Contemporary British Archaeology",
"author": "Cherry",
"year": "1978"
},
{
"issue": "3",
"key": "10.1016/j.jas.2019.105013_bib10",
"doi-asserted-by": "crossref",
"first-page": "273",
"DOI": "10.1016/0734-189X(84)90197-X",
"article-title": "Segmentation of a high-resolution urban scene using texture operators",
"volume": "25",
"author": "Conners",
"year": "1984",
"journal-title": "Comput. Vis. Graph Image Process"
},
{
"key": "10.1016/j.jas.2019.105013_bib11",
"first-page": "31",
"article-title": "Old land surfaces and modern ploughsoil: implications of recent work at Maxey, Cambridgeshire",
"volume": "2",
"author": "Crowther",
"year": "1983",
"journal-title": "Scott. Archaeol. Rev."
},
{
"key": "10.1016/j.jas.2019.105013_bib12",
"series-title": "Settlement Pattern Studies in the Americas: Fifty Years since Virú",
"first-page": "203",
"article-title": "Conclusions: the settlement pattern concept from an Americanist perspective",
"author": "Fish",
"year": "1999"
},
{
"key": "10.1016/j.jas.2019.105013_bib13",
"doi-asserted-by": "crossref",
"first-page": "21",
"DOI": "10.3390/geosciences9010021",
"article-title": "Remote sensing and historical morphodynamics of alluvial plains. The 1909 indus flood and the city of Dera Gazhi Khan (province of Punjab, Pakistan)",
"volume": "9",
"author": "Garcia",
"year": "2019",
"journal-title": "Geosciences"
},
{
"key": "10.1016/j.jas.2019.105013_bib14",
"unstructured": "Georgiadis, M.; Garcia-Molsosa, A.; Orengo, H.A.; Kefalidou, E. and Kallintzi, K. In Preparation. APAX Project 2015-2018: A Preliminary Report. (Hesperia)."
},
{
"key": "10.1016/j.jas.2019.105013_bib15",
"series-title": "Geographical Information Systems and Landscape Archaeology",
"first-page": "35",
"article-title": "Regional survey and GIS: the boeotia project",
"author": "Gillings",
"year": "1999"
},
{
"key": "10.1016/j.jas.2019.105013_bib16",
"doi-asserted-by": "crossref",
"first-page": "18",
"DOI": "10.1016/j.rse.2017.06.031",
"article-title": "Google Earth engine: planetary-scale geospatial analysis for everyone",
"volume": "202",
"author": "Gorelick",
"year": "2017",
"journal-title": "Remote Sens. Environ."
},
{
"issue": "107",
"key": "10.1016/j.jas.2019.105013_bib17",
"doi-asserted-by": "crossref",
"first-page": "177",
"DOI": "10.1111/j.0031-868X.2004.00278.x",
"article-title": "Photogrammetric reconstruction of the great buddha of Bamiyan, Afghanistan",
"volume": "19",
"author": "Grün",
"year": "2004",
"journal-title": "Photogramm. Rec."
},
{
"issue": "6",
"key": "10.1016/j.jas.2019.105013_bib18",
"doi-asserted-by": "crossref",
"first-page": "610",
"DOI": "10.1109/TSMC.1973.4309314",
"article-title": "Textural features for image classification",
"author": "Haralick",
"year": "1973",
"journal-title": "IEEE Trans. Syst., Man, Cybernet., SMC-3"
},
{
"key": "10.1016/j.jas.2019.105013_bib19",
"doi-asserted-by": "crossref",
"first-page": "76",
"DOI": "10.1558/jmea.v14i1.76",
"article-title": "Excavating to excess? Implications of the last decade of archaeology in Israel",
"volume": "14",
"author": "Kletter",
"year": "2001",
"journal-title": "J. Mediterr. Archaeol."
},
{
"key": "10.1016/j.jas.2019.105013_bib20",
"first-page": "299",
"article-title": "Testing Google Earth Engine for the automatic identification and vectorization of archaeological features: a case study from Faynan, Jordan",
"volume": "15",
"author": "Liss",
"year": "2017",
"journal-title": "J. Archaeol. Sci.: Report"
},
{
"key": "10.1016/j.jas.2019.105013_bib21",
"series-title": "Geographical Information Systems and Landscape Archaeology",
"first-page": "55",
"article-title": "Towards a methodology for modelling surface survey data: the sangro valley project",
"author": "Lock",
"year": "1999"
},
{
"key": "10.1016/j.jas.2019.105013_bib22",
"series-title": "Extracting Meaning from Ploughsoil Assemblages",
"first-page": "5",
"article-title": "Methods of collection recording and quantification",
"author": "Mattingly",
"year": "2000"
},
{
"issue": "14",
"key": "10.1016/j.jas.2019.105013_bib23",
"doi-asserted-by": "crossref",
"first-page": "E778",
"DOI": "10.1073/pnas.1115472109",
"article-title": "Mapping patterns of long-term settlement in Northern Mesopotamia at a large scale",
"volume": "109",
"author": "Menze",
"year": "2012",
"journal-title": "Proc. Natl. Acad. Sci."
},
{
"key": "10.1016/j.jas.2019.105013_bib24",
"doi-asserted-by": "crossref",
"first-page": "80",
"DOI": "10.1016/j.jas.2015.04.002",
"article-title": "A supervised machine-learning approach towards geochemical predictive modelling in archaeology",
"volume": "59",
"author": "Oonk",
"year": "2015",
"journal-title": "J. Archaeol. Sci."
},
{
"key": "10.1016/j.jas.2019.105013_bib25",
"doi-asserted-by": "crossref",
"first-page": "49",
"DOI": "10.1016/j.isprsjprs.2012.07.005",
"article-title": "Combining terrestrial stereophotogrammetry, DGPS and GIS-based 3D voxel modelling in the volumetric recording of archaeological features",
"volume": "76",
"author": "Orengo",
"year": "2013",
"journal-title": "ISPRS J. Photogrammetry Remote Sens."
},
{
"key": "10.1016/j.jas.2019.105013_bib26",
"doi-asserted-by": "crossref",
"first-page": "100",
"DOI": "10.1016/j.jas.2015.10.008",
"article-title": "Photogrammetric re-discovery of the Eastern Thessalian hidden long-term landscapes",
"volume": "64",
"author": "Orengo",
"year": "2015",
"journal-title": "J. Archaeol. Sci."
},
{
"issue": "3",
"key": "10.1016/j.jas.2019.105013_bib27",
"doi-asserted-by": "crossref",
"first-page": "479",
"DOI": "10.3764/aja.122.3.0479",
"article-title": "Towards a definition of Minoan agro-pastoral landscapes: results of the survey at Palaikastro (Crete)",
"volume": "122",
"author": "Orengo",
"year": "2018",
"journal-title": "Am. J. Archaeol."
},
{
"issue": "7",
"key": "10.1016/j.jas.2019.105013_bib28",
"doi-asserted-by": "crossref",
"first-page": "735",
"DOI": "10.3390/rs9070735",
"article-title": "Large-scale, multi-temporal remote sensing of palaeo-river networks: a case study from Northwest India and its implications for the Indus civilisation",
"volume": "9",
"author": "Orengo",
"year": "2017",
"journal-title": "Remote Sens."
},
{
"key": "10.1016/j.jas.2019.105013_bib29",
"doi-asserted-by": "crossref",
"first-page": "1361",
"DOI": "10.1002/esp.4317",
"article-title": "Multi-scale relief model (MSRM): a new algorithm for the visualization of subtle topographic change of variable size in digital elevation models",
"volume": "43",
"author": "Orengo",
"year": "2018",
"journal-title": "Earth Surf. Process. Landforms"
},
{
"key": "10.1016/j.jas.2019.105013_bib30",
"series-title": "Submitted to Proceedings of the National Academy of Sciences",
"article-title": "Living on the edge of the desert: automated detection of archaeological mounds in Cholistan (Pakistan) using machine learning classification of multi-sensor multi-temporal satellite data",
"author": "Orengo",
"year": "2019"
},
{
"key": "10.1016/j.jas.2019.105013_bib31",
"first-page": "154",
"article-title": "How many trees in a random forest?",
"volume": "vol. 7376",
"author": "Oshiro",
"year": "2012"
},
{
"key": "10.1016/j.jas.2019.105013_bib32",
"article-title": "Decision-making in modern surveys",
"volume": "ume 1",
"author": "Plog",
"year": "1978"
},
{
"issue": "4",
"key": "10.1016/j.jas.2019.105013_bib33",
"doi-asserted-by": "crossref",
"first-page": "100",
"DOI": "10.3390/geosciences7040100",
"article-title": "From above and on the ground: geospatial methods for recording endangered archaeology in the Middle East and north africa",
"volume": "7",
"author": "Rayne",
"year": "2017",
"journal-title": "Geosciences"
},
{
"issue": "1",
"key": "10.1016/j.jas.2019.105013_bib34",
"doi-asserted-by": "crossref",
"first-page": "1",
"DOI": "10.1080/00438243.1978.9979712",
"article-title": "The design of archaeological surveys",
"volume": "10",
"author": "Schiffer",
"year": "1978",
"journal-title": "World Archaeol."
},
{
"key": "10.1016/j.jas.2019.105013_bib35",
"series-title": "Experiments in the Collection and Analysis of Archaeological Survey Data: the East Hampshire Survey",
"author": "Shennan",
"year": "1985"
},
{
"key": "10.1016/j.jas.2019.105013_bib36",
"doi-asserted-by": "crossref",
"first-page": "1066",
"DOI": "10.1016/j.culher.2016.06.006",
"article-title": "Drones over Mediterranean landscapes. The potential of small UAV's (drones) for site detection and heritage management in archaeological survey projects: a case study from Le Pianelle in the Tappino Valley, Molise (Italy)",
"volume": "22",
"author": "Stek",
"year": "2016",
"journal-title": "J. Cult. Herit."
},
{
"key": "10.1016/j.jas.2019.105013_bib37",
"series-title": "Side-by-Side Survey. Comparative Regional Studies in the Mediterranean World",
"first-page": "65",
"article-title": "Side-by-side and back to front: exploring intra-regional latitudinal and longitudinal comparability in survey data. Three case studies from Metaponto, southern Italy",
"author": "Thomson",
"year": "2004"
},
{
"key": "10.1016/j.jas.2019.105013_bib38",
"series-title": "Digital Discovery. Exploring New Frontiers in Human Heritage. Computer Applications and Quantitative Methods in Archaeology",
"article-title": "Computer vision and machine learning for archaeology",
"author": "van der Maaten",
"year": "2007"
},
{
"key": "10.1016/j.jas.2019.105013_bib39",
"doi-asserted-by": "crossref",
"first-page": "1114",
"DOI": "10.1111/j.1475-4754.2012.00667.x",
"article-title": "Computer vision-based orthophoto mapping of complex archaeological sites: the ancient quarry of Pitaranha (Portugal-Spain)",
"volume": "54",
"author": "Verhoeven",
"year": "2012",
"journal-title": "Archaeometry"
},
{
"key": "10.1016/j.jas.2019.105013_bib40",
"series-title": "A Guide for Salvage Archeology",
"author": "Wendorf",
"year": "1962"
}
],
"container-title": [
"Journal of Archaeological Science"
],
"original-title": [
],
"language": "en",
"link": [
{
"URL": "https://api.elsevier.com/content/article/PII:S0305440319301001?httpAccept=text/xml",
"content-type": "text/xml",
"content-version": "vor",
"intended-application": "text-mining"
},
{
"URL": "https://api.elsevier.com/content/article/PII:S0305440319301001?httpAccept=text/plain",
"content-type": "text/plain",
"content-version": "vor",
"intended-application": "text-mining"
}
],
"deposited": {
"date-parts": [
[
2019,
11,
25
]
],
"date-time": "2019-11-25T06:46:34Z",
"timestamp": 1574664394000
},
"score": 1,
"subtitle": [
],
"short-title": [
],
"issued": {
"date-parts": [
[
2019,
12
]
]
},
"references-count": 39,
"alternative-id": [
"S0305440319301001"
],
"URL": "http://dx.doi.org/10.1016/j.jas.2019.105013",
"relation": {
},
"ISSN": [
"0305-4403"
],
"issn-type": [
{
"value": "0305-4403",
"type": "print"
}
],
"subject": [
"Archaeology",
"Archaeology"
],
"published": {
"date-parts": [
[
2019,
12
]
]
},
"assertion": [
{
"value": "Elsevier",
"name": "publisher",
"label": "This article is maintained by"
},
{
"value": "A brave new world for archaeological survey: Automated machine learning-based potsherd detection using high-resolution drone imagery",
"name": "articletitle",
"label": "Article Title"
},
{
"value": "Journal of Archaeological Science",
"name": "journaltitle",
"label": "Journal Title"
},
{
"value": "https://doi.org/10.1016/j.jas.2019.105013",
"name": "articlelink",
"label": "CrossRef DOI link to publisher maintained version"
},
{
"value": "article",
"name": "content_type",
"label": "Content Type"
},
{
"value": "© 2019 The Authors. Published by Elsevier Ltd.",
"name": "copyright",
"label": "Copyright"
}
],
"article-number": "105013"
}

@ -0,0 +1,107 @@
package eu.dnetlib.dhp.oa.graph.resolution
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.common.EntityType
import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
object SparkResolveEntities {
val mapper = new ObjectMapper()
val entities = List(EntityType.dataset,EntityType.publication, EntityType.software, EntityType.otherresearchproduct)
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/resolution/resolve_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val graphBasePath = parser.get("graphBasePath")
log.info(s"graphBasePath -> $graphBasePath")
val workingPath = parser.get("workingPath")
log.info(s"workingPath -> $workingPath")
val unresolvedPath = parser.get("unresolvedPath")
log.info(s"unresolvedPath -> $unresolvedPath")
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(new Path(workingPath))
resolveEntities(spark, workingPath, unresolvedPath)
generateResolvedEntities(spark, workingPath, graphBasePath)
// TO BE conservative we keep the original entities in the working dir
// and save the resolved entities on the graphBasePath
//In future these lines of code should be removed
entities.foreach {
e =>
fs.rename(new Path(s"$graphBasePath/$e"), new Path(s"$workingPath/${e}_old"))
fs.rename(new Path(s"$workingPath/resolvedGraph/$e"), new Path(s"$graphBasePath/$e"))
}
}
def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: String) = {
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
import spark.implicits._
val rPid: Dataset[(String, String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String, String)]
val up: Dataset[(String, Result)] = spark.read.text(unresolvedPath).as[String].map(s => mapper.readValue(s, classOf[Result])).map(r => (r.getId, r))(Encoders.tuple(Encoders.STRING, resEncoder))
rPid.joinWith(up, rPid("_2").equalTo(up("_1")), "inner").map {
r =>
val result = r._2._2
val dnetId = r._1._1
result.setId(dnetId)
result
}.write.mode(SaveMode.Overwrite).save(s"$workingPath/resolvedEntities")
}
def deserializeObject(input:String, entity:EntityType ) :Result = {
entity match {
case EntityType.publication => mapper.readValue(input, classOf[Publication])
case EntityType.dataset => mapper.readValue(input, classOf[OafDataset])
case EntityType.software=> mapper.readValue(input, classOf[Software])
case EntityType.otherresearchproduct=> mapper.readValue(input, classOf[OtherResearchProduct])
}
}
def generateResolvedEntities(spark:SparkSession, workingPath: String, graphBasePath:String) = {
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
import spark.implicits._
val re:Dataset[Result] = spark.read.load(s"$workingPath/resolvedEntities").as[Result]
entities.foreach {
e =>
spark.read.text(s"$graphBasePath/$e").as[String]
.map(s => deserializeObject(s, e))
.union(re)
.groupByKey(_.getId)
.reduceGroups {
(x, y) =>
x.mergeFrom(y)
x
}.map(_._2)
.filter(r => r.getClass.getSimpleName.toLowerCase != "result")
.map(r => mapper.writeValueAsString(r))(Encoders.STRING)
.write.mode(SaveMode.Overwrite).option("compression", "gzip").text(s"$workingPath/resolvedGraph/$e")
}
}
}

@ -96,6 +96,21 @@ object SparkResolveRelation {
.text(s"$graphBasePath/relation")
}
def extractInstanceCF(input: String): List[(String, String)] = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
val result: List[(String, String)] = for {
JObject(iObj) <- json \ "instance"
JField("collectedfrom", JObject(cf)) <- iObj
JField("instancetype", JObject(instancetype)) <- iObj
JField("value", JString(collectedFrom)) <- cf
JField("classname", JString(classname)) <- instancetype
} yield (classname, collectedFrom)
result
}
def extractPidsFromRecord(input: String): (String, List[(String, String)]) = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
@ -108,14 +123,7 @@ object SparkResolveRelation {
JField("classid", JString(pidType)) <- qualifier
} yield (pidValue, pidType)
val alternateIds: List[(String, String)] = for {
JObject(pids) <- json \\ "alternateIdentifier"
JField("value", JString(pidValue)) <- pids
JField("qualifier", JObject(qualifier)) <- pids
JField("classid", JString(pidType)) <- qualifier
} yield (pidValue, pidType)
(id, result ::: alternateIds)
(id, result)
}
@ -128,7 +136,7 @@ object SparkResolveRelation {
source != null
}
private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, graphPath: String, workingPath: String) = {
def extractPidResolvedTableFromJsonRDD(spark: SparkSession, graphPath: String, workingPath: String) = {
import spark.implicits._
val d: RDD[(String, String)] = spark.sparkContext.textFile(s"$graphPath/*")

@ -4,6 +4,10 @@
<name>graphBasePath</name>
<description>the path of the graph</description>
</property>
<property>
<name>unresolvedPath</name>
<description>the path of the unresolved Entities</description>
</property>
</parameters>
<start to="ResolveRelations"/>
@ -36,5 +40,33 @@
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="ResolveEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Resolve Relations in raw graph</name>
<class>eu.dnetlib.dhp.oa.graph.resolution.SparkResolveEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=10000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--unresolvedPath</arg><arg>${unresolvedPath}</arg>
<arg>--workingPath</arg><arg>${workingDir}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -0,0 +1,6 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"u", "paramLongName":"unresolvedPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true}
]

@ -0,0 +1,190 @@
package eu.dnetlib.dhp.oa.graph.resolution
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.schema.common.EntityType
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
import eu.dnetlib.dhp.schema.oaf.{Result, StructuredProperty}
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.TestInstance.Lifecycle
import org.junit.jupiter.api.{AfterAll, BeforeAll, Test, TestInstance}
import java.nio.file.{Files, Path}
import scala.collection.JavaConverters._
import scala.io.Source
@TestInstance(Lifecycle.PER_CLASS)
class ResolveEntitiesTest extends Serializable {
var workingDir:Path = null
val FAKE_TITLE = "FAKETITLE"
val FAKE_SUBJECT = "FAKESUBJECT"
var sparkSession:Option[SparkSession] = None
@BeforeAll
def setUp() :Unit = {
workingDir = Files.createTempDirectory(getClass.getSimpleName)
val conf = new SparkConf()
sparkSession = Some(SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master("local[*]").getOrCreate())
populateDatasets(sparkSession.get)
generateUpdates(sparkSession.get)
}
@AfterAll
def tearDown():Unit = {
FileUtils.deleteDirectory(workingDir.toFile)
sparkSession.get.stop()
}
def generateUpdates(spark:SparkSession):Unit = {
val template = Source.fromInputStream(this.getClass.getResourceAsStream("updates")).mkString
val pids:List[String] = template.lines.map{id =>
val r = new Result
r.setId(id.toLowerCase.trim)
r.setSubject(List(OafMapperUtils.structuredProperty(FAKE_SUBJECT, OafMapperUtils.qualifier("fos","fosCS", "fossSchema", "fossiFIgo"), null)).asJava)
r.setTitle(List(OafMapperUtils.structuredProperty(FAKE_TITLE, OafMapperUtils.qualifier("fos","fosCS", "fossSchema", "fossiFIgo"), null)).asJava)
r
}.map{r =>
val mapper = new ObjectMapper()
mapper.writeValueAsString(r)}.toList
val sc =spark.sparkContext
println(sc.parallelize(pids).count())
spark.createDataset(sc.parallelize(pids))(Encoders.STRING).write.mode(SaveMode.Overwrite).option("compression", "gzip").text(s"$workingDir/updates")
import spark.implicits._
implicit val resEncoder: Encoder[Result] = Encoders.bean(classOf[Result])
val ds = spark.read.text(s"$workingDir/updates").as[String].map{s => val mapper = new ObjectMapper()
mapper.readValue(s, classOf[Result])}.collect()
assertEquals(4, ds.length)
ds.foreach{r => assertNotNull(r.getSubject)}
ds.foreach{r => assertEquals(1,r.getSubject.size())}
ds.foreach{r => assertNotNull(r.getTitle)}
ds.foreach{r => assertEquals(1,r.getTitle.size())}
ds.flatMap(r => r.getTitle.asScala.map(t => t.getValue)).foreach(t => assertEquals(FAKE_TITLE,t))
ds.flatMap(r => r.getSubject.asScala.map(t => t.getValue)).foreach(t => assertEquals(FAKE_SUBJECT,t))
println("generated Updates")
}
def populateDatasets(spark:SparkSession):Unit = {
import spark.implicits._
val entities =SparkResolveEntities.entities
entities.foreach{
e =>
val template = Source.fromInputStream(this.getClass.getResourceAsStream(s"$e")).mkString
spark.createDataset(spark.sparkContext.parallelize(template.lines.toList)).as[String].write.option("compression", "gzip").text(s"$workingDir/graph/$e")
println(s"Created Dataset $e")
}
SparkResolveRelation.extractPidResolvedTableFromJsonRDD(spark, s"$workingDir/graph", s"$workingDir/work")
}
@Test
def testResolution():Unit = {
val spark:SparkSession = sparkSession.get
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
SparkResolveEntities.resolveEntities(spark,s"$workingDir/work", s"$workingDir/updates" )
val ds = spark.read.load(s"$workingDir/work/resolvedEntities").as[Result]
assertEquals(3, ds.count())
ds.collect().foreach{
r =>
assertTrue(r.getId.startsWith("50"))
}
}
private def structuredPContainsValue(l:java.util.List[StructuredProperty], exptectedValue:String):Boolean = {
l.asScala.exists(p =>p.getValue!= null && p.getValue.equalsIgnoreCase(exptectedValue))
}
@Test
def testUpdate():Unit = {
val spark:SparkSession = sparkSession.get
import spark.implicits._
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
val m = new ObjectMapper()
SparkResolveEntities.resolveEntities(spark,s"$workingDir/work", s"$workingDir/updates" )
SparkResolveEntities.generateResolvedEntities(spark,s"$workingDir/work",s"$workingDir/graph" )
val pubDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/publication").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.publication))
val t = pubDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
val datDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/dataset").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.dataset))
val td = datDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
val softDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/software").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.software))
val ts = softDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
val orpDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/otherresearchproduct").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.otherresearchproduct))
val to = orpDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
assertEquals(0, t)
assertEquals(2, td)
assertEquals(1, ts)
assertEquals(0, to)
}
}

@ -0,0 +1,4 @@
unresolved::10.17026/dans-x3z-fsq5::doi
unresolved::10.17026/dans-xsw-qtnx::doi
unresolved::10.5281/zenodo.1473694::doi
unresolved::10.17632/fake::doi

@ -753,7 +753,7 @@
<mockito-core.version>3.3.3</mockito-core.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version>
<vtd.version>[2.12,3.0)</vtd.version>
<dhp-schemas.version>[2.8.22-SNAPSHOT]</dhp-schemas.version>
<dhp-schemas.version>[2.8.22]</dhp-schemas.version>
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>

Loading…
Cancel
Save