added first implementation of dnet-workflows

This commit is contained in:
Sandro La Bruzzo 2019-03-18 10:44:35 +01:00
parent 37b84b6afa
commit e67d9ee1a9
23 changed files with 278 additions and 8 deletions

View File

@ -3,23 +3,42 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp</artifactId> <artifactId>dhp-workflows</artifactId>
<version>1.0.0-SNAPSHOT</version> <version>1.0.0-SNAPSHOT</version>
</parent> </parent>
<artifactId>dhp-spark-jobs</artifactId> <artifactId>dhp-aggregations</artifactId>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId> <artifactId>spark-core_2.11</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId> <artifactId>spark-sql_2.11</artifactId>
</dependency> </dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>jaxen</groupId>
<artifactId>jaxen</artifactId>
<version>1.1.6</version>
</dependency>
</dependencies> </dependencies>

View File

@ -0,0 +1,105 @@
package eu.dnetlib.dhp.collection;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import java.io.ByteArrayInputStream;
import java.util.Objects;
public class GenerateNativeStoreSparkJob {
public static MetadataRecord parseRecord (final String input, final String xpath, final String encoding, final Provenance provenance, final Long dateOfCollection, final LongAccumulator totalItems, final LongAccumulator invalidRecords) {
if(totalItems != null)
totalItems.add(1);
try {
SAXReader reader = new SAXReader();
Document document = reader.read(new ByteArrayInputStream(input.getBytes("UTF-8")));
Node node = document.selectSingleNode(xpath);
final String originalIdentifier = node.getText();
if (StringUtils.isBlank(originalIdentifier)) {
if (invalidRecords!= null)
invalidRecords.add(1);
return null;
}
return new MetadataRecord(originalIdentifier, encoding, provenance, input, dateOfCollection);
} catch (Throwable e) {
if (invalidRecords!= null)
invalidRecords.add(1);
e.printStackTrace();
return null;
}
}
public static void main(String[] args) throws Exception {
if (args == null || args.length != 6)
//TODO Create a DHPWFException
throw new Exception("unexpected number of parameters ");
final String encoding = args[0];
final long dateOfCollection = Long.valueOf(args[1]);
final String jsonProvenance = args[2];
final ObjectMapper jsonMapper = new ObjectMapper();
final Provenance provenance = jsonMapper.readValue(jsonProvenance, Provenance.class);
final String xpath = args[3];
final String inputPath = args[4];
final String outputPath = args[5];
final SparkSession spark = SparkSession
.builder()
.appName("GenerateNativeStoreSparkJob")
.master("yarn")
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final JavaPairRDD<IntWritable, Text> inputRDD = sc.sequenceFile(inputPath, IntWritable.class, Text.class);
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
final JavaRDD<MetadataRecord> mappeRDD = inputRDD.map(item -> parseRecord(item._2().toString(), xpath, encoding, provenance, dateOfCollection, totalItems, invalidRecords))
.filter(Objects::nonNull).distinct();
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
mdStoreRecords.add(mdstore.count());
System.out.println("totalItems.value() = " + totalItems.value());
System.out.println("invalidRecords = " + invalidRecords.value());
System.out.println("mdstoreRecords.value() = " + mdStoreRecords.value());
mdstore.write().format("parquet").save(outputPath);
}
}

View File

@ -0,0 +1,53 @@
<workflow-app name="Oozie_Java_Wf" xmlns="uri:oozie:workflow:0.5">
<start to="DeleteMDStoresNative"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DeleteMDStoresNative">
<fs>
<delete path='/user/sandro.labruzzo/mdstores/oai_1'/>
</fs>
<ok to="CollectionWorker"/>
<error to="Kill"/>
</action>
<action name="CollectionWorker">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<exec>lib/dhp-collector-worker-1.0.0.jar</exec>
<argument>/user/sandro.labruzzo/mdstores/oai_1</argument>
<argument>{&quot;id&quot;:&quot;oai&quot;,&quot;baseUrl&quot;:&quot;http://www.revista.vocesdelaeducacion.com.mx/index.php/index/oai&quot;,&quot;protocol&quot;:&quot;oai&quot;,&quot;params&quot;:{&quot;format&quot;:&quot;oai_dc&quot;}}</argument>
<argument>${nameNode}</argument>
<capture-output/>
</shell>
<ok to="mdBuilder"/>
<error to="Kill"/>
</action>
<action name="mdBuilder">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>cluster</mode>
<name>MDBuilder</name>
<class>eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob</class>
<jar>dhp-aggregations-1.0.0-SNAPSHOT.jar</jar>
<spark-opts>--num-executors 50 --conf spark.yarn.jars=&quot;hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2&quot;</spark-opts>
<arg>XML</arg>
<arg>1000</arg>
<arg>{"datasourceId":"pippo","datasourceName":"puppa","nsPrefix":"ns_prefix"}</arg>
<arg>./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']</arg>
<arg>/user/sandro.labruzzo/mdstores/oai_1</arg>
<arg>/user/sandro.labruzzo/mdstores/mdstore_1</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,34 @@
<record xmlns="http://www.openarchives.org/OAI/2.0/">
<header>
<identifier>oai:vocesdEducacion:article/100</identifier>
<datestamp>2018-09-02T03:21:17Z</datestamp>
<setSpec>voces:art</setSpec>
</header>
<metadata>
<oai_dc:dc xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd">
<dc:title xml:lang="es-ES">¿Cómo son los jóvenes estudiantes? Una propuesta analítica desde la investigación educativa</dc:title>
<dc:title xml:lang="en-US">How are the young students? An analytical proposal from educational research</dc:title>
<dc:creator>Meneses Reyes, Marcela</dc:creator>
<dc:creator>Pogliaghi, Leticia</dc:creator>
<dc:description xml:lang="es-ES">El objetivo de este artículo consiste en una revisión de cómo se ha estudiado a los jóvenes estudiantes desde la investigación educativa, en particular a los del nivel medio superior en México, para la elaboración de una propuesta analítica propia para su abordaje en su doble condición de estudiantes y de jóvenes.
Palabras clave: Estudiantes, jóvenes, educación media superior, investigación educativa</dc:description>
<dc:description xml:lang="en-US">The objective of this article is to make a review of how young students have been studied from the educational research, particularlly those of high school level in Mexico, for the elaboration of an own analytical proposal for the study of these subjects in their double condition as students and as young people.
Keywords: Students, young people, high school, educational research</dc:description>
<dc:publisher xml:lang="es-ES">Voces de la educación</dc:publisher>
<dc:date>2018-06-15</dc:date>
<dc:type>info:eu-repo/semantics/article</dc:type>
<dc:type>info:eu-repo/semantics/publishedVersion</dc:type>
<dc:format>application/pdf</dc:format>
<dc:identifier>http://www.revista.vocesdelaeducacion.com.mx/index.php/voces/article/view/100</dc:identifier>
<dc:source xml:lang="es-AR">Voces de la educación; ##issue.vol## 3 ##issue.no## 5 (2018); 170-178</dc:source>
<dc:source xml:lang="es-ES">Voces de la educación; Vol. 3 Núm. 5 (2018); 170-178</dc:source>
<dc:source xml:lang="en-US">Voices of education; Vol 3 No 5 (2018); 170-178</dc:source>
<dc:source>2448-6248</dc:source>
<dc:source>1665-1596</dc:source>
<dc:language>spa</dc:language>
<dc:relation>http://www.revista.vocesdelaeducacion.com.mx/index.php/voces/article/view/100/84</dc:relation>
<dc:rights xml:lang="en-US">Copyright (c) 2018 Marcela Meneses Reyes, Leticia Pogliaghi</dc:rights>
<dc:rights xml:lang="en-US">http://creativecommons.org/licenses/by-nc-sa/4.0</dc:rights>
</oai_dc:dc>
</metadata>
</record>

View File

@ -13,6 +13,21 @@
<artifactId>dhp-collector-worker</artifactId> <artifactId>dhp-collector-worker</artifactId>
<version>1.0.0</version> <version>1.0.0</version>
<repositories>
<repository>
<id>cloudera</id>
<name>Cloudera Repository</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
@ -21,7 +36,7 @@
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId> <artifactId>hadoop-client</artifactId>
<version>${dhp.hadoop.version}</version> <version>2.6.0-cdh5.9.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
@ -48,6 +63,7 @@
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <build>
@ -63,4 +79,14 @@
</build> </build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
<dhp.hadoop.version>2.6.0-${dhp.cdh.version}</dhp.hadoop.version>
<dhp.spark.version>2.2.0</dhp.spark.version>
<scala.version>2.11.8</scala.version>
</properties>
</project> </project>

View File

@ -56,7 +56,12 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
@Override @Override
public void run(final String... args) throws Exception { public void run(final String... args) throws Exception {
if (args.length == 0) { return; } if (args.length == 0) { return; }
if (args.length != 2) { throw new DnetCollectorException("Invalid number of parameters, expected: hdfs_path and json_api_description"); } if (args.length != 3) { throw new DnetCollectorException("Invalid number of parameters, expected: hdfs_path and json_api_description"); }
//TODO : migrate to https://commons.apache.org/proper/commons-cli/usage.html
final String hdfsPath = args[0]; final String hdfsPath = args[0];
@ -64,13 +69,16 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
final String json = args[1]; final String json = args[1];
final String nameNode = args[2];
log.info("json = "+json); log.info("json = "+json);
final ObjectMapper jsonMapper = new ObjectMapper(); final ObjectMapper jsonMapper = new ObjectMapper();
final ApiDescriptor api = jsonMapper.readValue(json, ApiDescriptor.class); final ApiDescriptor api = jsonMapper.readValue(json, ApiDescriptor.class);
final CollectorPlugin plugin = collectorPluginEnumerator.getPluginByProtocol(api.getProtocol()); final CollectorPlugin plugin = collectorPluginEnumerator.getPluginByProtocol(api.getProtocol());
final String hdfsuri ="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020"; final String hdfsuri =nameNode;
// ====== Init HDFS File System Object // ====== Init HDFS File System Object
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -79,6 +87,8 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
// Because of Maven // Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
System.setProperty("HADOOP_USER_NAME", "sandro.labruzzo");
System.setProperty("hadoop.home.dir", "/"); System.setProperty("hadoop.home.dir", "/");
//Get the filesystem - HDFS //Get the filesystem - HDFS
FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf); FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf);

View File

@ -3,6 +3,8 @@ package eu.dnetlib.collector.worker.model;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
//TODO sholud be moved on dhp-common
public class ApiDescriptor { public class ApiDescriptor {
private String id; private String id;

View File

@ -27,7 +27,7 @@ public class OaiCollectorPlugin implements CollectorPlugin {
private static final String FORMAT_PARAM = "format"; private static final String FORMAT_PARAM = "format";
private static final String OAI_SET_PARAM = "set"; private static final String OAI_SET_PARAM = "set";
private static final Object OAI_FROM_DATE_PARAM = "fromDate"; private static final Object OAI_FROM_DATE_PARAM = "fromDate";
private static final Object AI_UNTIL_DATE_PARAM = "untilDate"; private static final Object OAI_UNTIL_DATE_PARAM = "untilDate";
@Autowired @Autowired
private OaiIteratorFactory oaiIteratorFactory; private OaiIteratorFactory oaiIteratorFactory;
@ -38,7 +38,7 @@ public class OaiCollectorPlugin implements CollectorPlugin {
final String mdFormat = api.getParams().get(FORMAT_PARAM); final String mdFormat = api.getParams().get(FORMAT_PARAM);
final String setParam = api.getParams().get(OAI_SET_PARAM); final String setParam = api.getParams().get(OAI_SET_PARAM);
final String fromDate = api.getParams().get(OAI_FROM_DATE_PARAM); final String fromDate = api.getParams().get(OAI_FROM_DATE_PARAM);
final String untilDate = api.getParams().get(AI_UNTIL_DATE_PARAM); final String untilDate = api.getParams().get(OAI_UNTIL_DATE_PARAM);
final List<String> sets = new ArrayList<>(); final List<String> sets = new ArrayList<>();
if (setParam != null) { if (setParam != null) {

View File

@ -0,0 +1,4 @@
#Created by Apache Maven 3.5.2
version=1.0.0
groupId=eu.dnetlib
artifactId=dhp-collector-worker

16
dhp-workflows/pom.xml Normal file
View File

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>dhp-workflows</artifactId>
<packaging>pom</packaging>
<modules>
<module>dhp-collector-worker</module>
<module>dhp-aggregation</module>
</modules>
</project>