diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java index 4fee87ed7..66e4cb780 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java @@ -1,36 +1,35 @@ package eu.dnetlib.dhp.utils; -import java.io.*; -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.codec.binary.Base64OutputStream; -import org.apache.commons.codec.binary.Hex; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.SaveMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import com.jayway.jsonpath.JsonPath; - +import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import net.minidev.json.JSONArray; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.collection.JavaConverters; import scala.collection.Seq; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.*; +import java.util.stream.Collectors; + public class DHPUtils { private static final Logger log = LoggerFactory.getLogger(DHPUtils.class); @@ -53,6 +52,45 @@ public class DHPUtils { } } + /** + * Retrieves from the metadata store manager application the list of paths associated with mdstores characterized + * by he given format, layout, interpretation + * @param mdstoreManagerUrl the URL of the mdstore manager service + * @param format the mdstore format + * @param layout the mdstore layout + * @param interpretation the mdstore interpretation + * @param includeEmpty include Empty mdstores + * @return the set of hdfs paths + * @throws IOException in case of HTTP communication issues + */ + public static Set mdstorePaths(final String mdstoreManagerUrl, + final String format, + final String layout, + final String interpretation, + boolean includeEmpty) throws IOException { + final String url = mdstoreManagerUrl + "/mdstores/"; + final ObjectMapper objectMapper = new ObjectMapper(); + + final HttpGet req = new HttpGet(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + final String json = IOUtils.toString(response.getEntity().getContent()); + final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); + return Arrays + .stream(mdstores) + .filter(md -> md.getFormat().equalsIgnoreCase(format)) + .filter(md -> md.getLayout().equalsIgnoreCase(layout)) + .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) + .filter(md -> StringUtils.isNotBlank(md.getHdfsPath())) + .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) + .filter(md -> includeEmpty || md.getSize() > 0) + .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") + .collect(Collectors.toSet()); + } + } + } + public static String generateIdentifier(final String originalId, final String nsPrefix) { return String.format("%s::%s", nsPrefix, DHPUtils.md5(originalId)); } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml index 73b1a3b60..4b47ae38e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/oozie_app/workflow.xml @@ -52,7 +52,7 @@ yarn-cluster cluster Incremental Download EBI Links - eu.dnetllib.dhp.sx.bio.ebi.SparkDownloadEBILinks + eu.dnetlib.dhp.sx.bio.ebi.SparkDownloadEBILinks dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -85,7 +85,7 @@ yarn-cluster cluster Create OAF DataSet - eu.dnetllib.dhp.sx.bio.ebi.SparkEBILinksToOaf + eu.dnetlib.dhp.sx.bio.ebi.SparkEBILinksToOaf dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml index 21fd2d153..8915a090b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml @@ -30,7 +30,7 @@ yarn cluster Convert Baseline to OAF Dataset - eu.dnetllib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame + eu.dnetlib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java deleted file mode 100644 index 31ebcbc6e..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java +++ /dev/null @@ -1,118 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.raw; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.*; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.clearspring.analytics.util.Lists; -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; -import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import scala.Tuple2; - -public class CopyHdfsOafApplication extends AbstractMigrationApplication { - - private static final Logger log = LoggerFactory.getLogger(CopyHdfsOafApplication.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - CopyHdfsOafApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json"))); - parser.parseArgument(args); - - final Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String mdstoreManagerUrl = parser.get("mdstoreManagerUrl"); - log.info("mdstoreManagerUrl: {}", mdstoreManagerUrl); - - final String mdFormat = parser.get("mdFormat"); - log.info("mdFormat: {}", mdFormat); - - final String mdLayout = parser.get("mdLayout"); - log.info("mdLayout: {}", mdLayout); - - final String mdInterpretation = parser.get("mdInterpretation"); - log.info("mdInterpretation: {}", mdInterpretation); - - final String hdfsPath = parser.get("hdfsPath"); - log.info("hdfsPath: {}", hdfsPath); - - final Set paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation); - - final SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - - final List oafTypes = Lists.newArrayList(ModelSupport.oafTypes.keySet()); - - runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, oafTypes, hdfsPath, paths)); - } - - public static void processPaths(final SparkSession spark, - final List oafTypes, - final String outputPath, - final Set paths) { - - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - log.info("Found {} mdstores", paths.size()); - paths.forEach(log::info); - - final String[] validPaths = paths - .stream() - .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) - .toArray(String[]::new); - log.info("Non empty mdstores {}", validPaths.length); - - if (validPaths.length > 0) { - // load the dataset - Dataset oaf = spark - .read() - .load(validPaths) - .as(Encoders.kryo(Oaf.class)); - - // dispatch each entity type individually in the respective graph subdirectory in append mode - for (String type : oafTypes) { - oaf - .filter((FilterFunction) o -> o.getClass().getSimpleName().toLowerCase().equals(type)) - .map((MapFunction) OBJECT_MAPPER::writeValueAsString, Encoders.STRING()) - .write() - .option("compression", "gzip") - .mode(SaveMode.Append) - .text(outputPath + "/" + type); - } - } - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala new file mode 100644 index 000000000..c7ad1890d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala @@ -0,0 +1,74 @@ +package eu.dnetlib.dhp.oa.graph.raw + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.common.HdfsSupport +import eu.dnetlib.dhp.schema.common.ModelSupport +import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo +import eu.dnetlib.dhp.schema.oaf.Oaf +import eu.dnetlib.dhp.utils.DHPUtils +import org.apache.commons.io.IOUtils +import org.apache.commons.lang3.StringUtils +import org.apache.http.client.methods.HttpGet +import org.apache.http.impl.client.HttpClients +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.{SparkConf, SparkContext} +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters._ +import scala.io.Source + +object CopyHdfsOafSparkApplication { + + def main(args: Array[String]): Unit = { + val log = LoggerFactory.getLogger(getClass) + val conf = new SparkConf() + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json")).mkString) + parser.parseArgument(args) + + val spark = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sc: SparkContext = spark.sparkContext + + val mdstoreManagerUrl = parser.get("mdstoreManagerUrl") + log.info("mdstoreManagerUrl: {}", mdstoreManagerUrl) + + val mdFormat = parser.get("mdFormat") + log.info("mdFormat: {}", mdFormat) + + val mdLayout = parser.get("mdLayout") + log.info("mdLayout: {}", mdLayout) + + val mdInterpretation = parser.get("mdInterpretation") + log.info("mdInterpretation: {}", mdInterpretation) + + val hdfsPath = parser.get("hdfsPath") + log.info("hdfsPath: {}", hdfsPath) + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + + val paths = DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala + + val validPaths: List[String] = paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList + + if (validPaths.nonEmpty) { + val oaf = spark.read.load(validPaths: _*).as[Oaf] + val mapper = new ObjectMapper() + val l =ModelSupport.oafTypes.entrySet.asScala.map(e => e.getKey).toList + l.foreach( + e => + oaf.filter(o => o.getClass.getSimpleName.equalsIgnoreCase(e)) + .map(s => mapper.writeValueAsString(s))(Encoders.STRING) + .write + .option("compression", "gzip") + .mode(SaveMode.Append) + .text(s"$hdfsPath/${e}") + ) + } + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java index 8cd495e08..cba64899b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.utils.DHPUtils; public class AbstractMigrationApplication implements Closeable { @@ -71,27 +72,7 @@ public class AbstractMigrationApplication implements Closeable { final String format, final String layout, final String interpretation) throws IOException { - final String url = mdstoreManagerUrl + "/mdstores/"; - final ObjectMapper objectMapper = new ObjectMapper(); - - final HttpGet req = new HttpGet(url); - - try (final CloseableHttpClient client = HttpClients.createDefault()) { - try (final CloseableHttpResponse response = client.execute(req)) { - final String json = IOUtils.toString(response.getEntity().getContent()); - final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); - return Arrays - .stream(mdstores) - .filter(md -> md.getFormat().equalsIgnoreCase(format)) - .filter(md -> md.getLayout().equalsIgnoreCase(layout)) - .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) - .filter(md -> StringUtils.isNotBlank(md.getHdfsPath())) - .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) - .filter(md -> md.getSize() > 0) - .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") - .collect(Collectors.toSet()); - } - } + return DHPUtils.mdstorePaths(mdstoreManagerUrl, format, layout, interpretation, false); } private Configuration getConf() { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json index 1d89017c5..d1b16b09a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json @@ -23,6 +23,12 @@ "paramDescription": "metadata layout", "paramRequired": true }, + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "should be yarn or local", + "paramRequired": true + }, { "paramName": "i", "paramLongName": "mdInterpretation", diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 137c69ed8..307e26267 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -553,7 +553,7 @@ yarn cluster ImportOAF_hdfs_graph - eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafApplication + eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication dhp-graph-mapper-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} @@ -568,6 +568,7 @@ --mdstoreManagerUrl${mdstoreManagerUrl} --mdFormatOAF --mdLayoutstore + --masteryarn --mdInterpretationgraph