diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index e5c8a4606..47aab1d20 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -25,6 +25,7 @@ public class DedupRecordFactory { public static Dataset createDedupRecord( final SparkSession spark, + final DataInfo dataInfo, final String mergeRelsInputPath, final String entitiesInputPath, final Class clazz) { @@ -67,41 +68,39 @@ public class DedupRecordFactory { Encoders.STRING()) .mapGroups( (MapGroupsFunction, T>) - (key, values) -> entityMerger(key, values, ts, clazz), + (key, values) -> entityMerger(key, values, ts, dataInfo), Encoders.bean(clazz)); } private static T entityMerger( - String id, Iterator> entities, long ts, Class clazz) { - try { - T entity = clazz.newInstance(); - entity.setId(id); - entity.setDataInfo(new DataInfo()); - entity.getDataInfo().setTrust("0.9"); - entity.setLastupdatetimestamp(ts); + String id, Iterator> entities, long ts, DataInfo dataInfo) { - final Collection dates = Lists.newArrayList(); - entities.forEachRemaining( - t -> { - T duplicate = t._2(); - entity.mergeFrom(duplicate); - if (ModelSupport.isSubClass(duplicate, Result.class)) { - Result r1 = (Result) duplicate; - Result er = (Result) entity; - er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor())); + T entity = entities.next()._2(); - if (er.getDateofacceptance() != null) { - dates.add(r1.getDateofacceptance().getValue()); - } + final Collection dates = Lists.newArrayList(); + entities.forEachRemaining( + t -> { + T duplicate = t._2(); + entity.mergeFrom(duplicate); + if (ModelSupport.isSubClass(duplicate, Result.class)) { + Result r1 = (Result) duplicate; + Result er = (Result) entity; + er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor())); + + if (r1.getDateofacceptance() != null) { + dates.add(r1.getDateofacceptance().getValue()); } - }); + } + }); - if (ModelSupport.isSubClass(entity, Result.class)) { - ((Result) entity).setDateofacceptance(DatePicker.pick(dates)); - } - return entity; - } catch (IllegalAccessException | InstantiationException e) { - throw new RuntimeException(e); + if (ModelSupport.isSubClass(entity, Result.class)) { + ((Result) entity).setDateofacceptance(DatePicker.pick(dates)); } + + entity.setId(id); + entity.setLastupdatetimestamp(ts); + entity.setDataInfo(dataInfo); + + return entity; } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java index c46464ffd..42a0cff8a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java @@ -3,7 +3,9 @@ package eu.dnetlib.dhp.oa.dedup; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -21,6 +23,10 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class); + public static final String ROOT_TRUST = "0.8"; + public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup"; + public static final String PROVENANCE_ACTIONS = "dnet:provenanceActions"; + public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); } @@ -67,13 +73,30 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); - Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); - - DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz) + final Class clazz = + ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); + final DataInfo dataInfo = getDataInfo(dedupConf); + DedupRecordFactory.createDedupRecord(spark, dataInfo, mergeRelPath, entityPath, clazz) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); } } + + private static DataInfo getDataInfo(DedupConfig dedupConf) { + DataInfo info = new DataInfo(); + info.setDeletedbyinference(false); + info.setInferred(true); + info.setInvisible(false); + info.setTrust(ROOT_TRUST); + info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); + Qualifier provenance = new Qualifier(); + provenance.setClassid(PROVENANCE_ACTION_CLASS); + provenance.setClassname(PROVENANCE_ACTION_CLASS); + provenance.setSchemeid(PROVENANCE_ACTIONS); + provenance.setSchemename(PROVENANCE_ACTIONS); + info.setProvenanceaction(provenance); + return info; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java index 4baac0229..b89a0e7e2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java @@ -25,7 +25,7 @@ public class ConnectedComponent implements Serializable { if (docIds.size() > 1) { final String s = getMin(); String prefix = s.split("\\|")[0]; - ccId = prefix + "|dedup_______::" + DedupUtility.md5(s); + ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s); return ccId; } else { return docIds.iterator().next(); diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java index 41d53944f..a5aa94e09 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java @@ -25,7 +25,7 @@ public class ConnectedComponent implements Serializable { if (docIds.size() > 1) { final String s = getMin(); String prefix = s.split("\\|")[0]; - ccId = prefix + "|dedup_______::" + DedupUtility.md5(s); + ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s); return ccId; } else { return docIds.iterator().next(); diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index eee0a4a72..157373db6 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -10,10 +10,11 @@ import org.json4s.jackson.JsonMethods._ import org.slf4j.Logger import scala.collection.JavaConverters._ -case class mappingAffiliation(name:String) -case class mappingAuthor(given: Option[String], family: String, ORCID: Option[String], affiliation:Option[mappingAffiliation]) {} +case class mappingAffiliation(name: String) +case class mappingAuthor(given: Option[String], family: String, ORCID: Option[String], affiliation: Option[mappingAffiliation]) {} -class Crossref2Oaf { + +case object Crossref2Oaf { //STATIC STRING val MAG = "MAG" @@ -28,7 +29,6 @@ class Crossref2Oaf { val DNET_LANGUAGES = "dnet:languages" val PID_TYPES = "dnet:pid_types" - val mappingCrossrefType = Map( "book-section" -> "publication", "book" -> "publication", @@ -84,7 +84,7 @@ class Crossref2Oaf { ) - def mappingResult(result: Result, json: JValue, cobjCategory:String): Result = { + def mappingResult(result: Result, json: JValue, cobjCategory: String): Result = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats //MAPPING Crossref DOI into PID @@ -111,7 +111,7 @@ class Crossref2Oaf { result.setCollectedfrom(List(createCollectedFrom()).asJava) // Publisher ( Name of work's publisher mapped into Result/Publisher) - val publisher = (json \ "publisher").extract[String] + val publisher = (json \ "publisher").extractOrElse[String](null) result.setPublisher(asField(publisher)) // TITLE @@ -144,7 +144,7 @@ class Crossref2Oaf { //Mapping AUthor - val authorList:List[mappingAuthor] = (json \ "author").extract[List[mappingAuthor]] + val authorList: List[mappingAuthor] = (json \ "author").extractOrElse[List[mappingAuthor]](List()) result.setAuthor(authorList.map(a => generateAuhtor(a.given.orNull, a.family, a.ORCID.orNull)).asJava) @@ -152,8 +152,8 @@ class Crossref2Oaf { val instance = new Instance() val license = for { - JString(lic) <- json \ "license" \ "URL" - } yield asField(lic) + JString(lic) <- json \ "license" \ "URL" + } yield asField(lic) val l = license.filter(d => StringUtils.isNotBlank(d.getValue)) if (l.nonEmpty) instance.setLicense(l.head) @@ -161,24 +161,22 @@ class Crossref2Oaf { instance.setAccessright(createQualifier("Restricted", "dnet:access_modes")) result.setInstance(List(instance).asJava) - instance.setInstancetype(createQualifier(cobjCategory.substring(0,4), cobjCategory.substring(5), "dnet:publication_resource", "dnet:publication_resource")) + instance.setInstancetype(createQualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), "dnet:publication_resource", "dnet:publication_resource")) instance.setCollectedfrom(createCollectedFrom()) if (StringUtils.isNotBlank(issuedDate)) { instance.setDateofacceptance(asField(issuedDate)) } - val s: String =(json \ "URL").extract[String] - val links:List[String] = ((for {JString(url) <-json \ "link" \ "URL"} yield url) ::: List(s)).filter(p =>p != null).distinct + val s: String = (json \ "URL").extract[String] + val links: List[String] = ((for {JString(url) <- json \ "link" \ "URL"} yield url) ::: List(s)).filter(p => p != null).distinct if (links.nonEmpty) instance.setUrl(links.asJava) result } - - - def generateAuhtor(given:String, family:String, orcid:String):Author = { - val a =new Author + def generateAuhtor(given: String, family: String, orcid: String): Author = { + val a = new Author a.setName(given) a.setSurname(family) a.setFullname(s"${given} ${family}") @@ -202,30 +200,28 @@ class Crossref2Oaf { if (result == null) return result val cOBJCategory = mappingCrossrefSubType.getOrElse(objectType, mappingCrossrefSubType.getOrElse(objectSubType, "0038 Other literature type")); - logger.debug(mappingCrossrefType(objectType)) - logger.debug(cOBJCategory) +// logger.debug(mappingCrossrefType(objectType)) +// logger.debug(cOBJCategory) mappingResult(result, json, cOBJCategory) result match { - case publication: Publication => convertPublication(publication) + case publication: Publication => convertPublication(publication, json, cOBJCategory) case dataset: Dataset => convertDataset(dataset) } - - - result } def convertDataset(dataset: Dataset): Unit = { - + //TODO probably we need to add relation and other stuff here } - def convertPublication(publication: Publication, json: JValue, cobjCategory:String): Unit = { + def convertPublication(publication: Publication, json: JValue, cobjCategory: String): Unit = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats val containerTitles = for {JString(ct) <- json \ "container-title"} yield ct @@ -243,12 +239,44 @@ class Crossref2Oaf { publication.setSource(List(asField(source)).asJava) } } else { - val issn = + // Mapping Journal + + val issnInfos = for {JArray(issn_types) <- json \ "issn-type" + JObject(issn_type) <- issn_types + JField("type", JString(tp)) <- issn_type + JField("value", JString(vl)) <- issn_type + } yield Tuple2(tp, vl) + + val volume = (json \ "volume").extractOrElse[String] (null) + if (containerTitles.nonEmpty) { + val journal = new Journal + journal.setName(containerTitles.head) + if (issnInfos.nonEmpty) { + + issnInfos.foreach(tp => { + tp._1 match { + case "electronic" => journal.setIssnOnline(tp._2) + case "print" => journal.setIssnPrinted(tp._2) + } + }) + + } + journal.setVol(volume) + + val page = (json \ "page").extractOrElse[String] (null) + if(page!= null ) { + val pp = page.split("-") + journal.setSp(pp.head) + if (pp.size > 1) + journal.setEp(pp(1)) + } + + + publication.setJournal(journal) + } + } - // Mapping other types of publications - - } @@ -322,7 +350,7 @@ class Crossref2Oaf { } - def createQualifier(clsName: String,clsValue: String, schName: String, schValue: String): Qualifier = { + def createQualifier(clsName: String, clsValue: String, schName: String, schValue: String): Qualifier = { val q = new Qualifier q.setClassid(clsName) q.setClassname(clsValue) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala index 284106f81..3374f2969 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala @@ -1,22 +1,22 @@ package eu.dnetlib.doiboost.crossref import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.Publication import org.apache.commons.io.IOUtils import org.apache.hadoop.io.{IntWritable, Text} import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Dataset, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} - -case class Reference(author:String, firstPage:String) {} +case class Reference(author: String, firstPage: String) {} object SparkMapDumpIntoOAF { def main(args: Array[String]): Unit = { - val logger:Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) + val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) val conf: SparkConf = new SparkConf() val parser = new ArgumentApplicationParser(IOUtils.toString(SparkMapDumpIntoOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json"))) parser.parseArgument(args) @@ -26,33 +26,42 @@ object SparkMapDumpIntoOAF { .config(conf) .appName(SparkMapDumpIntoOAF.getClass.getSimpleName) .master(parser.get("master")).getOrCreate() + import spark.implicits._ + implicit val mapEncoder = Encoders.bean(classOf[Publication]) val sc = spark.sparkContext - val x: String = sc.sequenceFile(parser.get("sourcePath"), classOf[IntWritable], classOf[Text]) - .map(k => k._2.toString).first() - val item =CrossrefImporter.decompressBlob(x) + val total = sc.sequenceFile(parser.get("sourcePath"), classOf[IntWritable], classOf[Text]) + .map(k => k._2.toString).map(CrossrefImporter.decompressBlob) + .map(k => Crossref2Oaf.convert(k, logger)) + .filter(k => k != null && k.isInstanceOf[Publication]) + .map(k => k.asInstanceOf[Publication]) - logger.info(item) + val ds: Dataset[Publication] = spark.createDataset(total) + val targetPath = parser.get("targetPath") + ds.write.mode(SaveMode.Overwrite).save(s"${targetPath}/publication") -// lazy val json: json4s.JValue = parse(item) -// -// -// val references = for { -// JArray(references) <- json \\ "reference" -// JObject(reference) <- references -// JField("first-page", JString(firstPage)) <- reference -// JField("author", JString(author)) <- reference -// } yield Reference(author, firstPage) -// -// -// -// -// logger.info((json \ "created" \ "timestamp").extractOrElse("missing")) -// logger.info(references.toString()) -// -// logger.info((json \ "type").extractOrElse("missing")) + + logger.info(s"total Item :${total}") + + // lazy val json: json4s.JValue = parse(item) + // + // + // val references = for { + // JArray(references) <- json \\ "reference" + // JObject(reference) <- references + // JField("first-page", JString(firstPage)) <- reference + // JField("author", JString(author)) <- reference + // } yield Reference(author, firstPage) + // + // + // + // + // logger.info((json \ "created" \ "timestamp").extractOrElse("missing")) + // logger.info(references.toString()) + // + // logger.info((json \ "type").extractOrElse("missing")) } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json index 8bac47123..312bd0751 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json @@ -1,5 +1,6 @@ [ {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true}, {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/DoiBoostTest.java b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/DoiBoostTest.java index 9658c8858..49f9ef912 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/DoiBoostTest.java +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/DoiBoostTest.java @@ -26,18 +26,98 @@ public class DoiBoostTest { // CrossrefImporter.main("-n file:///tmp -t file:///tmp/p.seq -ts 1586110000749".split(" // ")); SparkMapDumpIntoOAF.main( - "-m local[*] -s file:///data/doiboost/crossref_dump.seq".split(" ")); + "-m local[*] -s file:///data/doiboost/crossref_dump.seq -t /data/doiboost" + .split(" ")); + } + + @Test + public void testConvertDatasetCrossRef2Oaf() throws IOException { + final String json = IOUtils.toString(getClass().getResourceAsStream("dataset.json")); + ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT); + assertNotNull(json); + assertFalse(StringUtils.isBlank(json)); + final Result result = Crossref2Oaf.convert(json, logger); + + logger.info(mapper.writeValueAsString(result)); } @Test public void testConvertPreprintCrossRef2Oaf() throws IOException { - final String json = IOUtils.toString(getClass().getResourceAsStream("article.json")); + final String json = IOUtils.toString(getClass().getResourceAsStream("preprint.json")); ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT); assertNotNull(json); assertFalse(StringUtils.isBlank(json)); - Crossref2Oaf cf = new Crossref2Oaf(); - final Result result = cf.convert(json, logger); + + final Result result = Crossref2Oaf.convert(json, logger); + assertNotNull(result); + + assertNotNull(result.getDataInfo(), "Datainfo test not null Failed"); + assertNotNull( + result.getDataInfo().getProvenanceaction(), + "DataInfo/Provenance test not null Failed"); + assertFalse( + StringUtils.isBlank(result.getDataInfo().getProvenanceaction().getClassid()), + "DataInfo/Provenance/classId test not null Failed"); + assertFalse( + StringUtils.isBlank(result.getDataInfo().getProvenanceaction().getClassname()), + "DataInfo/Provenance/className test not null Failed"); + assertFalse( + StringUtils.isBlank(result.getDataInfo().getProvenanceaction().getSchemeid()), + "DataInfo/Provenance/SchemeId test not null Failed"); + assertFalse( + StringUtils.isBlank(result.getDataInfo().getProvenanceaction().getSchemename()), + "DataInfo/Provenance/SchemeName test not null Failed"); + + assertNotNull(result.getCollectedfrom(), "CollectedFrom test not null Failed"); + assertTrue(result.getCollectedfrom().size() > 0); + assertTrue( + result.getCollectedfrom().stream() + .anyMatch( + c -> + c.getKey() + .equalsIgnoreCase( + "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2"))); + assertTrue( + result.getCollectedfrom().stream() + .anyMatch(c -> c.getValue().equalsIgnoreCase("crossref"))); + + assertTrue( + result.getRelevantdate().stream() + .anyMatch(d -> d.getQualifier().getClassid().equalsIgnoreCase("created"))); + assertTrue( + result.getRelevantdate().stream() + .anyMatch( + d -> d.getQualifier().getClassid().equalsIgnoreCase("available"))); + assertTrue( + result.getRelevantdate().stream() + .anyMatch(d -> d.getQualifier().getClassid().equalsIgnoreCase("accepted"))); + assertTrue( + result.getRelevantdate().stream() + .anyMatch( + d -> + d.getQualifier() + .getClassid() + .equalsIgnoreCase("published-online"))); + assertTrue( + result.getRelevantdate().stream() + .anyMatch( + d -> + d.getQualifier() + .getClassid() + .equalsIgnoreCase("published-print"))); + + logger.info(mapper.writeValueAsString(result)); + } + + @Test + public void testConvertArticleCrossRef2Oaf() throws IOException { + + final String json = IOUtils.toString(getClass().getResourceAsStream("article.json")); + ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT); + assertNotNull(json); + assertFalse(StringUtils.isBlank(json)); + final Result result = Crossref2Oaf.convert(json, logger); assertNotNull(result); assertNotNull(result.getDataInfo(), "Datainfo test not null Failed"); @@ -73,15 +153,6 @@ public class DoiBoostTest { assertTrue( result.getRelevantdate().stream() .anyMatch(d -> d.getQualifier().getClassid().equalsIgnoreCase("created"))); - // assertTrue( - // result.getRelevantdate().stream() - // .anyMatch( - // d -> - // d.getQualifier().getClassid().equalsIgnoreCase("available"))); - // assertTrue( - // result.getRelevantdate().stream() - // .anyMatch(d -> - // d.getQualifier().getClassid().equalsIgnoreCase("accepted"))); assertTrue( result.getRelevantdate().stream() .anyMatch( @@ -107,8 +178,7 @@ public class DoiBoostTest { ObjectMapper mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT); assertNotNull(json); assertFalse(StringUtils.isBlank(json)); - Crossref2Oaf cf = new Crossref2Oaf(); - final Result result = cf.convert(json, logger); + final Result result = Crossref2Oaf.convert(json, logger); assertNotNull(result); logger.info(mapper.writeValueAsString(result)); diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/article.json b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/article.json index e89d41ecd..afef13b69 100644 --- a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/article.json +++ b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/article.json @@ -170,5 +170,5 @@ "container-title": [ "Ecl\u00e9tica Qu\u00edmica Journal" ], - "page": "41" + "page": "41-50" } \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/dataset.json b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/dataset.json new file mode 100644 index 000000000..5c4b8c5a2 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/dataset.json @@ -0,0 +1,105 @@ +{ + "DOI": "10.1037/e522512014-096", + "subtitle": [ + "(522512014-096)" + ], + "issued": { + "date-parts": [ + [ + 2012 + ] + ] + }, + "prefix": "10.1037", + "author": [ + { + "affiliation": [], + "given": "Jessica", + "family": "Trudeau", + "sequence": "first" + }, + { + "affiliation": [], + "given": "Amy", + "family": "McShane", + "sequence": "additional" + }, + { + "affiliation": [], + "given": "Renee", + "family": "McDonald", + "sequence": "additional" + } + ], + "reference-count": 0, + "member": "15", + "source": "Crossref", + "score": 1.0, + "deposited": { + "timestamp": 1413827035000, + "date-parts": [ + [ + 2014, + 10, + 20 + ] + ], + "date-time": "2014-10-20T17:43:55Z" + }, + "indexed": { + "timestamp": 1550142454710, + "date-parts": [ + [ + 2019, + 2, + 14 + ] + ], + "date-time": "2019-02-14T11:07:34Z" + }, + "type": "dataset", + "URL": "http://dx.doi.org/10.1037/e522512014-096", + "is-referenced-by-count": 0, + "published-print": { + "date-parts": [ + [ + 2012 + ] + ] + }, + "references-count": 0, + "institution": { + "acronym": [ + "APA" + ], + "place": [ + "-" + ], + "name": "American Psychological Association" + }, + "publisher": "American Psychological Association (APA)", + "content-domain": { + "domain": [], + "crossmark-restriction": false + }, + "created": { + "timestamp": 1413826121000, + "date-parts": [ + [ + 2014, + 10, + 20 + ] + ], + "date-time": "2014-10-20T17:28:41Z" + }, + "title": [ + "Project Support: A Randomized Control Study to Evaluate the Translation of an Evidence- Based Program" + ], + "alternative-id": [ + "522512014-096" + ], + "container-title": [ + "PsycEXTRA Dataset" + ] +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java index d0fe95289..bec3810f9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJob.java @@ -19,6 +19,8 @@ public class GraphHiveImporterJob { private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = @@ -37,12 +39,12 @@ public class GraphHiveImporterJob { String inputPath = parser.get("inputPath"); log.info("inputPath: {}", inputPath); - String hiveMetastoreUris = parser.get("hiveMetastoreUris"); - log.info("hiveMetastoreUris: {}", hiveMetastoreUris); - String hiveDbName = parser.get("hiveDbName"); log.info("hiveDbName: {}", hiveDbName); + String hiveMetastoreUris = parser.get("hiveMetastoreUris"); + log.info("hiveMetastoreUris: {}", hiveMetastoreUris); + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", hiveMetastoreUris); @@ -58,13 +60,13 @@ public class GraphHiveImporterJob { spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName)); spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName)); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); // Read the input file and convert it into RDD of serializable object ModelSupport.oafTypes.forEach( (name, clazz) -> spark.createDataset( sc.textFile(inputPath + "/" + name) - .map(s -> new ObjectMapper().readValue(s, clazz)) + .map(s -> OBJECT_MAPPER.readValue(s, clazz)) .rdd(), Encoders.bean(clazz)) .write() diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml index 8d8766283..9608732ed 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/config-default.xml @@ -12,19 +12,15 @@ true - oozie.action.sharelib.for.spark - spark2 - - - hive_metastore_uris + hiveMetastoreUris thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 - hive_jdbc_url + hiveJdbcUrl jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 - hive_db_name + hiveDbName openaire \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql index c92f8d1af..6c49679cd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/lib/scripts/postprocessing.sql @@ -1,10 +1,10 @@ -DROP VIEW IF EXISTS ${hive_db_name}.result; +DROP VIEW IF EXISTS ${hiveDbName}.result; CREATE VIEW IF NOT EXISTS result as - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.publication p + select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.publication p union all - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.dataset d + select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.dataset d union all - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.software s + select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.software s union all - select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hive_db_name}.otherresearchproduct o; + select id, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, embargoenddate, resourcetype, context, instance from ${hiveDbName}.otherresearchproduct o; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml index 67ca6a64a..dc1fa092d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml @@ -2,13 +2,21 @@ - sourcePath + inputPath the source path - hive_db_name + hiveDbName the target hive database name + + hiveJdbcUrl + hive server jdbc url + + + hiveMetastoreUris + hive server metastore URIs + sparkDriverMemory memory for driver process @@ -87,9 +95,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --sourcePath${sourcePath} - --hive_db_name${hive_db_name} - --hive_metastore_uris${hive_metastore_uris} + --inputPath${inputPath} + --hiveDbName${hiveDbName} + --hiveMetastoreUris${hiveMetastoreUris} @@ -102,12 +110,12 @@ hive.metastore.uris - ${hive_metastore_uris} + ${hiveMetastoreUris} - ${hive_jdbc_url}/${hive_db_name} + ${hiveJdbcUrl}/${hiveDbName} - hive_db_name=${hive_db_name} + hiveDbName=${hiveDbName} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index 20786582f..eaa18ad0b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -216,6 +216,7 @@ public class CreateRelatedEntitiesJob_phase2 { (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)) + .filter("dataInfo.invisible == false") .map( (MapFunction) value -> diff --git a/pom.xml b/pom.xml index f4dfc2c0d..25e84a424 100644 --- a/pom.xml +++ b/pom.xml @@ -292,6 +292,12 @@ eu.dnetlib dnet-actionmanager-common 6.0.5 + + + org.apache.hadoop + hadoop-common + + eu.dnetlib