diff --git a/dhp-spark-jobs/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
similarity index 55%
rename from dhp-spark-jobs/pom.xml
rename to dhp-workflows/dhp-aggregation/pom.xml
index 323fa1d59..0534e23b8 100644
--- a/dhp-spark-jobs/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -3,23 +3,42 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
-
eu.dnetlib.dhp
- dhp
+ dhp-workflows
1.0.0-SNAPSHOT
- dhp-spark-jobs
+ dhp-aggregations
org.apache.spark
spark-core_2.11
+
org.apache.spark
spark-sql_2.11
+
+ eu.dnetlib.dhp
+ dhp-common
+ 1.0.0-SNAPSHOT
+
+
+
+ dom4j
+ dom4j
+ 1.6.1
+
+
+
+ jaxen
+ jaxen
+ 1.1.6
+
+
+
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java
new file mode 100644
index 000000000..5edf89a05
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java
@@ -0,0 +1,105 @@
+package eu.dnetlib.dhp.collection;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
+import eu.dnetlib.dhp.model.mdstore.Provenance;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.util.LongAccumulator;
+import org.dom4j.Document;
+import org.dom4j.Node;
+import org.dom4j.io.SAXReader;
+
+import java.io.ByteArrayInputStream;
+import java.util.Objects;
+
+public class GenerateNativeStoreSparkJob {
+
+
+ public static MetadataRecord parseRecord (final String input, final String xpath, final String encoding, final Provenance provenance, final Long dateOfCollection, final LongAccumulator totalItems, final LongAccumulator invalidRecords) {
+
+ if(totalItems != null)
+ totalItems.add(1);
+ try {
+ SAXReader reader = new SAXReader();
+ Document document = reader.read(new ByteArrayInputStream(input.getBytes("UTF-8")));
+ Node node = document.selectSingleNode(xpath);
+ final String originalIdentifier = node.getText();
+ if (StringUtils.isBlank(originalIdentifier)) {
+ if (invalidRecords!= null)
+ invalidRecords.add(1);
+ return null;
+ }
+ return new MetadataRecord(originalIdentifier, encoding, provenance, input, dateOfCollection);
+ } catch (Throwable e) {
+ if (invalidRecords!= null)
+ invalidRecords.add(1);
+ e.printStackTrace();
+ return null;
+
+ }
+ }
+
+
+ public static void main(String[] args) throws Exception {
+
+ if (args == null || args.length != 6)
+ //TODO Create a DHPWFException
+ throw new Exception("unexpected number of parameters ");
+
+ final String encoding = args[0];
+ final long dateOfCollection = Long.valueOf(args[1]);
+ final String jsonProvenance = args[2];
+ final ObjectMapper jsonMapper = new ObjectMapper();
+ final Provenance provenance = jsonMapper.readValue(jsonProvenance, Provenance.class);
+ final String xpath = args[3];
+ final String inputPath = args[4];
+ final String outputPath = args[5];
+
+
+
+ final SparkSession spark = SparkSession
+ .builder()
+ .appName("GenerateNativeStoreSparkJob")
+ .master("yarn")
+ .getOrCreate();
+
+ final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+
+ final JavaPairRDD inputRDD = sc.sequenceFile(inputPath, IntWritable.class, Text.class);
+
+ final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
+
+ final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
+
+ final JavaRDD mappeRDD = inputRDD.map(item -> parseRecord(item._2().toString(), xpath, encoding, provenance, dateOfCollection, totalItems, invalidRecords))
+ .filter(Objects::nonNull).distinct();
+
+ final Encoder encoder = Encoders.bean(MetadataRecord.class);
+ final Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
+
+
+ final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
+ mdStoreRecords.add(mdstore.count());
+ System.out.println("totalItems.value() = " + totalItems.value());
+ System.out.println("invalidRecords = " + invalidRecords.value());
+ System.out.println("mdstoreRecords.value() = " + mdStoreRecords.value());
+
+ mdstore.write().format("parquet").save(outputPath);
+
+
+
+
+
+
+
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/oozie/workflows/oozie_collection_workflows.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/oozie/workflows/oozie_collection_workflows.xml
new file mode 100644
index 000000000..c14e280bb
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/oozie/workflows/oozie_collection_workflows.xml
@@ -0,0 +1,53 @@
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ lib/dhp-collector-worker-1.0.0.jar
+ /user/sandro.labruzzo/mdstores/oai_1
+ {"id":"oai","baseUrl":"http://www.revista.vocesdelaeducacion.com.mx/index.php/index/oai","protocol":"oai","params":{"format":"oai_dc"}}
+ ${nameNode}
+
+
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ yarn
+ cluster
+ MDBuilder
+ eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob
+ dhp-aggregations-1.0.0-SNAPSHOT.jar
+ --num-executors 50 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2"
+ XML
+ 1000
+ {"datasourceId":"pippo","datasourceName":"puppa","nsPrefix":"ns_prefix"}
+ ./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']
+ /user/sandro.labruzzo/mdstores/oai_1
+ /user/sandro.labruzzo/mdstores/mdstore_1
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/record.xml b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/record.xml
new file mode 100644
index 000000000..3043a2d2e
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/record.xml
@@ -0,0 +1,34 @@
+
+
+ oai:vocesdEducacion:article/100
+ 2018-09-02T03:21:17Z
+ voces:art
+
+
+
+ ¿Cómo son los jóvenes estudiantes? Una propuesta analítica desde la investigación educativa
+ How are the young students? An analytical proposal from educational research
+ Meneses Reyes, Marcela
+ Pogliaghi, Leticia
+ El objetivo de este artículo consiste en una revisión de cómo se ha estudiado a los jóvenes estudiantes desde la investigación educativa, en particular a los del nivel medio superior en México, para la elaboración de una propuesta analítica propia para su abordaje en su doble condición de estudiantes y de jóvenes.
+ Palabras clave: Estudiantes, jóvenes, educación media superior, investigación educativa
+ The objective of this article is to make a review of how young students have been studied from the educational research, particularlly those of high school level in Mexico, for the elaboration of an own analytical proposal for the study of these subjects in their double condition as students and as young people.
+ Keywords: Students, young people, high school, educational research
+ Voces de la educación
+ 2018-06-15
+ info:eu-repo/semantics/article
+ info:eu-repo/semantics/publishedVersion
+ application/pdf
+ http://www.revista.vocesdelaeducacion.com.mx/index.php/voces/article/view/100
+ Voces de la educación; ##issue.vol## 3 ##issue.no## 5 (2018); 170-178
+ Voces de la educación; Vol. 3 Núm. 5 (2018); 170-178
+ Voices of education; Vol 3 No 5 (2018); 170-178
+ 2448-6248
+ 1665-1596
+ spa
+ http://www.revista.vocesdelaeducacion.com.mx/index.php/voces/article/view/100/84
+ Copyright (c) 2018 Marcela Meneses Reyes, Leticia Pogliaghi
+ http://creativecommons.org/licenses/by-nc-sa/4.0
+
+
+
\ No newline at end of file
diff --git a/dhp-collector-worker/README.md b/dhp-workflows/dhp-collector-worker/README.md
similarity index 100%
rename from dhp-collector-worker/README.md
rename to dhp-workflows/dhp-collector-worker/README.md
diff --git a/dhp-collector-worker/pom.xml b/dhp-workflows/dhp-collector-worker/pom.xml
similarity index 70%
rename from dhp-collector-worker/pom.xml
rename to dhp-workflows/dhp-collector-worker/pom.xml
index e187d86f1..27fae6961 100644
--- a/dhp-collector-worker/pom.xml
+++ b/dhp-workflows/dhp-collector-worker/pom.xml
@@ -13,6 +13,21 @@
dhp-collector-worker
1.0.0
+
+
+
+ cloudera
+ Cloudera Repository
+ https://repository.cloudera.com/artifactory/cloudera-repos
+
+ true
+
+
+ false
+
+
+
+
org.springframework.boot
@@ -21,7 +36,7 @@
org.apache.hadoop
hadoop-client
- ${dhp.hadoop.version}
+ 2.6.0-cdh5.9.2
com.fasterxml.jackson.core
@@ -48,6 +63,7 @@
spring-boot-starter-test
test
+
@@ -63,4 +79,14 @@
+
+ UTF-8
+ UTF-8
+ cdh5.9.2
+ 2.6.0-${dhp.cdh.version}
+ 2.2.0
+ 2.11.8
+
+
+
\ No newline at end of file
diff --git a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorException.java b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorException.java
similarity index 100%
rename from dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorException.java
rename to dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorException.java
diff --git a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java
similarity index 92%
rename from dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java
rename to dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java
index a0fb1a4d4..c2854b8f2 100644
--- a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java
+++ b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java
@@ -56,7 +56,12 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
@Override
public void run(final String... args) throws Exception {
if (args.length == 0) { return; }
- if (args.length != 2) { throw new DnetCollectorException("Invalid number of parameters, expected: hdfs_path and json_api_description"); }
+ if (args.length != 3) { throw new DnetCollectorException("Invalid number of parameters, expected: hdfs_path and json_api_description"); }
+
+ //TODO : migrate to https://commons.apache.org/proper/commons-cli/usage.html
+
+
+
final String hdfsPath = args[0];
@@ -64,13 +69,16 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
final String json = args[1];
+
+ final String nameNode = args[2];
+
log.info("json = "+json);
final ObjectMapper jsonMapper = new ObjectMapper();
final ApiDescriptor api = jsonMapper.readValue(json, ApiDescriptor.class);
final CollectorPlugin plugin = collectorPluginEnumerator.getPluginByProtocol(api.getProtocol());
- final String hdfsuri ="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020";
+ final String hdfsuri =nameNode;
// ====== Init HDFS File System Object
Configuration conf = new Configuration();
@@ -79,6 +87,8 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
+
+ System.setProperty("HADOOP_USER_NAME", "sandro.labruzzo");
System.setProperty("hadoop.home.dir", "/");
//Get the filesystem - HDFS
FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf);
diff --git a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java
similarity index 95%
rename from dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java
rename to dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java
index 28b7f75d4..6874d97f4 100644
--- a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java
+++ b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java
@@ -3,6 +3,8 @@ package eu.dnetlib.collector.worker.model;
import java.util.HashMap;
import java.util.Map;
+
+//TODO sholud be moved on dhp-common
public class ApiDescriptor {
private String id;
diff --git a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/CollectorPlugin.java b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/CollectorPlugin.java
similarity index 100%
rename from dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/CollectorPlugin.java
rename to dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/CollectorPlugin.java
diff --git a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiCollectorPlugin.java b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiCollectorPlugin.java
similarity index 94%
rename from dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiCollectorPlugin.java
rename to dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiCollectorPlugin.java
index a536eef5c..7c4bec192 100644
--- a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiCollectorPlugin.java
+++ b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiCollectorPlugin.java
@@ -27,7 +27,7 @@ public class OaiCollectorPlugin implements CollectorPlugin {
private static final String FORMAT_PARAM = "format";
private static final String OAI_SET_PARAM = "set";
private static final Object OAI_FROM_DATE_PARAM = "fromDate";
- private static final Object AI_UNTIL_DATE_PARAM = "untilDate";
+ private static final Object OAI_UNTIL_DATE_PARAM = "untilDate";
@Autowired
private OaiIteratorFactory oaiIteratorFactory;
@@ -38,7 +38,7 @@ public class OaiCollectorPlugin implements CollectorPlugin {
final String mdFormat = api.getParams().get(FORMAT_PARAM);
final String setParam = api.getParams().get(OAI_SET_PARAM);
final String fromDate = api.getParams().get(OAI_FROM_DATE_PARAM);
- final String untilDate = api.getParams().get(AI_UNTIL_DATE_PARAM);
+ final String untilDate = api.getParams().get(OAI_UNTIL_DATE_PARAM);
final List sets = new ArrayList<>();
if (setParam != null) {
diff --git a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiIterator.java b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiIterator.java
similarity index 100%
rename from dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiIterator.java
rename to dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiIterator.java
diff --git a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiIteratorFactory.java b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiIteratorFactory.java
similarity index 100%
rename from dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiIteratorFactory.java
rename to dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiIteratorFactory.java
diff --git a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/CollectorPluginEnumerator.java b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/CollectorPluginEnumerator.java
similarity index 100%
rename from dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/CollectorPluginEnumerator.java
rename to dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/CollectorPluginEnumerator.java
diff --git a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/CollectorPluginErrorLogList.java b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/CollectorPluginErrorLogList.java
similarity index 100%
rename from dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/CollectorPluginErrorLogList.java
rename to dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/CollectorPluginErrorLogList.java
diff --git a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/DnetWorkerCollector.java b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/DnetWorkerCollector.java
similarity index 100%
rename from dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/DnetWorkerCollector.java
rename to dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/DnetWorkerCollector.java
diff --git a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/HttpConnector.java b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/HttpConnector.java
similarity index 100%
rename from dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/HttpConnector.java
rename to dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/HttpConnector.java
diff --git a/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/XmlCleaner.java b/dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/XmlCleaner.java
similarity index 100%
rename from dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/XmlCleaner.java
rename to dhp-workflows/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/XmlCleaner.java
diff --git a/dhp-collector-worker/src/main/resources/application.properties b/dhp-workflows/dhp-collector-worker/src/main/resources/application.properties
similarity index 100%
rename from dhp-collector-worker/src/main/resources/application.properties
rename to dhp-workflows/dhp-collector-worker/src/main/resources/application.properties
diff --git a/dhp-collector-worker/src/test/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplicationTests.java b/dhp-workflows/dhp-collector-worker/src/test/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplicationTests.java
similarity index 100%
rename from dhp-collector-worker/src/test/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplicationTests.java
rename to dhp-workflows/dhp-collector-worker/src/test/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplicationTests.java
diff --git a/dhp-workflows/dhp-collector-worker/target/classes/application.properties b/dhp-workflows/dhp-collector-worker/target/classes/application.properties
new file mode 100644
index 000000000..8b1378917
--- /dev/null
+++ b/dhp-workflows/dhp-collector-worker/target/classes/application.properties
@@ -0,0 +1 @@
+
diff --git a/dhp-workflows/dhp-collector-worker/target/maven-archiver/pom.properties b/dhp-workflows/dhp-collector-worker/target/maven-archiver/pom.properties
new file mode 100644
index 000000000..6b3be9ca3
--- /dev/null
+++ b/dhp-workflows/dhp-collector-worker/target/maven-archiver/pom.properties
@@ -0,0 +1,4 @@
+#Created by Apache Maven 3.5.2
+version=1.0.0
+groupId=eu.dnetlib
+artifactId=dhp-collector-worker
diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml
new file mode 100644
index 000000000..f892307aa
--- /dev/null
+++ b/dhp-workflows/pom.xml
@@ -0,0 +1,16 @@
+
+
+ 4.0.0
+
+ eu.dnetlib.dhp
+ dhp
+ 1.0.0-SNAPSHOT
+
+ dhp-workflows
+ pom
+
+ dhp-collector-worker
+ dhp-aggregation
+
+
+