diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java index 0b2cd571fa..654fdd5ac8 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java @@ -28,7 +28,7 @@ public class HdfsSupport { * @param configuration Configuration of hadoop env */ public static boolean exists(String path, Configuration configuration) { - logger.info("Removing path: {}", path); + logger.info("Checking existence for path: {}", path); return rethrowAsRuntimeException( () -> { Path f = new Path(path); diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java index 6a86f30df7..5a59bc0dfe 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java @@ -4,19 +4,19 @@ package eu.dnetlib.dhp.utils; import java.io.*; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; +import java.util.*; +import java.util.stream.Collectors; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.codec.binary.Base64OutputStream; import org.apache.commons.codec.binary.Hex; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SaveMode; import org.slf4j.Logger; @@ -26,6 +26,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import com.jayway.jsonpath.JsonPath; +import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import net.minidev.json.JSONArray; import scala.collection.JavaConverters; import scala.collection.Seq; @@ -52,10 +54,56 @@ public class DHPUtils { } } + /** + * Retrieves from the metadata store manager application the list of paths associated with mdstores characterized + * by he given format, layout, interpretation + * @param mdstoreManagerUrl the URL of the mdstore manager service + * @param format the mdstore format + * @param layout the mdstore layout + * @param interpretation the mdstore interpretation + * @param includeEmpty include Empty mdstores + * @return the set of hdfs paths + * @throws IOException in case of HTTP communication issues + */ + public static Set mdstorePaths(final String mdstoreManagerUrl, + final String format, + final String layout, + final String interpretation, + boolean includeEmpty) throws IOException { + final String url = mdstoreManagerUrl + "/mdstores/"; + final ObjectMapper objectMapper = new ObjectMapper(); + + final HttpGet req = new HttpGet(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + final String json = IOUtils.toString(response.getEntity().getContent()); + final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); + return Arrays + .stream(mdstores) + .filter(md -> md.getFormat().equalsIgnoreCase(format)) + .filter(md -> md.getLayout().equalsIgnoreCase(layout)) + .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) + .filter(md -> StringUtils.isNotBlank(md.getHdfsPath())) + .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) + .filter(md -> includeEmpty || md.getSize() > 0) + .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") + .collect(Collectors.toSet()); + } + } + } + public static String generateIdentifier(final String originalId, final String nsPrefix) { return String.format("%s::%s", nsPrefix, DHPUtils.md5(originalId)); } + public static String generateUnresolvedIdentifier(final String pid, final String pidType) { + + final String cleanedPid = CleaningFunctions.normalizePidValue(pidType, pid); + + return String.format("unresolved::%s::%s", cleanedPid, pidType.toLowerCase().trim()); + } + public static String getJPathString(final String jsonPath, final String json) { try { Object o = JsonPath.read(json, jsonPath); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ExportActionSetJobNode.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ExportActionSetJobNode.scala deleted file mode 100644 index 9f0d257359..0000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ExportActionSetJobNode.scala +++ /dev/null @@ -1,41 +0,0 @@ -package eu.dnetlib.dhp.actionmanager.datacite - -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.Oaf -import org.apache.hadoop.io.Text -import org.apache.hadoop.io.compress.GzipCodec -import org.apache.hadoop.mapred.SequenceFileOutputFormat -import org.apache.spark.SparkConf -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} -import org.slf4j.{Logger, LoggerFactory} - -import scala.io.Source - -object ExportActionSetJobNode { - - val log: Logger = LoggerFactory.getLogger(ExportActionSetJobNode.getClass) - - def main(args: Array[String]): Unit = { - val conf = new SparkConf - val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json")).mkString) - parser.parseArgument(args) - val master = parser.get("master") - val sourcePath = parser.get("sourcePath") - val targetPath = parser.get("targetPath") - - val spark: SparkSession = SparkSession.builder().config(conf) - .appName(ExportActionSetJobNode.getClass.getSimpleName) - .master(master) - .getOrCreate() - implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val tEncoder:Encoder[(String,String)] = Encoders.tuple(Encoders.STRING,Encoders.STRING) - - spark.read.load(sourcePath).as[Oaf] - .map(o =>DataciteToOAFTransformation.toActionSet(o)) - .filter(o => o!= null) - .rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) - - - } - -} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/FilterCrossrefEntitiesSpark.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/FilterCrossrefEntitiesSpark.scala deleted file mode 100644 index 5860c50aca..0000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/FilterCrossrefEntitiesSpark.scala +++ /dev/null @@ -1,46 +0,0 @@ -package eu.dnetlib.dhp.actionmanager.datacite - -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup -import eu.dnetlib.dhp.schema.mdstore.MetadataRecord -import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} -import eu.dnetlib.dhp.utils.ISLookupClientFactory -import org.apache.spark.SparkConf -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} -import org.slf4j.{Logger, LoggerFactory} - -import scala.io.Source - -object FilterCrossrefEntitiesSpark { - - val log: Logger = LoggerFactory.getLogger(getClass.getClass) - - def main(args: Array[String]): Unit = { - val conf = new SparkConf - val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json")).mkString) - parser.parseArgument(args) - val master = parser.get("master") - val sourcePath = parser.get("sourcePath") - log.info("sourcePath: {}", sourcePath) - val targetPath = parser.get("targetPath") - log.info("targetPath: {}", targetPath) - - - - val spark: SparkSession = SparkSession.builder().config(conf) - .appName(getClass.getSimpleName) - .master(master) - .getOrCreate() - - - - implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val resEncoder: Encoder[Result] = Encoders.kryo[Result] - - val d:Dataset[Oaf]= spark.read.load(sourcePath).as[Oaf] - - d.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]).write.mode(SaveMode.Overwrite).save(targetPath) - - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala new file mode 100644 index 0000000000..11ecfd6cb6 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala @@ -0,0 +1,49 @@ +package eu.dnetlib.dhp.collection + +import eu.dnetlib.dhp.schema.common.ModelSupport +import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation} + +object CollectionUtils { + + /** + * This method in pipeline to the transformation phase, + * generates relations in both verse, typically it should be a phase of flatMap + * + * @param i input OAF + * @return + * If the input OAF is an entity -> List(i) + * If the input OAF is a relation -> List(relation, inverseRelation) + * + */ + + def fixRelations(i: Oaf): List[Oaf] = { + if (i.isInstanceOf[OafEntity]) + return List(i) + else { + val r: Relation = i.asInstanceOf[Relation] + val currentRel = ModelSupport.findRelation(r.getRelClass) + if (currentRel != null) { + + // Cleaning relation + r.setRelType(currentRel.getRelType) + r.setSubRelType(currentRel.getSubReltype) + r.setRelClass(currentRel.getRelClass) + val inverse = new Relation + inverse.setSource(r.getTarget) + inverse.setTarget(r.getSource) + inverse.setRelType(currentRel.getRelType) + inverse.setSubRelType(currentRel.getSubReltype) + inverse.setRelClass(currentRel.getInverseRelClass) + inverse.setCollectedfrom(r.getCollectedfrom) + inverse.setDataInfo(r.getDataInfo) + inverse.setProperties(r.getProperties) + inverse.setLastupdatetimestamp(r.getLastupdatetimestamp) + inverse.setValidated(r.getValidated) + inverse.setValidationDate(r.getValidationDate) + return List(r, inverse) + } + } + List() + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/AbstractRestClient.scala similarity index 90% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/AbstractRestClient.scala index bae41b218b..6a9b8e3e54 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/AbstractRestClient.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/AbstractRestClient.scala @@ -1,12 +1,10 @@ -package eu.dnetlib.dhp.actionmanager.datacite +package eu.dnetlib.dhp.datacite import org.apache.commons.io.IOUtils import org.apache.http.client.config.RequestConfig -import org.apache.http.client.methods.{HttpGet, HttpPost, HttpRequestBase, HttpUriRequest} +import org.apache.http.client.methods.{HttpGet, HttpPost, HttpUriRequest} import org.apache.http.entity.StringEntity -import org.apache.http.impl.client.{HttpClientBuilder, HttpClients} - -import java.io.IOException +import org.apache.http.impl.client.HttpClientBuilder abstract class AbstractRestClient extends Iterator[String] { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteAPIImporter.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteAPIImporter.scala similarity index 95% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteAPIImporter.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteAPIImporter.scala index 36ec9e8c33..7ec44a6ff0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteAPIImporter.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteAPIImporter.scala @@ -1,7 +1,7 @@ -package eu.dnetlib.dhp.actionmanager.datacite +package eu.dnetlib.dhp.datacite -import org.json4s.{DefaultFormats, JValue} import org.json4s.jackson.JsonMethods.{compact, parse, render} +import org.json4s.{DefaultFormats, JValue} class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10, until:Long = -1) extends AbstractRestClient { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala similarity index 98% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala index cfdd98d30c..6ce4920edf 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.actionmanager.datacite +package eu.dnetlib.dhp.datacite import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup @@ -325,8 +325,9 @@ object DataciteToOAFTransformation { val grantId = m.matcher(awardUri).replaceAll("$2") val targetId = s"$p${DHPUtils.md5(grantId)}" List( - generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo), - generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo) + generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo) +// REMOVED INVERSE RELATION since there is a specific method that should generate later +// generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo) ) } else @@ -580,11 +581,11 @@ object DataciteToOAFTransformation { rel.setProperties(List(dateProps).asJava) rel.setSource(id) - rel.setTarget(s"unresolved::${r.relatedIdentifier}::${r.relatedIdentifierType}") + rel.setTarget(DHPUtils.generateUnresolvedIdentifier(r.relatedIdentifier,r.relatedIdentifierType)) rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava) - rel.getCollectedfrom.asScala.map(c => c.getValue)(collection.breakOut) + rel.getCollectedfrom.asScala.map(c => c.getValue).toList rel - })(collection breakOut) + }).toList } def generateDataInfo(trust: String): DataInfo = { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala similarity index 61% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala index 2cabc78799..a63627d1c7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala @@ -1,9 +1,14 @@ -package eu.dnetlib.dhp.actionmanager.datacite +package eu.dnetlib.dhp.datacite +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations +import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH +import eu.dnetlib.dhp.common.Constants.MDSTORE_SIZE_PATH import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup -import eu.dnetlib.dhp.schema.mdstore.MetadataRecord +import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord} import eu.dnetlib.dhp.schema.oaf.Oaf +import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile import eu.dnetlib.dhp.utils.ISLookupClientFactory import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} @@ -17,11 +22,10 @@ object GenerateDataciteDatasetSpark { def main(args: Array[String]): Unit = { val conf = new SparkConf - val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json")).mkString) + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/datacite/generate_dataset_params.json")).mkString) parser.parseArgument(args) val master = parser.get("master") val sourcePath = parser.get("sourcePath") - val targetPath = parser.get("targetPath") val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks")) val isLookupUrl: String = parser.get("isLookupUrl") log.info("isLookupUrl: {}", isLookupUrl) @@ -33,16 +37,28 @@ object GenerateDataciteDatasetSpark { .master(master) .getOrCreate() + import spark.implicits._ + implicit val mrEncoder: Encoder[MetadataRecord] = Encoders.kryo[MetadataRecord] implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - import spark.implicits._ + val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") + val mapper = new ObjectMapper() + val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) + val outputBasePath = cleanedMdStoreVersion.getHdfsPath + + log.info("outputBasePath: {}", outputBasePath) + val targetPath = s"$outputBasePath/$MDSTORE_DATA_PATH" spark.read.load(sourcePath).as[DataciteType] .filter(d => d.isActive) .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks)) .filter(d => d != null) + .flatMap(i => fixRelations(i)).filter(i => i != null) .write.mode(SaveMode.Overwrite).save(targetPath) + + val total_items = spark.read.load(targetPath).as[Oaf].count() + writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH) } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/ImportDatacite.scala similarity index 93% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/ImportDatacite.scala index 2b73d29559..018b4958ad 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/ImportDatacite.scala @@ -1,6 +1,5 @@ -package eu.dnetlib.dhp.actionmanager.datacite +package eu.dnetlib.dhp.datacite -import eu.dnetlib.dhp.actionmanager.datacite.DataciteToOAFTransformation.df_it import eu.dnetlib.dhp.application.ArgumentApplicationParser import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path} @@ -9,14 +8,14 @@ import org.apache.hadoop.io.{IntWritable, SequenceFile, Text} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.functions.max import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession} import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse -import org.apache.spark.sql.functions.max import org.slf4j.{Logger, LoggerFactory} -import java.time.format.DateTimeFormatter._ -import java.time.{LocalDate, LocalDateTime, ZoneOffset} +import java.time.format.DateTimeFormatter.ISO_DATE_TIME +import java.time.{LocalDateTime, ZoneOffset} import scala.io.Source object ImportDatacite { @@ -138,11 +137,11 @@ object ImportDatacite { } } - private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs:Int): Long = { - var from:Long = timestamp * 1000 - val delta:Long = 100000000L + private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs: Int): Long = { + var from: Long = timestamp * 1000 + val delta: Long = 100000000L var client: DataciteAPIImporter = null - val now :Long =System.currentTimeMillis() + val now: Long = System.currentTimeMillis() var i = 0 try { val writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfsTargetPath), SequenceFile.Writer.keyClass(classOf[IntWritable]), SequenceFile.Writer.valueClass(classOf[Text])) @@ -168,7 +167,7 @@ object ImportDatacite { start = System.currentTimeMillis } } - println(s"updating from value: $from -> ${from+delta}") + println(s"updating from value: $from -> ${from + delta}") from = from + delta } } catch { @@ -183,4 +182,4 @@ object ImportDatacite { i } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/SparkDownloadUpdateDatacite.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/SparkDownloadUpdateDatacite.scala similarity index 68% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/SparkDownloadUpdateDatacite.scala rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/SparkDownloadUpdateDatacite.scala index 0b459fcf68..d46e5423d3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/SparkDownloadUpdateDatacite.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/SparkDownloadUpdateDatacite.scala @@ -1,18 +1,14 @@ -package eu.dnetlib.dhp.actionmanager.datacite - +package eu.dnetlib.dhp.datacite import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.LocalFileSystem -import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.spark.SparkConf -import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.apache.spark.sql.functions.max +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.slf4j.{Logger, LoggerFactory} import java.text.SimpleDateFormat -import java.util.{Date, Locale} +import java.util.Locale import scala.io.Source object SparkDownloadUpdateDatacite { @@ -21,7 +17,7 @@ object SparkDownloadUpdateDatacite { def main(args: Array[String]): Unit = { val conf = new SparkConf - val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json")).mkString) + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/datacite/generate_dataset_params.json")).mkString) parser.parseArgument(args) val master = parser.get("master") val sourcePath = parser.get("sourcePath") @@ -42,9 +38,9 @@ object SparkDownloadUpdateDatacite { import spark.implicits._ - val maxDate:String = spark.read.load(workingPath).as[Oaf].filter(s => s.isInstanceOf[Result]).map(r => r.asInstanceOf[Result].getDateofcollection).select(max("value")).first().getString(0) + val maxDate: String = spark.read.load(workingPath).as[Oaf].filter(s => s.isInstanceOf[Result]).map(r => r.asInstanceOf[Result].getDateofcollection).select(max("value")).first().getString(0) val ISO8601FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.US) - val string_to_date =ISO8601FORMAT.parse(maxDate) + val string_to_date = ISO8601FORMAT.parse(maxDate) val ts = string_to_date.getTime diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala index 7a62437a36..8ae8285e3f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.sx.bio import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.Oaf import BioDBToOAF.ScholixResolved +import eu.dnetlib.dhp.collection.CollectionUtils import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} @@ -35,13 +36,13 @@ object SparkTransformBioDatabaseToOAF { import spark.implicits._ database.toUpperCase() match { case "UNIPROT" => - spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).write.mode(SaveMode.Overwrite).save(targetPath) + spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) case "PDB" => - spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath) + spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) case "SCHOLIX" => - spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).write.mode(SaveMode.Overwrite).save(targetPath) + spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) case "CROSSREF_LINKS" => - spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath) + spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala index b19bfc23a5..8da617ca07 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala @@ -5,6 +5,7 @@ import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.sx.bio.BioDBToOAF import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem import BioDBToOAF.EBILinkItem +import eu.dnetlib.dhp.collection.CollectionUtils import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql._ @@ -37,6 +38,7 @@ object SparkEBILinksToOaf { ebLinks.flatMap(j => BioDBToOAF.parse_ebi_links(j.links)) .filter(p => BioDBToOAF.EBITargetLinksFilter(p)) .flatMap(p => BioDBToOAF.convertEBILinksToOaf(p)) + .flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null) .write.mode(SaveMode.Overwrite).save(targetPath) } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml deleted file mode 100644 index c6332ff7d5..0000000000 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml +++ /dev/null @@ -1,81 +0,0 @@ - - - - mainPath - the working path of Datacite stores - - - isLookupUrl - The IS lookUp service endopoint - - - blocksize - 100 - The request block size - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - yarn-cluster - cluster - ImportDatacite - eu.dnetlib.dhp.actionmanager.datacite.ImportDatacite - dhp-aggregation-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --targetPath${mainPath}/datacite_update - --dataciteDumpPath${mainPath}/datacite_dump - --namenode${nameNode} - --masteryarn-cluster - --blocksize${blocksize} - - - - - - - - - yarn-cluster - cluster - TransformJob - eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark - dhp-aggregation-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${mainPath}/datacite_dump - --targetPath${mainPath}/datacite_oaf - --isLookupUrl${isLookupUrl} - --exportLinksfalse - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/config-default.xml deleted file mode 100644 index dd3c32c620..0000000000 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/config-default.xml +++ /dev/null @@ -1,23 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - - - oozie.launcher.mapreduce.user.classpath.first - true - - \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/workflow.xml deleted file mode 100644 index 397288c694..0000000000 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/scholix/oozie_app/workflow.xml +++ /dev/null @@ -1,84 +0,0 @@ - - - - datacitePath - the path of Datacite spark dataset - - - isLookupUrl - The IS lookUp service endopoint - - - crossrefPath - the path of Crossref spark dataset - - - - targetPath - the path of Crossref spark dataset - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - yarn-cluster - cluster - ImportDatacite - eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark - dhp-aggregation-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${datacitePath} - --targetPath${targetPath}/datacite_oaf - --isLookupUrl${isLookupUrl} - --exportLinkstrue - --masteryarn-cluster - - - - - - - - - yarn-cluster - cluster - FilterCrossrefEntities - eu.dnetlib.dhp.actionmanager.datacite.FilterCrossrefEntitiesSpark - dhp-aggregation-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePath${crossrefPath} - --targetPath${targetPath}/crossref_oaf - --masteryarn-cluster - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/actionset/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/collection/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/actionset/oozie_app/config-default.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/collection/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/actionset/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/collection/oozie_app/workflow.xml similarity index 59% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/actionset/oozie_app/workflow.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/collection/oozie_app/workflow.xml index 3c58ace7bf..6989eed66e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/actionset/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/collection/oozie_app/workflow.xml @@ -1,46 +1,52 @@ - + - sourcePath + mainPath the working path of Datacite stores - outputPath - the path of Datacite ActionSet + isLookupUrl + The IS lookUp service endopoint + + blocksize + 100 + The request block size + + - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn-cluster cluster - ExportDataset - eu.dnetlib.dhp.actionmanager.datacite.ExportActionSetJobNode + ImportDatacite + eu.dnetlib.dhp.datacite.ImportDatacite dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePath${sourcePath} - --targetPath${outputPath} + --targetPath${mainPath}/datacite_update + --dataciteDumpPath${mainPath}/datacite_dump + --namenode${nameNode} --masteryarn-cluster + --blocksize${blocksize} - \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/datacite_filter b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/datacite_filter similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/datacite_filter rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/datacite_filter diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/exportDataset_parameters.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/exportDataset_parameters.json diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/filter_crossref_param.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/filter_crossref_param.json diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/generate_dataset_params.json similarity index 90% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/generate_dataset_params.json index 67e7f37dcb..04dc8b942c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/generate_dataset_params.json @@ -7,8 +7,8 @@ }, { - "paramName": "t", - "paramLongName": "targetPath", + "paramName": "mo", + "paramLongName": "mdstoreOutputVersion", "paramDescription": "the target mdstore path", "paramRequired": true }, diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/hostedBy_map.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/hostedBy_map.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/hostedBy_map.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/hostedBy_map.json diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/import_from_api.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/import_from_api.json diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/transformation/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/config-default.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/transformation/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/transformation/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/transformation/oozie_app/workflow.xml new file mode 100644 index 0000000000..d5bae7305d --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/transformation/oozie_app/workflow.xml @@ -0,0 +1,126 @@ + + + + mainPath + the working path of Datacite stores + + + isLookupUrl + The IS lookUp service endopoint + + + mdStoreOutputId + the identifier of the cleaned MDStore + + + mdStoreManagerURI + the path of the cleaned mdstore + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionNEW_VERSION + --mdStoreID${mdStoreOutputId} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + yarn-cluster + cluster + TransformJob + eu.dnetlib.dhp.datacite.GenerateDataciteDatasetSpark + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${mainPath}/datacite_dump + --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --isLookupUrl${isLookupUrl} + --exportLinkstrue + --masteryarn-cluster + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionCOMMIT + --namenode${nameNode} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionREAD_UNLOCK + --mdStoreManagerURI${mdStoreManagerURI} + --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']} + + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionROLLBACK + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml index 73b1a3b608..4b47ae38e4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml @@ -52,7 +52,7 @@ yarn-cluster cluster Incremental Download EBI Links - eu.dnetllib.dhp.sx.bio.ebi.SparkDownloadEBILinks + eu.dnetlib.dhp.sx.bio.ebi.SparkDownloadEBILinks dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -85,7 +85,7 @@ yarn-cluster cluster Create OAF DataSet - eu.dnetllib.dhp.sx.bio.ebi.SparkEBILinksToOaf + eu.dnetlib.dhp.sx.bio.ebi.SparkEBILinksToOaf dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml index 21fd2d1535..8915a090bd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml @@ -30,7 +30,7 @@ yarn cluster Convert Baseline to OAF Dataset - eu.dnetllib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame + eu.dnetlib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTest.scala b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala similarity index 88% rename from dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTest.scala rename to dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala index a795a910da..f21e9eab1c 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala @@ -1,8 +1,7 @@ -package eu.dnetlib.dhp.actionmanager.datacite +package eu.dnetlib.dhp.datacite -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.databind.SerializationFeature +import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature} import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest import eu.dnetlib.dhp.schema.oaf.Oaf import org.junit.jupiter.api.extension.ExtendWith diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java index ef419a042e..474944260b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java @@ -127,13 +127,6 @@ public class MergeGraphTableSparkJob { } }, Encoders.bean(p_clazz)) .filter((FilterFunction

) Objects::nonNull) - .filter((FilterFunction

) o -> { - HashSet collectedFromNames = Optional - .ofNullable(o.getCollectedfrom()) - .map(c -> c.stream().map(KeyValue::getValue).collect(Collectors.toCollection(HashSet::new))) - .orElse(new HashSet<>()); - return !collectedFromNames.contains("Datacite"); - }) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java deleted file mode 100644 index 4cff88d820..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java +++ /dev/null @@ -1,136 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.raw; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.*; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.clearspring.analytics.util.Lists; -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; -import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; - -public class CopyHdfsOafApplication extends AbstractMigrationApplication { - - private static final Logger log = LoggerFactory.getLogger(CopyHdfsOafApplication.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - CopyHdfsOafApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json"))); - parser.parseArgument(args); - - final Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String mdstoreManagerUrl = parser.get("mdstoreManagerUrl"); - log.info("mdstoreManagerUrl: {}", mdstoreManagerUrl); - - final String mdFormat = parser.get("mdFormat"); - log.info("mdFormat: {}", mdFormat); - - final String mdLayout = parser.get("mdLayout"); - log.info("mdLayout: {}", mdLayout); - - final String mdInterpretation = parser.get("mdInterpretation"); - log.info("mdInterpretation: {}", mdInterpretation); - - final String hdfsPath = parser.get("hdfsPath"); - log.info("hdfsPath: {}", hdfsPath); - - final String isLookupUrl = parser.get("isLookupUrl"); - log.info("isLookupUrl: {}", isLookupUrl); - - final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); - final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService); - - final Set paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation); - - final SparkConf conf = new SparkConf(); - runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, vocs, hdfsPath, paths)); - } - - public static void processPaths(final SparkSession spark, - final VocabularyGroup vocs, - final String outputPath, - final Set paths) { - - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - log.info("Found {} mdstores", paths.size()); - paths.forEach(log::info); - - final String[] validPaths = paths - .stream() - .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) - .toArray(String[]::new); - log.info("Non empty mdstores {}", validPaths.length); - - if (validPaths.length > 0) { - // load the dataset - Dataset oaf = spark - .read() - .load(validPaths) - .as(Encoders.kryo(Oaf.class)); - - // dispatch each entity type individually in the respective graph subdirectory in append mode - for (Map.Entry e : ModelSupport.oafTypes.entrySet()) { - oaf - .filter((FilterFunction) o -> o.getClass().getSimpleName().toLowerCase().equals(e.getKey())) - .map((MapFunction) OBJECT_MAPPER::writeValueAsString, Encoders.bean(e.getValue())) - .write() - .option("compression", "gzip") - .mode(SaveMode.Append) - .text(outputPath + "/" + e.getKey()); - } - } - } - - private static Relation getInverse(Relation rel, VocabularyGroup vocs) { - final Relation inverse = new Relation(); - - inverse.setProperties(rel.getProperties()); - inverse.setValidated(rel.getValidated()); - inverse.setValidationDate(rel.getValidationDate()); - inverse.setCollectedfrom(rel.getCollectedfrom()); - inverse.setDataInfo(rel.getDataInfo()); - inverse.setLastupdatetimestamp(rel.getLastupdatetimestamp()); - - inverse.setSource(rel.getTarget()); - inverse.setTarget(rel.getSource()); - inverse.setRelType(rel.getRelType()); - inverse.setSubRelType(rel.getSubRelType()); - - return inverse; - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala new file mode 100644 index 0000000000..c7ad1890de --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala @@ -0,0 +1,74 @@ +package eu.dnetlib.dhp.oa.graph.raw + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.common.HdfsSupport +import eu.dnetlib.dhp.schema.common.ModelSupport +import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo +import eu.dnetlib.dhp.schema.oaf.Oaf +import eu.dnetlib.dhp.utils.DHPUtils +import org.apache.commons.io.IOUtils +import org.apache.commons.lang3.StringUtils +import org.apache.http.client.methods.HttpGet +import org.apache.http.impl.client.HttpClients +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.{SparkConf, SparkContext} +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters._ +import scala.io.Source + +object CopyHdfsOafSparkApplication { + + def main(args: Array[String]): Unit = { + val log = LoggerFactory.getLogger(getClass) + val conf = new SparkConf() + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json")).mkString) + parser.parseArgument(args) + + val spark = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sc: SparkContext = spark.sparkContext + + val mdstoreManagerUrl = parser.get("mdstoreManagerUrl") + log.info("mdstoreManagerUrl: {}", mdstoreManagerUrl) + + val mdFormat = parser.get("mdFormat") + log.info("mdFormat: {}", mdFormat) + + val mdLayout = parser.get("mdLayout") + log.info("mdLayout: {}", mdLayout) + + val mdInterpretation = parser.get("mdInterpretation") + log.info("mdInterpretation: {}", mdInterpretation) + + val hdfsPath = parser.get("hdfsPath") + log.info("hdfsPath: {}", hdfsPath) + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + + val paths = DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala + + val validPaths: List[String] = paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList + + if (validPaths.nonEmpty) { + val oaf = spark.read.load(validPaths: _*).as[Oaf] + val mapper = new ObjectMapper() + val l =ModelSupport.oafTypes.entrySet.asScala.map(e => e.getKey).toList + l.foreach( + e => + oaf.filter(o => o.getClass.getSimpleName.equalsIgnoreCase(e)) + .map(s => mapper.writeValueAsString(s))(Encoders.STRING) + .write + .option("compression", "gzip") + .mode(SaveMode.Append) + .text(s"$hdfsPath/${e}") + ) + } + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java index 8cd495e08a..cba64899b5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.utils.DHPUtils; public class AbstractMigrationApplication implements Closeable { @@ -71,27 +72,7 @@ public class AbstractMigrationApplication implements Closeable { final String format, final String layout, final String interpretation) throws IOException { - final String url = mdstoreManagerUrl + "/mdstores/"; - final ObjectMapper objectMapper = new ObjectMapper(); - - final HttpGet req = new HttpGet(url); - - try (final CloseableHttpClient client = HttpClients.createDefault()) { - try (final CloseableHttpResponse response = client.execute(req)) { - final String json = IOUtils.toString(response.getEntity().getContent()); - final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); - return Arrays - .stream(mdstores) - .filter(md -> md.getFormat().equalsIgnoreCase(format)) - .filter(md -> md.getLayout().equalsIgnoreCase(layout)) - .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) - .filter(md -> StringUtils.isNotBlank(md.getHdfsPath())) - .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) - .filter(md -> md.getSize() > 0) - .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") - .collect(Collectors.toSet()); - } - } + return DHPUtils.mdstorePaths(mdstoreManagerUrl, format, layout, interpretation, false); } private Configuration getConf() { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala new file mode 100644 index 0000000000..5ca7d9782d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveRelation.scala @@ -0,0 +1,152 @@ +package eu.dnetlib.dhp.oa.graph.resolution + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.common.HdfsSupport +import eu.dnetlib.dhp.schema.oaf.{Relation, Result} +import eu.dnetlib.dhp.utils.DHPUtils +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST.{JField, JObject, JString} +import org.json4s.jackson.JsonMethods.parse +import org.slf4j.{Logger, LoggerFactory} + +object SparkResolveRelation { + def main(args: Array[String]): Unit = { + val log: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + + val graphBasePath = parser.get("graphBasePath") + log.info(s"graphBasePath -> $graphBasePath") + val workingPath = parser.get("workingPath") + log.info(s"workingPath -> $workingPath") + + implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) + import spark.implicits._ + + + //CLEANING TEMPORARY FOLDER + HdfsSupport.remove(workingPath, spark.sparkContext.hadoopConfiguration) + val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) + fs.mkdirs(new Path(workingPath)) + + extractPidResolvedTableFromJsonRDD(spark, graphBasePath, workingPath) + + val mapper: ObjectMapper = new ObjectMapper() + + val rPid: Dataset[(String, String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String, String)] + + val relationDs: Dataset[(String, Relation)] = spark.read.text(s"$graphBasePath/relation").as[String] + .map(s => mapper.readValue(s, classOf[Relation])).as[Relation] + .map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) + + relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map { + m => + val sourceResolved = m._2 + val currentRelation = m._1._2 + if (sourceResolved != null && sourceResolved._1 != null && sourceResolved._1.nonEmpty) + currentRelation.setSource(sourceResolved._1) + currentRelation + }.write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/relationResolvedSource") + + + val relationSourceResolved: Dataset[(String, Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation] + .map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) + relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map { + m => + val targetResolved = m._2 + val currentRelation = m._1._2 + if (targetResolved != null && targetResolved._1.nonEmpty) + currentRelation.setTarget(targetResolved._1) + currentRelation + }.filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved")) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/relation_resolved") + + + // TO BE conservative we keep the original relation in the working dir + // and save the relation resolved on the graphBasePath + //In future this two line of code should be removed + + fs.rename(new Path(s"$graphBasePath/relation"), new Path(s"$workingPath/relation")) + + spark.read.load(s"$workingPath/relation_resolved").as[Relation] + .map(r => mapper.writeValueAsString(r)) + .write + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .text(s"$graphBasePath/relation") + } + + + def extractPidsFromRecord(input: String): (String, List[(String, String)]) = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + val id: String = (json \ "id").extract[String] + val result: List[(String, String)] = for { + JObject(pids) <- json \\ "instance" \ "pid" + JField("value", JString(pidValue)) <- pids + JField("qualifier", JObject(qualifier)) <- pids + JField("classid", JString(pidType)) <- qualifier + } yield (pidValue, pidType) + + val alternateIds: List[(String, String)] = for { + JObject(pids) <- json \\ "alternateIdentifier" + JField("value", JString(pidValue)) <- pids + JField("qualifier", JObject(qualifier)) <- pids + JField("classid", JString(pidType)) <- qualifier + } yield (pidValue, pidType) + + (id, result ::: alternateIds) + } + + + private def isRelation(input: String): Boolean = { + + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + val source = (json \ "source").extractOrElse[String](null) + + source != null + } + + private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, graphPath: String, workingPath: String) = { + import spark.implicits._ + + val d: RDD[(String, String)] = spark.sparkContext.textFile(s"$graphPath/*") + .filter(i => !isRelation(i)) + .map(i => extractPidsFromRecord(i)) + .filter(s => s != null && s._1 != null && s._2 != null && s._2.nonEmpty) + .flatMap { p => + p._2.map(pid => + (p._1, DHPUtils.generateUnresolvedIdentifier(pid._1, pid._2)) + ) + }.filter(r => r._1 != null || r._2 != null) + + spark.createDataset(d) + .groupByKey(_._2) + .reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y) + .map(s => s._2) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/relationResolvedPid") + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala deleted file mode 100644 index 1b13b81c77..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ /dev/null @@ -1,154 +0,0 @@ -package eu.dnetlib.dhp.sx.graph - -import com.fasterxml.jackson.databind.ObjectMapper -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Relation, Result} -import org.apache.commons.io.IOUtils -import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD -import org.apache.spark.sql._ -import org.json4s -import org.json4s.DefaultFormats -import org.json4s.JsonAST.{JField, JObject, JString} -import org.json4s.jackson.JsonMethods.parse -import org.slf4j.{Logger, LoggerFactory} - -import scala.collection.JavaConverters._ -object SparkResolveRelation { - def main(args: Array[String]): Unit = { - val log: Logger = LoggerFactory.getLogger(getClass) - val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json"))) - parser.parseArgument(args) - val spark: SparkSession = - SparkSession - .builder() - .config(conf) - .appName(getClass.getSimpleName) - .master(parser.get("master")).getOrCreate() - - - val relationPath = parser.get("relationPath") - log.info(s"sourcePath -> $relationPath") - val entityPath = parser.get("entityPath") - log.info(s"entityPath -> $entityPath") - val workingPath = parser.get("workingPath") - log.info(s"workingPath -> $workingPath") - - implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) - import spark.implicits._ - - - extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath) - - val mappper = new ObjectMapper() - - val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String,String)] - - val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) - - relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map{ - m => - val sourceResolved = m._2 - val currentRelation = m._1._2 - if (sourceResolved!=null && sourceResolved._1!=null && sourceResolved._1.nonEmpty) - currentRelation.setSource(sourceResolved._1) - currentRelation - }.write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/relationResolvedSource") - - - val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) - relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map{ - m => - val targetResolved = m._2 - val currentRelation = m._1._2 - if (targetResolved!=null && targetResolved._1.nonEmpty) - currentRelation.setTarget(targetResolved._1) - currentRelation - }.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50")) - .write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/relation_resolved") - - spark.read.load(s"$workingPath/relation_resolved").as[Relation] - .map(r => mappper.writeValueAsString(r)) - .rdd.saveAsTextFile(s"$workingPath/relation", classOf[GzipCodec]) - - } - - - def extractPidsFromRecord(input:String):(String,List[(String,String)]) = { - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(input) - val id:String = (json \ "id").extract[String] - val result: List[(String,String)] = for { - JObject(pids) <- json \ "pid" - JField("value", JString(pidValue)) <- pids - JField("qualifier", JObject(qualifier)) <- pids - JField("classname", JString(pidType)) <- qualifier - } yield (pidValue, pidType) - - val alternateIds: List[(String,String)] = for { - JObject(pids) <- json \\ "alternateIdentifier" - JField("value", JString(pidValue)) <- pids - JField("qualifier", JObject(qualifier)) <- pids - JField("classname", JString(pidType)) <- qualifier - } yield (pidValue, pidType) - - (id,result:::alternateIds) - } - - private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, entityPath: String, workingPath: String) = { - import spark.implicits._ - - val d: RDD[(String,String)] = spark.sparkContext.textFile(s"$entityPath/*") - .map(i => extractPidsFromRecord(i)) - .filter(s => s != null && s._1!= null && s._2!=null && s._2.nonEmpty) - .flatMap{ p => - p._2.map(pid => - (p._1, convertPidToDNETIdentifier(pid._1, pid._2)) - ) - }.filter(r =>r._1 != null || r._2 != null) - - spark.createDataset(d) - .groupByKey(_._2) - .reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y) - .map(s => s._2) - .write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/relationResolvedPid") - } - - - /* - This method should be used once we finally convert everythings in Kryo dataset - instead of using rdd of json - */ - private def extractPidResolvedTableFromKryo(spark: SparkSession, entityPath: String, workingPath: String) = { - import spark.implicits._ - implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) - val entities: Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result] - entities.flatMap(e => e.getPid.asScala - .map(p => - convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid)) - .filter(s => s != null) - .map(s => (s, e.getId)) - ).groupByKey(_._1) - .reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) - .map(s => s._2) - .write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/relationResolvedPid") - } - - def convertPidToDNETIdentifier(pid:String, pidType: String):String = { - if (pid==null || pid.isEmpty || pidType== null || pidType.isEmpty) - null - else - s"unresolved::${pid.toLowerCase}::${pidType.toLowerCase}" - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json index 1e862198f1..d1b16b09a6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json @@ -23,16 +23,16 @@ "paramDescription": "metadata layout", "paramRequired": true }, + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "should be yarn or local", + "paramRequired": true + }, { "paramName": "i", "paramLongName": "mdInterpretation", "paramDescription": "metadata interpretation", "paramRequired": true - }, - { - "paramName": "isu", - "paramLongName": "isLookupUrl", - "paramDescription": "the url of the ISLookupService", - "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index c4cca52f62..307e262672 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -553,7 +553,7 @@ yarn cluster ImportOAF_hdfs_graph - eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafApplication + eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication dhp-graph-mapper-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} @@ -568,8 +568,8 @@ --mdstoreManagerUrl${mdstoreManagerUrl} --mdFormatOAF --mdLayoutstore + --masteryarn --mdInterpretationgraph - --isLookupUrl${isLookupUrl} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml similarity index 52% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml index 7683ff94cd..31cc53ae3c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/oozie_app/workflow.xml @@ -1,59 +1,37 @@ - entityPath - the path of deduplicate Entities + graphBasePath + the path of the graph - - relationPath - the path of relation unresolved - - - targetPath - the path of relation unresolved - - - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - yarn cluster Resolve Relations in raw graph - eu.dnetlib.dhp.sx.graph.SparkResolveRelation + eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelation dhp-graph-mapper-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.shuffle.partitions=3000 + --conf spark.sql.shuffle.partitions=8000 --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --masteryarn - --relationPath${relationPath} - --workingPath${targetPath} - --entityPath${entityPath} + --graphBasePath${graphBasePath} + --workingPath${workingDir} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json similarity index 50% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json index f211adb9a4..1fbe206481 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json @@ -1,6 +1,5 @@ [ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"r", "paramLongName":"relationPath", "paramDescription": "the source Path", "paramRequired": true}, {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true}, - {"paramName":"e", "paramLongName":"entityPath", "paramDescription": "the path of the raw graph", "paramRequired": true} + {"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala index 5b7fbe1cf5..bd7e4fd094 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala @@ -1,10 +1,10 @@ package eu.dnetlib.dhp.sx.graph.scholix import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature} +import eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelation import eu.dnetlib.dhp.schema.oaf.{Relation, Result} import eu.dnetlib.dhp.schema.sx.scholix.Scholix import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary -import eu.dnetlib.dhp.sx.graph.SparkResolveRelation import eu.dnetlib.dhp.sx.graph.bio.pubmed.AbstractVocabularyTest import org.json4s import org.json4s.DefaultFormats diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/updateCache.sh b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/updateCache.sh index dc19f84b4b..03aa535e10 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/updateCache.sh +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/updateCache.sh @@ -1,4 +1,4 @@ #!/usr/bin/env bash curl --request GET $1/cache/updateCache -sleep 20h \ No newline at end of file +sleep 6h \ No newline at end of file