diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index 27c5f52c92..1beb616ba0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -1,24 +1,65 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static eu.dnetlib.dhp.schema.common.ModelConstants.*; -import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.*; +import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PID_TYPES; +import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY; +import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME; +import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES; +import static eu.dnetlib.dhp.schema.common.ModelConstants.REPOSITORY_PROVENANCE_ACTIONS; +import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT; +import static eu.dnetlib.dhp.schema.common.ModelConstants.UNKNOWN; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.journal; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.keyValue; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listFields; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.oaiIProvenance; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.qualifier; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.structuredProperty; -import java.util.*; -import java.util.stream.Collectors; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.dom4j.Document; import org.dom4j.DocumentFactory; import org.dom4j.DocumentHelper; import org.dom4j.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.AccessRight; +import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.Context; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.GeoLocation; +import eu.dnetlib.dhp.schema.oaf.Instance; +import eu.dnetlib.dhp.schema.oaf.Journal; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.OAIProvenance; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.Software; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; @@ -35,7 +76,7 @@ public abstract class AbstractMdRecordToOafMapper { protected static final String DATACITE_SCHEMA_KERNEL_3 = "http://datacite.org/schema/kernel-3"; protected static final String DATACITE_SCHEMA_KERNEL_3_SLASH = "http://datacite.org/schema/kernel-3/"; protected static final Qualifier ORCID_PID_TYPE = qualifier( - ORCID_PENDING, ORCID_CLASSNAME, DNET_PID_TYPES, DNET_PID_TYPES); + "ORCID", "Open Researcher and Contributor ID", DNET_PID_TYPES, DNET_PID_TYPES); protected static final Qualifier MAG_PID_TYPE = qualifier( "MAGIdentifier", "Microsoft Academic Graph Identifier", DNET_PID_TYPES, DNET_PID_TYPES); @@ -43,6 +84,8 @@ public abstract class AbstractMdRecordToOafMapper { protected static final Map nsContext = new HashMap<>(); + private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesApplication.class); + static { nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr"); nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri"); @@ -61,6 +104,9 @@ public abstract class AbstractMdRecordToOafMapper { } public List processMdRecord(final String xml) { + + // log.info("Processing record: " + xml); + try { DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext); @@ -100,10 +146,10 @@ public abstract class AbstractMdRecordToOafMapper { } protected String getResultType(final Document doc, final List instances) { - String type = doc.valueOf("//dr:CobjCategory/@type"); + final String type = doc.valueOf("//dr:CobjCategory/@type"); if (StringUtils.isBlank(type) & vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { - String instanceType = instances + final String instanceType = instances .stream() .map(i -> i.getInstancetype().getClassid()) .findFirst() @@ -158,8 +204,12 @@ public abstract class AbstractMdRecordToOafMapper { return oafs; } - private OafEntity createEntity(Document doc, String type, List instances, KeyValue collectedFrom, - DataInfo info, long lastUpdateTimestamp) { + private OafEntity createEntity(final Document doc, + final String type, + final List instances, + final KeyValue collectedFrom, + final DataInfo info, + final long lastUpdateTimestamp) { switch (type.toLowerCase()) { case "publication": final Publication p = new Publication(); @@ -219,9 +269,7 @@ public abstract class AbstractMdRecordToOafMapper { getRelation( docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, entity, validationdDate)); res - .add( - getRelation( - projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity, validationdDate)); + .add(getRelation(projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity, validationdDate)); } } @@ -411,10 +459,10 @@ public abstract class AbstractMdRecordToOafMapper { return Lists.newArrayList(id); } } - List idList = doc + final List idList = doc .selectNodes( "normalize-space(//*[local-name()='header']/*[local-name()='identifier' or local-name()='recordIdentifier']/text())"); - Set originalIds = Sets.newHashSet(idList); + final Set originalIds = Sets.newHashSet(idList); if (originalIds.isEmpty()) { throw new IllegalStateException("missing originalID on " + doc.asXML()); @@ -423,8 +471,8 @@ public abstract class AbstractMdRecordToOafMapper { } protected AccessRight prepareAccessRight(final Node node, final String xpath, final String schemeId) { - Qualifier qualifier = prepareQualifier(node.valueOf(xpath).trim(), schemeId); - AccessRight accessRight = new AccessRight(); + final Qualifier qualifier = prepareQualifier(node.valueOf(xpath).trim(), schemeId); + final AccessRight accessRight = new AccessRight(); accessRight.setClassid(qualifier.getClassid()); accessRight.setClassname(qualifier.getClassname()); accessRight.setSchemeid(qualifier.getSchemeid()); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java new file mode 100644 index 0000000000..77fd28d773 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -0,0 +1,159 @@ + +package eu.dnetlib.dhp.oa.graph.raw; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.StringReader; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.dom4j.Document; +import org.dom4j.Element; +import org.dom4j.Namespace; +import org.dom4j.QName; +import org.dom4j.io.SAXReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; +import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; +import scala.Tuple2; + +public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication { + + private static final Logger log = LoggerFactory.getLogger(MigrateHdfsMdstoresApplication.class); + private static final Namespace DRI_NS_PREFIX = new Namespace("dri", + "http://www.driver-repository.eu/namespace/dri"); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + MigrateHdfsMdstoresApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String mdstoreManagerUrl = parser.get("mdstoreManagerUrl"); + final String mdFormat = parser.get("mdFormat"); + final String mdLayout = parser.get("mdLayout"); + final String mdInterpretation = parser.get("mdInterpretation"); + + final String hdfsPath = parser.get("hdfsPath"); + + final Set paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation); + + final SparkConf conf = new SparkConf(); + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + HdfsSupport.remove(hdfsPath, spark.sparkContext().hadoopConfiguration()); + processPaths(spark, hdfsPath, paths, String.format("%s-%s-%s", mdFormat, mdLayout, mdInterpretation)); + }); + } + + public static void processPaths(final SparkSession spark, + final String outputPath, + final Set paths, + final String type) throws Exception { + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + log.info("Found " + paths.size() + " not empty mdstores"); + paths.forEach(log::info); + + final String[] validPaths = paths + .stream() + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) + .toArray(size -> new String[size]); + + spark + .read() + .parquet(validPaths) + .map((MapFunction) r -> enrichRecord(r), Encoders.STRING()) + .toJavaRDD() + .mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml))) + // .coalesce(1) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + + /* + * .foreach(xml -> { try { writer.append(new Text(UUID.randomUUID() + ":" + type), new Text(xml)); } catch + * (final Exception e) { throw new RuntimeException(e); } }); + */ + } + + private static String enrichRecord(final Row r) { + final String xml = r.getAs("body"); + + final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ"); + final String collDate = dateFormat.format(new Date((Long) r.getAs("dateOfCollection"))); + final String tranDate = dateFormat.format(new Date((Long) r.getAs("dateOfTransformation"))); + + try { + final Document doc = new SAXReader().read(new StringReader(xml)); + final Element head = (Element) doc.selectSingleNode("//*[local-name() = 'header']"); + head.addElement(new QName("objIdentifier", DRI_NS_PREFIX)).addText(r.getAs("id")); + head.addElement(new QName("dateOfCollection", DRI_NS_PREFIX)).addText(collDate); + head.addElement(new QName("dateOfTransformation", DRI_NS_PREFIX)).addText(tranDate); + return doc.asXML(); + } catch (final Exception e) { + log.error("Error patching record: " + xml); + throw new RuntimeException("Error patching record: " + xml, e); + } + } + + private static Set mdstorePaths(final String mdstoreManagerUrl, + final String format, + final String layout, + final String interpretation) + throws Exception { + final String url = mdstoreManagerUrl + "/mdstores/"; + final ObjectMapper objectMapper = new ObjectMapper(); + + final HttpGet req = new HttpGet(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + final String json = IOUtils.toString(response.getEntity().getContent()); + final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); + return Arrays + .stream(mdstores) + .filter(md -> md.getFormat().equalsIgnoreCase(format)) + .filter(md -> md.getLayout().equalsIgnoreCase(layout)) + .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) + .filter(md -> StringUtils.isNotBlank(md.getHdfsPath())) + .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) + .filter(md -> md.getSize() > 0) + .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") + .collect(Collectors.toSet()); + } + } + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java index 9acdabb37f..3f6afbeac7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java @@ -10,9 +10,6 @@ import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientURI; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.MdstoreClient; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json new file mode 100644 index 0000000000..1d89017c52 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "p", + "paramLongName": "hdfsPath", + "paramDescription": "the path where storing the sequential file", + "paramRequired": true + }, + { + "paramName": "u", + "paramLongName": "mdstoreManagerUrl", + "paramDescription": "the MdstoreManager url", + "paramRequired": true + }, + { + "paramName": "f", + "paramLongName": "mdFormat", + "paramDescription": "metadata format", + "paramRequired": true + }, + { + "paramName": "l", + "paramLongName": "mdLayout", + "paramDescription": "metadata layout", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "mdInterpretation", + "paramDescription": "metadata interpretation", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 80f33bd53f..0821f04ea9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -40,6 +40,16 @@ false should import content from the aggregator or reuse a previous version + + reuseODF_hdfs + false + should import content from the aggregator or reuse a previous version + + + reuseOAF_hdfs + false + should import content from the aggregator or reuse a previous version + contentPath path location to store (or reuse) content from the aggregator @@ -289,7 +299,7 @@ ${wf:conf('reuseOAF') eq false} - ${wf:conf('reuseOAF') eq true} + ${wf:conf('reuseOAF') eq true} @@ -324,10 +334,78 @@ --mdLayoutstore --mdInterpretationintersection - + + + + + ${wf:conf('reuseODF_hdfs') eq false} + ${wf:conf('reuseODF_hdfs') eq true} + + + + + + yarn + cluster + ImportODF_hdfs + eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --hdfsPath${contentPath}/odf_records_hdfs + --mdstoreManagerUrl${mdstoreManagerUrl} + --mdFormatODF + --mdLayoutstore + --mdInterpretationcleaned + + + + + + + + ${wf:conf('reuseOAF_hdfs') eq false} + ${wf:conf('reuseOAF_hdfs') eq true} + + + + + + + yarn + cluster + ImportOAF_hdfs + eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --hdfsPath${contentPath}/oaf_records_hdfs + --mdstoreManagerUrl${mdstoreManagerUrl} + --mdFormatOAF + --mdLayoutstore + --mdInterpretationcleaned + + + + + ${wf:conf('reuseDBOpenorgs') eq false} @@ -426,7 +504,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePaths${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records + --sourcePaths${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records,${contentPath}/oaf_records_hdfs,${contentPath}/odf_records_hdfs --targetPath${workingDir}/entities --isLookupUrl${isLookupUrl} --shouldHashId${shouldHashId} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/config-default.xml new file mode 100644 index 0000000000..2e0ed9aeea --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow-bak.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow-bak.xml new file mode 100644 index 0000000000..bfe2dff0ba --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow-bak.xml @@ -0,0 +1,157 @@ + + + + + graphOutputPath + the target path to store raw graph + + + contentPath + path location to store (or reuse) content from the aggregator + + + mdstoreManagerUrl + the address of the Mdstore Manager + + + isLookupUrl + the address of the lookUp service + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + ImportODF_hdfs + eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --hdfsPath${contentPath}/odf_records_hdfs + --mdstoreManagerUrl${mdstoreManagerUrl} + --mdFormatODF + --mdLayoutstore + --mdInterpretationcleaned + + + + + + + + yarn + cluster + GenerateEntities + eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePaths${contentPath}/odf_records_hdfs + --targetPath${workingDir}/entities + --isLookupUrl${isLookupUrl} + --shouldHashId${shouldHashId} + + + + + + + + yarn + cluster + GenerateGraph + eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --sourcePath${workingDir}/entities + --graphRawPath${workingDir}/graph_raw + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml new file mode 100644 index 0000000000..bfe2dff0ba --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml @@ -0,0 +1,157 @@ + + + + + graphOutputPath + the target path to store raw graph + + + contentPath + path location to store (or reuse) content from the aggregator + + + mdstoreManagerUrl + the address of the Mdstore Manager + + + isLookupUrl + the address of the lookUp service + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + ImportODF_hdfs + eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --hdfsPath${contentPath}/odf_records_hdfs + --mdstoreManagerUrl${mdstoreManagerUrl} + --mdFormatODF + --mdLayoutstore + --mdInterpretationcleaned + + + + + + + + yarn + cluster + GenerateEntities + eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePaths${contentPath}/odf_records_hdfs + --targetPath${workingDir}/entities + --isLookupUrl${isLookupUrl} + --shouldHashId${shouldHashId} + + + + + + + + yarn + cluster + GenerateGraph + eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --sourcePath${workingDir}/entities + --graphRawPath${workingDir}/graph_raw + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 819edfde36..08a596db5c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -1,7 +1,11 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.lenient; import java.io.IOException; @@ -21,7 +25,15 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.oa.graph.clean.GraphCleaningFunctionsTest; import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Instance; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Software; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -409,7 +421,10 @@ public class MappersTest { assertEquals(1, d.getTitle().size()); assertEquals( "Validation of the Goodstrength System for Assessment of Abdominal Wall Strength in Patients With Incisional Hernia", - d.getTitle().get(0).getValue()); + d + .getTitle() + .get(0) + .getValue()); assertNotNull(d.getDescription()); assertEquals(1, d.getDescription().size()); @@ -435,7 +450,7 @@ public class MappersTest { assertNotNull(d.getInstance()); assertTrue(d.getInstance().size() == 1); - Instance i = d.getInstance().get(0); + final Instance i = d.getInstance().get(0); assertNotNull(i.getAccessright()); assertEquals(ModelConstants.DNET_ACCESS_MODES, i.getAccessright().getSchemeid()); @@ -633,7 +648,55 @@ public class MappersTest { System.out.println(p.getTitle().get(0).getValue()); } + @Test + void testOdfFromHdfs() throws IOException { + final String xml = IOUtils.toString(getClass().getResourceAsStream("odf_from_hdfs.xml")); + + final List list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml); + + assertEquals(1, list.size()); + + System.out.println(list.get(0).getClass()); + + assertTrue(list.get(0) instanceof Dataset); + + final Dataset p = (Dataset) list.get(0); + + assertValidId(p.getId()); + assertTrue(p.getOriginalId().size() == 1); + assertEquals("df76e73f-0483-49a4-a9bb-63f2f985574a", p.getOriginalId().get(0)); + assertValidId(p.getCollectedfrom().get(0).getKey()); + assertTrue(p.getAuthor().size() > 0); + + final Optional author = p + .getAuthor() + .stream() + .findFirst(); + assertTrue(author.isPresent()); + + assertEquals("Museum Sønderjylland", author.get().getFullname()); + + assertTrue(p.getSubject().size() > 0); + assertTrue(p.getInstance().size() > 0); + + assertNotNull(p.getTitle()); + assertFalse(p.getTitle().isEmpty()); + + assertNotNull(p.getInstance()); + assertTrue(p.getInstance().size() > 0); + p + .getInstance() + .stream() + .forEach(i -> { + assertNotNull(i.getAccessright()); + assertEquals("UNKNOWN", i.getAccessright().getClassid()); + }); + assertEquals("UNKNOWN", p.getInstance().get(0).getRefereed().getClassid()); + } + private void assertValidId(final String id) { + System.out.println(id); + assertEquals(49, id.length()); assertEquals('|', id.charAt(2)); assertEquals(':', id.charAt(15)); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/odf_from_hdfs.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/odf_from_hdfs.xml new file mode 100644 index 0000000000..b27e5930b3 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/odf_from_hdfs.xml @@ -0,0 +1,77 @@ + +
+ df76e73f-0483-49a4-a9bb-63f2f985574a + 2020-09-30T08:17:54Z + eudat-b2find + 2021-05-20T13:43:52.888Z + test________::92fe3efa47883b2f3401e6a4bd92e9d7 + 2020-05-21T05:26:15.93Z + 2020-08-01T11:06:26.977Z +
+ + + + + Museum Sønderjylland + + + + 200202-124 Hjelmvrå + + + This record describes + ancient sites and monuments as well archaeological excavations + undertaken by Danish museums. Excerpt of the Danish description of + events: 1995-04-26: Ved en besigtigelse ud for stedet fandt Nørgård + en større mængde skår i skovens udkant, liggende i nogle + drængrøfter1995-04-26: Leif Nørgård, der er leder af Sønderjyllands + Amatørarkæologer, havde ved en samtale med en tidligere ansat på + motorvejsprojektet gennem Sønderjylland fået at vide, at man på + dette sted havde fundet "urner".1995-04-26: Ved en besigtigelse ud + for stedet fandt Nørgård en større mængde skår i skovens udkant, + liggende i nogle drængrøfter1995-04-26: Leif Nørgård, der er leder + af Sønderjyllands Amatørarkæologer, havde ved en samtale med en + tidligere ansat på motorvejsprojektet gennem Sønderjylland fået at + vide, at man på dette sted havde fundet "urner". + + + + (9.376 LON, 55.220 LAT) + + + + Enkeltfund + Settlement + Single find + Archaeology + + + http://www.kulturarv.dk/fundogfortidsminder/Lokalitet/136540/ + + 2020 + Slots- og Kulturstyrelsen (www.slks.dk) + Danish + + Public + + Dataset + + 0021 + 2020-01-01 + UNKNOWN + Danish + + + +
\ No newline at end of file