diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java new file mode 100644 index 000000000..2e0611475 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafApplication.java @@ -0,0 +1,143 @@ + +package eu.dnetlib.dhp.oa.graph.raw; + +import com.clearspring.analytics.util.Lists; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class CopyHdfsOafApplication extends AbstractMigrationApplication { + + private static final Logger log = LoggerFactory.getLogger(CopyHdfsOafApplication.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + CopyHdfsOafApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String mdstoreManagerUrl = parser.get("mdstoreManagerUrl"); + log.info("mdstoreManagerUrl: {}", mdstoreManagerUrl); + + final String mdFormat = parser.get("mdFormat"); + log.info("mdFormat: {}", mdFormat); + + final String mdLayout = parser.get("mdLayout"); + log.info("mdLayout: {}", mdLayout); + + final String mdInterpretation = parser.get("mdInterpretation"); + log.info("mdInterpretation: {}", mdInterpretation); + + final String hdfsPath = parser.get("hdfsPath"); + log.info("hdfsPath: {}", hdfsPath); + + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + + final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); + final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService); + + final Set paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation); + + final SparkConf conf = new SparkConf(); + runWithSparkSession(conf, isSparkSessionManaged, spark -> processPaths(spark, vocs, hdfsPath, paths)); + } + + public static void processPaths(final SparkSession spark, + final VocabularyGroup vocs, + final String outputPath, + final Set paths) { + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + log.info("Found {} mdstores", paths.size()); + paths.forEach(log::info); + + final String[] validPaths = paths + .stream() + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) + .toArray(String[]::new); + log.info("Non empty mdstores {}", validPaths.length); + + if (validPaths.length > 0) { + // load the dataset + Dataset oaf = spark + .read() + .load(validPaths) + .as(Encoders.kryo(Oaf.class)); + + // dispatch each entity type individually in the respective graph subdirectory in append mode + for(Map.Entry e : ModelSupport.entityTypes.entrySet()) { + oaf + .filter((FilterFunction) o -> o.getClass().getSimpleName().toLowerCase().equals(e.getKey().toString())) + .map((MapFunction) OBJECT_MAPPER::writeValueAsString, Encoders.bean(e.getValue())) + .write() + .option("compression", "gzip") + .mode(SaveMode.Append) + .text(outputPath + "/" + e.getKey()); + } + + oaf + .flatMap((FlatMapFunction) o -> { + Relation rel = (Relation) o; + List rels = Lists.newArrayList(); + rels.add(getInverse(rel, vocs)); + + return rels.iterator(); + }, Encoders.bean(Relation.class)); + } + } + + private static Relation getInverse(Relation rel, VocabularyGroup vocs) { + final Relation inverse = new Relation(); + + inverse.setProperties(rel.getProperties()); + inverse.setValidated(rel.getValidated()); + inverse.setValidationDate(rel.getValidationDate()); + inverse.setCollectedfrom(rel.getCollectedfrom()); + inverse.setDataInfo(rel.getDataInfo()); + inverse.setLastupdatetimestamp(rel.getLastupdatetimestamp()); + + inverse.setSource(rel.getTarget()); + inverse.setTarget(rel.getSource()); + inverse.setRelType(rel.getRelType()); + inverse.setSubRelType(rel.getSubRelType()); + + return inverse; + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java index 4110bd806..6c72e4dfc 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -135,30 +135,4 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication } } - private static Set mdstorePaths(final String mdstoreManagerUrl, - final String format, - final String layout, - final String interpretation) throws IOException { - final String url = mdstoreManagerUrl + "/mdstores/"; - final ObjectMapper objectMapper = new ObjectMapper(); - - final HttpGet req = new HttpGet(url); - - try (final CloseableHttpClient client = HttpClients.createDefault()) { - try (final CloseableHttpResponse response = client.execute(req)) { - final String json = IOUtils.toString(response.getEntity().getContent()); - final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); - return Arrays - .stream(mdstores) - .filter(md -> md.getFormat().equalsIgnoreCase(format)) - .filter(md -> md.getLayout().equalsIgnoreCase(layout)) - .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) - .filter(md -> StringUtils.isNotBlank(md.getHdfsPath())) - .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) - .filter(md -> md.getSize() > 0) - .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") - .collect(Collectors.toSet()); - } - } - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java index 5d32fe926..7c88dbd9d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java @@ -3,8 +3,14 @@ package eu.dnetlib.dhp.oa.graph.raw.common; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -16,6 +22,10 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.oaf.Oaf; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; public class AbstractMigrationApplication implements Closeable { @@ -47,6 +57,43 @@ public class AbstractMigrationApplication implements Closeable { SequenceFile.Writer.valueClass(Text.class)); } + /** + * Retrieves from the metadata store manager application the list of paths associated with mdstores characterized + * by he given format, layout, interpretation + * @param mdstoreManagerUrl the URL of the mdstore manager service + * @param format the mdstore format + * @param layout the mdstore layout + * @param interpretation the mdstore interpretation + * @return the set of hdfs paths + * @throws IOException in case of HTTP communication issues + */ + protected static Set mdstorePaths(final String mdstoreManagerUrl, + final String format, + final String layout, + final String interpretation) throws IOException { + final String url = mdstoreManagerUrl + "/mdstores/"; + final ObjectMapper objectMapper = new ObjectMapper(); + + final HttpGet req = new HttpGet(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + final String json = IOUtils.toString(response.getEntity().getContent()); + final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); + return Arrays + .stream(mdstores) + .filter(md -> md.getFormat().equalsIgnoreCase(format)) + .filter(md -> md.getLayout().equalsIgnoreCase(layout)) + .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) + .filter(md -> StringUtils.isNotBlank(md.getHdfsPath())) + .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) + .filter(md -> md.getSize() > 0) + .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") + .collect(Collectors.toSet()); + } + } + } + private Configuration getConf() { return new Configuration(); /* diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json new file mode 100644 index 000000000..1e862198f --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json @@ -0,0 +1,38 @@ +[ + { + "paramName": "p", + "paramLongName": "hdfsPath", + "paramDescription": "the path where storing the sequential file", + "paramRequired": true + }, + { + "paramName": "u", + "paramLongName": "mdstoreManagerUrl", + "paramDescription": "the MdstoreManager url", + "paramRequired": true + }, + { + "paramName": "f", + "paramLongName": "mdFormat", + "paramDescription": "metadata format", + "paramRequired": true + }, + { + "paramName": "l", + "paramLongName": "mdLayout", + "paramDescription": "metadata layout", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "mdInterpretation", + "paramDescription": "metadata interpretation", + "paramRequired": true + }, + { + "paramName": "isu", + "paramLongName": "isLookupUrl", + "paramDescription": "the url of the ISLookupService", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 321ca4090..563923a5a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -544,6 +544,33 @@ --sourcePath${workingDir}/entities --graphRawPath${workingDir}/graph_raw + + + + + + + yarn + cluster + ImportOAF_hdfs_graph + eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --hdfsPath${workingDir}/graph_raw + --mdstoreManagerUrl${mdstoreManagerUrl} + --mdFormatOAF + --mdLayoutstore + --mdInterpretationgraph + --isLookupUrl${isLookupUrl} +