Merge branch 'beta' into hierarchical_orgs_relations

This commit is contained in:
Claudio Atzori 2021-10-27 15:36:29 +02:00
commit d02caef185
42 changed files with 559 additions and 706 deletions

View File

@ -28,7 +28,7 @@ public class HdfsSupport {
* @param configuration Configuration of hadoop env
*/
public static boolean exists(String path, Configuration configuration) {
logger.info("Removing path: {}", path);
logger.info("Checking existence for path: {}", path);
return rethrowAsRuntimeException(
() -> {
Path f = new Path(path);

View File

@ -4,19 +4,19 @@ package eu.dnetlib.dhp.utils;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Base64OutputStream;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
@ -26,6 +26,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import net.minidev.json.JSONArray;
import scala.collection.JavaConverters;
import scala.collection.Seq;
@ -52,10 +54,56 @@ public class DHPUtils {
}
}
/**
* Retrieves from the metadata store manager application the list of paths associated with mdstores characterized
* by he given format, layout, interpretation
* @param mdstoreManagerUrl the URL of the mdstore manager service
* @param format the mdstore format
* @param layout the mdstore layout
* @param interpretation the mdstore interpretation
* @param includeEmpty include Empty mdstores
* @return the set of hdfs paths
* @throws IOException in case of HTTP communication issues
*/
public static Set<String> mdstorePaths(final String mdstoreManagerUrl,
final String format,
final String layout,
final String interpretation,
boolean includeEmpty) throws IOException {
final String url = mdstoreManagerUrl + "/mdstores/";
final ObjectMapper objectMapper = new ObjectMapper();
final HttpGet req = new HttpGet(url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
final String json = IOUtils.toString(response.getEntity().getContent());
final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class);
return Arrays
.stream(mdstores)
.filter(md -> md.getFormat().equalsIgnoreCase(format))
.filter(md -> md.getLayout().equalsIgnoreCase(layout))
.filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation))
.filter(md -> StringUtils.isNotBlank(md.getHdfsPath()))
.filter(md -> StringUtils.isNotBlank(md.getCurrentVersion()))
.filter(md -> includeEmpty || md.getSize() > 0)
.map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store")
.collect(Collectors.toSet());
}
}
}
public static String generateIdentifier(final String originalId, final String nsPrefix) {
return String.format("%s::%s", nsPrefix, DHPUtils.md5(originalId));
}
public static String generateUnresolvedIdentifier(final String pid, final String pidType) {
final String cleanedPid = CleaningFunctions.normalizePidValue(pidType, pid);
return String.format("unresolved::%s::%s", cleanedPid, pidType.toLowerCase().trim());
}
public static String getJPathString(final String jsonPath, final String json) {
try {
Object o = JsonPath.read(json, jsonPath);

View File

@ -1,41 +0,0 @@
package eu.dnetlib.dhp.actionmanager.datacite
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object ExportActionSetJobNode {
val log: Logger = LoggerFactory.getLogger(ExportActionSetJobNode.getClass)
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
val targetPath = parser.get("targetPath")
val spark: SparkSession = SparkSession.builder().config(conf)
.appName(ExportActionSetJobNode.getClass.getSimpleName)
.master(master)
.getOrCreate()
implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val tEncoder:Encoder[(String,String)] = Encoders.tuple(Encoders.STRING,Encoders.STRING)
spark.read.load(sourcePath).as[Oaf]
.map(o =>DataciteToOAFTransformation.toActionSet(o))
.filter(o => o!= null)
.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
}
}

View File

@ -1,46 +0,0 @@
package eu.dnetlib.dhp.actionmanager.datacite
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord
import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object FilterCrossrefEntitiesSpark {
val log: Logger = LoggerFactory.getLogger(getClass.getClass)
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
log.info("sourcePath: {}", sourcePath)
val targetPath = parser.get("targetPath")
log.info("targetPath: {}", targetPath)
val spark: SparkSession = SparkSession.builder().config(conf)
.appName(getClass.getSimpleName)
.master(master)
.getOrCreate()
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val resEncoder: Encoder[Result] = Encoders.kryo[Result]
val d:Dataset[Oaf]= spark.read.load(sourcePath).as[Oaf]
d.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]).write.mode(SaveMode.Overwrite).save(targetPath)
}
}

View File

@ -0,0 +1,49 @@
package eu.dnetlib.dhp.collection
import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation}
object CollectionUtils {
/**
* This method in pipeline to the transformation phase,
* generates relations in both verse, typically it should be a phase of flatMap
*
* @param i input OAF
* @return
* If the input OAF is an entity -> List(i)
* If the input OAF is a relation -> List(relation, inverseRelation)
*
*/
def fixRelations(i: Oaf): List[Oaf] = {
if (i.isInstanceOf[OafEntity])
return List(i)
else {
val r: Relation = i.asInstanceOf[Relation]
val currentRel = ModelSupport.findRelation(r.getRelClass)
if (currentRel != null) {
// Cleaning relation
r.setRelType(currentRel.getRelType)
r.setSubRelType(currentRel.getSubReltype)
r.setRelClass(currentRel.getRelClass)
val inverse = new Relation
inverse.setSource(r.getTarget)
inverse.setTarget(r.getSource)
inverse.setRelType(currentRel.getRelType)
inverse.setSubRelType(currentRel.getSubReltype)
inverse.setRelClass(currentRel.getInverseRelClass)
inverse.setCollectedfrom(r.getCollectedfrom)
inverse.setDataInfo(r.getDataInfo)
inverse.setProperties(r.getProperties)
inverse.setLastupdatetimestamp(r.getLastupdatetimestamp)
inverse.setValidated(r.getValidated)
inverse.setValidationDate(r.getValidationDate)
return List(r, inverse)
}
}
List()
}
}

View File

@ -1,12 +1,10 @@
package eu.dnetlib.dhp.actionmanager.datacite
package eu.dnetlib.dhp.datacite
import org.apache.commons.io.IOUtils
import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.{HttpGet, HttpPost, HttpRequestBase, HttpUriRequest}
import org.apache.http.client.methods.{HttpGet, HttpPost, HttpUriRequest}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.{HttpClientBuilder, HttpClients}
import java.io.IOException
import org.apache.http.impl.client.HttpClientBuilder
abstract class AbstractRestClient extends Iterator[String] {

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.actionmanager.datacite
package eu.dnetlib.dhp.datacite
import org.json4s.{DefaultFormats, JValue}
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import org.json4s.{DefaultFormats, JValue}
class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10, until:Long = -1) extends AbstractRestClient {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.actionmanager.datacite
package eu.dnetlib.dhp.datacite
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
@ -325,8 +325,9 @@ object DataciteToOAFTransformation {
val grantId = m.matcher(awardUri).replaceAll("$2")
val targetId = s"$p${DHPUtils.md5(grantId)}"
List(
generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo),
generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo)
generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo)
// REMOVED INVERSE RELATION since there is a specific method that should generate later
// generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo)
)
}
else
@ -580,11 +581,11 @@ object DataciteToOAFTransformation {
rel.setProperties(List(dateProps).asJava)
rel.setSource(id)
rel.setTarget(s"unresolved::${r.relatedIdentifier}::${r.relatedIdentifierType}")
rel.setTarget(DHPUtils.generateUnresolvedIdentifier(r.relatedIdentifier,r.relatedIdentifierType))
rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava)
rel.getCollectedfrom.asScala.map(c => c.getValue)(collection.breakOut)
rel.getCollectedfrom.asScala.map(c => c.getValue).toList
rel
})(collection breakOut)
}).toList
}
def generateDataInfo(trust: String): DataInfo = {

View File

@ -1,9 +1,14 @@
package eu.dnetlib.dhp.actionmanager.datacite
package eu.dnetlib.dhp.datacite
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations
import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH
import eu.dnetlib.dhp.common.Constants.MDSTORE_SIZE_PATH
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord
import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord}
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
@ -17,11 +22,10 @@ object GenerateDataciteDatasetSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json")).mkString)
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/datacite/generate_dataset_params.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
val targetPath = parser.get("targetPath")
val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks"))
val isLookupUrl: String = parser.get("isLookupUrl")
log.info("isLookupUrl: {}", isLookupUrl)
@ -33,16 +37,28 @@ object GenerateDataciteDatasetSpark {
.master(master)
.getOrCreate()
import spark.implicits._
implicit val mrEncoder: Encoder[MetadataRecord] = Encoders.kryo[MetadataRecord]
implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
import spark.implicits._
val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
val mapper = new ObjectMapper()
val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
val outputBasePath = cleanedMdStoreVersion.getHdfsPath
log.info("outputBasePath: {}", outputBasePath)
val targetPath = s"$outputBasePath/$MDSTORE_DATA_PATH"
spark.read.load(sourcePath).as[DataciteType]
.filter(d => d.isActive)
.flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks))
.filter(d => d != null)
.flatMap(i => fixRelations(i)).filter(i => i != null)
.write.mode(SaveMode.Overwrite).save(targetPath)
val total_items = spark.read.load(targetPath).as[Oaf].count()
writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH)
}
}
}

View File

@ -1,6 +1,5 @@
package eu.dnetlib.dhp.actionmanager.datacite
package eu.dnetlib.dhp.datacite
import eu.dnetlib.dhp.actionmanager.datacite.DataciteToOAFTransformation.df_it
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path}
@ -9,14 +8,14 @@ import org.apache.hadoop.io.{IntWritable, SequenceFile, Text}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession}
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import org.apache.spark.sql.functions.max
import org.slf4j.{Logger, LoggerFactory}
import java.time.format.DateTimeFormatter._
import java.time.{LocalDate, LocalDateTime, ZoneOffset}
import java.time.format.DateTimeFormatter.ISO_DATE_TIME
import java.time.{LocalDateTime, ZoneOffset}
import scala.io.Source
object ImportDatacite {
@ -138,11 +137,11 @@ object ImportDatacite {
}
}
private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs:Int): Long = {
var from:Long = timestamp * 1000
val delta:Long = 100000000L
private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs: Int): Long = {
var from: Long = timestamp * 1000
val delta: Long = 100000000L
var client: DataciteAPIImporter = null
val now :Long =System.currentTimeMillis()
val now: Long = System.currentTimeMillis()
var i = 0
try {
val writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfsTargetPath), SequenceFile.Writer.keyClass(classOf[IntWritable]), SequenceFile.Writer.valueClass(classOf[Text]))
@ -168,7 +167,7 @@ object ImportDatacite {
start = System.currentTimeMillis
}
}
println(s"updating from value: $from -> ${from+delta}")
println(s"updating from value: $from -> ${from + delta}")
from = from + delta
}
} catch {
@ -183,4 +182,4 @@ object ImportDatacite {
i
}
}
}

View File

@ -1,18 +1,14 @@
package eu.dnetlib.dhp.actionmanager.datacite
package eu.dnetlib.dhp.datacite
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.LocalFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import java.text.SimpleDateFormat
import java.util.{Date, Locale}
import java.util.Locale
import scala.io.Source
object SparkDownloadUpdateDatacite {
@ -21,7 +17,7 @@ object SparkDownloadUpdateDatacite {
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json")).mkString)
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/datacite/generate_dataset_params.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
@ -42,9 +38,9 @@ object SparkDownloadUpdateDatacite {
import spark.implicits._
val maxDate:String = spark.read.load(workingPath).as[Oaf].filter(s => s.isInstanceOf[Result]).map(r => r.asInstanceOf[Result].getDateofcollection).select(max("value")).first().getString(0)
val maxDate: String = spark.read.load(workingPath).as[Oaf].filter(s => s.isInstanceOf[Result]).map(r => r.asInstanceOf[Result].getDateofcollection).select(max("value")).first().getString(0)
val ISO8601FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.US)
val string_to_date =ISO8601FORMAT.parse(maxDate)
val string_to_date = ISO8601FORMAT.parse(maxDate)
val ts = string_to_date.getTime

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.sx.bio
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf
import BioDBToOAF.ScholixResolved
import eu.dnetlib.dhp.collection.CollectionUtils
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
@ -35,13 +36,13 @@ object SparkTransformBioDatabaseToOAF {
import spark.implicits._
database.toUpperCase() match {
case "UNIPROT" =>
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).write.mode(SaveMode.Overwrite).save(targetPath)
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
case "PDB" =>
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath)
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
case "SCHOLIX" =>
spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).write.mode(SaveMode.Overwrite).save(targetPath)
spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
case "CROSSREF_LINKS" =>
spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).write.mode(SaveMode.Overwrite).save(targetPath)
spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
}
}

View File

@ -5,6 +5,7 @@ import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.sx.bio.BioDBToOAF
import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem
import BioDBToOAF.EBILinkItem
import eu.dnetlib.dhp.collection.CollectionUtils
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
@ -37,6 +38,7 @@ object SparkEBILinksToOaf {
ebLinks.flatMap(j => BioDBToOAF.parse_ebi_links(j.links))
.filter(p => BioDBToOAF.EBITargetLinksFilter(p))
.flatMap(p => BioDBToOAF.convertEBILinksToOaf(p))
.flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null)
.write.mode(SaveMode.Overwrite).save(targetPath)
}
}

View File

@ -1,81 +0,0 @@
<workflow-app name="Import_Datacite_and_transform_to_OAF" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>mainPath</name>
<description>the working path of Datacite stores</description>
</property>
<property>
<name>isLookupUrl</name>
<description>The IS lookUp service endopoint</description>
</property>
<property>
<name>blocksize</name>
<value>100</value>
<description>The request block size</description>
</property>
</parameters>
<start to="ImportDatacite"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ImportDatacite">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ImportDatacite</name>
<class>eu.dnetlib.dhp.actionmanager.datacite.ImportDatacite</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--targetPath</arg><arg>${mainPath}/datacite_update</arg>
<arg>--dataciteDumpPath</arg><arg>${mainPath}/datacite_dump</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
<arg>--blocksize</arg><arg>${blocksize}</arg>
</spark>
<ok to="TransformJob"/>
<error to="Kill"/>
</action>
<action name="TransformJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>TransformJob</name>
<class>eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${mainPath}/datacite_dump</arg>
<arg>--targetPath</arg><arg>${mainPath}/datacite_oaf</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--exportLinks</arg><arg>false</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,23 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -1,84 +0,0 @@
<workflow-app name="Generate_Datacite_and_Crossref_dump_for_Scholexplorer" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>datacitePath</name>
<description>the path of Datacite spark dataset</description>
</property>
<property>
<name>isLookupUrl</name>
<description>The IS lookUp service endopoint</description>
</property>
<property>
<name>crossrefPath</name>
<description>the path of Crossref spark dataset</description>
</property>
<property>
<name>targetPath</name>
<description>the path of Crossref spark dataset</description>
</property>
</parameters>
<start to="ImportDatacite"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ImportDatacite">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ImportDatacite</name>
<class>eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${datacitePath}</arg>
<arg>--targetPath</arg><arg>${targetPath}/datacite_oaf</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--exportLinks</arg><arg>true</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="FilterCrossrefEntities"/>
<error to="Kill"/>
</action>
<action name="FilterCrossrefEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>FilterCrossrefEntities</name>
<class>eu.dnetlib.dhp.actionmanager.datacite.FilterCrossrefEntitiesSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${crossrefPath}</arg>
<arg>--targetPath</arg><arg>${targetPath}/crossref_oaf</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,46 +1,52 @@
<workflow-app name="Datacite_to_ActionSet_Workflow" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="Collect_Datacite" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<name>mainPath</name>
<description>the working path of Datacite stores</description>
</property>
<property>
<name>outputPath</name>
<description>the path of Datacite ActionSet</description>
<name>isLookupUrl</name>
<description>The IS lookUp service endopoint</description>
</property>
<property>
<name>blocksize</name>
<value>100</value>
<description>The request block size</description>
</property>
</parameters>
<start to="ExportDataset"/>
<start to="ImportDatacite"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ExportDataset">
<action name="ImportDatacite">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExportDataset</name>
<class>eu.dnetlib.dhp.actionmanager.datacite.ExportActionSetJobNode</class>
<name>ImportDatacite</name>
<class>eu.dnetlib.dhp.datacite.ImportDatacite</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${outputPath}</arg>
<arg>--targetPath</arg><arg>${mainPath}/datacite_update</arg>
<arg>--dataciteDumpPath</arg><arg>${mainPath}/datacite_dump</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
<arg>--blocksize</arg><arg>${blocksize}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -7,8 +7,8 @@
},
{
"paramName": "t",
"paramLongName": "targetPath",
"paramName": "mo",
"paramLongName": "mdstoreOutputVersion",
"paramDescription": "the target mdstore path",
"paramRequired": true
},

View File

@ -0,0 +1,126 @@
<workflow-app name="transform_Datacite" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>mainPath</name>
<description>the working path of Datacite stores</description>
</property>
<property>
<name>isLookupUrl</name>
<description>The IS lookUp service endopoint</description>
</property>
<property>
<name>mdStoreOutputId</name>
<description>the identifier of the cleaned MDStore</description>
</property>
<property>
<name>mdStoreManagerURI</name>
<description>the path of the cleaned mdstore</description>
</property>
</parameters>
<start to="StartTransaction"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="StartTransaction">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>NEW_VERSION</arg>
<arg>--mdStoreID</arg><arg>${mdStoreOutputId}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<capture-output/>
</java>
<ok to="TransformJob"/>
<error to="EndReadRollBack"/>
</action>
<action name="TransformJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>TransformJob</name>
<class>eu.dnetlib.dhp.datacite.GenerateDataciteDatasetSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${mainPath}/datacite_dump</arg>
<arg>--mdstoreOutputVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--exportLinks</arg><arg>true</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="CommitVersion"/>
<error to="Kill"/>
</action>
<action name="CommitVersion">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>COMMIT</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="EndReadRollBack">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>READ_UNLOCK</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<arg>--readMDStoreId</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg>
<capture-output/>
</java>
<ok to="RollBack"/>
<error to="Kill"/>
</action>
<action name="RollBack">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>ROLLBACK</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
</java>
<ok to="Kill"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -52,7 +52,7 @@
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Incremental Download EBI Links</name>
<class>eu.dnetllib.dhp.sx.bio.ebi.SparkDownloadEBILinks</class>
<class>eu.dnetlib.dhp.sx.bio.ebi.SparkDownloadEBILinks</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
@ -85,7 +85,7 @@
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create OAF DataSet</name>
<class>eu.dnetllib.dhp.sx.bio.ebi.SparkEBILinksToOaf</class>
<class>eu.dnetlib.dhp.sx.bio.ebi.SparkEBILinksToOaf</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}

View File

@ -30,7 +30,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Convert Baseline to OAF Dataset</name>
<class>eu.dnetllib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame</class>
<class>eu.dnetlib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}

View File

@ -1,8 +1,7 @@
package eu.dnetlib.dhp.actionmanager.datacite
package eu.dnetlib.dhp.datacite
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest
import eu.dnetlib.dhp.schema.oaf.Oaf
import org.junit.jupiter.api.extension.ExtendWith

View File

@ -127,13 +127,6 @@ public class MergeGraphTableSparkJob {
}
}, Encoders.bean(p_clazz))
.filter((FilterFunction<P>) Objects::nonNull)
.filter((FilterFunction<P>) o -> {
HashSet<String> collectedFromNames = Optional
.ofNullable(o.getCollectedfrom())
.map(c -> c.stream().map(KeyValue::getValue).collect(Collectors.toCollection(HashSet::new)))
.orElse(new HashSet<>());
return !collectedFromNames.contains("Datacite");
})
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")

View File

@ -1,136 +0,0 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.clearspring.analytics.util.Lists;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class CopyHdfsOafApplication extends AbstractMigrationApplication {
private static final Logger log = LoggerFactory.getLogger(CopyHdfsOafApplication.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
CopyHdfsOafApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String mdstoreManagerUrl = parser.get("mdstoreManagerUrl");
log.info("mdstoreManagerUrl: {}", mdstoreManagerUrl);
final String mdFormat = parser.get("mdFormat");
log.info("mdFormat: {}", mdFormat);
final String mdLayout = parser.get("mdLayout");
log.info("mdLayout: {}", mdLayout);
final String mdInterpretation = parser.get("mdInterpretation");
log.info("mdInterpretation: {}", mdInterpretation);
final String hdfsPath = parser.get("hdfsPath");
log.info("hdfsPath: {}", hdfsPath);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService);
final Set<String> paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, vocs, hdfsPath, paths));
}
public static void processPaths(final SparkSession spark,
final VocabularyGroup vocs,
final String outputPath,
final Set<String> paths) {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
log.info("Found {} mdstores", paths.size());
paths.forEach(log::info);
final String[] validPaths = paths
.stream()
.filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()))
.toArray(String[]::new);
log.info("Non empty mdstores {}", validPaths.length);
if (validPaths.length > 0) {
// load the dataset
Dataset<Oaf> oaf = spark
.read()
.load(validPaths)
.as(Encoders.kryo(Oaf.class));
// dispatch each entity type individually in the respective graph subdirectory in append mode
for (Map.Entry<String, Class> e : ModelSupport.oafTypes.entrySet()) {
oaf
.filter((FilterFunction<Oaf>) o -> o.getClass().getSimpleName().toLowerCase().equals(e.getKey()))
.map((MapFunction<Oaf, String>) OBJECT_MAPPER::writeValueAsString, Encoders.bean(e.getValue()))
.write()
.option("compression", "gzip")
.mode(SaveMode.Append)
.text(outputPath + "/" + e.getKey());
}
}
}
private static Relation getInverse(Relation rel, VocabularyGroup vocs) {
final Relation inverse = new Relation();
inverse.setProperties(rel.getProperties());
inverse.setValidated(rel.getValidated());
inverse.setValidationDate(rel.getValidationDate());
inverse.setCollectedfrom(rel.getCollectedfrom());
inverse.setDataInfo(rel.getDataInfo());
inverse.setLastupdatetimestamp(rel.getLastupdatetimestamp());
inverse.setSource(rel.getTarget());
inverse.setTarget(rel.getSource());
inverse.setRelType(rel.getRelType());
inverse.setSubRelType(rel.getSubRelType());
return inverse;
}
}

View File

@ -0,0 +1,74 @@
package eu.dnetlib.dhp.oa.graph.raw
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClients
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
import scala.io.Source
object CopyHdfsOafSparkApplication {
def main(args: Array[String]): Unit = {
val log = LoggerFactory.getLogger(getClass)
val conf = new SparkConf()
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json")).mkString)
parser.parseArgument(args)
val spark =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sc: SparkContext = spark.sparkContext
val mdstoreManagerUrl = parser.get("mdstoreManagerUrl")
log.info("mdstoreManagerUrl: {}", mdstoreManagerUrl)
val mdFormat = parser.get("mdFormat")
log.info("mdFormat: {}", mdFormat)
val mdLayout = parser.get("mdLayout")
log.info("mdLayout: {}", mdLayout)
val mdInterpretation = parser.get("mdInterpretation")
log.info("mdInterpretation: {}", mdInterpretation)
val hdfsPath = parser.get("hdfsPath")
log.info("hdfsPath: {}", hdfsPath)
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
val paths = DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala
val validPaths: List[String] = paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList
if (validPaths.nonEmpty) {
val oaf = spark.read.load(validPaths: _*).as[Oaf]
val mapper = new ObjectMapper()
val l =ModelSupport.oafTypes.entrySet.asScala.map(e => e.getKey).toList
l.foreach(
e =>
oaf.filter(o => o.getClass.getSimpleName.equalsIgnoreCase(e))
.map(s => mapper.writeValueAsString(s))(Encoders.STRING)
.write
.option("compression", "gzip")
.mode(SaveMode.Append)
.text(s"$hdfsPath/${e}")
)
}
}
}

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.utils.DHPUtils;
public class AbstractMigrationApplication implements Closeable {
@ -71,27 +72,7 @@ public class AbstractMigrationApplication implements Closeable {
final String format,
final String layout,
final String interpretation) throws IOException {
final String url = mdstoreManagerUrl + "/mdstores/";
final ObjectMapper objectMapper = new ObjectMapper();
final HttpGet req = new HttpGet(url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
final String json = IOUtils.toString(response.getEntity().getContent());
final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class);
return Arrays
.stream(mdstores)
.filter(md -> md.getFormat().equalsIgnoreCase(format))
.filter(md -> md.getLayout().equalsIgnoreCase(layout))
.filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation))
.filter(md -> StringUtils.isNotBlank(md.getHdfsPath()))
.filter(md -> StringUtils.isNotBlank(md.getCurrentVersion()))
.filter(md -> md.getSize() > 0)
.map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store")
.collect(Collectors.toSet());
}
}
return DHPUtils.mdstorePaths(mdstoreManagerUrl, format, layout, interpretation, false);
}
private Configuration getConf() {

View File

@ -0,0 +1,152 @@
package eu.dnetlib.dhp.oa.graph.resolution
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.oaf.{Relation, Result}
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.parse
import org.slf4j.{Logger, LoggerFactory}
object SparkResolveRelation {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/resolution/resolve_relations_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val graphBasePath = parser.get("graphBasePath")
log.info(s"graphBasePath -> $graphBasePath")
val workingPath = parser.get("workingPath")
log.info(s"workingPath -> $workingPath")
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
import spark.implicits._
//CLEANING TEMPORARY FOLDER
HdfsSupport.remove(workingPath, spark.sparkContext.hadoopConfiguration)
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(new Path(workingPath))
extractPidResolvedTableFromJsonRDD(spark, graphBasePath, workingPath)
val mapper: ObjectMapper = new ObjectMapper()
val rPid: Dataset[(String, String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String, String)]
val relationDs: Dataset[(String, Relation)] = spark.read.text(s"$graphBasePath/relation").as[String]
.map(s => mapper.readValue(s, classOf[Relation])).as[Relation]
.map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map {
m =>
val sourceResolved = m._2
val currentRelation = m._1._2
if (sourceResolved != null && sourceResolved._1 != null && sourceResolved._1.nonEmpty)
currentRelation.setSource(sourceResolved._1)
currentRelation
}.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/relationResolvedSource")
val relationSourceResolved: Dataset[(String, Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation]
.map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map {
m =>
val targetResolved = m._2
val currentRelation = m._1._2
if (targetResolved != null && targetResolved._1.nonEmpty)
currentRelation.setTarget(targetResolved._1)
currentRelation
}.filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved"))
.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/relation_resolved")
// TO BE conservative we keep the original relation in the working dir
// and save the relation resolved on the graphBasePath
//In future this two line of code should be removed
fs.rename(new Path(s"$graphBasePath/relation"), new Path(s"$workingPath/relation"))
spark.read.load(s"$workingPath/relation_resolved").as[Relation]
.map(r => mapper.writeValueAsString(r))
.write
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.text(s"$graphBasePath/relation")
}
def extractPidsFromRecord(input: String): (String, List[(String, String)]) = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
val id: String = (json \ "id").extract[String]
val result: List[(String, String)] = for {
JObject(pids) <- json \\ "instance" \ "pid"
JField("value", JString(pidValue)) <- pids
JField("qualifier", JObject(qualifier)) <- pids
JField("classid", JString(pidType)) <- qualifier
} yield (pidValue, pidType)
val alternateIds: List[(String, String)] = for {
JObject(pids) <- json \\ "alternateIdentifier"
JField("value", JString(pidValue)) <- pids
JField("qualifier", JObject(qualifier)) <- pids
JField("classid", JString(pidType)) <- qualifier
} yield (pidValue, pidType)
(id, result ::: alternateIds)
}
private def isRelation(input: String): Boolean = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
val source = (json \ "source").extractOrElse[String](null)
source != null
}
private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, graphPath: String, workingPath: String) = {
import spark.implicits._
val d: RDD[(String, String)] = spark.sparkContext.textFile(s"$graphPath/*")
.filter(i => !isRelation(i))
.map(i => extractPidsFromRecord(i))
.filter(s => s != null && s._1 != null && s._2 != null && s._2.nonEmpty)
.flatMap { p =>
p._2.map(pid =>
(p._1, DHPUtils.generateUnresolvedIdentifier(pid._1, pid._2))
)
}.filter(r => r._1 != null || r._2 != null)
spark.createDataset(d)
.groupByKey(_._2)
.reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y)
.map(s => s._2)
.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/relationResolvedPid")
}
}

View File

@ -1,154 +0,0 @@
package eu.dnetlib.dhp.sx.graph
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Relation, Result}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.parse
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
object SparkResolveRelation {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/resolve_relations_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val relationPath = parser.get("relationPath")
log.info(s"sourcePath -> $relationPath")
val entityPath = parser.get("entityPath")
log.info(s"entityPath -> $entityPath")
val workingPath = parser.get("workingPath")
log.info(s"workingPath -> $workingPath")
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
import spark.implicits._
extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath)
val mappper = new ObjectMapper()
val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String,String)]
val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map{
m =>
val sourceResolved = m._2
val currentRelation = m._1._2
if (sourceResolved!=null && sourceResolved._1!=null && sourceResolved._1.nonEmpty)
currentRelation.setSource(sourceResolved._1)
currentRelation
}.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/relationResolvedSource")
val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map{
m =>
val targetResolved = m._2
val currentRelation = m._1._2
if (targetResolved!=null && targetResolved._1.nonEmpty)
currentRelation.setTarget(targetResolved._1)
currentRelation
}.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50"))
.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/relation_resolved")
spark.read.load(s"$workingPath/relation_resolved").as[Relation]
.map(r => mappper.writeValueAsString(r))
.rdd.saveAsTextFile(s"$workingPath/relation", classOf[GzipCodec])
}
def extractPidsFromRecord(input:String):(String,List[(String,String)]) = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
val id:String = (json \ "id").extract[String]
val result: List[(String,String)] = for {
JObject(pids) <- json \ "pid"
JField("value", JString(pidValue)) <- pids
JField("qualifier", JObject(qualifier)) <- pids
JField("classname", JString(pidType)) <- qualifier
} yield (pidValue, pidType)
val alternateIds: List[(String,String)] = for {
JObject(pids) <- json \\ "alternateIdentifier"
JField("value", JString(pidValue)) <- pids
JField("qualifier", JObject(qualifier)) <- pids
JField("classname", JString(pidType)) <- qualifier
} yield (pidValue, pidType)
(id,result:::alternateIds)
}
private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, entityPath: String, workingPath: String) = {
import spark.implicits._
val d: RDD[(String,String)] = spark.sparkContext.textFile(s"$entityPath/*")
.map(i => extractPidsFromRecord(i))
.filter(s => s != null && s._1!= null && s._2!=null && s._2.nonEmpty)
.flatMap{ p =>
p._2.map(pid =>
(p._1, convertPidToDNETIdentifier(pid._1, pid._2))
)
}.filter(r =>r._1 != null || r._2 != null)
spark.createDataset(d)
.groupByKey(_._2)
.reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y)
.map(s => s._2)
.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/relationResolvedPid")
}
/*
This method should be used once we finally convert everythings in Kryo dataset
instead of using rdd of json
*/
private def extractPidResolvedTableFromKryo(spark: SparkSession, entityPath: String, workingPath: String) = {
import spark.implicits._
implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
val entities: Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result]
entities.flatMap(e => e.getPid.asScala
.map(p =>
convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid))
.filter(s => s != null)
.map(s => (s, e.getId))
).groupByKey(_._1)
.reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y)
.map(s => s._2)
.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/relationResolvedPid")
}
def convertPidToDNETIdentifier(pid:String, pidType: String):String = {
if (pid==null || pid.isEmpty || pidType== null || pidType.isEmpty)
null
else
s"unresolved::${pid.toLowerCase}::${pidType.toLowerCase}"
}
}

View File

@ -23,16 +23,16 @@
"paramDescription": "metadata layout",
"paramRequired": true
},
{
"paramName": "m",
"paramLongName": "master",
"paramDescription": "should be yarn or local",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "mdInterpretation",
"paramDescription": "metadata interpretation",
"paramRequired": true
},
{
"paramName": "isu",
"paramLongName": "isLookupUrl",
"paramDescription": "the url of the ISLookupService",
"paramRequired": true
}
]

View File

@ -553,7 +553,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>ImportOAF_hdfs_graph</name>
<class>eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafApplication</class>
<class>eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
@ -568,8 +568,8 @@
<arg>--mdstoreManagerUrl</arg><arg>${mdstoreManagerUrl}</arg>
<arg>--mdFormat</arg><arg>OAF</arg>
<arg>--mdLayout</arg><arg>store</arg>
<arg>--master</arg><arg>yarn</arg>
<arg>--mdInterpretation</arg><arg>graph</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="wait_graphs"/>
<error to="Kill"/>

View File

@ -1,59 +1,37 @@
<workflow-app name="Resolve Relation" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>entityPath</name>
<description>the path of deduplicate Entities</description>
<name>graphBasePath</name>
<description>the path of the graph</description>
</property>
<property>
<name>relationPath</name>
<description>the path of relation unresolved</description>
</property>
<property>
<name>targetPath</name>
<description>the path of relation unresolved</description>
</property>
</parameters>
<start to="DropRelFolder"/>
<start to="ResolveRelations"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DropRelFolder">
<fs>
<delete path='${targetPath}/relation'/>
<delete path='${targetPath}/relation_resolved'/>
<delete path='${targetPath}/resolvedSource'/>
<delete path='${targetPath}/resolvedPid'/>
</fs>
<ok to="ResolveRelations"/>
<error to="Kill"/>
</action>
<action name="ResolveRelations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Resolve Relations in raw graph</name>
<class>eu.dnetlib.dhp.sx.graph.SparkResolveRelation</class>
<class>eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelation</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=3000
--conf spark.sql.shuffle.partitions=8000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--relationPath</arg><arg>${relationPath}</arg>
<arg>--workingPath</arg><arg>${targetPath}</arg>
<arg>--entityPath</arg><arg>${entityPath}</arg>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingDir}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -1,6 +1,5 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"r", "paramLongName":"relationPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"e", "paramLongName":"entityPath", "paramDescription": "the path of the raw graph", "paramRequired": true}
{"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true}
]

View File

@ -1,10 +1,10 @@
package eu.dnetlib.dhp.sx.graph.scholix
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
import eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelation
import eu.dnetlib.dhp.schema.oaf.{Relation, Result}
import eu.dnetlib.dhp.schema.sx.scholix.Scholix
import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
import eu.dnetlib.dhp.sx.graph.SparkResolveRelation
import eu.dnetlib.dhp.sx.graph.bio.pubmed.AbstractVocabularyTest
import org.json4s
import org.json4s.DefaultFormats

View File

@ -1,4 +1,4 @@
#!/usr/bin/env bash
curl --request GET $1/cache/updateCache
sleep 20h
sleep 6h