[DOIBoost Refactor]

- Testing :
    Added common method to retrieve mock vocabulary in test class
    fixed test
- Mapping Crossref:
    Using vocabulary IS to mapping the crossref type instead of a map into the code
This commit is contained in:
Sandro La Bruzzo 2022-06-27 16:23:28 +02:00
parent eaf9385ae5
commit 8b9f70d977
14 changed files with 4334 additions and 1689 deletions

View File

@ -0,0 +1,27 @@
package eu.dnetlib.dhp.common.test;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
public class TestUtils {
public static List<String> getVocabulariesMock() throws IOException {
return IOUtils
.readLines(
Objects
.requireNonNull(
TestUtils.class.getResourceAsStream("/eu/dnetlib/dhp/vocabulary/terms.txt")));
}
public static List<String> getSynonymsMock() throws IOException {
return IOUtils
.readLines(
Objects
.requireNonNull(
TestUtils.class.getResourceAsStream("/eu/dnetlib/dhp/vocabulary/synonyms.txt")));
}
}

View File

@ -7,9 +7,14 @@ import java.security.MessageDigest;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import org.apache.commons.codec.binary.Hex; import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -32,6 +37,9 @@ import net.minidev.json.JSONArray;
import scala.collection.JavaConverters; import scala.collection.JavaConverters;
import scala.collection.Seq; import scala.collection.Seq;
/**
* The type Dhp utils.
*/
public class DHPUtils { public class DHPUtils {
private static final Logger log = LoggerFactory.getLogger(DHPUtils.class); private static final Logger log = LoggerFactory.getLogger(DHPUtils.class);
@ -39,10 +47,22 @@ public class DHPUtils {
private DHPUtils() { private DHPUtils() {
} }
/**
* To seq seq.
*
* @param list the list
* @return the seq
*/
public static Seq<String> toSeq(List<String> list) { public static Seq<String> toSeq(List<String> list) {
return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
} }
/**
* Md 5 string.
*
* @param s the s
* @return the string
*/
public static String md5(final String s) { public static String md5(final String s) {
try { try {
final MessageDigest md = MessageDigest.getInstance("MD5"); final MessageDigest md = MessageDigest.getInstance("MD5");
@ -54,9 +74,66 @@ public class DHPUtils {
} }
} }
private static Pair<Qualifier, Qualifier> searchTypeInVocabularies(final String aType, final VocabularyGroup vocabularies) {
if (StringUtils.isNotBlank(aType)) {
final Qualifier typeQualifier = vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, aType);
if (typeQualifier != null)
return new ImmutablePair<>(typeQualifier, vocabularies.getSynonymAsQualifier(
ModelConstants.DNET_RESULT_TYPOLOGIES,
typeQualifier.getClassid()
));
}
return null;
}
/**
* Retrieve oaf type from vocabulary pair.
*
* This method tries to find the correct oaf type general and instance type from
* vocabularies giving this order:
* 1 - search a vocabulary synonym from subResourceType
* 2 - search a vocabulary synonym from otherResourceType
*
*
*
* @param resourceTypeGeneral the resource type general
* @param subResourceType the sub resource type
* @param otherResourceType the other resource type
* @param vocabularies the vocabularies
* @return the pair
*/
public static Pair<Qualifier, Qualifier> retrieveOAFTypeFromVocabulary(final String resourceTypeGeneral , final String subResourceType, final String otherResourceType, final VocabularyGroup vocabularies ) {
if (StringUtils.isNotBlank(subResourceType)) {
Pair<Qualifier, Qualifier> result = searchTypeInVocabularies(subResourceType, vocabularies);
if (result!= null)
return result;
}
if (StringUtils.isNotBlank(otherResourceType)) {
Pair<Qualifier, Qualifier> result = searchTypeInVocabularies(otherResourceType, vocabularies);
if (result!= null)
return result;
}
if (StringUtils.isNotBlank(resourceTypeGeneral)) {
Pair<Qualifier, Qualifier> result = searchTypeInVocabularies(resourceTypeGeneral, vocabularies);
return result;
}
return null;
}
/** /**
* Retrieves from the metadata store manager application the list of paths associated with mdstores characterized * Retrieves from the metadata store manager application the list of paths associated with mdstores characterized
* by he given format, layout, interpretation * by he given format, layout, interpretation
*
* @param mdstoreManagerUrl the URL of the mdstore manager service * @param mdstoreManagerUrl the URL of the mdstore manager service
* @param format the mdstore format * @param format the mdstore format
* @param layout the mdstore layout * @param layout the mdstore layout
@ -93,10 +170,24 @@ public class DHPUtils {
} }
} }
/**
* Generate identifier string.
*
* @param originalId the original id
* @param nsPrefix the ns prefix
* @return the string
*/
public static String generateIdentifier(final String originalId, final String nsPrefix) { public static String generateIdentifier(final String originalId, final String nsPrefix) {
return String.format("%s::%s", nsPrefix, DHPUtils.md5(originalId)); return String.format("%s::%s", nsPrefix, DHPUtils.md5(originalId));
} }
/**
* Generate unresolved identifier string.
*
* @param pid the pid
* @param pidType the pid type
* @return the string
*/
public static String generateUnresolvedIdentifier(final String pid, final String pidType) { public static String generateUnresolvedIdentifier(final String pid, final String pidType) {
final String cleanedPid = CleaningFunctions.normalizePidValue(pidType, pid); final String cleanedPid = CleaningFunctions.normalizePidValue(pidType, pid);
@ -104,6 +195,13 @@ public class DHPUtils {
return String.format("unresolved::%s::%s", cleanedPid, pidType.toLowerCase().trim()); return String.format("unresolved::%s::%s", cleanedPid, pidType.toLowerCase().trim());
} }
/**
* Gets j path string.
*
* @param jsonPath the json path
* @param json the json
* @return the j path string
*/
public static String getJPathString(final String jsonPath, final String json) { public static String getJPathString(final String jsonPath, final String json) {
try { try {
Object o = JsonPath.read(json, jsonPath); Object o = JsonPath.read(json, jsonPath);
@ -117,8 +215,19 @@ public class DHPUtils {
} }
} }
/**
* The constant MAPPER.
*/
public static final ObjectMapper MAPPER = new ObjectMapper(); public static final ObjectMapper MAPPER = new ObjectMapper();
/**
* Write hdfs file.
*
* @param conf the conf
* @param content the content
* @param path the path
* @throws IOException the io exception
*/
public static void writeHdfsFile(final Configuration conf, final String content, final String path) public static void writeHdfsFile(final Configuration conf, final String content, final String path)
throws IOException { throws IOException {
@ -130,6 +239,14 @@ public class DHPUtils {
} }
} }
/**
* Read hdfs file string.
*
* @param conf the conf
* @param path the path
* @return the string
* @throws IOException the io exception
*/
public static String readHdfsFile(Configuration conf, String path) throws IOException { public static String readHdfsFile(Configuration conf, String path) throws IOException {
log.info("reading file {}", path); log.info("reading file {}", path);
@ -142,10 +259,27 @@ public class DHPUtils {
} }
} }
/**
* Read hdfs file as t.
*
* @param <T> the type parameter
* @param conf the conf
* @param path the path
* @param clazz the clazz
* @return the t
* @throws IOException the io exception
*/
public static <T> T readHdfsFileAs(Configuration conf, String path, Class<T> clazz) throws IOException { public static <T> T readHdfsFileAs(Configuration conf, String path, Class<T> clazz) throws IOException {
return MAPPER.readValue(readHdfsFile(conf, path), clazz); return MAPPER.readValue(readHdfsFile(conf, path), clazz);
} }
/**
* Save dataset.
*
* @param <T> the type parameter
* @param mdstore the mdstore
* @param targetPath the target path
*/
public static <T> void saveDataset(final Dataset<T> mdstore, final String targetPath) { public static <T> void saveDataset(final Dataset<T> mdstore, final String targetPath) {
log.info("saving dataset in: {}", targetPath); log.info("saving dataset in: {}", targetPath);
mdstore mdstore
@ -155,6 +289,12 @@ public class DHPUtils {
.save(targetPath); .save(targetPath);
} }
/**
* Gets hadoop configuration.
*
* @param nameNode the name node
* @return the hadoop configuration
*/
public static Configuration getHadoopConfiguration(String nameNode) { public static Configuration getHadoopConfiguration(String nameNode) {
// ====== Init HDFS File System Object // ====== Init HDFS File System Object
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -168,6 +308,12 @@ public class DHPUtils {
return conf; return conf;
} }
/**
* Populate oozie env.
*
* @param report the report
* @throws IOException the io exception
*/
public static void populateOOZIEEnv(final Map<String, String> report) throws IOException { public static void populateOOZIEEnv(final Map<String, String> report) throws IOException {
File file = new File(System.getProperty("oozie.action.output.properties")); File file = new File(System.getProperty("oozie.action.output.properties"));
Properties props = new Properties(); Properties props = new Properties();
@ -178,6 +324,13 @@ public class DHPUtils {
} }
} }
/**
* Populate oozie env.
*
* @param paramName the param name
* @param value the value
* @throws IOException the io exception
*/
public static void populateOOZIEEnv(final String paramName, String value) throws IOException { public static void populateOOZIEEnv(final String paramName, String value) throws IOException {
Map<String, String> report = Maps.newHashMap(); Map<String, String> report = Maps.newHashMap();
report.put(paramName, value); report.put(paramName, value);

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.datacite package eu.dnetlib.dhp.client
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.http.client.config.RequestConfig import org.apache.http.client.config.RequestConfig

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.datacite package eu.dnetlib.dhp.datacite
import eu.dnetlib.dhp.client.AbstractRestClient
import org.json4s.jackson.JsonMethods.{compact, parse, render} import org.json4s.jackson.JsonMethods.{compact, parse, render}
import org.json4s.{DefaultFormats, JValue} import org.json4s.{DefaultFormats, JValue}

View File

@ -1,5 +1,6 @@
package eu.dnetlib.doiboost.crossref package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.schema.oaf._
import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, IdentifierFactory, OafMapperUtils} import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, IdentifierFactory, OafMapperUtils}
@ -7,6 +8,7 @@ import eu.dnetlib.dhp.utils.DHPUtils
import eu.dnetlib.doiboost.DoiBoostMappingUtil import eu.dnetlib.doiboost.DoiBoostMappingUtil
import eu.dnetlib.doiboost.DoiBoostMappingUtil._ import eu.dnetlib.doiboost.DoiBoostMappingUtil._
import org.apache.commons.lang.StringUtils import org.apache.commons.lang.StringUtils
import org.apache.commons.lang3.tuple
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.JsonAST._ import org.json4s.JsonAST._
@ -35,60 +37,60 @@ case class mappingFunder(name: String, DOI: Option[String], award: Option[List[S
case object Crossref2Oaf { case object Crossref2Oaf {
val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass) val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass)
val mappingCrossrefType = Map( // val mappingCrossrefType = Map(
"book-section" -> "publication", // "book-section" -> "publication",
"book" -> "publication", // "book" -> "publication",
"book-chapter" -> "publication", // "book-chapter" -> "publication",
"book-part" -> "publication", // "book-part" -> "publication",
"book-series" -> "publication", // "book-series" -> "publication",
"book-set" -> "publication", // "book-set" -> "publication",
"book-track" -> "publication", // "book-track" -> "publication",
"edited-book" -> "publication", // "edited-book" -> "publication",
"reference-book" -> "publication", // "reference-book" -> "publication",
"monograph" -> "publication", // "monograph" -> "publication",
"journal-article" -> "publication", // "journal-article" -> "publication",
"dissertation" -> "publication", // "dissertation" -> "publication",
"other" -> "publication", // "other" -> "publication",
"peer-review" -> "publication", // "peer-review" -> "publication",
"proceedings" -> "publication", // "proceedings" -> "publication",
"proceedings-article" -> "publication", // "proceedings-article" -> "publication",
"reference-entry" -> "publication", // "reference-entry" -> "publication",
"report" -> "publication", // "report" -> "publication",
"report-series" -> "publication", // "report-series" -> "publication",
"standard" -> "publication", // "standard" -> "publication",
"standard-series" -> "publication", // "standard-series" -> "publication",
"posted-content" -> "publication", // "posted-content" -> "publication",
"dataset" -> "dataset" // "dataset" -> "dataset"
) // )
//
// val mappingCrossrefSubType = Map(
// "book-section" -> "0013 Part of book or chapter of book",
// "book" -> "0002 Book",
// "book-chapter" -> "0013 Part of book or chapter of book",
// "book-part" -> "0013 Part of book or chapter of book",
// "book-series" -> "0002 Book",
// "book-set" -> "0002 Book",
// "book-track" -> "0002 Book",
// "edited-book" -> "0002 Book",
// "reference-book" -> "0002 Book",
// "monograph" -> "0002 Book",
// "journal-article" -> "0001 Article",
// "dissertation" -> "0044 Thesis",
// "other" -> "0038 Other literature type",
// "peer-review" -> "0015 Review",
// "proceedings" -> "0004 Conference object",
// "proceedings-article" -> "0004 Conference object",
// "reference-entry" -> "0013 Part of book or chapter of book",
// "report" -> "0017 Report",
// "report-series" -> "0017 Report",
// "standard" -> "0038 Other literature type",
// "standard-series" -> "0038 Other literature type",
// "dataset" -> "0021 Dataset",
// "preprint" -> "0016 Preprint",
// "report" -> "0017 Report"
// )
val mappingCrossrefSubType = Map( def mappingResult(result: Result, json: JValue, resourceType:Qualifier, instanceType: Qualifier): Result = {
"book-section" -> "0013 Part of book or chapter of book",
"book" -> "0002 Book",
"book-chapter" -> "0013 Part of book or chapter of book",
"book-part" -> "0013 Part of book or chapter of book",
"book-series" -> "0002 Book",
"book-set" -> "0002 Book",
"book-track" -> "0002 Book",
"edited-book" -> "0002 Book",
"reference-book" -> "0002 Book",
"monograph" -> "0002 Book",
"journal-article" -> "0001 Article",
"dissertation" -> "0044 Thesis",
"other" -> "0038 Other literature type",
"peer-review" -> "0015 Review",
"proceedings" -> "0004 Conference object",
"proceedings-article" -> "0004 Conference object",
"reference-entry" -> "0013 Part of book or chapter of book",
"report" -> "0017 Report",
"report-series" -> "0017 Report",
"standard" -> "0038 Other literature type",
"standard-series" -> "0038 Other literature type",
"dataset" -> "0021 Dataset",
"preprint" -> "0016 Preprint",
"report" -> "0017 Report"
)
def mappingResult(result: Result, json: JValue, cobjCategory: String): Result = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
//MAPPING Crossref DOI into PID //MAPPING Crossref DOI into PID
@ -256,22 +258,8 @@ case object Crossref2Oaf {
instance.setAccessright( instance.setAccessright(
decideAccessRight(instance.getLicense, result.getDateofacceptance.getValue) decideAccessRight(instance.getLicense, result.getDateofacceptance.getValue)
) )
instance.setInstancetype( instance.setInstancetype(instanceType)
OafMapperUtils.qualifier( result.setResourcetype(resourceType)
cobjCategory.substring(0, 4),
cobjCategory.substring(5),
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
)
result.setResourcetype(
OafMapperUtils.qualifier(
cobjCategory.substring(0, 4),
cobjCategory.substring(5),
ModelConstants.DNET_PUBLICATION_RESOURCE,
ModelConstants.DNET_PUBLICATION_RESOURCE
)
)
instance.setCollectedfrom(createCrossrefCollectedFrom()) instance.setCollectedfrom(createCrossrefCollectedFrom())
if (StringUtils.isNotBlank(issuedDate)) { if (StringUtils.isNotBlank(issuedDate)) {
@ -330,7 +318,7 @@ case object Crossref2Oaf {
a a
} }
def convert(input: String): List[Oaf] = { def convert(input: String, vocabularies: VocabularyGroup): List[Oaf] = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input) lazy val json: json4s.JValue = parse(input)
@ -341,14 +329,14 @@ case object Crossref2Oaf {
if (objectType == null) if (objectType == null)
return resultList return resultList
val result = generateItemFromType(objectType, objectSubType) val result = generateItemFromType(objectType, objectSubType, vocabularies)
if (result == null) if (result == null)
return List() return List()
val cOBJCategory = mappingCrossrefSubType.getOrElse(
objectType, val (resourceType, instanceType) =getTypeQualifier(objectType, objectSubType, vocabularies)
mappingCrossrefSubType.getOrElse(objectSubType, "0038 Other literature type")
)
mappingResult(result, json, cOBJCategory) mappingResult(result, json, resourceType, instanceType)
if (result == null || result.getId == null) if (result == null || result.getId == null)
return List() return List()
@ -366,7 +354,7 @@ case object Crossref2Oaf {
} }
result match { result match {
case publication: Publication => convertPublication(publication, json, cOBJCategory) case publication: Publication => convertPublication(publication, json)
case dataset: Dataset => convertDataset(dataset) case dataset: Dataset => convertDataset(dataset)
} }
@ -521,12 +509,14 @@ case object Crossref2Oaf {
// TODO check if there are other info to map into the Dataset // TODO check if there are other info to map into the Dataset
} }
def convertPublication(publication: Publication, json: JValue, cobjCategory: String): Unit = { def convertPublication(publication: Publication, json: JValue): Unit = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
val containerTitles = for { JString(ct) <- json \ "container-title" } yield ct val containerTitles = for { JString(ct) <- json \ "container-title" } yield ct
val className = publication.getInstance().asScala.map(i => i.getInstancetype.getClassname).head
//Mapping book //Mapping book
if (cobjCategory.toLowerCase.contains("book")) { if ("book".equalsIgnoreCase(className)) {
val ISBN = for { JString(isbn) <- json \ "ISBN" } yield isbn val ISBN = for { JString(isbn) <- json \ "ISBN" } yield isbn
if (ISBN.nonEmpty && containerTitles.nonEmpty) { if (ISBN.nonEmpty && containerTitles.nonEmpty) {
val source = s"${containerTitles.head} ISBN: ${ISBN.head}" val source = s"${containerTitles.head} ISBN: ${ISBN.head}"
@ -607,12 +597,27 @@ case object Crossref2Oaf {
null null
} }
def generateItemFromType(objectType: String, objectSubType: String): Result = {
if (mappingCrossrefType.contains(objectType)) {
if (mappingCrossrefType(objectType).equalsIgnoreCase("publication")) def getTypeQualifier(objectType: String, objectSubType:String, vocabularies: VocabularyGroup):(Qualifier,Qualifier) = {
return new Publication() val result: tuple.Pair[Qualifier, Qualifier] = DHPUtils.retrieveOAFTypeFromVocabulary(objectType, objectSubType,null, vocabularies)
if (mappingCrossrefType(objectType).equalsIgnoreCase("dataset"))
return new Dataset() if (result!= null)
(result.getValue, result.getKey)
else
null
}
def generateItemFromType(objectType: String, objectSubType:String, vocabularies: VocabularyGroup): Result = {
val result =getTypeQualifier(objectType, objectSubType, vocabularies)
if (result != null)
{
if ("publication".equalsIgnoreCase(result._1.getClassname)) {
return new Publication
}
if ("dataset".equalsIgnoreCase(result._1.getClassname))
return new Dataset
} }
null null
} }

View File

@ -0,0 +1,37 @@
package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.client.AbstractRestClient
import org.json4s.{DefaultFormats, JValue}
import org.json4s.jackson.JsonMethods.{compact, parse, render}
class CrossrefFunderRetriever(var cursor:String = "*") extends AbstractRestClient{
override def extractInfo(input: String): Unit = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: org.json4s.JValue = parse(input)
buffer = (json \\ "items").extract[List[JValue]].map(s => compact(render(s)))
cursor = (json \ "message" \ "next-cursor").extractOrElse[String](null)
if (cursor.isEmpty)
complete = true
current_index = 0
}
def get_url(): String = {
println(s"cursor is $cursor")
s"https://api.crossref.org/funders?rows=1000&cursor=$cursor"
}
override def getBufferData(): Unit = {
if (!complete) {
val response =
if (scroll_value.isDefined) doHTTPGETRequest(scroll_value.get)
else doHTTPGETRequest(get_url())
extractInfo(response)
}
}
}

View File

@ -1,9 +1,9 @@
package eu.dnetlib.doiboost.crossref package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.{AbstractScalaApplication, ArgumentApplicationParser, SparkScalaApplication}
import eu.dnetlib.doiboost.DoiBoostMappingUtil import eu.dnetlib.doiboost.DoiBoostMappingUtil
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.{SparkConf, SparkContext}
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
@ -12,59 +12,103 @@ import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source import scala.io.Source
object GenerateCrossrefDataset {
val log: Logger = LoggerFactory.getLogger(GenerateCrossrefDataset.getClass) class SparkGenerateCrossrefDataset (propertyPath: String, args: Array[String], log: Logger)
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT]
def crossrefElement(meta: String): CrossrefDT = { /**
* This method convert the Json crossoref to CrossrefDT class
*
* @param metadata the json metadata
* @return the CrossrefDT
*/
def crossrefElement(metadata: String): CrossrefDT = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(meta) lazy val json: json4s.JValue = parse(metadata)
val doi: String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String]) val doi: String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String])
val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long] val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long]
CrossrefDT(doi, meta, timestamp) CrossrefDT(doi, metadata, timestamp)
} }
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(
Source
.fromInputStream(
getClass.getResourceAsStream(
"/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json"
)
)
.mkString
)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
val targetPath = parser.get("targetPath")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
.appName(UnpackCrtossrefEntries.getClass.getSimpleName)
.master(master)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
def convertDataset(spark:SparkSession, sourcePath:String, targetPath:String):Unit = {
import spark.implicits._ import spark.implicits._
spark.read.text(sourcePath).as[String].map(entry => crossrefElement(entry))
val tmp: RDD[String] = sc.textFile(sourcePath, 6000)
spark
.createDataset(tmp)
.map(entry => crossrefElement(entry))
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(targetPath) .save(targetPath)
// .map(meta => crossrefElement(meta))
// .toDS.as[CrossrefDT]
// .write.mode(SaveMode.Overwrite).save(targetPath)
} }
/** Here all the spark applications runs this method
* where the whole logic of the spark node is defined
*/
override def run(): Unit = {
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath is $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath is $targetPath")
convertDataset(spark, sourcePath, targetPath)
}
} }
//object GenerateCrossrefDataset {
//
// val log: Logger = LoggerFactory.getLogger(GenerateCrossrefDataset.getClass)
//
// implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT]
//
//
//
// def main(args: Array[String]): Unit = {
// val conf = new SparkConf
// val parser = new ArgumentApplicationParser(
// Source
// .fromInputStream(
// getClass.getResourceAsStream(
// "/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json"
// )
// )
// .mkString
// )
// parser.parseArgument(args)
// val master = parser.get("master")
// val sourcePath = parser.get("sourcePath")
// val targetPath = parser.get("targetPath")
//
// val spark: SparkSession = SparkSession
// .builder()
// .config(conf)
// .appName(UnpackCrtossrefEntries.getClass.getSimpleName)
// .master(master)
// .getOrCreate()
// val sc: SparkContext = spark.sparkContext
//
// import spark.implicits._
//
// val tmp: RDD[String] = sc.textFile(sourcePath, 6000)
//
// spark
// .createDataset(tmp)
// .map(entry => crossrefElement(entry))
// .write
// .mode(SaveMode.Overwrite)
// .save(targetPath)
// }
//
//}

View File

@ -1,8 +1,10 @@
package eu.dnetlib.doiboost.crossref package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.oaf import eu.dnetlib.dhp.schema.oaf
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql._ import org.apache.spark.sql._
@ -39,12 +41,19 @@ object SparkMapDumpIntoOAF {
implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.kryo[Relation] implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.kryo[Relation]
implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset] implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset]
val isLookupUrl: String = parser.get("isLookupUrl")
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
require(vocabularies != null)
val targetPath = parser.get("targetPath") val targetPath = parser.get("targetPath")
spark.read spark.read
.load(parser.get("sourcePath")) .load(parser.get("sourcePath"))
.as[CrossrefDT] .as[CrossrefDT]
.flatMap(k => Crossref2Oaf.convert(k.json)) .flatMap(k => Crossref2Oaf.convert(k.json,vocabularies))
.filter(o => o != null) .filter(o => o != null)
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)

View File

@ -0,0 +1,42 @@
package eu.dnetlib.doiboost;
import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import eu.dnetlib.dhp.common.test.TestUtils;
import org.apache.commons.io.IOUtils;
import org.mockito.Mock;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public abstract class AbstractVocabularyTest {
@Mock
protected ISLookUpService isLookUpService;
protected VocabularyGroup vocabularies;
public void setUpVocabulary() throws ISLookUpException, IOException {
lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs());
lenient()
.when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY))
.thenReturn(synonyms());
vocabularies = VocabularyGroup.loadVocsFromIS(isLookUpService);
}
private static List<String> vocs() throws IOException {
return TestUtils.getVocabulariesMock();
}
private static List<String> synonyms() throws IOException {
return TestUtils.getSynonymsMock();
}
}

View File

@ -0,0 +1,62 @@
package eu.dnetlib.dhp.doiboost.crossref
import eu.dnetlib.doiboost.crossref.CrossrefFunderRetriever
import org.json4s.DefaultFormats
import org.json4s.JsonAST.JString
import org.json4s.jackson.JsonMethods.parse
import org.junit.jupiter.api.Test
class CrossrefFunderTest {
def parse_funder(input:String):(String, String, List[String]) = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: org.json4s.JValue = parse(input)
val l:List[String] = for { JString(desc) <- json \ "descendants" } yield desc
((json \ "name").extract[String],(json \ "uri").extract[String], l)
}
@Test
def testFunderRelationshipsMapping(): Unit = {
val cf = new CrossrefFunderRetriever()
var i = 0
// val w =new PrintWriter("/tmp/funder_names")
val data = cf.toIterator.next()
println(data)
// cf.map(s=>parse_funder(s)).foreach(s=> w.write(s"${s._1} \t${s._2} \t${s._3.mkString("--")}\t\n"))
//
//
// w.close()
// cf.foreach{_ =>
//
// i = i+1
//
// if (i % 1000 == 0)
// println(s"get $i documents")
//
// }
println(s"Total item $i")
}
}

View File

@ -2,21 +2,32 @@ package eu.dnetlib.dhp.doiboost.crossref
import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.schema.oaf._
import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.dhp.utils.DHPUtils
import eu.dnetlib.doiboost.AbstractVocabularyTest
import eu.dnetlib.doiboost.crossref.Crossref2Oaf import eu.dnetlib.doiboost.crossref.Crossref2Oaf
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test import org.junit.jupiter.api.{BeforeAll, BeforeEach, Test}
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.junit.jupiter.MockitoExtension
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.io.Source import scala.io.Source
import scala.util.matching.Regex import scala.util.matching.Regex
class CrossrefMappingTest {
@ExtendWith(Array(classOf[MockitoExtension]))
class CrossrefMappingTest extends AbstractVocabularyTest {
val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass) val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass)
val mapper = new ObjectMapper() val mapper = new ObjectMapper()
@BeforeEach
def setUp(): Unit = {
setUpVocabulary()
}
@Test @Test
def testFunderRelationshipsMapping(): Unit = { def testFunderRelationshipsMapping(): Unit = {
val template = Source val template = Source
@ -33,13 +44,13 @@ class CrossrefMappingTest {
for (line <- funder_doi.lines) { for (line <- funder_doi.lines) {
val json = template.replace("%s", line) val json = template.replace("%s", line)
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json, vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
checkRelation(resultList) checkRelation(resultList)
} }
for (line <- funder_name.lines) { for (line <- funder_name.lines) {
val json = template.replace("%s", line) val json = template.replace("%s", line)
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json, vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
checkRelation(resultList) checkRelation(resultList)
} }
@ -79,7 +90,7 @@ class CrossrefMappingTest {
Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/issue_date.json")).mkString Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/issue_date.json")).mkString
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty) assertFalse(json.isEmpty)
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
val items = resultList.filter(p => p.isInstanceOf[Result]) val items = resultList.filter(p => p.isInstanceOf[Result])
@ -98,7 +109,7 @@ class CrossrefMappingTest {
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty) assertFalse(json.isEmpty)
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
@ -120,7 +131,7 @@ class CrossrefMappingTest {
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
@ -141,7 +152,7 @@ class CrossrefMappingTest {
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
@ -177,7 +188,7 @@ class CrossrefMappingTest {
assertFalse(json.isEmpty) assertFalse(json.isEmpty)
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
val rels: List[Relation] = val rels: List[Relation] =
@ -197,7 +208,7 @@ class CrossrefMappingTest {
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
@ -273,7 +284,7 @@ class CrossrefMappingTest {
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
@ -357,7 +368,7 @@ class CrossrefMappingTest {
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
@ -405,7 +416,7 @@ class CrossrefMappingTest {
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
@ -485,7 +496,7 @@ class CrossrefMappingTest {
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
@ -508,7 +519,7 @@ class CrossrefMappingTest {
val line: String = val line: String =
"\"funder\": [{\"name\": \"Wellcome Trust Masters Fellowship\",\"award\": [\"090633\"]}]," "\"funder\": [{\"name\": \"Wellcome Trust Masters Fellowship\",\"award\": [\"090633\"]}],"
val json = template.replace("%s", line) val json = template.replace("%s", line)
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
val items = resultList.filter(p => p.isInstanceOf[Publication]) val items = resultList.filter(p => p.isInstanceOf[Publication])
val result: Result = items.head.asInstanceOf[Publication] val result: Result = items.head.asInstanceOf[Publication]
@ -527,7 +538,7 @@ class CrossrefMappingTest {
.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/article.json")) .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/article.json"))
.mkString .mkString
val resultList: List[Oaf] = Crossref2Oaf.convert(template) val resultList: List[Oaf] = Crossref2Oaf.convert(template, vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
val items = resultList.filter(p => p.isInstanceOf[Publication]) val items = resultList.filter(p => p.isInstanceOf[Publication])
val result: Result = items.head.asInstanceOf[Publication] val result: Result = items.head.asInstanceOf[Publication]
@ -551,7 +562,7 @@ class CrossrefMappingTest {
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
@ -581,7 +592,7 @@ class CrossrefMappingTest {
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
@ -616,7 +627,7 @@ class CrossrefMappingTest {
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
@ -651,7 +662,7 @@ class CrossrefMappingTest {
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
@ -686,7 +697,7 @@ class CrossrefMappingTest {
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
@ -719,7 +730,7 @@ class CrossrefMappingTest {
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json,vocabularies)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)