report) {
+ sendMessage(new Message(MessageType.REPORT, workflowId, report));
+ }
+
+ private Message createOngoingMessage(final Long current, final Long total) {
+ final Message m = new Message(MessageType.ONGOING, workflowId);
+ m.getBody().put(Message.CURRENT_PARAM, current.toString());
+ if (total != null) {
+ m.getBody().put(Message.TOTAL_PARAM, total.toString());
+ }
+ return m;
+ }
+
+ private void _sendMessage(final Message message) {
+ try {
+ final String json = objectMapper.writeValueAsString(message);
+
+ final HttpPut req = new HttpPut(dnetMessageEndpoint);
+ req.setEntity(new StringEntity(json, ContentType.APPLICATION_JSON));
+
+ final RequestConfig requestConfig = RequestConfig
+ .custom()
+ .setConnectTimeout(CONNTECTION_TIMEOUT_MS)
+ .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
+ .setSocketTimeout(SOCKET_TIMEOUT_MS)
+ .build();
+
+ try (final CloseableHttpClient client = HttpClients
+ .custom()
+ .setDefaultRequestConfig(requestConfig)
+ .build();
+ final CloseableHttpResponse response = client.execute(req)) {
+ log.debug("Sent Message to " + dnetMessageEndpoint);
+ log.debug("MESSAGE:" + message);
+ } catch (final Throwable e) {
+ log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e);
+ }
+ } catch (final JsonProcessingException e) {
+ log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e);
+ }
+ }
+
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageType.java b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageType.java
new file mode 100644
index 000000000..75ffb8ef5
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageType.java
@@ -0,0 +1,21 @@
+
+package eu.dnetlib.dhp.message;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+import org.apache.commons.lang3.StringUtils;
+
+public enum MessageType implements Serializable {
+
+ ONGOING, REPORT;
+
+ public MessageType from(String value) {
+ return Optional
+ .ofNullable(value)
+ .map(StringUtils::upperCase)
+ .map(MessageType::valueOf)
+ .orElseThrow(() -> new IllegalArgumentException("unknown message type: " + value));
+ }
+
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java b/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java
deleted file mode 100644
index ce65e710f..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java
+++ /dev/null
@@ -1,121 +0,0 @@
-
-package eu.dnetlib.dhp.model.mdstore;
-
-import java.io.Serializable;
-
-import eu.dnetlib.dhp.utils.DHPUtils;
-
-/** This class models a record inside the new Metadata store collection on HDFS * */
-public class MetadataRecord implements Serializable {
-
- /** The D-Net Identifier associated to the record */
- private String id;
-
- /** The original Identifier of the record */
- private String originalId;
-
- /** The encoding of the record, should be JSON or XML */
- private String encoding;
-
- /**
- * The information about the provenance of the record see @{@link Provenance} for the model of this information
- */
- private Provenance provenance;
-
- /** The content of the metadata */
- private String body;
-
- /** the date when the record has been stored */
- private long dateOfCollection;
-
- /** the date when the record has been stored */
- private long dateOfTransformation;
-
- public MetadataRecord() {
- this.dateOfCollection = System.currentTimeMillis();
- }
-
- public MetadataRecord(
- String originalId,
- String encoding,
- Provenance provenance,
- String body,
- long dateOfCollection) {
-
- this.originalId = originalId;
- this.encoding = encoding;
- this.provenance = provenance;
- this.body = body;
- this.dateOfCollection = dateOfCollection;
- this.id = DHPUtils.generateIdentifier(originalId, this.provenance.getNsPrefix());
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getOriginalId() {
- return originalId;
- }
-
- public void setOriginalId(String originalId) {
- this.originalId = originalId;
- }
-
- public String getEncoding() {
- return encoding;
- }
-
- public void setEncoding(String encoding) {
- this.encoding = encoding;
- }
-
- public Provenance getProvenance() {
- return provenance;
- }
-
- public void setProvenance(Provenance provenance) {
- this.provenance = provenance;
- }
-
- public String getBody() {
- return body;
- }
-
- public void setBody(String body) {
- this.body = body;
- }
-
- public long getDateOfCollection() {
- return dateOfCollection;
- }
-
- public void setDateOfCollection(long dateOfCollection) {
- this.dateOfCollection = dateOfCollection;
- }
-
- public long getDateOfTransformation() {
- return dateOfTransformation;
- }
-
- public void setDateOfTransformation(long dateOfTransformation) {
- this.dateOfTransformation = dateOfTransformation;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof MetadataRecord)) {
- return false;
- }
- return ((MetadataRecord) o).getId().equalsIgnoreCase(id);
- }
-
- @Override
- public int hashCode() {
- return id.hashCode();
- }
-}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/Provenance.java b/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/Provenance.java
deleted file mode 100644
index 556535022..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/Provenance.java
+++ /dev/null
@@ -1,52 +0,0 @@
-
-package eu.dnetlib.dhp.model.mdstore;
-
-import java.io.Serializable;
-
-/**
- * @author Sandro La Bruzzo
- *
- * Provenace class models the provenance of the record in the metadataStore It contains the identifier and the
- * name of the datasource that gives the record
- */
-public class Provenance implements Serializable {
-
- private String datasourceId;
-
- private String datasourceName;
-
- private String nsPrefix;
-
- public Provenance() {
- }
-
- public Provenance(String datasourceId, String datasourceName, String nsPrefix) {
- this.datasourceId = datasourceId;
- this.datasourceName = datasourceName;
- this.nsPrefix = nsPrefix;
- }
-
- public String getDatasourceId() {
- return datasourceId;
- }
-
- public void setDatasourceId(String datasourceId) {
- this.datasourceId = datasourceId;
- }
-
- public String getDatasourceName() {
- return datasourceName;
- }
-
- public void setDatasourceName(String datasourceName) {
- this.datasourceName = datasourceName;
- }
-
- public String getNsPrefix() {
- return nsPrefix;
- }
-
- public void setNsPrefix(String nsPrefix) {
- this.nsPrefix = nsPrefix;
- }
-}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java
index 17482c019..aea046203 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java
@@ -18,6 +18,9 @@ public class AuthorMerger {
private static final Double THRESHOLD = 0.95;
+ private AuthorMerger() {
+ }
+
public static List merge(List> authors) {
authors.sort((o1, o2) -> -Integer.compare(countAuthorsPids(o1), countAuthorsPids(o2)));
@@ -32,44 +35,54 @@ public class AuthorMerger {
}
- public static List mergeAuthor(final List a, final List b) {
+ public static List mergeAuthor(final List a, final List b, Double threshold) {
int pa = countAuthorsPids(a);
int pb = countAuthorsPids(b);
- List base, enrich;
+ List base;
+ List enrich;
int sa = authorsSize(a);
int sb = authorsSize(b);
- if (pa == pb) {
- base = sa > sb ? a : b;
- enrich = sa > sb ? b : a;
- } else {
+ if (sa == sb) {
base = pa > pb ? a : b;
enrich = pa > pb ? b : a;
+ } else {
+ base = sa > sb ? a : b;
+ enrich = sa > sb ? b : a;
}
- enrichPidFromList(base, enrich);
+ enrichPidFromList(base, enrich, threshold);
return base;
}
- private static void enrichPidFromList(List base, List enrich) {
+ public static List mergeAuthor(final List a, final List b) {
+ return mergeAuthor(a, b, THRESHOLD);
+ }
+
+ private static void enrichPidFromList(List base, List enrich, Double threshold) {
if (base == null || enrich == null)
return;
+
+ // (if an Author has more than 1 pid, it appears 2 times in the list)
final Map basePidAuthorMap = base
.stream()
- .filter(a -> a.getPid() != null && a.getPid().size() > 0)
+ .filter(a -> a.getPid() != null && !a.getPid().isEmpty())
.flatMap(
a -> a
.getPid()
.stream()
+ .filter(Objects::nonNull)
.map(p -> new Tuple2<>(pidToComparableString(p), a)))
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
+ // (list of pid that are missing in the other list)
final List> pidToEnrich = enrich
.stream()
- .filter(a -> a.getPid() != null && a.getPid().size() > 0)
+ .filter(a -> a.getPid() != null && !a.getPid().isEmpty())
.flatMap(
a -> a
.getPid()
.stream()
+ .filter(Objects::nonNull)
.filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p)))
.map(p -> new Tuple2<>(p, a)))
.collect(Collectors.toList());
@@ -83,10 +96,10 @@ public class AuthorMerger {
.max(Comparator.comparing(Tuple2::_1));
if (simAuthor.isPresent()) {
- double th = THRESHOLD;
+ double th = threshold;
// increase the threshold if the surname is too short
if (simAuthor.get()._2().getSurname() != null
- && simAuthor.get()._2().getSurname().length() <= 3)
+ && simAuthor.get()._2().getSurname().length() <= 3 && threshold > 0.0)
th = 0.99;
if (simAuthor.get()._1() > th) {
@@ -107,9 +120,9 @@ public class AuthorMerger {
}
public static String pidToComparableString(StructuredProperty pid) {
- return (pid.getQualifier() != null
- ? pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : ""
- : "")
+ final String classid = pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase()
+ : "";
+ return (pid.getQualifier() != null ? classid : "")
+ (pid.getValue() != null ? pid.getValue().toLowerCase() : "");
}
@@ -142,7 +155,7 @@ public class AuthorMerger {
}
private static boolean hasPid(Author a) {
- if (a == null || a.getPid() == null || a.getPid().size() == 0)
+ if (a == null || a.getPid() == null || a.getPid().isEmpty())
return false;
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
}
@@ -151,12 +164,15 @@ public class AuthorMerger {
if (StringUtils.isNotBlank(author.getSurname())) {
return new Person(author.getSurname() + ", " + author.getName(), false);
} else {
- return new Person(author.getFullname(), false);
+ if (StringUtils.isNotBlank(author.getFullname()))
+ return new Person(author.getFullname(), false);
+ else
+ return new Person("", false);
}
}
private static String normalize(final String s) {
- return nfd(s)
+ String[] normalized = nfd(s)
.toLowerCase()
// do not compact the regexes in a single expression, would cause StackOverflowError
// in case
@@ -166,7 +182,12 @@ public class AuthorMerger {
.replaceAll("(\\p{Punct})+", " ")
.replaceAll("(\\d)+", " ")
.replaceAll("(\\n)+", " ")
- .trim();
+ .trim()
+ .split(" ");
+
+ Arrays.sort(normalized);
+
+ return String.join(" ", normalized);
}
private static String nfd(final String s) {
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java
new file mode 100644
index 000000000..3f65d754f
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java
@@ -0,0 +1,97 @@
+
+package eu.dnetlib.dhp.oa.merge;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+import eu.dnetlib.dhp.schema.oaf.OafEntity;
+
+public class DispatchEntitiesSparkJob {
+
+ private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ Objects
+ .requireNonNull(
+ DispatchEntitiesSparkJob.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json")));
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ String inputPath = parser.get("inputPath");
+ log.info("inputPath: {}", inputPath);
+
+ String outputPath = parser.get("outputPath");
+ log.info("outputPath: {}", outputPath);
+
+ String graphTableClassName = parser.get("graphTableClassName");
+ log.info("graphTableClassName: {}", graphTableClassName);
+
+ @SuppressWarnings("unchecked")
+ Class extends OafEntity> entityClazz = (Class extends OafEntity>) Class.forName(graphTableClassName);
+
+ SparkConf conf = new SparkConf();
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
+ dispatchEntities(spark, inputPath, entityClazz, outputPath);
+ });
+ }
+
+ private static void dispatchEntities(
+ SparkSession spark,
+ String inputPath,
+ Class clazz,
+ String outputPath) {
+
+ spark
+ .read()
+ .textFile(inputPath)
+ .filter((FilterFunction) s -> isEntityType(s, clazz))
+ .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING())
+ .map(
+ (MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz),
+ Encoders.bean(clazz))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(outputPath);
+ }
+
+ private static boolean isEntityType(final String s, final Class clazz) {
+ return StringUtils.substringBefore(s, "|").equals(clazz.getName());
+ }
+
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java
new file mode 100644
index 000000000..e652bd5b6
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java
@@ -0,0 +1,203 @@
+
+package eu.dnetlib.dhp.oa.merge;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.expressions.Aggregator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.Option;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.*;
+import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
+import scala.Tuple2;
+
+/**
+ * Groups the graph content by entity identifier to ensure ID uniqueness
+ */
+public class GroupEntitiesSparkJob {
+
+ private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
+
+ private static final String ID_JPATH = "$.id";
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ GroupEntitiesSparkJob.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json"));
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ String graphInputPath = parser.get("graphInputPath");
+ log.info("graphInputPath: {}", graphInputPath);
+
+ String outputPath = parser.get("outputPath");
+ log.info("outputPath: {}", outputPath);
+
+ SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(ModelSupport.getOafModelClasses());
+
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
+ groupEntities(spark, graphInputPath, outputPath);
+ });
+ }
+
+ private static void groupEntities(
+ SparkSession spark,
+ String inputPath,
+ String outputPath) {
+
+ final TypedColumn aggregator = new GroupingAggregator().toColumn();
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ spark
+ .read()
+ .textFile(toSeq(listEntityPaths(inputPath, sc)))
+ .map((MapFunction) GroupEntitiesSparkJob::parseOaf, Encoders.kryo(OafEntity.class))
+ .filter((FilterFunction) e -> StringUtils.isNotBlank(ModelSupport.idFn().apply(e)))
+ .groupByKey((MapFunction) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING())
+ .agg(aggregator)
+ .map(
+ (MapFunction, String>) t -> t._2().getClass().getName() +
+ "|" + OBJECT_MAPPER.writeValueAsString(t._2()),
+ Encoders.STRING())
+ .write()
+ .option("compression", "gzip")
+ .mode(SaveMode.Overwrite)
+ .text(outputPath);
+ }
+
+ public static class GroupingAggregator extends Aggregator {
+
+ @Override
+ public OafEntity zero() {
+ return null;
+ }
+
+ @Override
+ public OafEntity reduce(OafEntity b, OafEntity a) {
+ return mergeAndGet(b, a);
+ }
+
+ private OafEntity mergeAndGet(OafEntity b, OafEntity a) {
+ if (Objects.nonNull(a) && Objects.nonNull(b)) {
+ return OafMapperUtils.mergeEntities(b, a);
+ }
+ return Objects.isNull(a) ? b : a;
+ }
+
+ @Override
+ public OafEntity merge(OafEntity b, OafEntity a) {
+ return mergeAndGet(b, a);
+ }
+
+ @Override
+ public OafEntity finish(OafEntity j) {
+ return j;
+ }
+
+ @Override
+ public Encoder bufferEncoder() {
+ return Encoders.kryo(OafEntity.class);
+ }
+
+ @Override
+ public Encoder outputEncoder() {
+ return Encoders.kryo(OafEntity.class);
+ }
+
+ }
+
+ private static OafEntity parseOaf(String s) {
+
+ DocumentContext dc = JsonPath
+ .parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
+ final String id = dc.read(ID_JPATH);
+ if (StringUtils.isNotBlank(id)) {
+
+ String prefix = StringUtils.substringBefore(id, "|");
+ switch (prefix) {
+ case "10":
+ return parse(s, Datasource.class);
+ case "20":
+ return parse(s, Organization.class);
+ case "40":
+ return parse(s, Project.class);
+ case "50":
+ String resultType = dc.read("$.resulttype.classid");
+ switch (resultType) {
+ case "publication":
+ return parse(s, Publication.class);
+ case "dataset":
+ return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class);
+ case "software":
+ return parse(s, Software.class);
+ case "other":
+ return parse(s, OtherResearchProduct.class);
+ default:
+ throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType));
+ }
+ default:
+ throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix));
+ }
+ } else {
+ throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s));
+ }
+ }
+
+ private static OafEntity parse(String s, Class clazz) {
+ try {
+ return OBJECT_MAPPER.readValue(s, clazz);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ private static List listEntityPaths(String inputPath, JavaSparkContext sc) {
+ return HdfsSupport
+ .listFiles(inputPath, sc.hadoopConfiguration())
+ .stream()
+ .filter(f -> !f.toLowerCase().contains("relation"))
+ .collect(Collectors.toList());
+ }
+
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdUtilityParser.java b/dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdUtilityParser.java
index 9ac0a0bf7..fd4c0191a 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdUtilityParser.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdUtilityParser.java
@@ -12,6 +12,9 @@ import com.ximpleware.VTDNav;
/** Created by sandro on 9/29/16. */
public class VtdUtilityParser {
+ private VtdUtilityParser() {
+ }
+
public static List getTextValuesWithAttributes(
final AutoPilot ap, final VTDNav vn, final String xpath, final List attributes)
throws VtdException {
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/CleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/CleaningFunctions.java
deleted file mode 100644
index 12fbcc490..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/CleaningFunctions.java
+++ /dev/null
@@ -1,238 +0,0 @@
-
-package eu.dnetlib.dhp.schema.oaf;
-
-import java.util.LinkedHashMap;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang3.StringUtils;
-
-import com.clearspring.analytics.util.Lists;
-
-import eu.dnetlib.dhp.schema.common.ModelConstants;
-
-public class CleaningFunctions {
-
- public static final String DOI_URL_PREFIX_REGEX = "(^http(s?):\\/\\/)(((dx\\.)?doi\\.org)|(handle\\.test\\.datacite\\.org))\\/";
- public static final String ORCID_PREFIX_REGEX = "^http(s?):\\/\\/orcid\\.org\\/";
- public static final String NONE = "none";
-
- public static T fixVocabularyNames(T value) {
- if (value instanceof Datasource) {
- // nothing to clean here
- } else if (value instanceof Project) {
- // nothing to clean here
- } else if (value instanceof Organization) {
- Organization o = (Organization) value;
- if (Objects.nonNull(o.getCountry())) {
- fixVocabName(o.getCountry(), ModelConstants.DNET_COUNTRY_TYPE);
- }
- } else if (value instanceof Relation) {
- // nothing to clean here
- } else if (value instanceof Result) {
-
- Result r = (Result) value;
-
- fixVocabName(r.getLanguage(), ModelConstants.DNET_LANGUAGES);
- fixVocabName(r.getResourcetype(), ModelConstants.DNET_DATA_CITE_RESOURCE);
- fixVocabName(r.getBestaccessright(), ModelConstants.DNET_ACCESS_MODES);
-
- if (Objects.nonNull(r.getSubject())) {
- r.getSubject().forEach(s -> fixVocabName(s.getQualifier(), ModelConstants.DNET_SUBJECT_TYPOLOGIES));
- }
- if (Objects.nonNull(r.getInstance())) {
- for (Instance i : r.getInstance()) {
- fixVocabName(i.getAccessright(), ModelConstants.DNET_ACCESS_MODES);
- fixVocabName(i.getRefereed(), ModelConstants.DNET_REVIEW_LEVELS);
- }
- }
- if (Objects.nonNull(r.getAuthor())) {
- r.getAuthor().forEach(a -> {
- if (Objects.nonNull(a.getPid())) {
- a.getPid().forEach(p -> {
- fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES);
- });
- }
- });
- }
- if (value instanceof Publication) {
-
- } else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
-
- } else if (value instanceof OtherResearchProduct) {
-
- } else if (value instanceof Software) {
-
- }
- }
-
- return value;
- }
-
- public static T fixDefaults(T value) {
- if (value instanceof Datasource) {
- // nothing to clean here
- } else if (value instanceof Project) {
- // nothing to clean here
- } else if (value instanceof Organization) {
- Organization o = (Organization) value;
- if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) {
- o.setCountry(qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_COUNTRY_TYPE));
- }
- } else if (value instanceof Relation) {
- // nothing to clean here
- } else if (value instanceof Result) {
-
- Result r = (Result) value;
- if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) {
- r.setPublisher(null);
- }
- if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) {
- r
- .setLanguage(
- qualifier("und", "Undetermined", ModelConstants.DNET_LANGUAGES));
- }
- if (Objects.nonNull(r.getSubject())) {
- r
- .setSubject(
- r
- .getSubject()
- .stream()
- .filter(Objects::nonNull)
- .filter(sp -> StringUtils.isNotBlank(sp.getValue()))
- .filter(sp -> Objects.nonNull(sp.getQualifier()))
- .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
- .collect(Collectors.toList()));
- }
- if (Objects.nonNull(r.getPid())) {
- r
- .setPid(
- r
- .getPid()
- .stream()
- .filter(Objects::nonNull)
- .filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue())))
- .filter(sp -> NONE.equalsIgnoreCase(sp.getValue()))
- .filter(sp -> Objects.nonNull(sp.getQualifier()))
- .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
- .map(CleaningFunctions::normalizePidValue)
- .collect(Collectors.toList()));
- }
- if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) {
- r
- .setResourcetype(
- qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE));
- }
- if (Objects.nonNull(r.getInstance())) {
- for (Instance i : r.getInstance()) {
- if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) {
- i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
- }
- if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) {
- i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY);
- }
- if (Objects.isNull(i.getRefereed())) {
- i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS));
- }
- }
- }
- if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
- Qualifier bestaccessrights = OafMapperUtils.createBestAccessRights(r.getInstance());
- if (Objects.isNull(bestaccessrights)) {
- r
- .setBestaccessright(
- qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
- } else {
- r.setBestaccessright(bestaccessrights);
- }
- }
- if (Objects.nonNull(r.getAuthor())) {
- boolean nullRank = r
- .getAuthor()
- .stream()
- .anyMatch(a -> Objects.isNull(a.getRank()));
- if (nullRank) {
- int i = 1;
- for (Author author : r.getAuthor()) {
- author.setRank(i++);
- }
- }
- for (Author a : r.getAuthor()) {
- if (Objects.isNull(a.getPid())) {
- a.setPid(Lists.newArrayList());
- } else {
- a
- .setPid(
- a
- .getPid()
- .stream()
- .filter(p -> Objects.nonNull(p.getQualifier()))
- .filter(p -> StringUtils.isNotBlank(p.getValue()))
- .map(p -> {
- p.setValue(p.getValue().trim().replaceAll(ORCID_PREFIX_REGEX, ""));
- return p;
- })
- .collect(
- Collectors
- .toMap(
- StructuredProperty::getValue, Function.identity(), (p1, p2) -> p1,
- LinkedHashMap::new))
- .values()
- .stream()
- .collect(Collectors.toList()));
- }
- }
-
- }
- if (value instanceof Publication) {
-
- } else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
-
- } else if (value instanceof OtherResearchProduct) {
-
- } else if (value instanceof Software) {
-
- }
- }
-
- return value;
- }
-
- // HELPERS
-
- private static void fixVocabName(Qualifier q, String vocabularyName) {
- if (Objects.nonNull(q) && StringUtils.isBlank(q.getSchemeid())) {
- q.setSchemeid(vocabularyName);
- q.setSchemename(vocabularyName);
- }
- }
-
- private static Qualifier qualifier(String classid, String classname, String scheme) {
- return OafMapperUtils
- .qualifier(
- classid, classname, scheme, scheme);
- }
-
- /**
- * Utility method that normalises PID values on a per-type basis.
- * @param pid the PID whose value will be normalised.
- * @return the PID containing the normalised value.
- */
- public static StructuredProperty normalizePidValue(StructuredProperty pid) {
- String value = Optional
- .ofNullable(pid.getValue())
- .map(String::trim)
- .orElseThrow(() -> new IllegalArgumentException("PID value cannot be empty"));
- switch (pid.getQualifier().getClassid()) {
-
- // TODO add cleaning for more PID types as needed
- case "doi":
- pid.setValue(value.toLowerCase().replaceAll(DOI_URL_PREFIX_REGEX, ""));
- break;
- }
- return pid;
- }
-
-}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ModelHardLimits.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ModelHardLimits.java
deleted file mode 100644
index 16fdc3760..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ModelHardLimits.java
+++ /dev/null
@@ -1,14 +0,0 @@
-
-package eu.dnetlib.dhp.schema.oaf;
-
-public class ModelHardLimits {
-
- public static final int MAX_EXTERNAL_ENTITIES = 50;
- public static final int MAX_AUTHORS = 200;
- public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
- public static final int MAX_TITLE_LENGTH = 5000;
- public static final int MAX_TITLES = 10;
- public static final int MAX_ABSTRACT_LENGTH = 150000;
- public static final int MAX_INSTANCES = 10;
-
-}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java
deleted file mode 100644
index f079c55af..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java
+++ /dev/null
@@ -1,296 +0,0 @@
-
-package eu.dnetlib.dhp.schema.oaf;
-
-import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
-import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_ACCESS_MODES;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang3.StringUtils;
-
-import eu.dnetlib.dhp.schema.common.LicenseComparator;
-import eu.dnetlib.dhp.utils.DHPUtils;
-
-public class OafMapperUtils {
-
- public static KeyValue keyValue(final String k, final String v) {
- final KeyValue kv = new KeyValue();
- kv.setKey(k);
- kv.setValue(v);
- return kv;
- }
-
- public static List listKeyValues(final String... s) {
- if (s.length % 2 > 0) {
- throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)");
- }
-
- final List list = new ArrayList<>();
- for (int i = 0; i < s.length; i += 2) {
- list.add(keyValue(s[i], s[i + 1]));
- }
- return list;
- }
-
- public static Field field(final T value, final DataInfo info) {
- if (value == null || StringUtils.isBlank(value.toString())) {
- return null;
- }
-
- final Field field = new Field<>();
- field.setValue(value);
- field.setDataInfo(info);
- return field;
- }
-
- public static List> listFields(final DataInfo info, final String... values) {
- return Arrays
- .stream(values)
- .map(v -> field(v, info))
- .filter(Objects::nonNull)
- .filter(distinctByKey(f -> f.getValue()))
- .collect(Collectors.toList());
- }
-
- public static List> listFields(final DataInfo info, final List values) {
- return values
- .stream()
- .map(v -> field(v, info))
- .filter(Objects::nonNull)
- .filter(distinctByKey(f -> f.getValue()))
- .collect(Collectors.toList());
- }
-
- public static Qualifier unknown(final String schemeid, final String schemename) {
- return qualifier("UNKNOWN", "Unknown", schemeid, schemename);
- }
-
- public static Qualifier qualifier(
- final String classid,
- final String classname,
- final String schemeid,
- final String schemename) {
- final Qualifier q = new Qualifier();
- q.setClassid(classid);
- q.setClassname(classname);
- q.setSchemeid(schemeid);
- q.setSchemename(schemename);
- return q;
- }
-
- public static StructuredProperty structuredProperty(
- final String value,
- final String classid,
- final String classname,
- final String schemeid,
- final String schemename,
- final DataInfo dataInfo) {
-
- return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo);
- }
-
- public static StructuredProperty structuredProperty(
- final String value,
- final Qualifier qualifier,
- final DataInfo dataInfo) {
- if (value == null) {
- return null;
- }
- final StructuredProperty sp = new StructuredProperty();
- sp.setValue(value);
- sp.setQualifier(qualifier);
- sp.setDataInfo(dataInfo);
- return sp;
- }
-
- public static ExtraInfo extraInfo(
- final String name,
- final String value,
- final String typology,
- final String provenance,
- final String trust) {
- final ExtraInfo info = new ExtraInfo();
- info.setName(name);
- info.setValue(value);
- info.setTypology(typology);
- info.setProvenance(provenance);
- info.setTrust(trust);
- return info;
- }
-
- public static OAIProvenance oaiIProvenance(
- final String identifier,
- final String baseURL,
- final String metadataNamespace,
- final Boolean altered,
- final String datestamp,
- final String harvestDate) {
-
- final OriginDescription desc = new OriginDescription();
- desc.setIdentifier(identifier);
- desc.setBaseURL(baseURL);
- desc.setMetadataNamespace(metadataNamespace);
- desc.setAltered(altered);
- desc.setDatestamp(datestamp);
- desc.setHarvestDate(harvestDate);
-
- final OAIProvenance p = new OAIProvenance();
- p.setOriginDescription(desc);
-
- return p;
- }
-
- public static Journal journal(
- final String name,
- final String issnPrinted,
- final String issnOnline,
- final String issnLinking,
- final DataInfo dataInfo) {
- return journal(
- name,
- issnPrinted,
- issnOnline,
- issnLinking,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- dataInfo);
- }
-
- public static Journal journal(
- final String name,
- final String issnPrinted,
- final String issnOnline,
- final String issnLinking,
- final String ep,
- final String iss,
- final String sp,
- final String vol,
- final String edition,
- final String conferenceplace,
- final String conferencedate,
- final DataInfo dataInfo) {
-
- if (StringUtils.isNotBlank(name)
- || StringUtils.isNotBlank(issnPrinted)
- || StringUtils.isNotBlank(issnOnline)
- || StringUtils.isNotBlank(issnLinking)) {
- final Journal j = new Journal();
- j.setName(name);
- j.setIssnPrinted(issnPrinted);
- j.setIssnOnline(issnOnline);
- j.setIssnLinking(issnLinking);
- j.setEp(ep);
- j.setIss(iss);
- j.setSp(sp);
- j.setVol(vol);
- j.setEdition(edition);
- j.setConferenceplace(conferenceplace);
- j.setConferencedate(conferencedate);
- j.setDataInfo(dataInfo);
- return j;
- } else {
- return null;
- }
- }
-
- public static DataInfo dataInfo(
- final Boolean deletedbyinference,
- final String inferenceprovenance,
- final Boolean inferred,
- final Boolean invisible,
- final Qualifier provenanceaction,
- final String trust) {
- final DataInfo d = new DataInfo();
- d.setDeletedbyinference(deletedbyinference);
- d.setInferenceprovenance(inferenceprovenance);
- d.setInferred(inferred);
- d.setInvisible(invisible);
- d.setProvenanceaction(provenanceaction);
- d.setTrust(trust);
- return d;
- }
-
- public static String createOpenaireId(
- final int prefix,
- final String originalId,
- final boolean to_md5) {
- if (StringUtils.isBlank(originalId)) {
- return null;
- } else if (to_md5) {
- final String nsPrefix = StringUtils.substringBefore(originalId, "::");
- final String rest = StringUtils.substringAfter(originalId, "::");
- return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest));
- } else {
- return String.format("%s|%s", prefix, originalId);
- }
- }
-
- public static String createOpenaireId(
- final String type,
- final String originalId,
- final boolean to_md5) {
- switch (type) {
- case "datasource":
- return createOpenaireId(10, originalId, to_md5);
- case "organization":
- return createOpenaireId(20, originalId, to_md5);
- case "person":
- return createOpenaireId(30, originalId, to_md5);
- case "project":
- return createOpenaireId(40, originalId, to_md5);
- default:
- return createOpenaireId(50, originalId, to_md5);
- }
- }
-
- public static String asString(final Object o) {
- return o == null ? "" : o.toString();
- }
-
- public static Predicate distinctByKey(
- final Function super T, ?> keyExtractor) {
- final Map