Merge pull request 'import_new_mdstores' (#112) from import_new_mdstores into stable_ids

Reviewed-on: #112
This commit is contained in:
Claudio Atzori 2021-06-14 09:23:55 +02:00
commit dd19c4ac5a
10 changed files with 813 additions and 27 deletions

View File

@ -1,24 +1,65 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PID_TYPES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.REPOSITORY_PROVENANCE_ACTIONS;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.UNKNOWN;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.journal;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.keyValue;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.oaiIProvenance;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.structuredProperty;
import java.util.*;
import java.util.stream.Collectors;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
import org.dom4j.DocumentFactory;
import org.dom4j.DocumentHelper;
import org.dom4j.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.AccessRight;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.GeoLocation;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
@ -35,7 +76,7 @@ public abstract class AbstractMdRecordToOafMapper {
protected static final String DATACITE_SCHEMA_KERNEL_3 = "http://datacite.org/schema/kernel-3";
protected static final String DATACITE_SCHEMA_KERNEL_3_SLASH = "http://datacite.org/schema/kernel-3/";
protected static final Qualifier ORCID_PID_TYPE = qualifier(
ORCID_PENDING, ORCID_CLASSNAME, DNET_PID_TYPES, DNET_PID_TYPES);
"ORCID", "Open Researcher and Contributor ID", DNET_PID_TYPES, DNET_PID_TYPES);
protected static final Qualifier MAG_PID_TYPE = qualifier(
"MAGIdentifier", "Microsoft Academic Graph Identifier", DNET_PID_TYPES, DNET_PID_TYPES);
@ -43,6 +84,8 @@ public abstract class AbstractMdRecordToOafMapper {
protected static final Map<String, String> nsContext = new HashMap<>();
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesApplication.class);
static {
nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr");
nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri");
@ -61,6 +104,9 @@ public abstract class AbstractMdRecordToOafMapper {
}
public List<Oaf> processMdRecord(final String xml) {
// log.info("Processing record: " + xml);
try {
DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext);
@ -100,10 +146,10 @@ public abstract class AbstractMdRecordToOafMapper {
}
protected String getResultType(final Document doc, final List<Instance> instances) {
String type = doc.valueOf("//dr:CobjCategory/@type");
final String type = doc.valueOf("//dr:CobjCategory/@type");
if (StringUtils.isBlank(type) & vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) {
String instanceType = instances
final String instanceType = instances
.stream()
.map(i -> i.getInstancetype().getClassid())
.findFirst()
@ -158,8 +204,12 @@ public abstract class AbstractMdRecordToOafMapper {
return oafs;
}
private OafEntity createEntity(Document doc, String type, List<Instance> instances, KeyValue collectedFrom,
DataInfo info, long lastUpdateTimestamp) {
private OafEntity createEntity(final Document doc,
final String type,
final List<Instance> instances,
final KeyValue collectedFrom,
final DataInfo info,
final long lastUpdateTimestamp) {
switch (type.toLowerCase()) {
case "publication":
final Publication p = new Publication();
@ -219,9 +269,7 @@ public abstract class AbstractMdRecordToOafMapper {
getRelation(
docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, entity, validationdDate));
res
.add(
getRelation(
projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity, validationdDate));
.add(getRelation(projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity, validationdDate));
}
}
@ -411,10 +459,10 @@ public abstract class AbstractMdRecordToOafMapper {
return Lists.newArrayList(id);
}
}
List<String> idList = doc
final List<String> idList = doc
.selectNodes(
"normalize-space(//*[local-name()='header']/*[local-name()='identifier' or local-name()='recordIdentifier']/text())");
Set<String> originalIds = Sets.newHashSet(idList);
final Set<String> originalIds = Sets.newHashSet(idList);
if (originalIds.isEmpty()) {
throw new IllegalStateException("missing originalID on " + doc.asXML());
@ -423,8 +471,8 @@ public abstract class AbstractMdRecordToOafMapper {
}
protected AccessRight prepareAccessRight(final Node node, final String xpath, final String schemeId) {
Qualifier qualifier = prepareQualifier(node.valueOf(xpath).trim(), schemeId);
AccessRight accessRight = new AccessRight();
final Qualifier qualifier = prepareQualifier(node.valueOf(xpath).trim(), schemeId);
final AccessRight accessRight = new AccessRight();
accessRight.setClassid(qualifier.getClassid());
accessRight.setClassname(qualifier.getClassname());
accessRight.setSchemeid(qualifier.getSchemeid());

View File

@ -0,0 +1,159 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.StringReader;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.Namespace;
import org.dom4j.QName;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
import scala.Tuple2;
public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication {
private static final Logger log = LoggerFactory.getLogger(MigrateHdfsMdstoresApplication.class);
private static final Namespace DRI_NS_PREFIX = new Namespace("dri",
"http://www.driver-repository.eu/namespace/dri");
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
MigrateHdfsMdstoresApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String mdstoreManagerUrl = parser.get("mdstoreManagerUrl");
final String mdFormat = parser.get("mdFormat");
final String mdLayout = parser.get("mdLayout");
final String mdInterpretation = parser.get("mdInterpretation");
final String hdfsPath = parser.get("hdfsPath");
final Set<String> paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
HdfsSupport.remove(hdfsPath, spark.sparkContext().hadoopConfiguration());
processPaths(spark, hdfsPath, paths, String.format("%s-%s-%s", mdFormat, mdLayout, mdInterpretation));
});
}
public static void processPaths(final SparkSession spark,
final String outputPath,
final Set<String> paths,
final String type) throws Exception {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
log.info("Found " + paths.size() + " not empty mdstores");
paths.forEach(log::info);
final String[] validPaths = paths
.stream()
.filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()))
.toArray(size -> new String[size]);
spark
.read()
.parquet(validPaths)
.map((MapFunction<Row, String>) r -> enrichRecord(r), Encoders.STRING())
.toJavaRDD()
.mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml)))
// .coalesce(1)
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
/*
* .foreach(xml -> { try { writer.append(new Text(UUID.randomUUID() + ":" + type), new Text(xml)); } catch
* (final Exception e) { throw new RuntimeException(e); } });
*/
}
private static String enrichRecord(final Row r) {
final String xml = r.getAs("body");
final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
final String collDate = dateFormat.format(new Date((Long) r.getAs("dateOfCollection")));
final String tranDate = dateFormat.format(new Date((Long) r.getAs("dateOfTransformation")));
try {
final Document doc = new SAXReader().read(new StringReader(xml));
final Element head = (Element) doc.selectSingleNode("//*[local-name() = 'header']");
head.addElement(new QName("objIdentifier", DRI_NS_PREFIX)).addText(r.getAs("id"));
head.addElement(new QName("dateOfCollection", DRI_NS_PREFIX)).addText(collDate);
head.addElement(new QName("dateOfTransformation", DRI_NS_PREFIX)).addText(tranDate);
return doc.asXML();
} catch (final Exception e) {
log.error("Error patching record: " + xml);
throw new RuntimeException("Error patching record: " + xml, e);
}
}
private static Set<String> mdstorePaths(final String mdstoreManagerUrl,
final String format,
final String layout,
final String interpretation)
throws Exception {
final String url = mdstoreManagerUrl + "/mdstores/";
final ObjectMapper objectMapper = new ObjectMapper();
final HttpGet req = new HttpGet(url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
final String json = IOUtils.toString(response.getEntity().getContent());
final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class);
return Arrays
.stream(mdstores)
.filter(md -> md.getFormat().equalsIgnoreCase(format))
.filter(md -> md.getLayout().equalsIgnoreCase(layout))
.filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation))
.filter(md -> StringUtils.isNotBlank(md.getHdfsPath()))
.filter(md -> StringUtils.isNotBlank(md.getCurrentVersion()))
.filter(md -> md.getSize() > 0)
.map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store")
.collect(Collectors.toSet());
}
}
}
}

View File

@ -10,9 +10,6 @@ import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.MdstoreClient;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;

View File

@ -0,0 +1,32 @@
[
{
"paramName": "p",
"paramLongName": "hdfsPath",
"paramDescription": "the path where storing the sequential file",
"paramRequired": true
},
{
"paramName": "u",
"paramLongName": "mdstoreManagerUrl",
"paramDescription": "the MdstoreManager url",
"paramRequired": true
},
{
"paramName": "f",
"paramLongName": "mdFormat",
"paramDescription": "metadata format",
"paramRequired": true
},
{
"paramName": "l",
"paramLongName": "mdLayout",
"paramDescription": "metadata layout",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "mdInterpretation",
"paramDescription": "metadata interpretation",
"paramRequired": true
}
]

View File

@ -40,6 +40,16 @@
<value>false</value>
<description>should import content from the aggregator or reuse a previous version</description>
</property>
<property>
<name>reuseODF_hdfs</name>
<value>false</value>
<description>should import content from the aggregator or reuse a previous version</description>
</property>
<property>
<name>reuseOAF_hdfs</name>
<value>false</value>
<description>should import content from the aggregator or reuse a previous version</description>
</property>
<property>
<name>contentPath</name>
<description>path location to store (or reuse) content from the aggregator</description>
@ -289,7 +299,7 @@
<decision name="reuse_oaf">
<switch>
<case to="ImportOAF">${wf:conf('reuseOAF') eq false}</case>
<case to="wait_import">${wf:conf('reuseOAF') eq true}</case>
<case to="reuse_odf_hdfs">${wf:conf('reuseOAF') eq true}</case>
<default to="ImportOAF"/>
</switch>
</decision>
@ -324,10 +334,78 @@
<arg>--mdLayout</arg><arg>store</arg>
<arg>--mdInterpretation</arg><arg>intersection</arg>
</java>
<ok to="wait_import"/>
<ok to="ImportODF_hdfs"/>
<error to="Kill"/>
</action>
<decision name="reuse_odf_hdfs">
<switch>
<case to="ImportODF_hdfs">${wf:conf('reuseODF_hdfs') eq false}</case>
<case to="reuse_oaf_hdfs">${wf:conf('reuseODF_hdfs') eq true}</case>
<default to="ImportODF_hdfs"/>
</switch>
</decision>
<action name="ImportODF_hdfs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ImportODF_hdfs</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--hdfsPath</arg><arg>${contentPath}/odf_records_hdfs</arg>
<arg>--mdstoreManagerUrl</arg><arg>${mdstoreManagerUrl}</arg>
<arg>--mdFormat</arg><arg>ODF</arg>
<arg>--mdLayout</arg><arg>store</arg>
<arg>--mdInterpretation</arg><arg>cleaned</arg>
</spark>
<ok to="reuse_oaf_hdfs"/>
<error to="Kill"/>
</action>
<decision name="reuse_oaf_hdfs">
<switch>
<case to="ImportOAF_hdfs">${wf:conf('reuseOAF_hdfs') eq false}</case>
<case to="wait_import">${wf:conf('reuseOAF_hdfs') eq true}</case>
<default to="ImportOAF_hdfs"/>
</switch>
</decision>
<action name="ImportOAF_hdfs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ImportOAF_hdfs</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--hdfsPath</arg><arg>${contentPath}/oaf_records_hdfs</arg>
<arg>--mdstoreManagerUrl</arg><arg>${mdstoreManagerUrl}</arg>
<arg>--mdFormat</arg><arg>OAF</arg>
<arg>--mdLayout</arg><arg>store</arg>
<arg>--mdInterpretation</arg><arg>cleaned</arg>
</spark>
<ok to="wait_import"/>
<error to="Kill"/>
</action>
<decision name="reuse_db_openorgs">
<switch>
<case to="ImportDB_openorgs">${wf:conf('reuseDBOpenorgs') eq false}</case>
@ -426,7 +504,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
<arg>--sourcePaths</arg><arg>${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records,${contentPath}/oaf_records_hdfs,${contentPath}/odf_records_hdfs</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--shouldHashId</arg><arg>${shouldHashId}</arg>

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,157 @@
<workflow-app name="Test Import of Hdfs Stores" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphOutputPath</name>
<description>the target path to store raw graph</description>
</property>
<property>
<name>contentPath</name>
<description>path location to store (or reuse) content from the aggregator</description>
</property>
<property>
<name>mdstoreManagerUrl</name>
<description>the address of the Mdstore Manager</description>
</property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="ImportODF_hdfs"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ImportODF_hdfs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ImportODF_hdfs</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--hdfsPath</arg><arg>${contentPath}/odf_records_hdfs</arg>
<arg>--mdstoreManagerUrl</arg><arg>${mdstoreManagerUrl}</arg>
<arg>--mdFormat</arg><arg>ODF</arg>
<arg>--mdLayout</arg><arg>store</arg>
<arg>--mdInterpretation</arg><arg>cleaned</arg>
</spark>
<ok to="GenerateEntities"/>
<error to="Kill"/>
</action>
<action name="GenerateEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEntities</name>
<class>eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/odf_records_hdfs</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--shouldHashId</arg><arg>${shouldHashId}</arg>
</spark>
<ok to="GenerateGraph"/>
<error to="Kill"/>
</action>
<action name="GenerateGraph">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateGraph</name>
<class>eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/entities</arg>
<arg>--graphRawPath</arg><arg>${workingDir}/graph_raw</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,157 @@
<workflow-app name="Test Import of Hdfs Stores" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphOutputPath</name>
<description>the target path to store raw graph</description>
</property>
<property>
<name>contentPath</name>
<description>path location to store (or reuse) content from the aggregator</description>
</property>
<property>
<name>mdstoreManagerUrl</name>
<description>the address of the Mdstore Manager</description>
</property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="ImportODF_hdfs"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ImportODF_hdfs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ImportODF_hdfs</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--hdfsPath</arg><arg>${contentPath}/odf_records_hdfs</arg>
<arg>--mdstoreManagerUrl</arg><arg>${mdstoreManagerUrl}</arg>
<arg>--mdFormat</arg><arg>ODF</arg>
<arg>--mdLayout</arg><arg>store</arg>
<arg>--mdInterpretation</arg><arg>cleaned</arg>
</spark>
<ok to="GenerateEntities"/>
<error to="Kill"/>
</action>
<action name="GenerateEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEntities</name>
<class>eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/odf_records_hdfs</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--shouldHashId</arg><arg>${shouldHashId}</arg>
</spark>
<ok to="GenerateGraph"/>
<error to="Kill"/>
</action>
<action name="GenerateGraph">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateGraph</name>
<class>eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/entities</arg>
<arg>--graphRawPath</arg><arg>${workingDir}/graph_raw</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,7 +1,11 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.lenient;
import java.io.IOException;
@ -21,7 +25,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.GraphCleaningFunctionsTest;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -409,7 +421,10 @@ public class MappersTest {
assertEquals(1, d.getTitle().size());
assertEquals(
"Validation of the Goodstrength System for Assessment of Abdominal Wall Strength in Patients With Incisional Hernia",
d.getTitle().get(0).getValue());
d
.getTitle()
.get(0)
.getValue());
assertNotNull(d.getDescription());
assertEquals(1, d.getDescription().size());
@ -435,7 +450,7 @@ public class MappersTest {
assertNotNull(d.getInstance());
assertTrue(d.getInstance().size() == 1);
Instance i = d.getInstance().get(0);
final Instance i = d.getInstance().get(0);
assertNotNull(i.getAccessright());
assertEquals(ModelConstants.DNET_ACCESS_MODES, i.getAccessright().getSchemeid());
@ -633,7 +648,55 @@ public class MappersTest {
System.out.println(p.getTitle().get(0).getValue());
}
@Test
void testOdfFromHdfs() throws IOException {
final String xml = IOUtils.toString(getClass().getResourceAsStream("odf_from_hdfs.xml"));
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
assertEquals(1, list.size());
System.out.println(list.get(0).getClass());
assertTrue(list.get(0) instanceof Dataset);
final Dataset p = (Dataset) list.get(0);
assertValidId(p.getId());
assertTrue(p.getOriginalId().size() == 1);
assertEquals("df76e73f-0483-49a4-a9bb-63f2f985574a", p.getOriginalId().get(0));
assertValidId(p.getCollectedfrom().get(0).getKey());
assertTrue(p.getAuthor().size() > 0);
final Optional<Author> author = p
.getAuthor()
.stream()
.findFirst();
assertTrue(author.isPresent());
assertEquals("Museum Sønderjylland", author.get().getFullname());
assertTrue(p.getSubject().size() > 0);
assertTrue(p.getInstance().size() > 0);
assertNotNull(p.getTitle());
assertFalse(p.getTitle().isEmpty());
assertNotNull(p.getInstance());
assertTrue(p.getInstance().size() > 0);
p
.getInstance()
.stream()
.forEach(i -> {
assertNotNull(i.getAccessright());
assertEquals("UNKNOWN", i.getAccessright().getClassid());
});
assertEquals("UNKNOWN", p.getInstance().get(0).getRefereed().getClassid());
}
private void assertValidId(final String id) {
System.out.println(id);
assertEquals(49, id.length());
assertEquals('|', id.charAt(2));
assertEquals(':', id.charAt(15));

View File

@ -0,0 +1,77 @@
<record xmlns:oaf="http://namespace.openaire.eu/oaf"
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:datacite="http://datacite.org/schema/kernel-3"
xmlns:dr="http://www.driver-repository.eu/namespace/dr"
xmlns:dri="http://www.driver-repository.eu/namespace/dri">
<header xmlns="http://www.openarchives.org/OAI/2.0/">
<identifier>df76e73f-0483-49a4-a9bb-63f2f985574a</identifier>
<datestamp>2020-09-30T08:17:54Z</datestamp>
<setSpec>eudat-b2find</setSpec>
<dr:dateOfTransformation>2021-05-20T13:43:52.888Z</dr:dateOfTransformation>
<dri:objIdentifier>test________::92fe3efa47883b2f3401e6a4bd92e9d7</dri:objIdentifier>
<dri:dateOfCollection>2020-05-21T05:26:15.93Z</dri:dateOfCollection>
<dri:dateOfTransformation>2020-08-01T11:06:26.977Z</dri:dateOfTransformation>
</header>
<metadata>
<resource xmlns="http://datacite.org/schema/kernel-4"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://datacite.org/schema/kernel-4 http://schema.datacite.org/meta/kernel-4.3/metadata.xsd">
<creators>
<creator>
<creatorName>Museum Sønderjylland</creatorName>
</creator>
</creators>
<titles>
<title>200202-124 Hjelmvrå</title>
</titles>
<descriptions>
<description descriptionType="Abstract">This record describes
ancient sites and monuments as well archaeological excavations
undertaken by Danish museums. Excerpt of the Danish description of
events: 1995-04-26: Ved en besigtigelse ud for stedet fandt Nørgård
en større mængde skår i skovens udkant, liggende i nogle
drængrøfter1995-04-26: Leif Nørgård, der er leder af Sønderjyllands
Amatørarkæologer, havde ved en samtale med en tidligere ansat på
motorvejsprojektet gennem Sønderjylland fået at vide, at man på
dette sted havde fundet "urner".1995-04-26: Ved en besigtigelse ud
for stedet fandt Nørgård en større mængde skår i skovens udkant,
liggende i nogle drængrøfter1995-04-26: Leif Nørgård, der er leder
af Sønderjyllands Amatørarkæologer, havde ved en samtale med en
tidligere ansat på motorvejsprojektet gennem Sønderjylland fået at
vide, at man på dette sted havde fundet "urner".</description>
</descriptions>
<geoLocations>
<geoLocation>
<geoLocationPlace>(9.376 LON, 55.220 LAT)</geoLocationPlace>
</geoLocation>
</geoLocations>
<subjects>
<subject>Enkeltfund</subject>
<subject>Settlement</subject>
<subject>Single find</subject>
<subject>Archaeology</subject>
</subjects>
<alternateIdentifiers
xmlns="http://datacite.org/schema/kernel-3">
<alternateIdentifier
xmlns="http://datacite.org/schema/kernel-4"
alternateIdentifierType="URL">http://www.kulturarv.dk/fundogfortidsminder/Lokalitet/136540/</alternateIdentifier>
</alternateIdentifiers>
<publicationYear>2020</publicationYear>
<publisher>Slots- og Kulturstyrelsen (www.slks.dk)</publisher>
<language>Danish</language>
<rightsList>
<rights>Public</rights>
</rightsList>
<resourceType resourceTypeGeneral="Other">Dataset</resourceType>
</resource>
<dr:CobjCategory type="dataset">0021</dr:CobjCategory>
<oaf:dateAccepted>2020-01-01</oaf:dateAccepted>
<oaf:accessrights>UNKNOWN</oaf:accessrights>
<oaf:language>Danish</oaf:language>
<oaf:hostedBy name="B2FIND"
id="re3data_____::r3d100012377" />
<oaf:collectedFrom name="B2FIND"
id="re3data_____::r3d100012377" />
</metadata>
</record>