diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml
index 687f0de66..f579a7d2b 100644
--- a/dhp-workflows/dhp-graph-mapper/pom.xml
+++ b/dhp-workflows/dhp-graph-mapper/pom.xml
@@ -57,6 +57,11 @@
commons-io
+
+ commons-validator
+ commons-validator
+
+
org.apache.spark
spark-core_2.11
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
index da1c764e8..a8d09e4a7 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
@@ -10,10 +10,16 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.UNKNOWN;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.*;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.*;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.validator.routines.UrlValidator;
import org.dom4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -50,6 +56,8 @@ public abstract class AbstractMdRecordToOafMapper {
protected static final Map nsContext = new HashMap<>();
+ private static final Logger log = LoggerFactory.getLogger(AbstractMdRecordToOafMapper.class);
+
static {
nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr");
nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri");
@@ -76,40 +84,44 @@ public abstract class AbstractMdRecordToOafMapper {
this.forceOriginalId = false;
}
- public List processMdRecord(final String xml) throws DocumentException {
+ public List processMdRecord(final String xml) {
DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext);
+ try {
+ final Document doc = DocumentHelper
+ .parseText(
+ xml
+ .replaceAll(DATACITE_SCHEMA_KERNEL_4, DATACITE_SCHEMA_KERNEL_3)
+ .replaceAll(DATACITE_SCHEMA_KERNEL_4_SLASH, DATACITE_SCHEMA_KERNEL_3)
+ .replaceAll(DATACITE_SCHEMA_KERNEL_3_SLASH, DATACITE_SCHEMA_KERNEL_3));
- final Document doc = DocumentHelper
- .parseText(
- xml
- .replaceAll(DATACITE_SCHEMA_KERNEL_4, DATACITE_SCHEMA_KERNEL_3)
- .replaceAll(DATACITE_SCHEMA_KERNEL_4_SLASH, DATACITE_SCHEMA_KERNEL_3)
- .replaceAll(DATACITE_SCHEMA_KERNEL_3_SLASH, DATACITE_SCHEMA_KERNEL_3));
+ final KeyValue collectedFrom = getProvenanceDatasource(
+ doc, "//oaf:collectedFrom/@id", "//oaf:collectedFrom/@name");
- final KeyValue collectedFrom = getProvenanceDatasource(
- doc, "//oaf:collectedFrom/@id", "//oaf:collectedFrom/@name");
+ if (collectedFrom == null) {
+ return Lists.newArrayList();
+ }
- if (collectedFrom == null) {
+ final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id"))
+ ? collectedFrom
+ : getProvenanceDatasource(doc, "//oaf:hostedBy/@id", "//oaf:hostedBy/@name");
+
+ if (hostedBy == null) {
+ return Lists.newArrayList();
+ }
+
+ final DataInfo info = prepareDataInfo(doc, invisible);
+ final long lastUpdateTimestamp = new Date().getTime();
+
+ final List instances = prepareInstances(doc, info, collectedFrom, hostedBy);
+
+ final String type = getResultType(doc, instances);
+
+ return createOafs(doc, type, instances, collectedFrom, info, lastUpdateTimestamp);
+ } catch (DocumentException e) {
+ log.error("Error with record:\n" + xml);
return Lists.newArrayList();
}
-
- final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id"))
- ? collectedFrom
- : getProvenanceDatasource(doc, "//oaf:hostedBy/@id", "//oaf:hostedBy/@name");
-
- if (hostedBy == null) {
- return Lists.newArrayList();
- }
-
- final DataInfo info = prepareDataInfo(doc, invisible);
- final long lastUpdateTimestamp = new Date().getTime();
-
- final List instances = prepareInstances(doc, info, collectedFrom, hostedBy);
-
- final String type = getResultType(doc, instances);
-
- return createOafs(doc, type, instances, collectedFrom, info, lastUpdateTimestamp);
}
protected String getResultType(final Document doc, final List instances) {
@@ -609,4 +621,15 @@ public abstract class AbstractMdRecordToOafMapper {
return res;
}
+ protected Set validateUrl(Collection url) {
+ UrlValidator urlValidator = UrlValidator.getInstance();
+ if (Objects.isNull(url)) {
+ return new HashSet<>();
+ }
+ return url
+ .stream()
+ .filter(u -> urlValidator.isValid(u))
+ .collect(Collectors.toCollection(HashSet::new));
+ }
+
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
index 6bb18c375..5f9d98073 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
@@ -16,6 +16,9 @@ import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
@@ -127,8 +130,8 @@ public class GenerateEntitiesApplication {
.sequenceFile(sp, Text.class, Text.class)
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
.map(k -> convertToListOaf(k._1(), k._2(), shouldHashId, vocs))
- .filter(Objects::nonNull)
- .flatMap(List::iterator));
+ .flatMap(List::iterator)
+ .filter(Objects::nonNull));
}
switch (mode) {
@@ -155,11 +158,11 @@ public class GenerateEntitiesApplication {
.saveAsTextFile(targetPath, GzipCodec.class);
}
- private static List convertToListOaf(
+ public static List convertToListOaf(
final String id,
final String s,
final boolean shouldHashId,
- final VocabularyGroup vocs) throws DocumentException {
+ final VocabularyGroup vocs) {
final String type = StringUtils.substringAfter(id, ":");
switch (type.toLowerCase()) {
@@ -200,8 +203,7 @@ public class GenerateEntitiesApplication {
try {
return OBJECT_MAPPER.readValue(s, clazz);
} catch (final Exception e) {
- log.error("Error parsing object of class: {}", clazz);
- log.error(s);
+ log.error("Error parsing object of class: {}:\n{}", clazz, s);
throw new IllegalArgumentException(e);
}
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java
index 9225e174d..30f3935f5 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java
@@ -159,22 +159,25 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
.setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info));
final List nodes = Lists.newArrayList(doc.selectNodes("//dc:identifier"));
- instance
- .setUrl(
- nodes
- .stream()
- .filter(n -> StringUtils.isNotBlank(n.getText()))
- .map(n -> n.getText().trim())
- .filter(u -> u.startsWith("http"))
- .map(s -> {
- try {
- return URLDecoder.decode(s, "UTF-8");
- } catch (Throwable t) {
- return s;
- }
- })
- .distinct()
- .collect(Collectors.toCollection(ArrayList::new)));
+ final List url = nodes
+ .stream()
+ .filter(n -> StringUtils.isNotBlank(n.getText()))
+ .map(n -> n.getText().trim())
+ .filter(u -> u.startsWith("http"))
+ .map(s -> {
+ try {
+ return URLDecoder.decode(s, "UTF-8");
+ } catch (Throwable t) {
+ return s;
+ }
+ })
+ .distinct()
+ .collect(Collectors.toCollection(ArrayList::new));
+ final Set validUrl = validateUrl(url);
+ if (!validUrl.isEmpty()) {
+ instance.setUrl(new ArrayList<>());
+ instance.getUrl().addAll(validUrl);
+ }
return Lists.newArrayList(instance);
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java
index d6bfe6714..5781988e6 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java
@@ -6,11 +6,14 @@ import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.structuredProperty;
import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.net.URLDecoder;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.validator.routines.UrlValidator;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.Node;
@@ -171,23 +174,31 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
for (final Object o : doc.selectNodes("//*[local-name()='identifier' and ./@identifierType='landingPage']")) {
url.add(trimAndDecodeUrl(((Node) o).getText().trim()));
}
- for (final Object o : doc
- .selectNodes("//*[local-name()='alternateIdentifier' and ./@alternateIdentifierType='DOI']")) {
- url.add(HTTP_DOI_PREIFX + ((Node) o).getText().trim());
+
+ Set validUrl = validateUrl(url);
+
+ if (validUrl.stream().noneMatch(s -> s.contains("doi.org"))) {
+ for (final Object o : doc
+ .selectNodes("//*[local-name()='alternateIdentifier' and ./@alternateIdentifierType='DOI']")) {
+ validUrl.add(HTTP_DOI_PREIFX + ((Node) o).getText().trim());
+ }
+ for (final Object o : doc.selectNodes("//*[local-name()='identifier' and ./@identifierType='DOI']")) {
+ validUrl.add(HTTP_DOI_PREIFX + ((Node) o).getText().trim());
+ }
}
- for (final Object o : doc.selectNodes("//*[local-name()='identifier' and ./@identifierType='DOI']")) {
- url.add(HTTP_DOI_PREIFX + ((Node) o).getText().trim());
+ if (validUrl.stream().noneMatch(s -> s.contains("hdl.handle.net"))) {
+ for (final Object o : doc
+ .selectNodes("//*[local-name()='alternateIdentifier' and ./@alternateIdentifierType='Handle']")) {
+ validUrl.add(HTTP_HANDLE_PREIFX + ((Node) o).getText().trim());
+ }
+ for (final Object o : doc.selectNodes("//*[local-name()='identifier' and ./@identifierType='Handle']")) {
+ validUrl.add(HTTP_HANDLE_PREIFX + ((Node) o).getText().trim());
+ }
}
- for (final Object o : doc
- .selectNodes("//*[local-name()='alternateIdentifier' and ./@alternateIdentifierType='Handle']")) {
- url.add(HTTP_HANDLE_PREIFX + ((Node) o).getText().trim());
- }
- for (final Object o : doc.selectNodes("//*[local-name()='identifier' and ./@identifierType='Handle']")) {
- url.add(HTTP_HANDLE_PREIFX + ((Node) o).getText().trim());
- }
- if (!url.isEmpty()) {
+
+ if (!validUrl.isEmpty()) {
instance.setUrl(new ArrayList<>());
- instance.getUrl().addAll(url);
+ instance.getUrl().addAll(validUrl);
}
return Arrays.asList(instance);
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java
new file mode 100644
index 000000000..a8eb871c8
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java
@@ -0,0 +1,108 @@
+
+package eu.dnetlib.dhp.oa.graph.raw;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import scala.Tuple2;
+
+public class VerifyRecordsApplication {
+
+ private static final Logger log = LoggerFactory.getLogger(VerifyRecordsApplication.class);
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ VerifyRecordsApplication.class
+ .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/verify_records_parameters.json")));
+
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String sourcePaths = parser.get("sourcePaths");
+ log.info("sourcePaths: {}", sourcePaths);
+
+ final String invalidPath = parser.get("invalidPath");
+ log.info("invalidPath: {}", invalidPath);
+
+ final String isLookupUrl = parser.get("isLookupUrl");
+ log.info("isLookupUrl: {}", isLookupUrl);
+
+ final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
+ final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupService);
+
+ final SparkConf conf = new SparkConf();
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+ HdfsSupport.remove(invalidPath, spark.sparkContext().hadoopConfiguration());
+ validateRecords(spark, sourcePaths, invalidPath, vocs);
+ });
+ }
+
+ private static void validateRecords(SparkSession spark, String sourcePaths, String invalidPath,
+ VocabularyGroup vocs) {
+
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ final List existingSourcePaths = Arrays
+ .stream(sourcePaths.split(","))
+ .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()))
+ .collect(Collectors.toList());
+
+ log.info("Verify records in files:");
+ existingSourcePaths.forEach(log::info);
+
+ for (final String sp : existingSourcePaths) {
+ RDD invalidRecords = sc
+ .sequenceFile(sp, Text.class, Text.class)
+ .map(k -> tryApplyMapping(k._1().toString(), k._2().toString(), true, vocs))
+ .filter(Objects::nonNull)
+ .rdd();
+ spark
+ .createDataset(invalidRecords, Encoders.STRING())
+ .write()
+ .mode(SaveMode.Append)
+ .option("compression", "gzip")
+ .text(invalidPath);
+ }
+ }
+
+ private static String tryApplyMapping(
+ final String id,
+ final String xmlRecord,
+ final boolean shouldHashId,
+ final VocabularyGroup vocs) {
+
+ final List oaf = GenerateEntitiesApplication.convertToListOaf(id, xmlRecord, shouldHashId, vocs);
+ if (Optional.ofNullable(oaf).map(List::isEmpty).orElse(false)) {
+ return xmlRecord;
+ }
+ return null;
+ }
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java
index cba64899b..6f63e9327 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java
@@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.graph.raw.common;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -24,8 +25,11 @@ import org.apache.http.impl.client.HttpClients;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
+import eu.dnetlib.dhp.oa.graph.raw.OafToOafMapper;
+import eu.dnetlib.dhp.oa.graph.raw.OdfToOafMapper;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
+import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.DHPUtils;
public class AbstractMigrationApplication implements Closeable {
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml
index c6cc46c0f..8262c6923 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml
@@ -446,10 +446,34 @@
-
-
+
+
+
+
+ yarn
+ cluster
+ VerifyRecords_claim
+ eu.dnetlib.dhp.oa.graph.raw.VerifyRecordsApplication
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePaths${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims
+ --invalidPath${workingDir}/invalid_records_claim
+ --isLookupUrl${isLookupUrl}
+
+
+
+
+
yarn
@@ -499,6 +523,30 @@
+
+
+ yarn
+ cluster
+ VerifyRecords
+ eu.dnetlib.dhp.oa.graph.raw.VerifyRecordsApplication
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePaths${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records,${contentPath}/oaf_records_hdfs,${contentPath}/odf_records_hdfs,${contentPath}/oaf_records_invisible
+ --invalidPath${workingDir}/invalid_records
+ --isLookupUrl${isLookupUrl}
+
+
+
+
+
yarn
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/verify_records_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/verify_records_parameters.json
new file mode 100644
index 000000000..eb00e7609
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/verify_records_parameters.json
@@ -0,0 +1,26 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
+ {
+ "paramName": "s",
+ "paramLongName": "sourcePaths",
+ "paramDescription": "the HDFS source paths which contains the sequential file (comma separated)",
+ "paramRequired": true
+ },
+ {
+ "paramName": "i",
+ "paramLongName": "invalidPath",
+ "paramDescription": "the path of the invalid records file",
+ "paramRequired": false
+ },
+ {
+ "paramName": "isu",
+ "paramLongName": "isLookupUrl",
+ "paramDescription": "the url of the ISLookupService",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java
index 67406e794..64b68e6af 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java
@@ -21,7 +21,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
@@ -948,6 +947,15 @@ class MappersTest {
}
+ @Test
+ void testNotWellFormed() throws IOException {
+ final String xml = IOUtils
+ .toString(Objects.requireNonNull(getClass().getResourceAsStream("oaf_notwellformed.xml")));
+ final List actual = new OafToOafMapper(vocs, false, true).processMdRecord(xml);
+ assertNotNull(actual);
+ assertTrue(actual.isEmpty());
+ }
+
private void assertValidId(final String id) {
// System.out.println(id);
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java
index 069edc5a6..27304ec06 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java
@@ -251,6 +251,18 @@ class MigrateDbEntitiesApplicationTest {
assertValidId(r2.getSource());
assertEquals(r1.getSource(), r2.getTarget());
assertEquals(r2.getSource(), r1.getTarget());
+
+ assertTrue(r1.getSource().startsWith("10|"));
+ assertTrue(r1.getTarget().startsWith("20|"));
+
+ assertEquals(ModelConstants.DATASOURCE_ORGANIZATION, r1.getRelType());
+ assertEquals(ModelConstants.DATASOURCE_ORGANIZATION, r2.getRelType());
+
+ assertEquals(ModelConstants.PROVISION, r1.getSubRelType());
+ assertEquals(ModelConstants.PROVISION, r2.getSubRelType());
+
+ assertEquals(ModelConstants.IS_PROVIDED_BY, r1.getRelClass());
+ assertEquals(ModelConstants.PROVIDES, r2.getRelClass());
}
@Test
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_notwellformed.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_notwellformed.xml
new file mode 100644
index 000000000..09384054e
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_notwellformed.xml
@@ -0,0 +1,70 @@
+
+
+
+ jairo_______::000012e58ed836576ef2a0d38b0f726f
+ oai:irdb.nii.ac.jp:01221:0000010198
+
+
+
+
+
+ 2021-05-10T11:31:09.424Z
+ 2021-06-03T01:45:42.536Z
+ jairo_______
+
+
+ 多項式GCDを用いた復号法に関する研究
+ 上原, 剛
+ 甲斐, 博
+ 野田, 松太郎
+ application/pdf
+ http://hdl.handle.net/2433/25934
+ jpn
+ 京都大学数理解析研究所
+ 410
+ Departmental Bulletin Paper
+ 0014
+ 2004-10-01
+
+ openaire____::554c7c2873
+ OPEN
+
+
+ 2433/25934
+ AN00061013
+ http://hdl.handle.net/2433/25934
+ http://repository.kulib.kyoto-u.ac.jp/dspace/bitstream/2433/25934/1/1395-16.pdf
+ 数理解析研究所講究録
+
+
+
+
+ https%3A%2F%2Firdb.nii.ac.jp%2Foai
+ oai:irdb.nii.ac.jp:01221:0000010198
+ 2021-04-13T13:36:29Z
+
+
+ http://repository.kulib.kyoto-u.ac.jp/dspace-oai/request
+ oai:repository.kulib.kyoto-u.ac.jp:2433/25934
+ 2012-07-12T14:15:41Z
+ http://irdb.nii.ac.jp/oai
+
+
+
+
+ false
+ false
+ 0.9
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/log4j.properties b/dhp-workflows/dhp-graph-mapper/src/test/resources/log4j.properties
new file mode 100644
index 000000000..71255bb77
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/log4j.properties
@@ -0,0 +1,8 @@
+# Root logger option
+log4j.rootLogger=DEBUG, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index ab59e7be3..a1b26966e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -200,6 +200,12 @@
${dhp.commons.lang.version}
+
+ commons-validator
+ commons-validator
+ 1.7
+
+
com.github.sisyphsu
dateparser