diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index b295bc1f1..a8607a9b3 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -53,11 +53,6 @@
com.fasterxml.jackson.core
jackson-databind
-
-
- com.rabbitmq
- amqp-client
-
net.sf.saxon
Saxon-HE
@@ -98,6 +93,12 @@
dnet-pace-core
+
+ org.apache.httpcomponents
+ httpclient
+
+
+
eu.dnetlib.dhp
dhp-schemas
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java
new file mode 100644
index 000000000..531c13af3
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java
@@ -0,0 +1,21 @@
+
+package eu.dnetlib.dhp.application;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.util.Properties;
+
+public class ApplicationUtils {
+
+ public static void populateOOZIEEnv(final String paramName, String value) throws Exception {
+ File file = new File(System.getProperty("oozie.action.output.properties"));
+ Properties props = new Properties();
+
+ props.setProperty(paramName, value);
+ OutputStream os = new FileOutputStream(file);
+ props.store(os, "");
+ os.close();
+ }
+
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java b/dhp-common/src/main/java/eu/dnetlib/dhp/collector/worker/model/ApiDescriptor.java
similarity index 93%
rename from dhp-common/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java
rename to dhp-common/src/main/java/eu/dnetlib/dhp/collector/worker/model/ApiDescriptor.java
index bfd70e8c6..8ba30faeb 100644
--- a/dhp-common/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/collector/worker/model/ApiDescriptor.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.collector.worker.model;
+package eu.dnetlib.dhp.collector.worker.model;
import java.util.HashMap;
import java.util.Map;
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java
new file mode 100644
index 000000000..014f18606
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java
@@ -0,0 +1,54 @@
+
+package eu.dnetlib.dhp.common.rest;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class DNetRestClient {
+
+ private static ObjectMapper mapper = new ObjectMapper();
+
+ public static T doGET(final String url, Class clazz) throws Exception {
+ final HttpGet httpGet = new HttpGet(url);
+ return doHTTPRequest(httpGet, clazz);
+ }
+
+ public static String doGET(final String url) throws Exception {
+ final HttpGet httpGet = new HttpGet(url);
+ return doHTTPRequest(httpGet);
+ }
+
+ public static String doPOST(final String url, V objParam) throws Exception {
+ final HttpPost httpPost = new HttpPost(url);
+
+ if (objParam != null) {
+ final StringEntity entity = new StringEntity(mapper.writeValueAsString(objParam));
+ httpPost.setEntity(entity);
+ httpPost.setHeader("Accept", "application/json");
+ httpPost.setHeader("Content-type", "application/json");
+ }
+ return doHTTPRequest(httpPost);
+ }
+
+ public static T doPOST(final String url, V objParam, Class clazz) throws Exception {
+ return mapper.readValue(doPOST(url, objParam), clazz);
+ }
+
+ private static String doHTTPRequest(final HttpUriRequest r) throws Exception {
+ CloseableHttpClient client = HttpClients.createDefault();
+ CloseableHttpResponse response = client.execute(r);
+ return IOUtils.toString(response.getEntity().getContent());
+ }
+
+ private static T doHTTPRequest(final HttpUriRequest r, Class clazz) throws Exception {
+ return mapper.readValue(doHTTPRequest(r), clazz);
+ }
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/VocabularyGroup.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/VocabularyGroup.java
index fac55189b..f81181e53 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/VocabularyGroup.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/VocabularyGroup.java
@@ -67,6 +67,10 @@ public class VocabularyGroup implements Serializable {
private final Map vocs = new HashMap<>();
+ public Set vocabularyNames() {
+ return vocs.keySet();
+ }
+
public void addVocabulary(final String id, final String name) {
vocs.put(id.toLowerCase(), new Vocabulary(id, name));
}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java b/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java
index ce65e710f..0b59dcce0 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java
@@ -26,13 +26,13 @@ public class MetadataRecord implements Serializable {
private String body;
/** the date when the record has been stored */
- private long dateOfCollection;
+ private Long dateOfCollection;
/** the date when the record has been stored */
- private long dateOfTransformation;
+ private Long dateOfTransformation;
public MetadataRecord() {
- this.dateOfCollection = System.currentTimeMillis();
+
}
public MetadataRecord(
@@ -40,7 +40,7 @@ public class MetadataRecord implements Serializable {
String encoding,
Provenance provenance,
String body,
- long dateOfCollection) {
+ Long dateOfCollection) {
this.originalId = originalId;
this.encoding = encoding;
@@ -90,19 +90,19 @@ public class MetadataRecord implements Serializable {
this.body = body;
}
- public long getDateOfCollection() {
+ public Long getDateOfCollection() {
return dateOfCollection;
}
- public void setDateOfCollection(long dateOfCollection) {
+ public void setDateOfCollection(Long dateOfCollection) {
this.dateOfCollection = dateOfCollection;
}
- public long getDateOfTransformation() {
+ public Long getDateOfTransformation() {
return dateOfTransformation;
}
- public void setDateOfTransformation(long dateOfTransformation) {
+ public void setDateOfTransformation(Long dateOfTransformation) {
this.dateOfTransformation = dateOfTransformation;
}
diff --git a/dhp-common/src/main/java/eu/dnetlib/message/Message.java b/dhp-common/src/main/java/eu/dnetlib/message/Message.java
deleted file mode 100644
index fc1c38291..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/message/Message.java
+++ /dev/null
@@ -1,76 +0,0 @@
-
-package eu.dnetlib.message;
-
-import java.io.IOException;
-import java.util.Map;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-public class Message {
-
- private String workflowId;
-
- private String jobName;
-
- private MessageType type;
-
- private Map body;
-
- public static Message fromJson(final String json) throws IOException {
- final ObjectMapper jsonMapper = new ObjectMapper();
- return jsonMapper.readValue(json, Message.class);
- }
-
- public Message() {
- }
-
- public Message(String workflowId, String jobName, MessageType type, Map body) {
- this.workflowId = workflowId;
- this.jobName = jobName;
- this.type = type;
- this.body = body;
- }
-
- public String getWorkflowId() {
- return workflowId;
- }
-
- public void setWorkflowId(String workflowId) {
- this.workflowId = workflowId;
- }
-
- public String getJobName() {
- return jobName;
- }
-
- public void setJobName(String jobName) {
- this.jobName = jobName;
- }
-
- public MessageType getType() {
- return type;
- }
-
- public void setType(MessageType type) {
- this.type = type;
- }
-
- public Map getBody() {
- return body;
- }
-
- public void setBody(Map body) {
- this.body = body;
- }
-
- @Override
- public String toString() {
- final ObjectMapper jsonMapper = new ObjectMapper();
- try {
- return jsonMapper.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- return null;
- }
- }
-}
diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java
deleted file mode 100644
index fb3f0bd95..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-
-package eu.dnetlib.message;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.DefaultConsumer;
-import com.rabbitmq.client.Envelope;
-
-public class MessageConsumer extends DefaultConsumer {
-
- final LinkedBlockingQueue queueMessages;
-
- /**
- * Constructs a new instance and records its association to the passed-in channel.
- *
- * @param channel the channel to which this consumer is attached
- * @param queueMessages
- */
- public MessageConsumer(Channel channel, LinkedBlockingQueue queueMessages) {
- super(channel);
- this.queueMessages = queueMessages;
- }
-
- @Override
- public void handleDelivery(
- String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
- throws IOException {
- final String json = new String(body, StandardCharsets.UTF_8);
- Message message = Message.fromJson(json);
- try {
- this.queueMessages.put(message);
- System.out.println("Receiving Message " + message);
- } catch (InterruptedException e) {
- if (message.getType() == MessageType.REPORT)
- throw new RuntimeException("Error on sending message");
- else {
- // TODO LOGGING EXCEPTION
- }
- } finally {
- getChannel().basicAck(envelope.getDeliveryTag(), false);
- }
- }
-}
diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java
deleted file mode 100644
index 5ca79f3cc..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java
+++ /dev/null
@@ -1,136 +0,0 @@
-
-package eu.dnetlib.message;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeoutException;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-
-public class MessageManager {
-
- private final String messageHost;
-
- private final String username;
-
- private final String password;
-
- private Connection connection;
-
- private final Map channels = new HashMap<>();
-
- private boolean durable;
-
- private boolean autodelete;
-
- private final LinkedBlockingQueue queueMessages;
-
- public MessageManager(
- String messageHost,
- String username,
- String password,
- final LinkedBlockingQueue queueMessages) {
- this.queueMessages = queueMessages;
- this.messageHost = messageHost;
- this.username = username;
- this.password = password;
- }
-
- public MessageManager(
- String messageHost,
- String username,
- String password,
- boolean durable,
- boolean autodelete,
- final LinkedBlockingQueue queueMessages) {
- this.queueMessages = queueMessages;
- this.messageHost = messageHost;
- this.username = username;
- this.password = password;
-
- this.durable = durable;
- this.autodelete = autodelete;
- }
-
- private Connection createConnection() throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(this.messageHost);
- factory.setUsername(this.username);
- factory.setPassword(this.password);
- return factory.newConnection();
- }
-
- private Channel createChannel(
- final Connection connection,
- final String queueName,
- final boolean durable,
- final boolean autodelete)
- throws Exception {
- Map args = new HashMap<>();
- args.put("x-message-ttl", 10000);
- Channel channel = connection.createChannel();
- channel.queueDeclare(queueName, durable, false, this.autodelete, args);
- return channel;
- }
-
- private Channel getOrCreateChannel(final String queueName, boolean durable, boolean autodelete)
- throws Exception {
- if (channels.containsKey(queueName)) {
- return channels.get(queueName);
- }
-
- if (this.connection == null) {
- this.connection = createConnection();
- }
- channels.put(queueName, createChannel(this.connection, queueName, durable, autodelete));
- return channels.get(queueName);
- }
-
- public void close() throws IOException {
- channels
- .values()
- .forEach(
- ch -> {
- try {
- ch.close();
- } catch (Exception e) {
- // TODO LOG
- }
- });
-
- this.connection.close();
- }
-
- public boolean sendMessage(final Message message, String queueName) throws Exception {
- try {
- Channel channel = getOrCreateChannel(queueName, this.durable, this.autodelete);
- channel.basicPublish("", queueName, null, message.toString().getBytes());
- return true;
- } catch (Throwable e) {
- throw new RuntimeException(e);
- }
- }
-
- public boolean sendMessage(
- final Message message, String queueName, boolean durable_var, boolean autodelete_var)
- throws Exception {
- try {
- Channel channel = getOrCreateChannel(queueName, durable_var, autodelete_var);
- channel.basicPublish("", queueName, null, message.toString().getBytes());
- return true;
- } catch (Throwable e) {
- throw new RuntimeException(e);
- }
- }
-
- public void startConsumingMessage(
- final String queueName, final boolean durable, final boolean autodelete) throws Exception {
-
- Channel channel = createChannel(createConnection(), queueName, durable, autodelete);
- channel.basicConsume(queueName, false, new MessageConsumer(channel, queueMessages));
- }
-}
diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java
deleted file mode 100644
index 72cbda252..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java
+++ /dev/null
@@ -1,6 +0,0 @@
-
-package eu.dnetlib.message;
-
-public enum MessageType {
- ONGOING, REPORT
-}
diff --git a/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java b/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java
deleted file mode 100644
index 442f7b5c2..000000000
--- a/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-
-package eu.dnetlib.message;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.jupiter.api.Test;
-
-public class MessageTest {
-
- @Test
- public void fromJsonTest() throws IOException {
- Message m = new Message();
- m.setWorkflowId("wId");
- m.setType(MessageType.ONGOING);
- m.setJobName("Collection");
- Map body = new HashMap<>();
- body.put("parsedItem", "300");
- body.put("ExecutionTime", "30s");
-
- m.setBody(body);
- System.out.println("m = " + m);
- Message m1 = Message.fromJson(m.toString());
- assertEquals(m1.getWorkflowId(), m.getWorkflowId());
- assertEquals(m1.getType(), m.getType());
- assertEquals(m1.getJobName(), m.getJobName());
-
- assertNotNull(m1.getBody());
- m1.getBody().keySet().forEach(it -> assertEquals(m1.getBody().get(it), m.getBody().get(it)));
- assertEquals(m1.getJobName(), m.getJobName());
- }
-
- @Test
- public void toStringTest() {
- final String expectedJson = "{\"workflowId\":\"wId\",\"jobName\":\"Collection\",\"type\":\"ONGOING\",\"body\":{\"ExecutionTime\":\"30s\",\"parsedItem\":\"300\"}}";
- Message m = new Message();
- m.setWorkflowId("wId");
- m.setType(MessageType.ONGOING);
- m.setJobName("Collection");
- Map body = new HashMap<>();
- body.put("parsedItem", "300");
- body.put("ExecutionTime", "30s");
-
- m.setBody(body);
-
- assertEquals(expectedJson, m.toString());
- }
-}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala
index 6837e94b2..f04f92c63 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala
@@ -27,7 +27,7 @@ object GenerateDataciteDatasetSpark {
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
-
+ log.info(s"vocabulary size is ${vocabularies.getTerms("dnet:languages").size()}")
val spark: SparkSession = SparkSession.builder().config(conf)
.appName(GenerateDataciteDatasetSpark.getClass.getSimpleName)
.master(master)
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationConstants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationConstants.java
new file mode 100644
index 000000000..7c5ad354d
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationConstants.java
@@ -0,0 +1,14 @@
+
+package eu.dnetlib.dhp.aggregation.common;
+
+public class AggregationConstants {
+
+ public static final String SEQUENCE_FILE_NAME = "/sequence_file";
+ public static final String MDSTORE_DATA_PATH = "/store";
+ public static final String MDSTORE_SIZE_PATH = "/size";
+
+ public static final String CONTENT_TOTALITEMS = "TotalItems";
+ public static final String CONTENT_INVALIDRECORDS = "InvalidRecords";
+ public static final String CONTENT_TRANSFORMEDRECORDS = "transformedItems";
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java
new file mode 100644
index 000000000..7332ac071
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java
@@ -0,0 +1,49 @@
+
+package eu.dnetlib.dhp.aggregation.common;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
+import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
+
+public class AggregationUtility {
+
+ private static final Logger log = LoggerFactory.getLogger(AggregationUtility.class);
+
+ public static final ObjectMapper MAPPER = new ObjectMapper();
+
+ public static void writeTotalSizeOnHDFS(final SparkSession spark, final Long total, final String path)
+ throws IOException {
+
+ log.info("writing size ({}) info file {}", total, path);
+ try (FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
+ BufferedOutputStream os = new BufferedOutputStream(fs.create(new Path(path)))) {
+ os.write(total.toString().getBytes(StandardCharsets.UTF_8));
+ os.flush();
+ }
+
+ }
+
+ public static void saveDataset(final Dataset mdstore, final String targetPath) {
+ log.info("saving dataset in: {}", targetPath);
+ mdstore
+ .write()
+ .mode(SaveMode.Overwrite)
+ .format("parquet")
+ .save(targetPath);
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java
new file mode 100644
index 000000000..3e471cfc8
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java
@@ -0,0 +1,148 @@
+
+package eu.dnetlib.dhp.aggregation.mdstore;
+
+import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
+import static eu.dnetlib.dhp.application.ApplicationUtils.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.Properties;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.rest.DNetRestClient;
+
+public class MDStoreActionNode {
+ private static final Logger log = LoggerFactory.getLogger(MDStoreActionNode.class);
+
+ enum MDAction {
+ NEW_VERSION, ROLLBACK, COMMIT, READ_LOCK, READ_UNLOCK
+ }
+
+ public static String NEW_VERSION_URI = "%s/mdstore/%s/newVersion";
+
+ public static final String COMMIT_VERSION_URL = "%s/version/%s/commit/%s";
+ public static final String ROLLBACK_VERSION_URL = "%s/version/%s/abort";
+
+ public static final String READ_LOCK_URL = "%s/mdstore/%s/startReading";
+ public static final String READ_UNLOCK_URL = "%s/version/%s/endReading";
+
+ private static final String MDSTOREVERSIONPARAM = "mdStoreVersion";
+ private static final String MDSTOREREADLOCKPARAM = "mdStoreReadLockVersion";
+
+ public static void main(String[] args) throws Exception {
+ final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ MDStoreActionNode.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/collection/mdstore_action_parameters.json")));
+ argumentParser.parseArgument(args);
+
+ final MDAction action = MDAction.valueOf(argumentParser.get("action"));
+ log.info("Current action is {}", action);
+
+ final String mdStoreManagerURI = argumentParser.get("mdStoreManagerURI");
+ log.info("mdStoreManagerURI is {}", mdStoreManagerURI);
+
+ switch (action) {
+ case NEW_VERSION: {
+ final String mdStoreID = argumentParser.get("mdStoreID");
+ if (StringUtils.isBlank(mdStoreID)) {
+ throw new IllegalArgumentException("missing or empty argument mdStoreId");
+ }
+ final MDStoreVersion currentVersion = DNetRestClient
+ .doGET(String.format(NEW_VERSION_URI, mdStoreManagerURI, mdStoreID), MDStoreVersion.class);
+ populateOOZIEEnv(MDSTOREVERSIONPARAM, MAPPER.writeValueAsString(currentVersion));
+ break;
+ }
+ case COMMIT: {
+
+ final String hdfsuri = argumentParser.get("namenode");
+ if (StringUtils.isBlank(hdfsuri)) {
+ throw new IllegalArgumentException("missing or empty argument namenode");
+ }
+ final String mdStoreVersion_params = argumentParser.get("mdStoreVersion");
+ final MDStoreVersion mdStoreVersion = MAPPER.readValue(mdStoreVersion_params, MDStoreVersion.class);
+
+ if (StringUtils.isBlank(mdStoreVersion.getId())) {
+ throw new IllegalArgumentException(
+ "invalid MDStoreVersion value current is " + mdStoreVersion_params);
+ }
+
+ Configuration conf = new Configuration();
+ // Set FileSystem URI
+ conf.set("fs.defaultFS", hdfsuri);
+ // Because of Maven
+ conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
+ conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
+
+ System.setProperty("hadoop.home.dir", "/");
+ // Get the filesystem - HDFS
+ FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf);
+
+ Path hdfstoreSizepath = new Path(mdStoreVersion.getHdfsPath() + "/size");
+
+ FSDataInputStream inputStream = fs.open(hdfstoreSizepath);
+
+ final Long mdStoreSize = Long.parseLong(IOUtils.toString(inputStream));
+
+ inputStream.close();
+ fs.create(hdfstoreSizepath);
+
+ DNetRestClient
+ .doGET(String.format(COMMIT_VERSION_URL, mdStoreManagerURI, mdStoreVersion.getId(), mdStoreSize));
+ break;
+ }
+ case ROLLBACK: {
+ final String mdStoreVersion_params = argumentParser.get("mdStoreVersion");
+ final MDStoreVersion mdStoreVersion = MAPPER.readValue(mdStoreVersion_params, MDStoreVersion.class);
+
+ if (StringUtils.isBlank(mdStoreVersion.getId())) {
+ throw new IllegalArgumentException(
+ "invalid MDStoreVersion value current is " + mdStoreVersion_params);
+ }
+ DNetRestClient.doGET(String.format(ROLLBACK_VERSION_URL, mdStoreManagerURI, mdStoreVersion.getId()));
+ break;
+ }
+
+ case READ_LOCK: {
+ final String mdStoreID = argumentParser.get("mdStoreID");
+ if (StringUtils.isBlank(mdStoreID)) {
+ throw new IllegalArgumentException("missing or empty argument mdStoreId");
+ }
+ final MDStoreVersion currentVersion = DNetRestClient
+ .doGET(String.format(READ_LOCK_URL, mdStoreManagerURI, mdStoreID), MDStoreVersion.class);
+ populateOOZIEEnv(MDSTOREREADLOCKPARAM, MAPPER.writeValueAsString(currentVersion));
+ break;
+ }
+ case READ_UNLOCK: {
+ final String mdStoreVersion_params = argumentParser.get("readMDStoreId");
+ final MDStoreVersion mdStoreVersion = MAPPER.readValue(mdStoreVersion_params, MDStoreVersion.class);
+
+ if (StringUtils.isBlank(mdStoreVersion.getId())) {
+ throw new IllegalArgumentException(
+ "invalid MDStoreVersion value current is " + mdStoreVersion_params);
+ }
+ DNetRestClient.doGET(String.format(READ_UNLOCK_URL, mdStoreManagerURI, mdStoreVersion.getId()));
+ break;
+ }
+
+ default:
+ throw new IllegalArgumentException("invalid action");
+ }
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java
index c9c29b4ea..fdf3965d6 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java
@@ -1,9 +1,12 @@
package eu.dnetlib.dhp.collection;
+import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
+import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
@@ -13,12 +16,11 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.Encoders;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.Node;
@@ -28,10 +30,11 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
-import eu.dnetlib.message.MessageManager;
+import scala.Tuple2;
public class GenerateNativeStoreSparkJob {
@@ -46,15 +49,31 @@ public class GenerateNativeStoreSparkJob {
.getResourceAsStream(
"/eu/dnetlib/dhp/collection/collection_input_parameters.json")));
parser.parseArgument(args);
- final ObjectMapper jsonMapper = new ObjectMapper();
+
final String provenanceArgument = parser.get("provenance");
log.info("Provenance is {}", provenanceArgument);
- final Provenance provenance = jsonMapper.readValue(provenanceArgument, Provenance.class);
+ final Provenance provenance = MAPPER.readValue(provenanceArgument, Provenance.class);
+
final String dateOfCollectionArgs = parser.get("dateOfCollection");
log.info("dateOfCollection is {}", dateOfCollectionArgs);
- final long dateOfCollection = new Long(dateOfCollectionArgs);
- final String sequenceFileInputPath = parser.get("input");
- log.info("sequenceFileInputPath is {}", dateOfCollectionArgs);
+ final Long dateOfCollection = new Long(dateOfCollectionArgs);
+
+ String mdStoreVersion = parser.get("mdStoreVersion");
+ log.info("mdStoreVersion is {}", mdStoreVersion);
+
+ final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
+
+ String readMdStoreVersionParam = parser.get("readMdStoreVersion");
+ log.info("readMdStoreVersion is {}", readMdStoreVersionParam);
+
+ final MDStoreVersion readMdStoreVersion = StringUtils.isBlank(readMdStoreVersionParam) ? null
+ : MAPPER.readValue(readMdStoreVersionParam, MDStoreVersion.class);
+
+ final String xpath = parser.get("xpath");
+ log.info("xpath is {}", xpath);
+
+ final String encoding = parser.get("encoding");
+ log.info("encoding is {}", encoding);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
@@ -66,36 +85,112 @@ public class GenerateNativeStoreSparkJob {
runWithSparkSession(
conf,
isSparkSessionManaged,
- spark -> {
- final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ spark -> createNativeMDStore(
+ spark, provenance, dateOfCollection, xpath, encoding, currentVersion, readMdStoreVersion));
+ }
- final JavaPairRDD inputRDD = sc
- .sequenceFile(sequenceFileInputPath, IntWritable.class, Text.class);
+ private static void createNativeMDStore(SparkSession spark,
+ Provenance provenance,
+ Long dateOfCollection,
+ String xpath,
+ String encoding,
+ MDStoreVersion currentVersion,
+ MDStoreVersion readVersion) throws IOException {
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
- final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
- final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
+ final LongAccumulator totalItems = sc.sc().longAccumulator(CONTENT_TOTALITEMS);
+ final LongAccumulator invalidRecords = sc.sc().longAccumulator(CONTENT_INVALIDRECORDS);
- final JavaRDD nativeStore = inputRDD
- .map(
- item -> parseRecord(
- item._2().toString(),
- parser.get("xpath"),
- parser.get("encoding"),
- provenance,
- dateOfCollection,
- totalItems,
- invalidRecords))
- .filter(Objects::nonNull)
- .distinct();
+ final String seqFilePath = currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
+ final JavaRDD nativeStore = sc
+ .sequenceFile(seqFilePath, IntWritable.class, Text.class)
+ .map(
+ item -> parseRecord(
+ item._2().toString(),
+ xpath,
+ encoding,
+ provenance,
+ dateOfCollection,
+ totalItems,
+ invalidRecords))
+ .filter(Objects::nonNull)
+ .distinct();
- final Encoder encoder = Encoders.bean(MetadataRecord.class);
- final Dataset mdstore = spark.createDataset(nativeStore.rdd(), encoder);
- final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
- mdStoreRecords.add(mdstore.count());
+ final Encoder encoder = Encoders.bean(MetadataRecord.class);
+ final Dataset mdstore = spark.createDataset(nativeStore.rdd(), encoder);
- mdstore.write().format("parquet").save(parser.get("output"));
+ final String targetPath = currentVersion.getHdfsPath() + MDSTORE_DATA_PATH;
- });
+ if (readVersion != null) { // INCREMENTAL MODE
+ log.info("updating {} incrementally with {}", targetPath, readVersion.getHdfsPath());
+ Dataset currentMdStoreVersion = spark
+ .read()
+ .load(readVersion.getHdfsPath() + MDSTORE_DATA_PATH)
+ .as(encoder);
+ TypedColumn aggregator = new MDStoreAggregator().toColumn();
+
+ final Dataset map = currentMdStoreVersion
+ .union(mdstore)
+ .groupByKey(
+ (MapFunction) MetadataRecord::getId,
+ Encoders.STRING())
+ .agg(aggregator)
+ .map((MapFunction, MetadataRecord>) Tuple2::_2, encoder);
+
+ map.select("id").takeAsList(100).forEach(s -> log.info(s.toString()));
+
+ saveDataset(map, targetPath);
+
+ } else {
+ saveDataset(mdstore, targetPath);
+ }
+
+ final Long total = spark.read().load(targetPath).count();
+ log.info("collected {} records for datasource '{}'", total, provenance.getDatasourceName());
+
+ writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + MDSTORE_SIZE_PATH);
+ }
+
+ public static class MDStoreAggregator extends Aggregator {
+
+ @Override
+ public MetadataRecord zero() {
+ return null;
+ }
+
+ @Override
+ public MetadataRecord reduce(MetadataRecord b, MetadataRecord a) {
+ return getLatestRecord(b, a);
+ }
+
+ @Override
+ public MetadataRecord merge(MetadataRecord b, MetadataRecord a) {
+ return getLatestRecord(b, a);
+ }
+
+ private MetadataRecord getLatestRecord(MetadataRecord b, MetadataRecord a) {
+ if (b == null)
+ return a;
+
+ if (a == null)
+ return b;
+ return (a.getDateOfCollection() > b.getDateOfCollection()) ? a : b;
+ }
+
+ @Override
+ public MetadataRecord finish(MetadataRecord r) {
+ return r;
+ }
+
+ @Override
+ public Encoder bufferEncoder() {
+ return Encoders.bean(MetadataRecord.class);
+ }
+
+ @Override
+ public Encoder outputEncoder() {
+ return Encoders.bean(MetadataRecord.class);
+ }
}
@@ -120,7 +215,7 @@ public class GenerateNativeStoreSparkJob {
invalidRecords.add(1);
return null;
}
- return new MetadataRecord(originalIdentifier, encoding, provenance, input, dateOfCollection);
+ return new MetadataRecord(originalIdentifier, encoding, provenance, document.asXML(), dateOfCollection);
} catch (Throwable e) {
invalidRecords.add(1);
return null;
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java
index 7146e610e..a0c546858 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java
@@ -3,10 +3,13 @@ package eu.dnetlib.dhp.collection.plugin;
import java.util.stream.Stream;
-import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.collection.worker.CollectorException;
+import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList;
+import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
public interface CollectorPlugin {
Stream collect(ApiDescriptor api) throws CollectorException;
+
+ CollectorPluginErrorLogList getCollectionErrors();
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java
index c4c52271a..ea74919c5 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java
@@ -9,13 +9,16 @@ import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
+import org.jetbrains.annotations.NotNull;
+
import com.google.common.base.Splitter;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.CollectorException;
+import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList;
+import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
public class OaiCollectorPlugin implements CollectorPlugin {
@@ -26,8 +29,19 @@ public class OaiCollectorPlugin implements CollectorPlugin {
private OaiIteratorFactory oaiIteratorFactory;
+ private final CollectorPluginErrorLogList errorLogList = new CollectorPluginErrorLogList();
+
@Override
public Stream collect(final ApiDescriptor api) throws CollectorException {
+ try {
+ return doCollect(api);
+ } catch (CollectorException e) {
+ errorLogList.add(e.getMessage());
+ throw e;
+ }
+ }
+
+ private Stream doCollect(ApiDescriptor api) throws CollectorException {
final String baseUrl = api.getBaseUrl();
final String mdFormat = api.getParams().get(FORMAT_PARAM);
final String setParam = api.getParams().get(OAI_SET_PARAM);
@@ -65,7 +79,7 @@ public class OaiCollectorPlugin implements CollectorPlugin {
.stream()
.map(
set -> getOaiIteratorFactory()
- .newIterator(baseUrl, mdFormat, set, fromDate, untilDate))
+ .newIterator(baseUrl, mdFormat, set, fromDate, untilDate, errorLogList))
.iterator();
return StreamSupport
@@ -79,4 +93,9 @@ public class OaiCollectorPlugin implements CollectorPlugin {
}
return oaiIteratorFactory;
}
+
+ @Override
+ public CollectorPluginErrorLogList getCollectionErrors() {
+ return errorLogList;
+ }
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java
index e54bae67d..2392dee6a 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java
@@ -15,15 +15,17 @@ import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.worker.CollectorException;
+import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
import eu.dnetlib.dhp.collection.worker.utils.XmlCleaner;
public class OaiIterator implements Iterator {
- private static final Log log = LogFactory.getLog(OaiIterator.class); // NOPMD by marko on
- // 11/24/08 5:02 PM
+ private static final Logger log = LoggerFactory.getLogger(OaiIterator.class);
private final Queue queue = new PriorityBlockingQueue<>();
private final SAXReader reader = new SAXReader();
@@ -36,6 +38,7 @@ public class OaiIterator implements Iterator {
private String token;
private boolean started;
private final HttpConnector httpConnector;
+ private CollectorPluginErrorLogList errorLogList;
public OaiIterator(
final String baseUrl,
@@ -43,7 +46,8 @@ public class OaiIterator implements Iterator {
final String set,
final String fromDate,
final String untilDate,
- final HttpConnector httpConnector) {
+ final HttpConnector httpConnector,
+ final CollectorPluginErrorLogList errorLogList) {
this.baseUrl = baseUrl;
this.mdFormat = mdFormat;
this.set = set;
@@ -51,6 +55,7 @@ public class OaiIterator implements Iterator {
this.untilDate = untilDate;
this.started = false;
this.httpConnector = httpConnector;
+ this.errorLogList = errorLogList;
}
private void verifyStarted() {
@@ -139,7 +144,7 @@ public class OaiIterator implements Iterator {
private String downloadPage(final String url) throws CollectorException {
- final String xml = httpConnector.getInputSource(url);
+ final String xml = httpConnector.getInputSource(url, errorLogList);
Document doc;
try {
doc = reader.read(new StringReader(xml));
@@ -174,4 +179,8 @@ public class OaiIterator implements Iterator {
return doc.valueOf("//*[local-name()='resumptionToken']");
}
+
+ public CollectorPluginErrorLogList getErrorLogList() {
+ return errorLogList;
+ }
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java
index 4a6ea7f67..eafd265d4 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java
@@ -3,6 +3,7 @@ package eu.dnetlib.dhp.collection.plugin.oai;
import java.util.Iterator;
+import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
public class OaiIteratorFactory {
@@ -14,8 +15,9 @@ public class OaiIteratorFactory {
final String mdFormat,
final String set,
final String fromDate,
- final String untilDate) {
- return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector());
+ final String untilDate,
+ final CollectorPluginErrorLogList errorLogList) {
+ return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector(), errorLogList);
}
private HttpConnector getHttpConnector() {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java
index 380db641a..7033cfd8e 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java
@@ -14,80 +14,74 @@ import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import eu.dnetlib.collector.worker.model.ApiDescriptor;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
+import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
+import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
public class CollectorWorker {
private static final Logger log = LoggerFactory.getLogger(CollectorWorker.class);
- private final CollectorPluginFactory collectorPluginFactory;
-
private final ApiDescriptor api;
private final String hdfsuri;
private final String hdfsPath;
+ private CollectorPlugin plugin;
+
public CollectorWorker(
- final CollectorPluginFactory collectorPluginFactory,
final ApiDescriptor api,
final String hdfsuri,
- final String hdfsPath) {
- this.collectorPluginFactory = collectorPluginFactory;
+ final String hdfsPath) throws CollectorException {
this.api = api;
this.hdfsuri = hdfsuri;
this.hdfsPath = hdfsPath;
-
+ this.plugin = CollectorPluginFactory.getPluginByProtocol(api.getProtocol());
}
- public void collect() throws CollectorException {
- try {
- final CollectorPlugin plugin = collectorPluginFactory.getPluginByProtocol(api.getProtocol());
+ public CollectorPluginErrorLogList collect() throws IOException, CollectorException {
- // ====== Init HDFS File System Object
- Configuration conf = new Configuration();
- // Set FileSystem URI
- conf.set("fs.defaultFS", hdfsuri);
- // Because of Maven
- conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
- conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
+ // ====== Init HDFS File System Object
+ Configuration conf = new Configuration();
+ // Set FileSystem URI
+ conf.set("fs.defaultFS", hdfsuri);
+ // Because of Maven
+ conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
+ conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
- System.setProperty("hadoop.home.dir", "/");
- // Get the filesystem - HDFS
- FileSystem.get(URI.create(hdfsuri), conf);
- Path hdfswritepath = new Path(hdfsPath);
+ System.setProperty("hadoop.home.dir", "/");
+ // Get the filesystem - HDFS
- log.info("Created path " + hdfswritepath.toString());
+ FileSystem.get(URI.create(hdfsuri), conf);
+ Path hdfswritepath = new Path(hdfsPath);
- final AtomicInteger counter = new AtomicInteger(0);
- try (SequenceFile.Writer writer = SequenceFile
- .createWriter(
- conf,
- SequenceFile.Writer.file(hdfswritepath),
- SequenceFile.Writer.keyClass(IntWritable.class),
- SequenceFile.Writer.valueClass(Text.class))) {
- final IntWritable key = new IntWritable(counter.get());
- final Text value = new Text();
- plugin
- .collect(api)
- .forEach(
- content -> {
- key.set(counter.getAndIncrement());
- value.set(content);
- try {
- writer.append(key, value);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- }
- } catch (Throwable e) {
- throw new CollectorException("Error on collecting ", e);
+ log.info("Created path " + hdfswritepath.toString());
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ try (SequenceFile.Writer writer = SequenceFile
+ .createWriter(
+ conf,
+ SequenceFile.Writer.file(hdfswritepath),
+ SequenceFile.Writer.keyClass(IntWritable.class),
+ SequenceFile.Writer.valueClass(Text.class))) {
+ final IntWritable key = new IntWritable(counter.get());
+ final Text value = new Text();
+ plugin
+ .collect(api)
+ .forEach(
+ content -> {
+ key.set(counter.getAndIncrement());
+ value.set(content);
+ try {
+ writer.append(key, value);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } finally {
+ return plugin.getCollectionErrors();
}
}
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java
index 5e8d0f9c2..1d99689db 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java
@@ -1,15 +1,22 @@
package eu.dnetlib.dhp.collection.worker;
+import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
+import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
+import static eu.dnetlib.dhp.application.ApplicationUtils.*;
+
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.collector.worker.model.ApiDescriptor;
+import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
+import eu.dnetlib.dhp.aggregation.common.AggregationUtility;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
+import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
/**
* DnetCollectortWorkerApplication is the main class responsible to start the Dnet Collection into HDFS. This module
@@ -22,8 +29,6 @@ public class CollectorWorkerApplication {
private static final Logger log = LoggerFactory.getLogger(CollectorWorkerApplication.class);
- private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory();
-
/**
* @param args
*/
@@ -38,18 +43,25 @@ public class CollectorWorkerApplication {
argumentParser.parseArgument(args);
final String hdfsuri = argumentParser.get("namenode");
-
log.info("hdfsURI is {}", hdfsuri);
- final String hdfsPath = argumentParser.get("hdfsPath");
- log.info("hdfsPath is {}" + hdfsPath);
+
final String apiDescriptor = argumentParser.get("apidescriptor");
- log.info("apiDescriptor is {}" + apiDescriptor);
+ log.info("apiDescriptor is {}", apiDescriptor);
- final ObjectMapper jsonMapper = new ObjectMapper();
+ final String mdStoreVersion = argumentParser.get("mdStoreVersion");
+ log.info("mdStoreVersion is {}", mdStoreVersion);
- final ApiDescriptor api = jsonMapper.readValue(apiDescriptor, ApiDescriptor.class);
+ final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
+ final String hdfsPath = currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
+ log.info("hdfs path is {}", hdfsPath);
+
+ final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class);
+
+ final CollectorWorker worker = new CollectorWorker(api, hdfsuri, hdfsPath);
+ CollectorPluginErrorLogList errors = worker.collect();
+
+ populateOOZIEEnv("collectorErrors", errors.toString());
- final CollectorWorker worker = new CollectorWorker(collectorPluginFactory, api, hdfsuri, hdfsPath);
- worker.collect();
}
+
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java
index 6b070b191..7cbcd9b5c 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java
@@ -7,7 +7,7 @@ import eu.dnetlib.dhp.collection.worker.CollectorException;
public class CollectorPluginFactory {
- public CollectorPlugin getPluginByProtocol(final String protocol) throws CollectorException {
+ public static CollectorPlugin getPluginByProtocol(final String protocol) throws CollectorException {
if (protocol == null)
throw new CollectorException("protocol cannot be null");
switch (protocol.toLowerCase().trim()) {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java
index ff3c18aba..fc45b4814 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java
@@ -16,14 +16,14 @@ import javax.net.ssl.X509TrustManager;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.math.NumberUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.worker.CollectorException;
public class HttpConnector {
- private static final Log log = LogFactory.getLog(HttpConnector.class);
+ private static final Logger log = LoggerFactory.getLogger(HttpConnector.class);
private int maxNumberOfRetry = 6;
private int defaultDelay = 120; // seconds
@@ -45,7 +45,20 @@ public class HttpConnector {
* @throws CollectorException when retrying more than maxNumberOfRetry times
*/
public String getInputSource(final String requestUrl) throws CollectorException {
- return attemptDownlaodAsString(requestUrl, 1, new CollectorPluginErrorLogList());
+ return attemptDownloadAsString(requestUrl, 1, new CollectorPluginErrorLogList());
+ }
+
+ /**
+ * Given the URL returns the content via HTTP GET
+ *
+ * @param requestUrl the URL
+ * @param errorLogList the list of errors
+ * @return the content of the downloaded resource
+ * @throws CollectorException when retrying more than maxNumberOfRetry times
+ */
+ public String getInputSource(final String requestUrl, CollectorPluginErrorLogList errorLogList)
+ throws CollectorException {
+ return attemptDownloadAsString(requestUrl, 1, errorLogList);
}
/**
@@ -59,18 +72,20 @@ public class HttpConnector {
return attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
}
- private String attemptDownlaodAsString(
+ private String attemptDownloadAsString(
final String requestUrl, final int retryNumber, final CollectorPluginErrorLogList errorList)
throws CollectorException {
+
+ log.info("requesting URL [{}]", requestUrl);
try {
final InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
try {
return IOUtils.toString(s);
} catch (final IOException e) {
- log.error("error while retrieving from http-connection occured: " + requestUrl, e);
+ log.error("error while retrieving from http-connection occurred: {}", requestUrl, e);
Thread.sleep(defaultDelay * 1000);
errorList.add(e.getMessage());
- return attemptDownlaodAsString(requestUrl, retryNumber + 1, errorList);
+ return attemptDownloadAsString(requestUrl, retryNumber + 1, errorList);
} finally {
IOUtils.closeQuietly(s);
}
@@ -87,7 +102,7 @@ public class HttpConnector {
throw new CollectorException("Max number of retries exceeded. Cause: \n " + errorList);
}
- log.debug("Downloading " + requestUrl + " - try: " + retryNumber);
+ log.debug("requesting URL [{}], try {}", requestUrl, retryNumber);
try {
InputStream input = null;
@@ -103,7 +118,7 @@ public class HttpConnector {
final int retryAfter = obtainRetryAfter(urlConn.getHeaderFields());
if (retryAfter > 0 && urlConn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
- log.warn("waiting and repeating request after " + retryAfter + " sec.");
+ log.warn("waiting and repeating request after {} sec.", retryAfter);
Thread.sleep(retryAfter * 1000);
errorList.add("503 Service Unavailable");
urlConn.disconnect();
@@ -111,7 +126,7 @@ public class HttpConnector {
} else if (urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_PERM
|| urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_TEMP) {
final String newUrl = obtainNewLocation(urlConn.getHeaderFields());
- log.debug("The requested url has been moved to " + newUrl);
+ log.debug("The requested url has been moved to {}", newUrl);
errorList
.add(
String
@@ -121,15 +136,11 @@ public class HttpConnector {
urlConn.disconnect();
return attemptDownload(newUrl, retryNumber + 1, errorList);
} else if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
- log
- .error(
- String
- .format(
- "HTTP error: %s %s", urlConn.getResponseCode(), urlConn.getResponseMessage()));
+ final String msg = String
+ .format("HTTP error: %s %s", urlConn.getResponseCode(), urlConn.getResponseMessage());
+ log.error(msg);
Thread.sleep(defaultDelay * 1000);
- errorList
- .add(
- String.format("%s %s", urlConn.getResponseCode(), urlConn.getResponseMessage()));
+ errorList.add(msg);
urlConn.disconnect();
return attemptDownload(requestUrl, retryNumber + 1, errorList);
} else {
@@ -138,7 +149,7 @@ public class HttpConnector {
return input;
}
} catch (final IOException e) {
- log.error("error while retrieving from http-connection occured: " + requestUrl, e);
+ log.error("error while retrieving from http-connection occurred: {}", requestUrl, e);
Thread.sleep(defaultDelay * 1000);
errorList.add(e.getMessage());
return attemptDownload(requestUrl, retryNumber + 1, errorList);
@@ -149,12 +160,12 @@ public class HttpConnector {
}
private void logHeaderFields(final HttpURLConnection urlConn) throws IOException {
- log.debug("StatusCode: " + urlConn.getResponseMessage());
+ log.debug("StatusCode: {}", urlConn.getResponseMessage());
for (final Map.Entry> e : urlConn.getHeaderFields().entrySet()) {
if (e.getKey() != null) {
for (final String v : e.getValue()) {
- log.debug(" key: " + e.getKey() + " - value: " + v);
+ log.debug(" key: {} value: {}", e.getKey(), v);
}
}
}
@@ -183,37 +194,6 @@ public class HttpConnector {
"The requested url has been MOVED, but 'location' param is MISSING");
}
- /**
- * register for https scheme; this is a workaround and not intended for the use in trusted environments
- */
- public void initTrustManager() {
- final X509TrustManager tm = new X509TrustManager() {
-
- @Override
- public void checkClientTrusted(final X509Certificate[] xcs, final String string) {
- }
-
- @Override
- public void checkServerTrusted(final X509Certificate[] xcs, final String string) {
- }
-
- @Override
- public X509Certificate[] getAcceptedIssuers() {
- return null;
- }
- };
- try {
- final SSLContext ctx = SSLContext.getInstance("TLS");
- ctx.init(null, new TrustManager[] {
- tm
- }, null);
- HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory());
- } catch (final GeneralSecurityException e) {
- log.fatal(e);
- throw new IllegalStateException(e);
- }
- }
-
public int getMaxNumberOfRetry() {
return maxNumberOfRetry;
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java
index c6ed5a1e3..e1b1b849c 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java
@@ -1,42 +1,31 @@
package eu.dnetlib.dhp.transformation;
+import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
+import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
-import java.io.ByteArrayInputStream;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
-import org.dom4j.Document;
-import org.dom4j.DocumentException;
-import org.dom4j.Node;
-import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
-import eu.dnetlib.dhp.transformation.vocabulary.VocabularyHelper;
-import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
-import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-import eu.dnetlib.message.Message;
-import eu.dnetlib.message.MessageManager;
-import eu.dnetlib.message.MessageType;
public class TransformSparkJobNode {
@@ -59,38 +48,64 @@ public class TransformSparkJobNode {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
- final String inputPath = parser.get("mdstoreInputPath");
- final String outputPath = parser.get("mdstoreOutputPath");
- // TODO this variable will be used after implementing Messaging with DNet Aggregator
+ final String mdstoreInputVersion = parser.get("mdstoreInputVersion");
+ final String mdstoreOutputVersion = parser.get("mdstoreOutputVersion");
+
+ final MDStoreVersion nativeMdStoreVersion = MAPPER.readValue(mdstoreInputVersion, MDStoreVersion.class);
+ final String inputPath = nativeMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH;
+ log.info("inputPath: {}", inputPath);
+
+ final MDStoreVersion cleanedMdStoreVersion = MAPPER.readValue(mdstoreOutputVersion, MDStoreVersion.class);
+ final String outputBasePath = cleanedMdStoreVersion.getHdfsPath();
+ log.info("outputBasePath: {}", outputBasePath);
final String isLookupUrl = parser.get("isLookupUrl");
log.info(String.format("isLookupUrl: %s", isLookupUrl));
+ final String dateOfTransformation = parser.get("dateOfTransformation");
+ log.info(String.format("dateOfTransformation: %s", dateOfTransformation));
+
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
+ final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
+
+ log.info("Retrieved {} vocabularies", vocabularies.vocabularyNames().size());
+
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
- spark -> transformRecords(parser.getObjectMap(), isLookupService, spark, inputPath, outputPath));
+ spark -> {
+ transformRecords(
+ parser.getObjectMap(), isLookupService, spark, inputPath, outputBasePath);
+ });
}
public static void transformRecords(final Map args, final ISLookUpService isLookUpService,
- final SparkSession spark, final String inputPath, final String outputPath) throws DnetTransformationException {
+ final SparkSession spark, final String inputPath, final String outputBasePath)
+ throws DnetTransformationException, IOException {
- final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
- final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems");
- final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems");
+ final LongAccumulator totalItems = spark.sparkContext().longAccumulator(CONTENT_TOTALITEMS);
+ final LongAccumulator errorItems = spark.sparkContext().longAccumulator(CONTENT_INVALIDRECORDS);
+ final LongAccumulator transformedItems = spark.sparkContext().longAccumulator(CONTENT_TRANSFORMEDRECORDS);
final AggregationCounter ct = new AggregationCounter(totalItems, errorItems, transformedItems);
final Encoder encoder = Encoders.bean(MetadataRecord.class);
- final Dataset mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
- final MapFunction XSLTTransformationFunction = TransformationFactory
- .getTransformationPlugin(args, ct, isLookUpService);
- mdstoreInput.map(XSLTTransformationFunction, encoder).write().save(outputPath);
+
+ final Dataset mdstore = spark
+ .read()
+ .format("parquet")
+ .load(inputPath)
+ .as(encoder)
+ .map(
+ TransformationFactory.getTransformationPlugin(args, ct, isLookUpService),
+ encoder);
+ saveDataset(mdstore, outputBasePath + MDSTORE_DATA_PATH);
log.info("Transformed item " + ct.getProcessedItems().count());
log.info("Total item " + ct.getTotalItems().count());
log.info("Transformation Error item " + ct.getErrorItems().count());
+
+ writeTotalSizeOnHDFS(spark, mdstore.count(), outputBasePath + MDSTORE_SIZE_PATH);
}
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java
index 58292139a..45ba2981f 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java
@@ -18,7 +18,7 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class TransformationFactory {
private static final Logger log = LoggerFactory.getLogger(TransformationFactory.class);
- public static final String TRULE_XQUERY = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//TITLE = \"%s\" return $x//CODE/text()";
+ public static final String TRULE_XQUERY = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//RESOURCE_IDENTIFIER/@value = \"%s\" return $x//CODE/*[local-name() =\"stylesheet\"]";
public static MapFunction getTransformationPlugin(
final Map jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService)
@@ -30,13 +30,13 @@ public class TransformationFactory {
log.info("Transformation plugin required " + transformationPlugin);
switch (transformationPlugin) {
case "XSLT_TRANSFORM": {
- final String transformationRuleName = jobArgument.get("transformationRuleTitle");
- if (StringUtils.isBlank(transformationRuleName))
+ final String transformationRuleId = jobArgument.get("transformationRuleId");
+ if (StringUtils.isBlank(transformationRuleId))
throw new DnetTransformationException("Missing Parameter transformationRule");
final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
final String transformationRule = queryTransformationRuleFromIS(
- transformationRuleName, isLookupService);
+ transformationRuleId, isLookupService);
final long dateOfTransformation = new Long(jobArgument.get("dateOfTransformation"));
return new XSLTTransformationFunction(counters, transformationRule, dateOfTransformation,
@@ -54,15 +54,15 @@ public class TransformationFactory {
}
}
- private static String queryTransformationRuleFromIS(final String transformationRuleName,
+ private static String queryTransformationRuleFromIS(final String transformationRuleId,
final ISLookUpService isLookUpService) throws Exception {
- final String query = String.format(TRULE_XQUERY, transformationRuleName);
- log.info("asking query to IS: " + query);
+ final String query = String.format(TRULE_XQUERY, transformationRuleId);
+ System.out.println("asking query to IS: " + query);
List result = isLookUpService.quickSearchProfile(query);
if (result == null || result.isEmpty())
throw new DnetTransformationException(
- "Unable to find transformation rule with name: " + transformationRuleName);
+ "Unable to find transformation rule with name: " + transformationRuleId);
return result.get(0);
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/config-default.xml
index 2e0ed9aee..dd3c32c62 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/config-default.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/config-default.xml
@@ -15,4 +15,9 @@
oozie.action.sharelib.for.spark
spark2
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json
index 7f5113930..987f004bb 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json
@@ -30,16 +30,16 @@
"paramRequired": true
},
{
- "paramName": "i",
- "paramLongName": "input",
- "paramDescription": "the path of the sequencial file to read",
+ "paramName": "mv",
+ "paramLongName": "mdStoreVersion",
+ "paramDescription": "the Metadata Store Version Info",
"paramRequired": true
},
{
- "paramName": "o",
- "paramLongName": "output",
- "paramDescription": "the path of the result DataFrame on HDFS",
- "paramRequired": true
+ "paramName": "rmv",
+ "paramLongName": "readMdStoreVersion",
+ "paramDescription": "the Read Lock Metadata Store Version bean",
+ "paramRequired": false
},
{
"paramName": "w",
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json
index 901664e0d..60e9762ff 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json
@@ -1,6 +1,26 @@
[
- {"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true},
- {"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true},
- {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true},
- {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": false}
+ {
+ "paramName": "a",
+ "paramLongName": "apidescriptor",
+ "paramDescription": "the JSON encoding of the API Descriptor",
+ "paramRequired": true
+ },
+ {
+ "paramName": "n",
+ "paramLongName": "namenode",
+ "paramDescription": "the Name Node URI",
+ "paramRequired": true
+ },
+ {
+ "paramName": "mv",
+ "paramLongName": "mdStoreVersion",
+ "paramDescription": "the MDStore Version bean",
+ "paramRequired": true
+ },
+ {
+ "paramName": "w",
+ "paramLongName": "workflowId",
+ "paramDescription": "the identifier of the dnet Workflow",
+ "paramRequired": false
+ }
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mdstore_action_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mdstore_action_parameters.json
new file mode 100644
index 000000000..57a218a34
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mdstore_action_parameters.json
@@ -0,0 +1,45 @@
+[
+ {
+ "paramName": "a",
+ "paramLongName": "action",
+ "paramDescription": "the JSON encoding of the API Descriptor",
+ "paramRequired": true
+ },
+ {
+ "paramName": "mu",
+ "paramLongName": "mdStoreManagerURI",
+ "paramDescription": "the MDStore Manager URI",
+ "paramRequired": true
+ },
+ {
+ "paramName": "mi",
+ "paramLongName": "mdStoreID",
+ "paramDescription": "the Metadata Store ID",
+ "paramRequired": false
+ },
+ {
+ "paramName": "ms",
+ "paramLongName": "mdStoreSize",
+ "paramDescription": "the Metadata Store Size",
+ "paramRequired": false
+ },
+ {
+ "paramName": "mv",
+ "paramLongName": "mdStoreVersion",
+ "paramDescription": "the Metadata Version Bean",
+ "paramRequired": false
+ },
+ {
+ "paramName": "n",
+ "paramLongName": "namenode",
+ "paramDescription": "the Name Node URI",
+ "paramRequired": false
+ },
+ {
+ "paramName": "rm",
+ "paramLongName": "readMDStoreId",
+ "paramDescription": "the ID Locked to Read",
+ "paramRequired": false
+ }
+
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/config-default.xml
index 2e0ed9aee..e77dd09c9 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/config-default.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/config-default.xml
@@ -15,4 +15,8 @@
oozie.action.sharelib.for.spark
spark2
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml
index 38cd83da7..2b2cf9dce 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml
@@ -1,58 +1,95 @@
-
- mdStorePath
- the path of the native mdstore
-
-
apiDescription
A json encoding of the API Description class
-
dataSourceInfo
A json encoding of the Datasource Info
identifierPath
- An xpath to retrieve the metadata idnentifier for the generation of DNet Identifier
+ An xpath to retrieve the metadata identifier for the generation of DNet Identifier
-
metadataEncoding
The type of the metadata XML/JSON
-
timestamp
The timestamp of the collection date
-
workflowId
The identifier of the workflow
+
+ mdStoreID
+ The identifier of the mdStore
+
+
+ mdStoreManagerURI
+ The URI of the MDStore Manager
+
+
+ collectionMode
+ Should be REFRESH or INCREMENTAL
+
+
${jobTracker}
${nameNode}
-
+
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+ ${wf:conf('collectionMode') eq 'REFRESH'}
+ ${wf:conf('collectionMode') eq 'INCREMENTAL'}
+
+
+
+
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionREAD_LOCK
+ --mdStoreID${mdStoreID}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionNEW_VERSION
+ --mdStoreID${mdStoreID}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication
- --hdfsPath${workingDir}/sequenceFile_${mdstoreVersion}
--apidescriptor${apiDescription}
--namenode${nameNode}
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
-
+
@@ -75,13 +112,76 @@
--dateOfCollection${timestamp}
--provenance${dataSourceInfo}
--xpath${identifierPath}
- --input${workingDir}/sequenceFile
- --output${mdStorePath}
- -w${workflowId}
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --readMdStoreVersion${wf:actionData('BeginRead')['mdStoreReadLockVersion']}
+
+
+
+
+
+
+ ${wf:conf('collectionMode') eq 'REFRESH'}
+ ${wf:conf('collectionMode') eq 'INCREMENTAL'}
+
+
+
+
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionREAD_UNLOCK
+ --mdStoreManagerURI${mdStoreManagerURI}
+ --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']}
+
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionCOMMIT
+ --namenode${nameNode}
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+ ${wf:conf('collectionMode') eq 'REFRESH'}
+ ${wf:conf('collectionMode') eq 'INCREMENTAL'}
+
+
+
+
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionREAD_UNLOCK
+ --mdStoreManagerURI${mdStoreManagerURI}
+ --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']}
+
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionROLLBACK
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/config-default.xml
index 2e0ed9aee..bdd48b0ab 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/config-default.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/config-default.xml
@@ -15,4 +15,5 @@
oozie.action.sharelib.for.spark
spark2
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml
index b36bc3766..9e01936d4 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml
@@ -1,45 +1,85 @@
- mdstoreInputPath
- the path of the native MDStore
+ mdStoreInputId
+ the identifier of the native MDStore
-
- mdstoreOutputPath
+ mdStoreOutputId
+ the identifier of the cleaned MDStore
+
+
+ mdStoreManagerURI
the path of the cleaned mdstore
-
- transformationRuleTitle
+ transformationRuleId
The transformation Rule to apply
-
transformationPlugin
+ XSLT_TRANSFORM
The transformation Plugin
-
dateOfTransformation
The timestamp of the transformation date
-
-
+
+ isLookupUrl
+ The IS lookUp service endopoint
+
-
+
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionREAD_LOCK
+ --mdStoreID${mdStoreInputId}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionNEW_VERSION
+ --mdStoreID${mdStoreOutputId}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
yarn
cluster
Transform MetadataStore
eu.dnetlib.dhp.transformation.TransformSparkJobNode
- dhp-aggregations-${projectVersion}.jar
+ dhp-aggregation-${projectVersion}.jar
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
@@ -49,19 +89,89 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --mdstoreInputPath${mdstoreInputPath}
- --mdstoreOutputPath${mdstoreOutputPath}
+ --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdstoreInputVersion${wf:actionData('BeginRead')['mdStoreReadLockVersion']}
--dateOfTransformation${dateOfTransformation}
--transformationPlugin${transformationPlugin}
- --transformationRuleTitle${transformationRuleTitle}
-
-
+ --transformationRuleId${transformationRuleId}
+ --isLookupUrl${isLookupUrl}
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionREAD_UNLOCK
+ --mdStoreManagerURI${mdStoreManagerURI}
+ --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']}
+
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionCOMMIT
+ --namenode${nameNode}
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionREAD_UNLOCK
+ --mdStoreManagerURI${mdStoreManagerURI}
+ --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']}
+
+
+
+
+
-
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionROLLBACK
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json
index cbd2f25ab..d92698de5 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json
@@ -13,19 +13,19 @@
},
{
"paramName": "i",
- "paramLongName": "mdstoreInputPath",
- "paramDescription": "the path of the sequencial file to read",
+ "paramLongName": "mdstoreInputVersion",
+ "paramDescription": "the mdStore Version bean of the Input",
"paramRequired": true
},
{
"paramName": "o",
- "paramLongName": "mdstoreOutputPath",
- "paramDescription": "the path of the result DataFrame on HDFS",
+ "paramLongName": "mdstoreOutputVersion",
+ "paramDescription": "the mdStore Version bean of the Output",
"paramRequired": true
},
{
"paramName": "tr",
- "paramLongName": "transformationRuleTitle",
+ "paramLongName": "transformationRuleId",
"paramDescription": "the transformation Rule to apply to the input MDStore",
"paramRequired": true
},
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java
new file mode 100644
index 000000000..d5ecc9cb0
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java
@@ -0,0 +1,199 @@
+
+package eu.dnetlib.dhp.aggregation;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
+import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
+import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
+import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
+import eu.dnetlib.dhp.transformation.TransformSparkJobNode;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class AggregationJobTest {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static SparkSession spark;
+
+ private static Path workingDir;
+
+ private static Encoder encoder;
+
+ private static final String encoding = "XML";
+ private static final String dateOfCollection = System.currentTimeMillis() + "";
+ private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']";
+ private static String provenance;
+
+ private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class);
+
+ @BeforeAll
+ public static void beforeAll() throws IOException {
+ provenance = IOUtils
+ .toString(AggregationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json"));
+ workingDir = Files.createTempDirectory(AggregationJobTest.class.getSimpleName());
+ log.info("using work dir {}", workingDir);
+
+ SparkConf conf = new SparkConf();
+
+ conf.setAppName(AggregationJobTest.class.getSimpleName());
+
+ conf.setMaster("local[*]");
+ conf.set("spark.driver.host", "localhost");
+ conf.set("hive.metastore.local", "true");
+ conf.set("spark.ui.enabled", "false");
+ conf.set("spark.sql.warehouse.dir", workingDir.toString());
+ conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
+
+ encoder = Encoders.bean(MetadataRecord.class);
+ spark = SparkSession
+ .builder()
+ .appName(AggregationJobTest.class.getSimpleName())
+ .config(conf)
+ .getOrCreate();
+ }
+
+ @AfterAll
+ public static void afterAll() throws IOException {
+ FileUtils.deleteDirectory(workingDir.toFile());
+ spark.stop();
+ }
+
+ @Test
+ @Order(1)
+ public void testGenerateNativeStoreSparkJobRefresh() throws Exception {
+
+ MDStoreVersion mdStoreV1 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json");
+ FileUtils.forceMkdir(new File(mdStoreV1.getHdfsPath()));
+
+ IOUtils
+ .copy(
+ getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"),
+ new FileOutputStream(mdStoreV1.getHdfsPath() + "/sequence_file"));
+
+ GenerateNativeStoreSparkJob
+ .main(
+ new String[] {
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-encoding", encoding,
+ "-dateOfCollection", dateOfCollection,
+ "-provenance", provenance,
+ "-xpath", xpath,
+ "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1),
+ "-readMdStoreVersion", "",
+ "-workflowId", "abc"
+ });
+
+ verify(mdStoreV1);
+ }
+
+ @Test
+ @Order(2)
+ public void testGenerateNativeStoreSparkJobIncremental() throws Exception {
+
+ MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json");
+ FileUtils.forceMkdir(new File(mdStoreV2.getHdfsPath()));
+
+ IOUtils
+ .copy(
+ getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"),
+ new FileOutputStream(mdStoreV2.getHdfsPath() + "/sequence_file"));
+
+ MDStoreVersion mdStoreV1 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json");
+
+ GenerateNativeStoreSparkJob
+ .main(
+ new String[] {
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-encoding", encoding,
+ "-dateOfCollection", dateOfCollection,
+ "-provenance", provenance,
+ "-xpath", xpath,
+ "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2),
+ "-readMdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1),
+ "-workflowId", "abc"
+ });
+
+ verify(mdStoreV2);
+ }
+
+ @Test
+ @Order(3)
+ public void testTransformSparkJob() throws Exception {
+
+ MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json");
+ MDStoreVersion mdStoreCleanedVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json");
+
+ TransformSparkJobNode.main(new String[] {
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-dateOfTransformation", dateOfCollection,
+ "-mdstoreInputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2),
+ "-mdstoreOutputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreCleanedVersion),
+ "-transformationPlugin", "XSLT_TRANSFORM",
+ "-isLookupUrl", "https://dev-openaire.d4science.org/is/services/isLookUp",
+ "-transformationRuleId",
+ "183dde52-a69b-4db9-a07e-1ef2be105294_VHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZXMvVHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZVR5cGU="
+ });
+
+ }
+
+ protected void verify(MDStoreVersion mdStoreVersion) throws IOException {
+ Assertions.assertTrue(new File(mdStoreVersion.getHdfsPath()).exists());
+
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ long seqFileSize = sc
+ .sequenceFile(mdStoreVersion.getHdfsPath() + "/sequence_file", IntWritable.class, Text.class)
+ .count();
+
+ final Dataset mdstore = spark.read().load(mdStoreVersion.getHdfsPath() + "/store").as(encoder);
+ long mdStoreSize = mdstore.count();
+
+ long declaredSize = Long.parseLong(IOUtils.toString(new FileReader(mdStoreVersion.getHdfsPath() + "/size")));
+
+ Assertions.assertEquals(seqFileSize, declaredSize, "the size must be equal");
+ Assertions.assertEquals(seqFileSize, mdStoreSize, "the size must be equal");
+
+ long uniqueIds = mdstore
+ .map((MapFunction) MetadataRecord::getId, Encoders.STRING())
+ .distinct()
+ .count();
+
+ Assertions.assertEquals(seqFileSize, uniqueIds, "the size must be equal");
+ }
+
+ private MDStoreVersion prepareVersion(String filename) throws IOException {
+ MDStoreVersion mdstore = OBJECT_MAPPER
+ .readValue(IOUtils.toString(getClass().getResource(filename)), MDStoreVersion.class);
+ mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString()));
+ return mdstore;
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java
index c3b05f5c9..6f7bb2bc2 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java
@@ -16,6 +16,8 @@ import org.junit.jupiter.api.io.TempDir;
import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.data.mdstore.manager.common.model.MDStoreCurrentVersion;
+import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import eu.dnetlib.dhp.schema.common.ModelSupport;
@@ -37,6 +39,17 @@ public class CollectionJobTest {
spark.stop();
}
+ @Test
+ public void testJSONSerialization() throws Exception {
+ final String s = IOUtils.toString(getClass().getResourceAsStream("input.json"));
+ System.out.println("s = " + s);
+ final ObjectMapper mapper = new ObjectMapper();
+ MDStoreVersion mi = mapper.readValue(s, MDStoreVersion.class);
+
+ assertNotNull(mi);
+
+ }
+
@Test
public void tesCollection(@TempDir Path testDir) throws Exception {
final Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix");
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java
index fc19f2064..10964096c 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java
@@ -2,25 +2,18 @@
package eu.dnetlib.dhp.collector.worker;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.Mockito.*;
-import java.io.File;
import java.nio.file.Path;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.collector.worker.model.ApiDescriptor;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.CollectorWorker;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
-import eu.dnetlib.message.Message;
-import eu.dnetlib.message.MessageManager;
+import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
@Disabled
public class DnetCollectorWorkerApplicationTests {
@@ -47,7 +40,7 @@ public class DnetCollectorWorkerApplicationTests {
public void testFeeding(@TempDir Path testDir) throws Exception {
System.out.println(testDir.toString());
- CollectorWorker worker = new CollectorWorker(new CollectorPluginFactory(), getApi(),
+ CollectorWorker worker = new CollectorWorker(getApi(),
"file://" + testDir.toString() + "/file.seq", testDir.toString() + "/file.seq");
worker.collect();
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
index 6a80e01e2..d03c3acef 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
@@ -38,6 +38,7 @@ import eu.dnetlib.dhp.collection.CollectionJobTest;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json
new file mode 100644
index 000000000..a5adc8fda
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json
@@ -0,0 +1,9 @@
+{
+ "id":"md-cleaned",
+ "mdstore":"md-cleaned",
+ "writing":false,
+ "readCount":1,
+ "lastUpdate":1612187563099,
+ "size":71,
+ "hdfsPath":"%s/mdstore/md-cleaned"
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreVersion_1.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreVersion_1.json
new file mode 100644
index 000000000..8945c3d88
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreVersion_1.json
@@ -0,0 +1,9 @@
+{
+ "id":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42-1612187678801",
+ "mdstore":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42",
+ "writing":true,
+ "readCount":0,
+ "lastUpdate":null,
+ "size":0,
+ "hdfsPath":"%s/mdstore/md-84e86d00-5771-4ed9-b17f-177ef4b46e42/v1"
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreVersion_2.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreVersion_2.json
new file mode 100644
index 000000000..c3d4617cb
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreVersion_2.json
@@ -0,0 +1,9 @@
+{
+ "id":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42-1612187459108",
+ "mdstore":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42",
+ "writing":false,
+ "readCount":1,
+ "lastUpdate":1612187563099,
+ "size":71,
+ "hdfsPath":"%s/mdstore/md-84e86d00-5771-4ed9-b17f-177ef4b46e42/v2"
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/provenance.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/provenance.json
new file mode 100644
index 000000000..2cf0dab70
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/provenance.json
@@ -0,0 +1,5 @@
+{
+ "datasourceId":"74912366-d6df-49c1-a1fd-8a52fa98ce5f_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU\u003d",
+ "datasourceName":"PSNC Institutional Repository",
+ "nsPrefix":"psnc______pl"
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/sequence_file b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/sequence_file
new file mode 100644
index 000000000..309645a5f
Binary files /dev/null and b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/sequence_file differ
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl
index 9e5f84c11..becd3a05e 100644
--- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl
+++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl
@@ -9,7 +9,9 @@
-
+
+
+
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/input.xml b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/input.xml
index 8efb3c487..ebe8e919b 100644
--- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/input.xml
+++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/input.xml
@@ -1,68 +1,32 @@
-
-
-
- od______2294::00029b7f0a2a7e090e55b625a9079d83
- oai:pub.uni-bielefeld.de:2578942
- 2018-11-23T15:15:33.974+01:00
- od______2294
- oai:pub.uni-bielefeld.de:2578942
- 2018-07-24T13:01:16Z
- conference
- ddc:000
- conferenceFtxt
- driver
- open_access
-
-
-
- Mobile recommendation agents making online use of visual attention information at the point of sale
- Pfeiffer, Thies
- Pfeiffer, Jella
- Meißner, Martin
- Davis, Fred
- Riedl, René
- Jan, vom Brocke
- Léger, Pierre-Majorique
- Randolph, Adriane
- Mobile Cognitive Assistance Systems
- Information Systems
- ddc:000
- We aim to utilize online information about visual attention for developing mobile recommendation agents (RAs) for use at the point of sale. Up to now, most RAs are focussed exclusively at personalization in an e-commerce setting. Very little is known, however, about mobile RAs that offer information and assistance at the point of sale based on individual-level feature based preference models (Murray and Häubl 2009). Current attempts provide information about products at the point of sale by manually scanning barcodes or using RFID (Kowatsch et al. 2011, Heijden 2005), e.g. using specific apps for smartphones. We argue that an online access to the current visual attention of the user offers a much larger potential. Integrating mobile eye tracking into ordinary glasses would yield a direct benefit of applying neuroscience methods in the user’s everyday life. First, learning from consumers’ attentional processes over time and adapting recommendations based on this learning allows us to provide very accurate and relevant recommendations, potentially increasing the perceived usefulness. Second, our proposed system needs little explicit user input (no scanning or navigation on screen) making it easy to use. Thus, instead of learning from click behaviour and past customer ratings, as it is the case in the e-commerce setting, the mobile RA learns from eye movements by participating online in every day decision processes. We argue that mobile RAs should be built based on current research in human judgment and decision making (Murray et al. 2010). In our project, we therefore follow a two-step approach: In the empirical basic research stream, we aim to understand the user’s interaction with the product shelf: the actions and patterns of user’s behaviour (eye movements, gestures, approaching a product closer) and their correspondence to the user’s informational needs. In the empirical system development stream, we create prototypes of mobile RAs and test experimentally the factors that influence the user’s adoption. For example, we suggest that a user’s involvement in the process, such as a need for exact nutritional information or for assistance (e.g., reading support for elderly) will influence the user’s intention to use such as system. The experiments are conducted both in our immersive virtual reality supermarket presented in a CAVE, where we can also easily display information to the user and track the eye movement in great accuracy, as well as in real-world supermarkets (see Figure 1), so that the findings can be better generalized to natural decision situations (Gidlöf et al. 2013). In a first pilot study with five randomly chosen participants in a supermarket, we evaluated which sort of mobile RAs consumers favour in order to get a first impression of the user’s acceptance of the technology. Figure 1 shows an excerpt of one consumer’s eye movements during a decision process. First results show long eye cascades and short fixations on many products in situations where users are uncertain and in need for support. Furthermore, we find a surprising acceptance of the technology itself throughout all ages (23 – 61 years). At the same time, consumers express serious fear of being manipulated by such a technology. For that reason, they strongly prefer the information to be provided by trusted third party or shared with family members and friends (see also Murray and Häubl 2009). Our pilot will be followed by a larger field experiment in March in order to learn more about factors that influence the user’s acceptance as well as the eye movement patterns that reflect typical phases of decision processes and indicate the need for support by a RA.
- 2013
- info:eu-repo/semantics/conferenceObject
- doc-type:conferenceObject
- text
- https://pub.uni-bielefeld.de/record/2578942
- https://pub.uni-bielefeld.de/download/2578942/2602478
- Pfeiffer T, Pfeiffer J, Meißner M. Mobile recommendation agents making online use of visual attention information at the point of sale. In: Davis F, Riedl R, Jan vom B, Léger P-M, Randolph A, eds. Proceedings of the Gmunden Retreat on NeuroIS 2013. 2013: 3-3.
- eng
- info:eu-repo/semantics/openAccess
+
+
+ oai:lib.psnc.pl:278
+ 2011-08-25T15:17:13Z
+ PSNCRepository:PSNCExternalRepository:exhibitions
+ PSNCRepository:PSNCExternalRepository:Departments
+ PSNCRepository:PSNCExternalRepository:Departments:NetworkServices
+ PSNCRepository:PSNCExternalRepository
+ PSNCRepository:PSNCExternalRepository:publications
+ PSNCRepository
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
- http://pub.uni-bielefeld.de/oai
- oai:pub.uni-bielefeld.de:2578942
- 2018-07-24T13:01:16Z
- http://www.openarchives.org/OAI/2.0/oai_dc/
-
-
-
- false
- false
- 0.9
-
-
-
-
-
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 3e0626aed..cfe1edfbd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -374,11 +374,6 @@
provided
-
- com.rabbitmq
- amqp-client
- 5.6.0
-
com.jayway.jsonpath
json-path