report) {
+ sendMessage(new Message(MessageType.REPORT, workflowId, report));
+ }
+
+ private Message createOngoingMessage(final Long current, final Long total) {
+ final Message m = new Message(MessageType.ONGOING, workflowId);
+ m.getBody().put(Message.CURRENT_PARAM, current.toString());
+ if (total != null) {
+ m.getBody().put(Message.TOTAL_PARAM, total.toString());
+ }
+ return m;
+ }
+
+ private void _sendMessage(final Message message) {
+ try {
+ final String json = objectMapper.writeValueAsString(message);
+
+ final HttpPut req = new HttpPut(dnetMessageEndpoint);
+ req.setEntity(new StringEntity(json, ContentType.APPLICATION_JSON));
+
+ final RequestConfig requestConfig = RequestConfig
+ .custom()
+ .setConnectTimeout(CONNTECTION_TIMEOUT_MS)
+ .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
+ .setSocketTimeout(SOCKET_TIMEOUT_MS)
+ .build();
+
+ try (final CloseableHttpClient client = HttpClients
+ .custom()
+ .setDefaultRequestConfig(requestConfig)
+ .build();
+ final CloseableHttpResponse response = client.execute(req)) {
+ log.debug("Sent Message to " + dnetMessageEndpoint);
+ log.debug("MESSAGE:" + message);
+ } catch (final Throwable e) {
+ log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e);
+ }
+ } catch (final JsonProcessingException e) {
+ log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e);
+ }
+ }
+
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageType.java b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageType.java
new file mode 100644
index 000000000..75ffb8ef5
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageType.java
@@ -0,0 +1,21 @@
+
+package eu.dnetlib.dhp.message;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+import org.apache.commons.lang3.StringUtils;
+
+public enum MessageType implements Serializable {
+
+ ONGOING, REPORT;
+
+ public MessageType from(String value) {
+ return Optional
+ .ofNullable(value)
+ .map(StringUtils::upperCase)
+ .map(MessageType::valueOf)
+ .orElseThrow(() -> new IllegalArgumentException("unknown message type: " + value));
+ }
+
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java b/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java
deleted file mode 100644
index ce65e710f..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java
+++ /dev/null
@@ -1,121 +0,0 @@
-
-package eu.dnetlib.dhp.model.mdstore;
-
-import java.io.Serializable;
-
-import eu.dnetlib.dhp.utils.DHPUtils;
-
-/** This class models a record inside the new Metadata store collection on HDFS * */
-public class MetadataRecord implements Serializable {
-
- /** The D-Net Identifier associated to the record */
- private String id;
-
- /** The original Identifier of the record */
- private String originalId;
-
- /** The encoding of the record, should be JSON or XML */
- private String encoding;
-
- /**
- * The information about the provenance of the record see @{@link Provenance} for the model of this information
- */
- private Provenance provenance;
-
- /** The content of the metadata */
- private String body;
-
- /** the date when the record has been stored */
- private long dateOfCollection;
-
- /** the date when the record has been stored */
- private long dateOfTransformation;
-
- public MetadataRecord() {
- this.dateOfCollection = System.currentTimeMillis();
- }
-
- public MetadataRecord(
- String originalId,
- String encoding,
- Provenance provenance,
- String body,
- long dateOfCollection) {
-
- this.originalId = originalId;
- this.encoding = encoding;
- this.provenance = provenance;
- this.body = body;
- this.dateOfCollection = dateOfCollection;
- this.id = DHPUtils.generateIdentifier(originalId, this.provenance.getNsPrefix());
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getOriginalId() {
- return originalId;
- }
-
- public void setOriginalId(String originalId) {
- this.originalId = originalId;
- }
-
- public String getEncoding() {
- return encoding;
- }
-
- public void setEncoding(String encoding) {
- this.encoding = encoding;
- }
-
- public Provenance getProvenance() {
- return provenance;
- }
-
- public void setProvenance(Provenance provenance) {
- this.provenance = provenance;
- }
-
- public String getBody() {
- return body;
- }
-
- public void setBody(String body) {
- this.body = body;
- }
-
- public long getDateOfCollection() {
- return dateOfCollection;
- }
-
- public void setDateOfCollection(long dateOfCollection) {
- this.dateOfCollection = dateOfCollection;
- }
-
- public long getDateOfTransformation() {
- return dateOfTransformation;
- }
-
- public void setDateOfTransformation(long dateOfTransformation) {
- this.dateOfTransformation = dateOfTransformation;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof MetadataRecord)) {
- return false;
- }
- return ((MetadataRecord) o).getId().equalsIgnoreCase(id);
- }
-
- @Override
- public int hashCode() {
- return id.hashCode();
- }
-}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/Provenance.java b/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/Provenance.java
deleted file mode 100644
index 556535022..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/Provenance.java
+++ /dev/null
@@ -1,52 +0,0 @@
-
-package eu.dnetlib.dhp.model.mdstore;
-
-import java.io.Serializable;
-
-/**
- * @author Sandro La Bruzzo
- *
- * Provenace class models the provenance of the record in the metadataStore It contains the identifier and the
- * name of the datasource that gives the record
- */
-public class Provenance implements Serializable {
-
- private String datasourceId;
-
- private String datasourceName;
-
- private String nsPrefix;
-
- public Provenance() {
- }
-
- public Provenance(String datasourceId, String datasourceName, String nsPrefix) {
- this.datasourceId = datasourceId;
- this.datasourceName = datasourceName;
- this.nsPrefix = nsPrefix;
- }
-
- public String getDatasourceId() {
- return datasourceId;
- }
-
- public void setDatasourceId(String datasourceId) {
- this.datasourceId = datasourceId;
- }
-
- public String getDatasourceName() {
- return datasourceName;
- }
-
- public void setDatasourceName(String datasourceName) {
- this.datasourceName = datasourceName;
- }
-
- public String getNsPrefix() {
- return nsPrefix;
- }
-
- public void setNsPrefix(String nsPrefix) {
- this.nsPrefix = nsPrefix;
- }
-}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java
index 17482c019..0461c9353 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java
@@ -18,6 +18,9 @@ public class AuthorMerger {
private static final Double THRESHOLD = 0.95;
+ private AuthorMerger() {
+ }
+
public static List merge(List> authors) {
authors.sort((o1, o2) -> -Integer.compare(countAuthorsPids(o1), countAuthorsPids(o2)));
@@ -32,44 +35,54 @@ public class AuthorMerger {
}
- public static List mergeAuthor(final List a, final List b) {
+ public static List mergeAuthor(final List a, final List b, Double threshold) {
int pa = countAuthorsPids(a);
int pb = countAuthorsPids(b);
- List base, enrich;
+ List base;
+ List enrich;
int sa = authorsSize(a);
int sb = authorsSize(b);
- if (pa == pb) {
- base = sa > sb ? a : b;
- enrich = sa > sb ? b : a;
- } else {
+ if (sa == sb) {
base = pa > pb ? a : b;
enrich = pa > pb ? b : a;
+ } else {
+ base = sa > sb ? a : b;
+ enrich = sa > sb ? b : a;
}
- enrichPidFromList(base, enrich);
+ enrichPidFromList(base, enrich, threshold);
return base;
}
- private static void enrichPidFromList(List base, List enrich) {
+ public static List mergeAuthor(final List a, final List b) {
+ return mergeAuthor(a, b, THRESHOLD);
+ }
+
+ private static void enrichPidFromList(List base, List enrich, Double threshold) {
if (base == null || enrich == null)
return;
+
+ // (if an Author has more than 1 pid, it appears 2 times in the list)
final Map basePidAuthorMap = base
.stream()
- .filter(a -> a.getPid() != null && a.getPid().size() > 0)
+ .filter(a -> a.getPid() != null && !a.getPid().isEmpty())
.flatMap(
a -> a
.getPid()
.stream()
+ .filter(Objects::nonNull)
.map(p -> new Tuple2<>(pidToComparableString(p), a)))
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
+ // (list of pid that are missing in the other list)
final List> pidToEnrich = enrich
.stream()
- .filter(a -> a.getPid() != null && a.getPid().size() > 0)
+ .filter(a -> a.getPid() != null && !a.getPid().isEmpty())
.flatMap(
a -> a
.getPid()
.stream()
+ .filter(Objects::nonNull)
.filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p)))
.map(p -> new Tuple2<>(p, a)))
.collect(Collectors.toList());
@@ -83,10 +96,10 @@ public class AuthorMerger {
.max(Comparator.comparing(Tuple2::_1));
if (simAuthor.isPresent()) {
- double th = THRESHOLD;
+ double th = threshold;
// increase the threshold if the surname is too short
if (simAuthor.get()._2().getSurname() != null
- && simAuthor.get()._2().getSurname().length() <= 3)
+ && simAuthor.get()._2().getSurname().length() <= 3 && threshold > 0.0)
th = 0.99;
if (simAuthor.get()._1() > th) {
@@ -106,10 +119,135 @@ public class AuthorMerger {
});
}
+ public static String normalizeFullName(final String fullname) {
+ return nfd(fullname)
+ .toLowerCase()
+ // do not compact the regexes in a single expression, would cause StackOverflowError
+ // in case
+ // of large input strings
+ .replaceAll("(\\W)+", " ")
+ .replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ")
+ .replaceAll("(\\p{Punct})+", " ")
+ .replaceAll("(\\d)+", " ")
+ .replaceAll("(\\n)+", " ")
+
+ .trim();
+ }
+
+ private static String authorFieldToBeCompared(Author author) {
+ if (StringUtils.isNotBlank(author.getSurname())) {
+ return author.getSurname();
+
+ }
+ if (StringUtils.isNotBlank(author.getFullname())) {
+ return author.getFullname();
+ }
+ return null;
+ }
+
+ /**
+ * This method tries to figure out when two author are the same in the contest
+ * of ORCID enrichment
+ *
+ * @param left Author in the OAF entity
+ * @param right Author ORCID
+ * @return based on a heuristic on the names of the authors if they are the same.
+ */
+ public static boolean checkORCIDSimilarity(final Author left, final Author right) {
+ final Person pl = parse(left);
+ final Person pr = parse(right);
+
+ // If one of them didn't have a surname we verify if they have the fullName not empty
+ // and verify if the normalized version is equal
+ if (!(pl.getSurname() != null && pl.getSurname().stream().anyMatch(StringUtils::isNotBlank) &&
+ pr.getSurname() != null && pr.getSurname().stream().anyMatch(StringUtils::isNotBlank))) {
+
+ if (pl.getFullname() != null && !pl.getFullname().isEmpty() && pr.getFullname() != null
+ && !pr.getFullname().isEmpty()) {
+ return pl
+ .getFullname()
+ .stream()
+ .anyMatch(
+ fl -> pr.getFullname().stream().anyMatch(fr -> normalize(fl).equalsIgnoreCase(normalize(fr))));
+ } else {
+ return false;
+ }
+ }
+ // The Authors have one surname in common
+ if (pl.getSurname().stream().anyMatch(sl -> pr.getSurname().stream().anyMatch(sr -> sr.equalsIgnoreCase(sl)))) {
+
+ // If one of them has only a surname and is the same we can say that they are the same author
+ if ((pl.getName() == null || pl.getName().stream().allMatch(StringUtils::isBlank)) ||
+ (pr.getName() == null || pr.getName().stream().allMatch(StringUtils::isBlank)))
+ return true;
+ // The authors have the same initials of Name in common
+ if (pl
+ .getName()
+ .stream()
+ .anyMatch(
+ nl -> pr
+ .getName()
+ .stream()
+ .anyMatch(nr -> nr.equalsIgnoreCase(nl))))
+ return true;
+ }
+
+ // Sometimes we noticed that publication have author wrote in inverse order Surname, Name
+ // We verify if we have an exact match between name and surname
+ if (pl.getSurname().stream().anyMatch(sl -> pr.getName().stream().anyMatch(nr -> nr.equalsIgnoreCase(sl))) &&
+ pl.getName().stream().anyMatch(nl -> pr.getSurname().stream().anyMatch(sr -> sr.equalsIgnoreCase(nl))))
+ return true;
+ else
+ return false;
+ }
+ //
+
+ /**
+ * Method to enrich ORCID information in one list of authors based on another list
+ *
+ * @param baseAuthor the Author List in the OAF Entity
+ * @param orcidAuthor The list of ORCID Author intersected
+ * @return The Author List of the OAF Entity enriched with the orcid Author
+ */
+ public static List enrichOrcid(List baseAuthor, List orcidAuthor) {
+
+ if (baseAuthor == null || baseAuthor.isEmpty())
+ return orcidAuthor;
+
+ if (orcidAuthor == null || orcidAuthor.isEmpty())
+ return baseAuthor;
+
+ if (baseAuthor.size() == 1 && orcidAuthor.size() > 10)
+ return baseAuthor;
+
+ final List oAuthor = new ArrayList<>();
+ oAuthor.addAll(orcidAuthor);
+
+ baseAuthor.forEach(ba -> {
+ Optional aMatch = oAuthor.stream().filter(oa -> checkORCIDSimilarity(ba, oa)).findFirst();
+ if (aMatch.isPresent()) {
+ final Author sameAuthor = aMatch.get();
+ addPid(ba, sameAuthor.getPid());
+ oAuthor.remove(sameAuthor);
+ }
+ });
+ return baseAuthor;
+ }
+
+ private static void addPid(final Author a, final List pids) {
+
+ if (a.getPid() == null) {
+ a.setPid(new ArrayList<>());
+ }
+
+ a.getPid().addAll(pids);
+
+ }
+
public static String pidToComparableString(StructuredProperty pid) {
- return (pid.getQualifier() != null
- ? pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : ""
- : "")
+ final String classid = pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase()
+ : "";
+ return (pid.getQualifier() != null ? classid : "")
+ (pid.getValue() != null ? pid.getValue().toLowerCase() : "");
}
@@ -142,7 +280,7 @@ public class AuthorMerger {
}
private static boolean hasPid(Author a) {
- if (a == null || a.getPid() == null || a.getPid().size() == 0)
+ if (a == null || a.getPid() == null || a.getPid().isEmpty())
return false;
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
}
@@ -151,12 +289,15 @@ public class AuthorMerger {
if (StringUtils.isNotBlank(author.getSurname())) {
return new Person(author.getSurname() + ", " + author.getName(), false);
} else {
- return new Person(author.getFullname(), false);
+ if (StringUtils.isNotBlank(author.getFullname()))
+ return new Person(author.getFullname(), false);
+ else
+ return new Person("", false);
}
}
- private static String normalize(final String s) {
- return nfd(s)
+ public static String normalize(final String s) {
+ String[] normalized = nfd(s)
.toLowerCase()
// do not compact the regexes in a single expression, would cause StackOverflowError
// in case
@@ -166,7 +307,12 @@ public class AuthorMerger {
.replaceAll("(\\p{Punct})+", " ")
.replaceAll("(\\d)+", " ")
.replaceAll("(\\n)+", " ")
- .trim();
+ .trim()
+ .split(" ");
+
+ Arrays.sort(normalized);
+
+ return String.join(" ", normalized);
}
private static String nfd(final String s) {
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java
new file mode 100644
index 000000000..0225a5063
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java
@@ -0,0 +1,194 @@
+
+package eu.dnetlib.dhp.oa.merge;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.when;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.ReduceFunction;
+import org.apache.spark.sql.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
+import eu.dnetlib.dhp.schema.common.EntityType;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.OafEntity;
+import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
+import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import scala.Tuple2;
+
+/**
+ * Groups the graph content by entity identifier to ensure ID uniqueness
+ */
+public class GroupEntitiesSparkJob {
+ private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
+
+ private static final Encoder OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
+
+ private ArgumentApplicationParser parser;
+
+ public GroupEntitiesSparkJob(ArgumentApplicationParser parser) {
+ this.parser = parser;
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ GroupEntitiesSparkJob.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json"));
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String isLookupUrl = parser.get("isLookupUrl");
+ log.info("isLookupUrl: {}", isLookupUrl);
+
+ final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
+
+ new GroupEntitiesSparkJob(parser).run(isSparkSessionManaged, isLookupService);
+ }
+
+ public void run(Boolean isSparkSessionManaged, ISLookUpService isLookUpService)
+ throws ISLookUpException {
+
+ String graphInputPath = parser.get("graphInputPath");
+ log.info("graphInputPath: {}", graphInputPath);
+
+ String checkpointPath = parser.get("checkpointPath");
+ log.info("checkpointPath: {}", checkpointPath);
+
+ String outputPath = parser.get("outputPath");
+ log.info("outputPath: {}", outputPath);
+
+ boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible"));
+ log.info("filterInvisible: {}", filterInvisible);
+
+ SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(ModelSupport.getOafModelClasses());
+
+ final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookUpService);
+
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ HdfsSupport.remove(checkpointPath, spark.sparkContext().hadoopConfiguration());
+ groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible, vocs);
+ });
+ }
+
+ private static void groupEntities(
+ SparkSession spark,
+ String inputPath,
+ String checkpointPath,
+ String outputPath,
+ boolean filterInvisible, VocabularyGroup vocs) {
+
+ Dataset allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC);
+
+ for (Map.Entry e : ModelSupport.entityTypes.entrySet()) {
+ String entity = e.getKey().name();
+ Class extends OafEntity> entityClass = e.getValue();
+ String entityInputPath = inputPath + "/" + entity;
+
+ if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) {
+ continue;
+ }
+
+ allEntities = allEntities
+ .union(
+ ((Dataset) spark
+ .read()
+ .schema(Encoders.bean(entityClass).schema())
+ .json(entityInputPath)
+ .filter("length(id) > 0")
+ .as(Encoders.bean(entityClass)))
+ .map((MapFunction) r -> r, OAFENTITY_KRYO_ENC));
+ }
+
+ Dataset> groupedEntities = allEntities
+ .map(
+ (MapFunction) entity -> GraphCleaningFunctions
+ .applyCoarVocabularies(entity, vocs),
+ OAFENTITY_KRYO_ENC)
+ .groupByKey((MapFunction) OafEntity::getId, Encoders.STRING())
+ .reduceGroups((ReduceFunction) OafMapperUtils::mergeEntities)
+ .map(
+ (MapFunction, Tuple2>) t -> new Tuple2<>(
+ t._2().getClass().getName(), t._2()),
+ Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
+
+ // pivot on "_1" (classname of the entity)
+ // created columns containing only entities of the same class
+ for (Map.Entry e : ModelSupport.entityTypes.entrySet()) {
+ String entity = e.getKey().name();
+ Class extends OafEntity> entityClass = e.getValue();
+
+ groupedEntities = groupedEntities
+ .withColumn(
+ entity,
+ when(col("_1").equalTo(entityClass.getName()), col("_2")));
+ }
+
+ groupedEntities
+ .drop("_1", "_2")
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .save(checkpointPath);
+
+ ForkJoinPool parPool = new ForkJoinPool(ModelSupport.entityTypes.size());
+
+ ModelSupport.entityTypes
+ .entrySet()
+ .stream()
+ .map(e -> parPool.submit(() -> {
+ String entity = e.getKey().name();
+ Class extends OafEntity> entityClass = e.getValue();
+
+ spark
+ .read()
+ .load(checkpointPath)
+ .select(col(entity).as("value"))
+ .filter("value IS NOT NULL")
+ .as(OAFENTITY_KRYO_ENC)
+ .map((MapFunction) r -> r, (Encoder) Encoders.bean(entityClass))
+ .filter(filterInvisible ? "dataInfo.invisible != TRUE" : "TRUE")
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(outputPath + "/" + entity);
+ }))
+ .collect(Collectors.toList())
+ .forEach(t -> {
+ try {
+ t.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java
new file mode 100644
index 000000000..027bf0735
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oozie/RunSQLSparkJob.java
@@ -0,0 +1,77 @@
+
+package eu.dnetlib.dhp.oozie;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
+
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.apache.commons.text.StringSubstitutor;
+import org.apache.spark.SparkConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Resources;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+
+public class RunSQLSparkJob {
+ private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class);
+
+ private final ArgumentApplicationParser parser;
+
+ public RunSQLSparkJob(ArgumentApplicationParser parser) {
+ this.parser = parser;
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ Map params = new HashMap<>();
+ for (int i = 0; i < args.length - 1; i++) {
+ if (args[i].startsWith("--")) {
+ params.put(args[i].substring(2), args[++i]);
+ }
+ }
+
+ /*
+ * String jsonConfiguration = IOUtils .toString( Objects .requireNonNull( RunSQLSparkJob.class
+ * .getResourceAsStream( "/eu/dnetlib/dhp/oozie/run_sql_parameters.json"))); final ArgumentApplicationParser
+ * parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args);
+ */
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(params.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ URL url = com.google.common.io.Resources.getResource(params.get("sql"));
+ String raw_sql = Resources.toString(url, StandardCharsets.UTF_8);
+
+ String sql = StringSubstitutor.replace(raw_sql, params);
+ log.info("sql: {}", sql);
+
+ SparkConf conf = new SparkConf();
+ conf.set("hive.metastore.uris", params.get("hiveMetastoreUris"));
+
+ runWithSparkHiveSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ for (String statement : sql.split(";\\s*/\\*\\s*EOS\\s*\\*/\\s*")) {
+ log.info("executing: {}", statement);
+ long startTime = System.currentTimeMillis();
+ spark.sql(statement).show();
+ log
+ .info(
+ "executed in {}",
+ DurationFormatUtils.formatDuration(System.currentTimeMillis() - startTime, "HH:mm:ss.S"));
+ }
+ });
+ }
+
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdUtilityParser.java b/dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdUtilityParser.java
index 9ac0a0bf7..fd4c0191a 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdUtilityParser.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/parser/utility/VtdUtilityParser.java
@@ -12,6 +12,9 @@ import com.ximpleware.VTDNav;
/** Created by sandro on 9/29/16. */
public class VtdUtilityParser {
+ private VtdUtilityParser() {
+ }
+
public static List getTextValuesWithAttributes(
final AutoPilot ap, final VTDNav vn, final String xpath, final List attributes)
throws VtdException {
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java
deleted file mode 100644
index 4a66f91dc..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java
+++ /dev/null
@@ -1,297 +0,0 @@
-
-package eu.dnetlib.dhp.schema.oaf;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang3.StringUtils;
-
-import eu.dnetlib.dhp.schema.common.ModelSupport;
-import eu.dnetlib.dhp.utils.DHPUtils;
-
-public class OafMapperUtils {
-
- public static Oaf merge(final Oaf o1, final Oaf o2) {
- if (ModelSupport.isSubClass(o1, OafEntity.class)) {
- if (ModelSupport.isSubClass(o1, Result.class)) {
-
- return mergeResults((Result) o1, (Result) o2);
- } else if (ModelSupport.isSubClass(o1, Datasource.class)) {
- ((Datasource) o1).mergeFrom((Datasource) o2);
- } else if (ModelSupport.isSubClass(o1, Organization.class)) {
- ((Organization) o1).mergeFrom((Organization) o2);
- } else if (ModelSupport.isSubClass(o1, Project.class)) {
- ((Project) o1).mergeFrom((Project) o2);
- } else {
- throw new RuntimeException("invalid OafEntity subtype:" + o1.getClass().getCanonicalName());
- }
- } else if (ModelSupport.isSubClass(o1, Relation.class)) {
- ((Relation) o1).mergeFrom((Relation) o2);
- } else {
- throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName());
- }
- return o1;
- }
-
- public static Result mergeResults(Result r1, Result r2) {
- if (new ResultTypeComparator().compare(r1, r2) < 0) {
- r1.mergeFrom(r2);
- return r1;
- } else {
- r2.mergeFrom(r1);
- return r2;
- }
- }
-
- public static KeyValue keyValue(final String k, final String v) {
- final KeyValue kv = new KeyValue();
- kv.setKey(k);
- kv.setValue(v);
- return kv;
- }
-
- public static List listKeyValues(final String... s) {
- if (s.length % 2 > 0) {
- throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)");
- }
-
- final List list = new ArrayList<>();
- for (int i = 0; i < s.length; i += 2) {
- list.add(keyValue(s[i], s[i + 1]));
- }
- return list;
- }
-
- public static Field field(final T value, final DataInfo info) {
- if (value == null || StringUtils.isBlank(value.toString())) {
- return null;
- }
-
- final Field field = new Field<>();
- field.setValue(value);
- field.setDataInfo(info);
- return field;
- }
-
- public static List> listFields(final DataInfo info, final String... values) {
- return Arrays
- .stream(values)
- .map(v -> field(v, info))
- .filter(Objects::nonNull)
- .filter(distinctByKey(f -> f.getValue()))
- .collect(Collectors.toList());
- }
-
- public static List> listFields(final DataInfo info, final List values) {
- return values
- .stream()
- .map(v -> field(v, info))
- .filter(Objects::nonNull)
- .filter(distinctByKey(f -> f.getValue()))
- .collect(Collectors.toList());
- }
-
- public static Qualifier unknown(final String schemeid, final String schemename) {
- return qualifier("UNKNOWN", "Unknown", schemeid, schemename);
- }
-
- public static Qualifier qualifier(
- final String classid,
- final String classname,
- final String schemeid,
- final String schemename) {
- final Qualifier q = new Qualifier();
- q.setClassid(classid);
- q.setClassname(classname);
- q.setSchemeid(schemeid);
- q.setSchemename(schemename);
- return q;
- }
-
- public static StructuredProperty structuredProperty(
- final String value,
- final String classid,
- final String classname,
- final String schemeid,
- final String schemename,
- final DataInfo dataInfo) {
-
- return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo);
- }
-
- public static StructuredProperty structuredProperty(
- final String value,
- final Qualifier qualifier,
- final DataInfo dataInfo) {
- if (value == null) {
- return null;
- }
- final StructuredProperty sp = new StructuredProperty();
- sp.setValue(value);
- sp.setQualifier(qualifier);
- sp.setDataInfo(dataInfo);
- return sp;
- }
-
- public static ExtraInfo extraInfo(
- final String name,
- final String value,
- final String typology,
- final String provenance,
- final String trust) {
- final ExtraInfo info = new ExtraInfo();
- info.setName(name);
- info.setValue(value);
- info.setTypology(typology);
- info.setProvenance(provenance);
- info.setTrust(trust);
- return info;
- }
-
- public static OAIProvenance oaiIProvenance(
- final String identifier,
- final String baseURL,
- final String metadataNamespace,
- final Boolean altered,
- final String datestamp,
- final String harvestDate) {
-
- final OriginDescription desc = new OriginDescription();
- desc.setIdentifier(identifier);
- desc.setBaseURL(baseURL);
- desc.setMetadataNamespace(metadataNamespace);
- desc.setAltered(altered);
- desc.setDatestamp(datestamp);
- desc.setHarvestDate(harvestDate);
-
- final OAIProvenance p = new OAIProvenance();
- p.setOriginDescription(desc);
-
- return p;
- }
-
- public static Journal journal(
- final String name,
- final String issnPrinted,
- final String issnOnline,
- final String issnLinking,
- final DataInfo dataInfo) {
- return journal(
- name,
- issnPrinted,
- issnOnline,
- issnLinking,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- dataInfo);
- }
-
- public static Journal journal(
- final String name,
- final String issnPrinted,
- final String issnOnline,
- final String issnLinking,
- final String ep,
- final String iss,
- final String sp,
- final String vol,
- final String edition,
- final String conferenceplace,
- final String conferencedate,
- final DataInfo dataInfo) {
-
- if (StringUtils.isNotBlank(name)
- || StringUtils.isNotBlank(issnPrinted)
- || StringUtils.isNotBlank(issnOnline)
- || StringUtils.isNotBlank(issnLinking)) {
- final Journal j = new Journal();
- j.setName(name);
- j.setIssnPrinted(issnPrinted);
- j.setIssnOnline(issnOnline);
- j.setIssnLinking(issnLinking);
- j.setEp(ep);
- j.setIss(iss);
- j.setSp(sp);
- j.setVol(vol);
- j.setEdition(edition);
- j.setConferenceplace(conferenceplace);
- j.setConferencedate(conferencedate);
- j.setDataInfo(dataInfo);
- return j;
- } else {
- return null;
- }
- }
-
- public static DataInfo dataInfo(
- final Boolean deletedbyinference,
- final String inferenceprovenance,
- final Boolean inferred,
- final Boolean invisible,
- final Qualifier provenanceaction,
- final String trust) {
- final DataInfo d = new DataInfo();
- d.setDeletedbyinference(deletedbyinference);
- d.setInferenceprovenance(inferenceprovenance);
- d.setInferred(inferred);
- d.setInvisible(invisible);
- d.setProvenanceaction(provenanceaction);
- d.setTrust(trust);
- return d;
- }
-
- public static String createOpenaireId(
- final int prefix,
- final String originalId,
- final boolean to_md5) {
- if (StringUtils.isBlank(originalId)) {
- return null;
- } else if (to_md5) {
- final String nsPrefix = StringUtils.substringBefore(originalId, "::");
- final String rest = StringUtils.substringAfter(originalId, "::");
- return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest));
- } else {
- return String.format("%s|%s", prefix, originalId);
- }
- }
-
- public static String createOpenaireId(
- final String type,
- final String originalId,
- final boolean to_md5) {
- switch (type) {
- case "datasource":
- return createOpenaireId(10, originalId, to_md5);
- case "organization":
- return createOpenaireId(20, originalId, to_md5);
- case "person":
- return createOpenaireId(30, originalId, to_md5);
- case "project":
- return createOpenaireId(40, originalId, to_md5);
- default:
- return createOpenaireId(50, originalId, to_md5);
- }
- }
-
- public static String asString(final Object o) {
- return o == null ? "" : o.toString();
- }
-
- public static Predicate distinctByKey(
- final Function super T, ?> keyExtractor) {
- final Map