conflict resolved on merge

This commit is contained in:
Sandro La Bruzzo 2021-10-26 09:40:47 +02:00
parent 6b34ba737e
commit 034304b33a
8 changed files with 147 additions and 165 deletions

View File

@ -1,36 +1,35 @@
package eu.dnetlib.dhp.utils;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Base64OutputStream;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import net.minidev.json.JSONArray;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.*;
import java.util.stream.Collectors;
public class DHPUtils {
private static final Logger log = LoggerFactory.getLogger(DHPUtils.class);
@ -53,6 +52,45 @@ public class DHPUtils {
}
}
/**
* Retrieves from the metadata store manager application the list of paths associated with mdstores characterized
* by he given format, layout, interpretation
* @param mdstoreManagerUrl the URL of the mdstore manager service
* @param format the mdstore format
* @param layout the mdstore layout
* @param interpretation the mdstore interpretation
* @param includeEmpty include Empty mdstores
* @return the set of hdfs paths
* @throws IOException in case of HTTP communication issues
*/
public static Set<String> mdstorePaths(final String mdstoreManagerUrl,
final String format,
final String layout,
final String interpretation,
boolean includeEmpty) throws IOException {
final String url = mdstoreManagerUrl + "/mdstores/";
final ObjectMapper objectMapper = new ObjectMapper();
final HttpGet req = new HttpGet(url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
final String json = IOUtils.toString(response.getEntity().getContent());
final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class);
return Arrays
.stream(mdstores)
.filter(md -> md.getFormat().equalsIgnoreCase(format))
.filter(md -> md.getLayout().equalsIgnoreCase(layout))
.filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation))
.filter(md -> StringUtils.isNotBlank(md.getHdfsPath()))
.filter(md -> StringUtils.isNotBlank(md.getCurrentVersion()))
.filter(md -> includeEmpty || md.getSize() > 0)
.map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store")
.collect(Collectors.toSet());
}
}
}
public static String generateIdentifier(final String originalId, final String nsPrefix) {
return String.format("%s::%s", nsPrefix, DHPUtils.md5(originalId));
}

View File

@ -52,7 +52,7 @@
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Incremental Download EBI Links</name>
<class>eu.dnetllib.dhp.sx.bio.ebi.SparkDownloadEBILinks</class>
<class>eu.dnetlib.dhp.sx.bio.ebi.SparkDownloadEBILinks</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
@ -85,7 +85,7 @@
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create OAF DataSet</name>
<class>eu.dnetllib.dhp.sx.bio.ebi.SparkEBILinksToOaf</class>
<class>eu.dnetlib.dhp.sx.bio.ebi.SparkEBILinksToOaf</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}

View File

@ -30,7 +30,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Convert Baseline to OAF Dataset</name>
<class>eu.dnetllib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame</class>
<class>eu.dnetlib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}

View File

@ -1,118 +0,0 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.clearspring.analytics.util.Lists;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2;
public class CopyHdfsOafApplication extends AbstractMigrationApplication {
private static final Logger log = LoggerFactory.getLogger(CopyHdfsOafApplication.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
CopyHdfsOafApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String mdstoreManagerUrl = parser.get("mdstoreManagerUrl");
log.info("mdstoreManagerUrl: {}", mdstoreManagerUrl);
final String mdFormat = parser.get("mdFormat");
log.info("mdFormat: {}", mdFormat);
final String mdLayout = parser.get("mdLayout");
log.info("mdLayout: {}", mdLayout);
final String mdInterpretation = parser.get("mdInterpretation");
log.info("mdInterpretation: {}", mdInterpretation);
final String hdfsPath = parser.get("hdfsPath");
log.info("hdfsPath: {}", hdfsPath);
final Set<String> paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation);
final SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
final List<String> oafTypes = Lists.newArrayList(ModelSupport.oafTypes.keySet());
runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, oafTypes, hdfsPath, paths));
}
public static void processPaths(final SparkSession spark,
final List<String> oafTypes,
final String outputPath,
final Set<String> paths) {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
log.info("Found {} mdstores", paths.size());
paths.forEach(log::info);
final String[] validPaths = paths
.stream()
.filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()))
.toArray(String[]::new);
log.info("Non empty mdstores {}", validPaths.length);
if (validPaths.length > 0) {
// load the dataset
Dataset<Oaf> oaf = spark
.read()
.load(validPaths)
.as(Encoders.kryo(Oaf.class));
// dispatch each entity type individually in the respective graph subdirectory in append mode
for (String type : oafTypes) {
oaf
.filter((FilterFunction<Oaf>) o -> o.getClass().getSimpleName().toLowerCase().equals(type))
.map((MapFunction<Oaf, String>) OBJECT_MAPPER::writeValueAsString, Encoders.STRING())
.write()
.option("compression", "gzip")
.mode(SaveMode.Append)
.text(outputPath + "/" + type);
}
}
}
}

View File

@ -0,0 +1,74 @@
package eu.dnetlib.dhp.oa.graph.raw
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.common.ModelSupport
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClients
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
import scala.io.Source
object CopyHdfsOafSparkApplication {
def main(args: Array[String]): Unit = {
val log = LoggerFactory.getLogger(getClass)
val conf = new SparkConf()
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json")).mkString)
parser.parseArgument(args)
val spark =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sc: SparkContext = spark.sparkContext
val mdstoreManagerUrl = parser.get("mdstoreManagerUrl")
log.info("mdstoreManagerUrl: {}", mdstoreManagerUrl)
val mdFormat = parser.get("mdFormat")
log.info("mdFormat: {}", mdFormat)
val mdLayout = parser.get("mdLayout")
log.info("mdLayout: {}", mdLayout)
val mdInterpretation = parser.get("mdInterpretation")
log.info("mdInterpretation: {}", mdInterpretation)
val hdfsPath = parser.get("hdfsPath")
log.info("hdfsPath: {}", hdfsPath)
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
val paths = DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala
val validPaths: List[String] = paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList
if (validPaths.nonEmpty) {
val oaf = spark.read.load(validPaths: _*).as[Oaf]
val mapper = new ObjectMapper()
val l =ModelSupport.oafTypes.entrySet.asScala.map(e => e.getKey).toList
l.foreach(
e =>
oaf.filter(o => o.getClass.getSimpleName.equalsIgnoreCase(e))
.map(s => mapper.writeValueAsString(s))(Encoders.STRING)
.write
.option("compression", "gzip")
.mode(SaveMode.Append)
.text(s"$hdfsPath/${e}")
)
}
}
}

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.utils.DHPUtils;
public class AbstractMigrationApplication implements Closeable {
@ -71,27 +72,7 @@ public class AbstractMigrationApplication implements Closeable {
final String format,
final String layout,
final String interpretation) throws IOException {
final String url = mdstoreManagerUrl + "/mdstores/";
final ObjectMapper objectMapper = new ObjectMapper();
final HttpGet req = new HttpGet(url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
final String json = IOUtils.toString(response.getEntity().getContent());
final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class);
return Arrays
.stream(mdstores)
.filter(md -> md.getFormat().equalsIgnoreCase(format))
.filter(md -> md.getLayout().equalsIgnoreCase(layout))
.filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation))
.filter(md -> StringUtils.isNotBlank(md.getHdfsPath()))
.filter(md -> StringUtils.isNotBlank(md.getCurrentVersion()))
.filter(md -> md.getSize() > 0)
.map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store")
.collect(Collectors.toSet());
}
}
return DHPUtils.mdstorePaths(mdstoreManagerUrl, format, layout, interpretation, false);
}
private Configuration getConf() {

View File

@ -23,6 +23,12 @@
"paramDescription": "metadata layout",
"paramRequired": true
},
{
"paramName": "m",
"paramLongName": "master",
"paramDescription": "should be yarn or local",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "mdInterpretation",

View File

@ -553,7 +553,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>ImportOAF_hdfs_graph</name>
<class>eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafApplication</class>
<class>eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
@ -568,6 +568,7 @@
<arg>--mdstoreManagerUrl</arg><arg>${mdstoreManagerUrl}</arg>
<arg>--mdFormat</arg><arg>OAF</arg>
<arg>--mdLayout</arg><arg>store</arg>
<arg>--master</arg><arg>yarn</arg>
<arg>--mdInterpretation</arg><arg>graph</arg>
</spark>
<ok to="wait_graphs"/>