Removed old messaging system not quite used from collection and Transformation workflow

code refactor
This commit is contained in:
Sandro La Bruzzo 2021-01-28 09:51:17 +01:00
parent 184e7b3856
commit 98b9498b57
35 changed files with 597 additions and 720 deletions

View File

@ -17,8 +17,8 @@ import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
/**
* Applies the parsing of a csv file and writes the Serialization of it in hdfs

View File

@ -4,7 +4,6 @@ package eu.dnetlib.dhp.actionmanager.project.utils;
import java.io.*;
import java.nio.charset.StandardCharsets;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -15,8 +14,8 @@ import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
/**
* Applies the parsing of an excel file and writes the Serialization of it in hdfs

View File

@ -1,9 +1,9 @@
package eu.dnetlib.dhp.aggregation.common;
import org.apache.spark.util.LongAccumulator;
package eu.dnetlib.dhp.aggregation.common;
import java.io.Serializable;
import org.apache.spark.util.LongAccumulator;
public class AggregationCounter implements Serializable {
private LongAccumulator totalItems;

View File

@ -5,12 +5,9 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.cli.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
@ -22,7 +19,6 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.Node;
@ -35,9 +31,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
import eu.dnetlib.message.MessageType;
public class GenerateNativeStoreSparkJob {
@ -53,8 +47,14 @@ public class GenerateNativeStoreSparkJob {
"/eu/dnetlib/dhp/collection/collection_input_parameters.json")));
parser.parseArgument(args);
final ObjectMapper jsonMapper = new ObjectMapper();
final Provenance provenance = jsonMapper.readValue(parser.get("provenance"), Provenance.class);
final long dateOfCollection = new Long(parser.get("dateOfCollection"));
final String provenanceArgument = parser.get("provenance");
log.info("Provenance is {}", provenanceArgument);
final Provenance provenance = jsonMapper.readValue(provenanceArgument, Provenance.class);
final String dateOfCollectionArgs = parser.get("dateOfCollection");
log.info("dateOfCollection is {}", dateOfCollectionArgs);
final long dateOfCollection = new Long(dateOfCollectionArgs);
final String sequenceFileInputPath = parser.get("input");
log.info("sequenceFileInputPath is {}", dateOfCollectionArgs);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
@ -62,11 +62,6 @@ public class GenerateNativeStoreSparkJob {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final Map<String, String> ongoingMap = new HashMap<>();
final Map<String, String> reportMap = new HashMap<>();
final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest"));
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
@ -75,20 +70,12 @@ public class GenerateNativeStoreSparkJob {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final JavaPairRDD<IntWritable, Text> inputRDD = sc
.sequenceFile(parser.get("input"), IntWritable.class, Text.class);
.sequenceFile(sequenceFileInputPath, IntWritable.class, Text.class);
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
final MessageManager manager = new MessageManager(
parser.get("rabbitHost"),
parser.get("rabbitUser"),
parser.get("rabbitPassword"),
false,
false,
null);
final JavaRDD<MetadataRecord> mappeRDD = inputRDD
final JavaRDD<MetadataRecord> nativeStore = inputRDD
.map(
item -> parseRecord(
item._2().toString(),
@ -101,44 +88,13 @@ public class GenerateNativeStoreSparkJob {
.filter(Objects::nonNull)
.distinct();
ongoingMap.put("ongoing", "0");
if (!test) {
manager
.sendMessage(
new Message(
parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
parser.get("rabbitOngoingQueue"),
true,
false);
}
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
final Dataset<MetadataRecord> mdstore = spark.createDataset(nativeStore.rdd(), encoder);
final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
mdStoreRecords.add(mdstore.count());
ongoingMap.put("ongoing", "" + totalItems.value());
if (!test) {
manager
.sendMessage(
new Message(
parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
parser.get("rabbitOngoingQueue"),
true,
false);
}
mdstore.write().format("parquet").save(parser.get("output"));
reportMap.put("inputItem", "" + totalItems.value());
reportMap.put("invalidRecords", "" + invalidRecords.value());
reportMap.put("mdStoreSize", "" + mdStoreRecords.value());
if (!test) {
manager
.sendMessage(
new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap),
parser.get("rabbitReportQueue"),
true,
false);
manager.close();
}
});
}
@ -166,12 +122,9 @@ public class GenerateNativeStoreSparkJob {
}
return new MetadataRecord(originalIdentifier, encoding, provenance, input, dateOfCollection);
} catch (Throwable e) {
if (invalidRecords != null)
invalidRecords.add(1);
e.printStackTrace();
return null;
}
}
}

View File

@ -4,9 +4,9 @@ package eu.dnetlib.dhp.collection.plugin;
import java.util.stream.Stream;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.CollectorException;
public interface CollectorPlugin {
Stream<String> collect(ApiDescriptor api) throws DnetCollectorException;
Stream<String> collect(ApiDescriptor api) throws CollectorException;
}

View File

@ -15,7 +15,7 @@ import com.google.common.collect.Lists;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.CollectorException;
public class OaiCollectorPlugin implements CollectorPlugin {
@ -27,7 +27,7 @@ public class OaiCollectorPlugin implements CollectorPlugin {
private OaiIteratorFactory oaiIteratorFactory;
@Override
public Stream<String> collect(final ApiDescriptor api) throws DnetCollectorException {
public Stream<String> collect(final ApiDescriptor api) throws CollectorException {
final String baseUrl = api.getBaseUrl();
final String mdFormat = api.getParams().get(FORMAT_PARAM);
final String setParam = api.getParams().get(OAI_SET_PARAM);
@ -46,19 +46,19 @@ public class OaiCollectorPlugin implements CollectorPlugin {
}
if (baseUrl == null || baseUrl.isEmpty()) {
throw new DnetCollectorException("Param 'baseurl' is null or empty");
throw new CollectorException("Param 'baseurl' is null or empty");
}
if (mdFormat == null || mdFormat.isEmpty()) {
throw new DnetCollectorException("Param 'mdFormat' is null or empty");
throw new CollectorException("Param 'mdFormat' is null or empty");
}
if (fromDate != null && !fromDate.matches("\\d{4}-\\d{2}-\\d{2}")) {
throw new DnetCollectorException("Invalid date (YYYY-MM-DD): " + fromDate);
throw new CollectorException("Invalid date (YYYY-MM-DD): " + fromDate);
}
if (untilDate != null && !untilDate.matches("\\d{4}-\\d{2}-\\d{2}")) {
throw new DnetCollectorException("Invalid date (YYYY-MM-DD): " + untilDate);
throw new CollectorException("Invalid date (YYYY-MM-DD): " + untilDate);
}
final Iterator<Iterator<String>> iters = sets

View File

@ -16,7 +16,7 @@ import org.dom4j.DocumentException;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
import eu.dnetlib.dhp.collection.worker.utils.XmlCleaner;
@ -58,7 +58,7 @@ public class OaiIterator implements Iterator<String> {
this.started = true;
try {
this.token = firstPage();
} catch (final DnetCollectorException e) {
} catch (final CollectorException e) {
throw new RuntimeException(e);
}
}
@ -80,7 +80,7 @@ public class OaiIterator implements Iterator<String> {
while (queue.isEmpty() && token != null && !token.isEmpty()) {
try {
token = otherPages(token);
} catch (final DnetCollectorException e) {
} catch (final CollectorException e) {
throw new RuntimeException(e);
}
}
@ -92,7 +92,7 @@ public class OaiIterator implements Iterator<String> {
public void remove() {
}
private String firstPage() throws DnetCollectorException {
private String firstPage() throws CollectorException {
try {
String url = baseUrl + "?verb=ListRecords&metadataPrefix=" + URLEncoder.encode(mdFormat, "UTF-8");
if (set != null && !set.isEmpty()) {
@ -108,7 +108,7 @@ public class OaiIterator implements Iterator<String> {
return downloadPage(url);
} catch (final UnsupportedEncodingException e) {
throw new DnetCollectorException(e);
throw new CollectorException(e);
}
}
@ -126,18 +126,18 @@ public class OaiIterator implements Iterator<String> {
return result.trim();
}
private String otherPages(final String resumptionToken) throws DnetCollectorException {
private String otherPages(final String resumptionToken) throws CollectorException {
try {
return downloadPage(
baseUrl
+ "?verb=ListRecords&resumptionToken="
+ URLEncoder.encode(resumptionToken, "UTF-8"));
} catch (final UnsupportedEncodingException e) {
throw new DnetCollectorException(e);
throw new CollectorException(e);
}
}
private String downloadPage(final String url) throws DnetCollectorException {
private String downloadPage(final String url) throws CollectorException {
final String xml = httpConnector.getInputSource(url);
Document doc;
@ -151,7 +151,7 @@ public class OaiIterator implements Iterator<String> {
} catch (final DocumentException e1) {
final String resumptionToken = extractResumptionToken(xml);
if (resumptionToken == null) {
throw new DnetCollectorException("Error parsing cleaned document:" + cleaned, e1);
throw new CollectorException("Error parsing cleaned document:" + cleaned, e1);
}
return resumptionToken;
}
@ -164,7 +164,7 @@ public class OaiIterator implements Iterator<String> {
log.warn("noRecordsMatch for oai call: " + url);
return null;
} else {
throw new DnetCollectorException(code + " - " + errorNode.getText());
throw new CollectorException(code + " - " + errorNode.getText());
}
}

View File

@ -1,16 +1,16 @@
package eu.dnetlib.dhp.collection.worker;
public class DnetCollectorException extends Exception {
public class CollectorException extends Exception {
/** */
private static final long serialVersionUID = -290723075076039757L;
public DnetCollectorException() {
public CollectorException() {
super();
}
public DnetCollectorException(
public CollectorException(
final String message,
final Throwable cause,
final boolean enableSuppression,
@ -18,15 +18,15 @@ public class DnetCollectorException extends Exception {
super(message, cause, enableSuppression, writableStackTrace);
}
public DnetCollectorException(final String message, final Throwable cause) {
public CollectorException(final String message, final Throwable cause) {
super(message, cause);
}
public DnetCollectorException(final String message) {
public CollectorException(final String message) {
super(message);
}
public DnetCollectorException(final Throwable cause) {
public CollectorException(final Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,93 @@
package eu.dnetlib.dhp.collection.worker;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
public class CollectorWorker {
private static final Logger log = LoggerFactory.getLogger(CollectorWorker.class);
private final CollectorPluginFactory collectorPluginFactory;
private final ApiDescriptor api;
private final String hdfsuri;
private final String hdfsPath;
public CollectorWorker(
final CollectorPluginFactory collectorPluginFactory,
final ApiDescriptor api,
final String hdfsuri,
final String hdfsPath) {
this.collectorPluginFactory = collectorPluginFactory;
this.api = api;
this.hdfsuri = hdfsuri;
this.hdfsPath = hdfsPath;
}
public void collect() throws CollectorException {
try {
final CollectorPlugin plugin = collectorPluginFactory.getPluginByProtocol(api.getProtocol());
// ====== Init HDFS File System Object
Configuration conf = new Configuration();
// Set FileSystem URI
conf.set("fs.defaultFS", hdfsuri);
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
System.setProperty("hadoop.home.dir", "/");
// Get the filesystem - HDFS
FileSystem.get(URI.create(hdfsuri), conf);
Path hdfswritepath = new Path(hdfsPath);
log.info("Created path " + hdfswritepath.toString());
final AtomicInteger counter = new AtomicInteger(0);
try (SequenceFile.Writer writer = SequenceFile
.createWriter(
conf,
SequenceFile.Writer.file(hdfswritepath),
SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class))) {
final IntWritable key = new IntWritable(counter.get());
final Text value = new Text();
plugin
.collect(api)
.forEach(
content -> {
key.set(counter.getAndIncrement());
value.set(content);
try {
writer.append(key, value);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
} catch (Throwable e) {
throw new CollectorException("Error on collecting ", e);
}
}
}

View File

@ -0,0 +1,55 @@
package eu.dnetlib.dhp.collection.worker;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
/**
* DnetCollectortWorkerApplication is the main class responsible to start the Dnet Collection into HDFS. This module
* will be executed on the hadoop cluster and taking in input some parameters that tells it which is the right collector
* plugin to use and where store the data into HDFS path
*
* @author Sandro La Bruzzo
*/
public class CollectorWorkerApplication {
private static final Logger log = LoggerFactory.getLogger(CollectorWorkerApplication.class);
private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory();
/**
* @param args
*/
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
IOUtils
.toString(
CollectorWorker.class
.getResourceAsStream(
"/eu/dnetlib/dhp/collection/collector_parameter.json")));
argumentParser.parseArgument(args);
final String hdfsuri = argumentParser.get("namenode");
log.info("hdfsURI is {}", hdfsuri);
final String hdfsPath = argumentParser.get("hdfsPath");
log.info("hdfsPath is {}" + hdfsPath);
final String apiDescriptor = argumentParser.get("apidescriptor");
log.info("apiDescriptor is {}" + apiDescriptor);
final ObjectMapper jsonMapper = new ObjectMapper();
final ApiDescriptor api = jsonMapper.readValue(apiDescriptor, ApiDescriptor.class);
final CollectorWorker worker = new CollectorWorker(collectorPluginFactory, api, hdfsuri, hdfsPath);
worker.collect();
}
}

View File

@ -1,139 +0,0 @@
package eu.dnetlib.dhp.collection.worker;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
import eu.dnetlib.message.MessageType;
public class DnetCollectorWorker {
private static final Logger log = LoggerFactory.getLogger(DnetCollectorWorker.class);
private final CollectorPluginFactory collectorPluginFactory;
private final ArgumentApplicationParser argumentParser;
private final MessageManager manager;
public DnetCollectorWorker(
final CollectorPluginFactory collectorPluginFactory,
final ArgumentApplicationParser argumentParser,
final MessageManager manager)
throws DnetCollectorException {
this.collectorPluginFactory = collectorPluginFactory;
this.argumentParser = argumentParser;
this.manager = manager;
}
public void collect() throws DnetCollectorException {
try {
final ObjectMapper jsonMapper = new ObjectMapper();
final ApiDescriptor api = jsonMapper.readValue(argumentParser.get("apidescriptor"), ApiDescriptor.class);
final CollectorPlugin plugin = collectorPluginFactory.getPluginByProtocol(api.getProtocol());
final String hdfsuri = argumentParser.get("namenode");
// ====== Init HDFS File System Object
Configuration conf = new Configuration();
// Set FileSystem URI
conf.set("fs.defaultFS", hdfsuri);
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
System.setProperty("HADOOP_USER_NAME", argumentParser.get("userHDFS"));
System.setProperty("hadoop.home.dir", "/");
// Get the filesystem - HDFS
FileSystem.get(URI.create(hdfsuri), conf);
Path hdfswritepath = new Path(argumentParser.get("hdfsPath"));
log.info("Created path " + hdfswritepath.toString());
final Map<String, String> ongoingMap = new HashMap<>();
final Map<String, String> reportMap = new HashMap<>();
final AtomicInteger counter = new AtomicInteger(0);
try (SequenceFile.Writer writer = SequenceFile
.createWriter(
conf,
SequenceFile.Writer.file(hdfswritepath),
SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class))) {
final IntWritable key = new IntWritable(counter.get());
final Text value = new Text();
plugin
.collect(api)
.forEach(
content -> {
key.set(counter.getAndIncrement());
value.set(content);
if (counter.get() % 10 == 0) {
try {
ongoingMap.put("ongoing", "" + counter.get());
log
.debug(
"Sending message: "
+ manager
.sendMessage(
new Message(
argumentParser.get("workflowId"),
"Collection",
MessageType.ONGOING,
ongoingMap),
argumentParser.get("rabbitOngoingQueue"),
true,
false));
} catch (Exception e) {
log.error("Error on sending message ", e);
}
}
try {
writer.append(key, value);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
ongoingMap.put("ongoing", "" + counter.get());
manager
.sendMessage(
new Message(
argumentParser.get("workflowId"), "Collection", MessageType.ONGOING, ongoingMap),
argumentParser.get("rabbitOngoingQueue"),
true,
false);
reportMap.put("collected", "" + counter.get());
manager
.sendMessage(
new Message(
argumentParser.get("workflowId"), "Collection", MessageType.REPORT, reportMap),
argumentParser.get("rabbitOngoingQueue"),
true,
false);
manager.close();
} catch (Throwable e) {
throw new DnetCollectorException("Error on collecting ", e);
}
}
}

View File

@ -1,49 +0,0 @@
package eu.dnetlib.dhp.collection.worker;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.MessageManager;
/**
* DnetCollectortWorkerApplication is the main class responsible to start the Dnet Collection into HDFS. This module
* will be executed on the hadoop cluster and taking in input some parameters that tells it which is the right collector
* plugin to use and where store the data into HDFS path
*
* @author Sandro La Bruzzo
*/
public class DnetCollectorWorkerApplication {
private static final Logger log = LoggerFactory.getLogger(DnetCollectorWorkerApplication.class);
private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory();
private static ArgumentApplicationParser argumentParser;
/** @param args */
public static void main(final String[] args) throws Exception {
argumentParser = new ArgumentApplicationParser(
IOUtils
.toString(
DnetCollectorWorker.class
.getResourceAsStream(
"/eu/dnetlib/collector/worker/collector_parameter.json")));
argumentParser.parseArgument(args);
log.info("hdfsPath =" + argumentParser.get("hdfsPath"));
log.info("json = " + argumentParser.get("apidescriptor"));
final MessageManager manager = new MessageManager(
argumentParser.get("rabbitHost"),
argumentParser.get("rabbitUser"),
argumentParser.get("rabbitPassword"),
false,
false,
null);
final DnetCollectorWorker worker = new DnetCollectorWorker(collectorPluginFactory, argumentParser, manager);
worker.collect();
}
}

View File

@ -3,18 +3,18 @@ package eu.dnetlib.dhp.collection.worker.utils;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.CollectorException;
public class CollectorPluginFactory {
public CollectorPlugin getPluginByProtocol(final String protocol) throws DnetCollectorException {
public CollectorPlugin getPluginByProtocol(final String protocol) throws CollectorException {
if (protocol == null)
throw new DnetCollectorException("protocol cannot be null");
throw new CollectorException("protocol cannot be null");
switch (protocol.toLowerCase().trim()) {
case "oai":
return new OaiCollectorPlugin();
default:
throw new DnetCollectorException("UNknown protocol");
throw new CollectorException("UNknown protocol");
}
}
}

View File

@ -19,7 +19,7 @@ import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.CollectorException;
public class HttpConnector {
@ -42,9 +42,9 @@ public class HttpConnector {
*
* @param requestUrl the URL
* @return the content of the downloaded resource
* @throws DnetCollectorException when retrying more than maxNumberOfRetry times
* @throws CollectorException when retrying more than maxNumberOfRetry times
*/
public String getInputSource(final String requestUrl) throws DnetCollectorException {
public String getInputSource(final String requestUrl) throws CollectorException {
return attemptDownlaodAsString(requestUrl, 1, new CollectorPluginErrorLogList());
}
@ -53,15 +53,15 @@ public class HttpConnector {
*
* @param requestUrl the URL
* @return the content of the downloaded resource as InputStream
* @throws DnetCollectorException when retrying more than maxNumberOfRetry times
* @throws CollectorException when retrying more than maxNumberOfRetry times
*/
public InputStream getInputSourceAsStream(final String requestUrl) throws DnetCollectorException {
public InputStream getInputSourceAsStream(final String requestUrl) throws CollectorException {
return attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
}
private String attemptDownlaodAsString(
final String requestUrl, final int retryNumber, final CollectorPluginErrorLogList errorList)
throws DnetCollectorException {
throws CollectorException {
try {
final InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
try {
@ -75,16 +75,16 @@ public class HttpConnector {
IOUtils.closeQuietly(s);
}
} catch (final InterruptedException e) {
throw new DnetCollectorException(e);
throw new CollectorException(e);
}
}
private InputStream attemptDownload(
final String requestUrl, final int retryNumber, final CollectorPluginErrorLogList errorList)
throws DnetCollectorException {
throws CollectorException {
if (retryNumber > maxNumberOfRetry) {
throw new DnetCollectorException("Max number of retries exceeded. Cause: \n " + errorList);
throw new CollectorException("Max number of retries exceeded. Cause: \n " + errorList);
}
log.debug("Downloading " + requestUrl + " - try: " + retryNumber);
@ -144,7 +144,7 @@ public class HttpConnector {
return attemptDownload(requestUrl, retryNumber + 1, errorList);
}
} catch (final InterruptedException e) {
throw new DnetCollectorException(e);
throw new CollectorException(e);
}
}
@ -173,13 +173,13 @@ public class HttpConnector {
}
private String obtainNewLocation(final Map<String, List<String>> headerMap)
throws DnetCollectorException {
throws CollectorException {
for (final String key : headerMap.keySet()) {
if (key != null && key.toLowerCase().equals("location") && headerMap.get(key).size() > 0) {
return headerMap.get(key).get(0);
}
}
throw new DnetCollectorException(
throw new CollectorException(
"The requested url has been MOVED, but 'location' param is MISSING");
}

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.transformation;
public class DnetTransformationException extends Exception {

View File

@ -9,11 +9,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
@ -30,10 +25,15 @@ import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.vocabulary.VocabularyHelper;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
import eu.dnetlib.message.MessageType;
@ -59,10 +59,9 @@ public class TransformSparkJobNode {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("input");
final String outputPath = parser.get("output");
final String inputPath = parser.get("mdstoreInputPath");
final String outputPath = parser.get("mdstoreOutputPath");
// TODO this variable will be used after implementing Messaging with DNet Aggregator
final String workflowId = parser.get("workflowId");
final String isLookupUrl = parser.get("isLookupUrl");
log.info(String.format("isLookupUrl: %s", isLookupUrl));
@ -76,24 +75,22 @@ public class TransformSparkJobNode {
spark -> transformRecords(parser.getObjectMap(), isLookupService, spark, inputPath, outputPath));
}
public static void transformRecords(final Map<String,String>args, final ISLookUpService isLookUpService, final SparkSession spark, final String inputPath, final String outputPath) throws DnetTransformationException {
public static void transformRecords(final Map<String, String> args, final ISLookUpService isLookUpService,
final SparkSession spark, final String inputPath, final String outputPath) throws DnetTransformationException {
final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems");
final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems");
final AggregationCounter ct = new AggregationCounter(totalItems, errorItems,transformedItems );
final AggregationCounter ct = new AggregationCounter(totalItems, errorItems, transformedItems);
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
final MapFunction<MetadataRecord, MetadataRecord> XSLTTransformationFunction = TransformationFactory.getTransformationPlugin(args,ct, isLookUpService);
final MapFunction<MetadataRecord, MetadataRecord> XSLTTransformationFunction = TransformationFactory
.getTransformationPlugin(args, ct, isLookUpService);
mdstoreInput.map(XSLTTransformationFunction, encoder).write().save(outputPath);
log.info("Transformed item "+ ct.getProcessedItems().count());
log.info("Total item "+ ct.getTotalItems().count());
log.info("Transformation Error item "+ ct.getErrorItems().count());
log.info("Transformed item " + ct.getProcessedItems().count());
log.info("Total item " + ct.getTotalItems().count());
log.info("Transformation Error item " + ct.getErrorItems().count());
}
}

View File

@ -1,45 +1,51 @@
package eu.dnetlib.dhp.transformation;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.MapFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.MapFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
public class TransformationFactory {
private static final Logger log = LoggerFactory.getLogger(TransformationFactory.class);
public static final String TRULE_XQUERY = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//TITLE = \"%s\" return $x//CODE/text()";
public static MapFunction<MetadataRecord, MetadataRecord> getTransformationPlugin(final Map<String,String> jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService) throws DnetTransformationException {
public static MapFunction<MetadataRecord, MetadataRecord> getTransformationPlugin(
final Map<String, String> jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService)
throws DnetTransformationException {
try {
final String transformationPlugin = jobArgument.get("transformationPlugin");
log.info("Transformation plugin required "+transformationPlugin);
log.info("Transformation plugin required " + transformationPlugin);
switch (transformationPlugin) {
case "XSLT_TRANSFORM": {
final String transformationRuleName = jobArgument.get("transformationRule");
final String transformationRuleName = jobArgument.get("transformationRuleTitle");
if (StringUtils.isBlank(transformationRuleName))
throw new DnetTransformationException("Missing Parameter transformationRule");
final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
final String transformationRule = queryTransformationRuleFromIS(transformationRuleName, isLookupService);
final String transformationRule = queryTransformationRuleFromIS(
transformationRuleName, isLookupService);
final long dateOfTransformation = new Long(jobArgument.get("dateOfTransformation"));
return new XSLTTransformationFunction(counters,transformationRule,dateOfTransformation,vocabularies);
return new XSLTTransformationFunction(counters, transformationRule, dateOfTransformation,
vocabularies);
}
default:
throw new DnetTransformationException("transformation plugin does not exists for " + transformationPlugin);
throw new DnetTransformationException(
"transformation plugin does not exists for " + transformationPlugin);
}
@ -48,15 +54,16 @@ public class TransformationFactory {
}
}
private static String queryTransformationRuleFromIS(final String transformationRuleName, final ISLookUpService isLookUpService) throws Exception {
private static String queryTransformationRuleFromIS(final String transformationRuleName,
final ISLookUpService isLookUpService) throws Exception {
final String query = String.format(TRULE_XQUERY, transformationRuleName);
log.info("asking query to IS: "+ query);
log.info("asking query to IS: " + query);
List<String> result = isLookUpService.quickSearchProfile(query);
if (result==null || result.isEmpty())
throw new DnetTransformationException("Unable to find transformation rule with name: "+ transformationRuleName);
if (result == null || result.isEmpty())
throw new DnetTransformationException(
"Unable to find transformation rule with name: " + transformationRuleName);
return result.get(0);
}
}

View File

@ -1,7 +1,6 @@
package eu.dnetlib.dhp.transformation.xslt;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import net.sf.saxon.s9api.*;

View File

@ -1,15 +1,17 @@
package eu.dnetlib.dhp.transformation.xslt;
import java.io.ByteArrayInputStream;
import java.io.StringWriter;
import javax.xml.transform.stream.StreamSource;
import org.apache.spark.api.java.function.MapFunction;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import net.sf.saxon.s9api.*;
import org.apache.spark.api.java.function.MapFunction;
import javax.xml.transform.stream.StreamSource;
import java.io.ByteArrayInputStream;
import java.io.StringWriter;
public class XSLTTransformationFunction implements MapFunction<MetadataRecord, MetadataRecord> {

View File

@ -41,46 +41,10 @@
"paramDescription": "the path of the result DataFrame on HDFS",
"paramRequired": true
},
{
"paramName": "ru",
"paramLongName": "rabbitUser",
"paramDescription": "the user to connect with RabbitMq for messaging",
"paramRequired": true
},
{
"paramName": "rp",
"paramLongName": "rabbitPassword",
"paramDescription": "the password to connect with RabbitMq for messaging",
"paramRequired": true
},
{
"paramName": "rh",
"paramLongName": "rabbitHost",
"paramDescription": "the host of the RabbitMq server",
"paramRequired": true
},
{
"paramName": "ro",
"paramLongName": "rabbitOngoingQueue",
"paramDescription": "the name of the ongoing queue",
"paramRequired": true
},
{
"paramName": "rr",
"paramLongName": "rabbitReportQueue",
"paramDescription": "the name of the report queue",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workflowId",
"paramDescription": "the identifier of the dnet Workflow",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "isTest",
"paramDescription": "the name of the report queue",
"paramRequired": false
}
]

View File

@ -0,0 +1,6 @@
[
{"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true},
{"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true},
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true},
{"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": false}
]

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -1,10 +1,5 @@
<workflow-app name="CollectionWorkflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sequenceFilePath</name>
<description>the path to store the sequence file of the native metadata collected</description>
</property>
<property>
<name>mdStorePath</name>
<description>the path of the native mdstore</description>
@ -39,72 +34,52 @@
<description>The identifier of the workflow</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
</global>
<start to="DeleteMDStoresNative"/>
<start to="CollectionWorker"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DeleteMDStoresNative">
<fs>
<mkdir path='${sequenceFilePath}'/>
<mkdir path='${mdStorePath}'/>
<delete path='${sequenceFilePath}'/>
<delete path='${mdStorePath}'/>
</fs>
<ok to="CollectionWorker"/>
<error to="Kill"/>
</action>
<action name="CollectionWorker">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.collection.worker.DnetCollectorWorker</main-class>
<java-opts>-p</java-opts><java-opts>${sequenceFilePath}</java-opts>
<java-opts>-a</java-opts><java-opts>${apiDescription}</java-opts>
<java-opts>-n</java-opts><java-opts>${nameNode}</java-opts>
<java-opts>-rh</java-opts><java-opts>${rmq_host}</java-opts>
<java-opts>-ru</java-opts><java-opts>${rmq_user}</java-opts>
<java-opts>-rp</java-opts><java-opts>${rmq_pwd}</java-opts>
<java-opts>-rr</java-opts><java-opts>${rmq_report}</java-opts>
<java-opts>-ro</java-opts><java-opts>${rmq_ongoing}</java-opts>
<java-opts>-u</java-opts><java-opts>sandro.labruzzo</java-opts>
<java-opts>-w</java-opts><java-opts>${workflowId}</java-opts>
<main-class>eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/sequenceFile_${mdstoreVersion}</arg>
<arg>--apidescriptor</arg><arg>${apiDescription}</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
</java>
<ok to="GenerateNativeStoreSparkJob"/>
<error to="Kill"/>
</action>
<action name="GenerateNativeStoreSparkJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateNativeStoreSparkJob</name>
<name>Generate Native MetadataStore</name>
<class>eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob</class>
<jar>dhp-aggregations-1.0.0-SNAPSHOT.jar</jar>
<spark-opts>--num-executors 50 --conf spark.yarn.jars=&quot;hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2&quot;</spark-opts>
<arg>--encoding</arg> <arg>${metadataEncoding}</arg>
<arg>--dateOfCollection</arg> <arg>${timestamp}</arg>
<arg>--provenance</arg> <arg> ${dataSourceInfo}</arg>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--encoding</arg><arg>${metadataEncoding}</arg>
<arg>--dateOfCollection</arg><arg>${timestamp}</arg>
<arg>--provenance</arg><arg>${dataSourceInfo}</arg>
<arg>--xpath</arg><arg>${identifierPath}</arg>
<arg>--input</arg><arg>${sequenceFilePath}</arg>
<arg>--input</arg><arg>${workingDir}/sequenceFile</arg>
<arg>--output</arg><arg>${mdStorePath}</arg>
<arg>-rh</arg><arg>${rmq_host}</arg>
<arg>-ru</arg><arg>${rmq_user}</arg>
<arg>-rp</arg><arg>${rmq_pwd}</arg>
<arg>-rr</arg><arg>${rmq_report}</arg>
<arg>-ro</arg><arg>${rmq_ongoing}</arg>
<arg>-w</arg><arg>${workflowId}</arg>
</spark>
<ok to="End"/>
<error to="DropInvalidStore"/>
</action>
<action name="DropInvalidStore">
<fs>
<delete path='${mdStorePath}/../'/>
</fs>
<ok to="Kill"/>
<error to="Kill"/>
</action>

View File

@ -1,12 +0,0 @@
[
{"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true},
{"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true},
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true},
{"paramName":"u", "paramLongName":"userHDFS", "paramDescription": "the user wich create the hdfs seq file", "paramRequired": true},
{"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true},
{"paramName":"rp", "paramLongName":"rabbitPassword", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true},
{"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true},
{"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true},
{"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true},
{"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true}
]

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -2,7 +2,7 @@
<parameters>
<property>
<name>mdstoreInputPath</name>
<description>the path of the input MDStore</description>
<description>the path of the native MDStore</description>
</property>
<property>
@ -11,66 +11,57 @@
</property>
<property>
<name>transformationRule</name>
<name>transformationRuleTitle</name>
<description>The transformation Rule to apply</description>
</property>
<property>
<name>timestamp</name>
<description>The timestamp of the collection date</description>
<name>transformationPlugin</name>
<description>The transformation Plugin</description>
</property>
<property>
<name>workflowId</name>
<description>The identifier of the workflow</description>
<name>dateOfTransformation</name>
<description>The timestamp of the transformation date</description>
</property>
</parameters>
<start to="DeletePathIfExists"/>
<start to="TransformJob"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DeletePathIfExists">
<fs>
<mkdir path='${mdstoreOutputPath}'/>
<delete path='${mdstoreOutputPath}'/>
</fs>
<ok to="TransformJob"/>
<error to="Kill"/>
</action>
<action name="TransformJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>cluster</mode>
<name>MDBuilder</name>
<name>Transform MetadataStore</name>
<class>eu.dnetlib.dhp.transformation.TransformSparkJobNode</class>
<jar>dhp-aggregations-1.0.0-SNAPSHOT.jar</jar>
<spark-opts>--num-executors 50 --conf spark.yarn.jars=&quot;hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2&quot;</spark-opts>
<arg>--dateOfCollection</arg> <arg>${timestamp}</arg>
<arg>-mt</arg> <arg>yarn</arg>
<arg>--input</arg><arg>${mdstoreInputPath}</arg>
<arg>--output</arg><arg>${mdstoreOutputPath}</arg>
<arg>-w</arg><arg>${workflowId}</arg>
<arg>-tr</arg><arg>${transformationRule}</arg>
<arg>-ru</arg><arg>${rmq_user}</arg>
<arg>-rp</arg><arg>${rmq_pwd}</arg>
<arg>-rh</arg><arg>${rmq_host}</arg>
<arg>-ro</arg><arg>${rmq_ongoing}</arg>
<arg>-rr</arg><arg>${rmq_report}</arg>
<jar>dhp-aggregations-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--mdstoreInputPath</arg><arg>${mdstoreInputPath}</arg>
<arg>--mdstoreOutputPath</arg><arg>${mdstoreOutputPath}</arg>
<arg>--dateOfTransformation</arg><arg>${dateOfTransformation}</arg>
<arg>--transformationPlugin</arg><arg>${transformationPlugin}</arg>
<arg>--transformationRuleTitle</arg><arg>${transformationRuleTitle}</arg>
</spark>
<ok to="End"/>
<error to="DropInvalidStore"/>
</action>
<action name="DropInvalidStore">
<fs>
<delete path='${mdstoreOutputPath}/../'/>
</fs>
<ok to="Kill"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -13,28 +13,32 @@
},
{
"paramName": "i",
"paramLongName": "input",
"paramLongName": "mdstoreInputPath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "output",
"paramLongName": "mdstoreOutputPath",
"paramDescription": "the path of the result DataFrame on HDFS",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workflowId",
"paramDescription": "the identifier of the dnet Workflow",
"paramRequired": true
},
{
"paramName": "tr",
"paramLongName": "transformationRule",
"paramLongName": "transformationRuleTitle",
"paramDescription": "the transformation Rule to apply to the input MDStore",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "isLookupUrl",
"paramDescription": "the Information System Service LookUp URL",
"paramRequired": true
},
{
"paramName": "tp",
"paramLongName": "transformationPlugin",

View File

@ -6,16 +6,15 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
@Disabled
public class EXCELParserTest {
@ -31,7 +30,7 @@ public class EXCELParserTest {
}
@Test
public void test1() throws DnetCollectorException, IOException, InvalidFormatException, ClassNotFoundException,
public void test1() throws CollectorException, IOException, InvalidFormatException, ClassNotFoundException,
IllegalAccessException, InstantiationException {
EXCELParser excelParser = new EXCELParser();

View File

@ -1,8 +1,6 @@
package eu.dnetlib.dhp.actionmanager.project.httpconnector;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
@ -11,6 +9,9 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
@Disabled
public class HttpConnectorTest {
@ -31,12 +32,12 @@ public class HttpConnectorTest {
@Test
public void testGetInputSource() throws DnetCollectorException {
public void testGetInputSource() throws CollectorException {
System.out.println(connector.getInputSource(URL));
}
@Test
public void testGoodServers() throws DnetCollectorException {
public void testGoodServers() throws CollectorException {
System.out.println(connector.getInputSource(URL_GOODSNI_SERVER));
}

View File

@ -5,17 +5,19 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.*;
import java.io.File;
import java.nio.file.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.DnetCollectorWorker;
import eu.dnetlib.dhp.collection.worker.CollectorWorker;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
@ -23,43 +25,6 @@ import eu.dnetlib.message.MessageManager;
@Disabled
public class DnetCollectorWorkerApplicationTests {
private final ArgumentApplicationParser argumentParser = mock(ArgumentApplicationParser.class);
private final MessageManager messageManager = mock(MessageManager.class);
private DnetCollectorWorker worker;
@BeforeEach
public void setup() throws Exception {
ObjectMapper mapper = new ObjectMapper();
final String apiJson = mapper.writeValueAsString(getApi());
when(argumentParser.get("apidescriptor")).thenReturn(apiJson);
when(argumentParser.get("namenode")).thenReturn("file://tmp/test.seq");
when(argumentParser.get("hdfsPath")).thenReturn("/tmp/file.seq");
when(argumentParser.get("userHDFS")).thenReturn("sandro");
when(argumentParser.get("workflowId")).thenReturn("sandro");
when(argumentParser.get("rabbitOngoingQueue")).thenReturn("sandro");
when(messageManager.sendMessage(any(Message.class), anyString(), anyBoolean(), anyBoolean()))
.thenAnswer(
a -> {
System.out.println("sent message: " + a.getArguments()[0]);
return true;
});
when(messageManager.sendMessage(any(Message.class), anyString()))
.thenAnswer(
a -> {
System.out.println("Called");
return true;
});
worker = new DnetCollectorWorker(new CollectorPluginFactory(), argumentParser, messageManager);
}
@AfterEach
public void dropDown() {
File f = new File("/tmp/file.seq");
f.delete();
}
@Test
public void testFindPlugin() throws Exception {
final CollectorPluginFactory collectorPluginEnumerator = new CollectorPluginFactory();
@ -79,8 +44,14 @@ public class DnetCollectorWorkerApplicationTests {
}
@Test
public void testFeeding() throws Exception {
public void testFeeding(@TempDir Path testDir) throws Exception {
System.out.println(testDir.toString());
CollectorWorker worker = new CollectorWorker(new CollectorPluginFactory(), getApi(),
"file://" + testDir.toString() + "/file.seq", testDir.toString() + "/file.seq");
worker.collect();
// TODO create ASSERT HERE
}
private ApiDescriptor getApi() {

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.transformation;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.lenient;
@ -14,13 +15,13 @@ import java.util.stream.Stream;
import javax.xml.transform.stream.StreamSource;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
@ -31,8 +32,14 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.collection.CollectionJobTest;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ExtendWith(MockitoExtension.class)
public class TransformationJobTest {
@ -67,7 +74,6 @@ public class TransformationJobTest {
spark.stop();
}
@Test
@DisplayName("Test Transform Single XML using XSLTTransformator")
public void testTransformSaxonHE() throws Exception {
@ -76,19 +82,15 @@ public class TransformationJobTest {
final MetadataRecord mr = new MetadataRecord();
mr.setBody(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input.xml")));
// We Load the XSLT trasformation Rule from the classpath
// We Load the XSLT transformation Rule from the classpath
XSLTTransformationFunction tr = loadTransformationRule("/eu/dnetlib/dhp/transform/ext_simple.xsl");
//Print the record
// Print the record
System.out.println(tr.call(mr).getBody());
//TODO Create significant Assert
// TODO Create significant Assert
}
@DisplayName("Test TransformSparkJobNode.main")
@Test
public void transformTest(@TempDir Path testDir) throws Exception {
@ -96,24 +98,44 @@ public class TransformationJobTest {
final String mdstore_input = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstorenative").getFile();
final String mdstore_output = testDir.toString() + "/version";
mockupTrasformationRule("simpleTRule","/eu/dnetlib/dhp/transform/ext_simple.xsl");
mockupTrasformationRule("simpleTRule", "/eu/dnetlib/dhp/transform/ext_simple.xsl");
// final String arguments = "-issm true -i %s -o %s -d 1 -w 1 -tp XSLT_TRANSFORM -tr simpleTRule";
final Map<String,String > parameters = Stream.of(new String[][] {
{ "dateOfTransformation", "1234" },
{ "transformationPlugin", "XSLT_TRANSFORM" },
{ "transformationRule", "simpleTRule" },
final Map<String, String> parameters = Stream.of(new String[][] {
{
"dateOfTransformation", "1234"
},
{
"transformationPlugin", "XSLT_TRANSFORM"
},
{
"transformationRuleTitle", "simpleTRule"
},
}).collect(Collectors.toMap(data -> data[0], data -> data[1]));
TransformSparkJobNode.transformRecords(parameters,isLookUpService,spark,mdstore_input, mdstore_output);
TransformSparkJobNode.transformRecords(parameters, isLookUpService, spark, mdstore_input, mdstore_output);
// TODO introduce useful assertions
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mOutput = spark.read().format("parquet").load(mdstore_output).as(encoder);
final Long total = mOutput.count();
final long recordTs = mOutput
.filter((FilterFunction<MetadataRecord>) p -> p.getDateOfTransformation() == 1234)
.count();
final long recordNotEmpty = mOutput
.filter((FilterFunction<MetadataRecord>) p -> !StringUtils.isBlank(p.getBody()))
.count();
assertEquals(total, recordTs);
assertEquals(total, recordNotEmpty);
}
@Test
@ -128,18 +150,18 @@ public class TransformationJobTest {
Files.deleteIfExists(tempDirWithPrefix);
}
private void mockupTrasformationRule(final String trule, final String path)throws Exception {
private void mockupTrasformationRule(final String trule, final String path) throws Exception {
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
lenient().when(isLookUpService.quickSearchProfile(String.format(TransformationFactory.TRULE_XQUERY,trule)))
lenient()
.when(isLookUpService.quickSearchProfile(String.format(TransformationFactory.TRULE_XQUERY, trule)))
.thenReturn(Collections.singletonList(trValue));
}
private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception {
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
final LongAccumulator la = new LongAccumulator();
return new XSLTTransformationFunction(new AggregationCounter(la,la,la),trValue, 0,vocabularies);
return new XSLTTransformationFunction(new AggregationCounter(la, la, la), trValue, 0, vocabularies);
}
private List<String> vocs() throws IOException {

View File

@ -59,12 +59,14 @@ public class CleaningFunctions {
}
}
if (Objects.nonNull(r.getAuthor())) {
r.getAuthor()
r
.getAuthor()
.stream()
.filter(Objects::nonNull)
.forEach(a -> {
if (Objects.nonNull(a.getPid())) {
a.getPid()
a
.getPid()
.stream()
.filter(Objects::nonNull)
.forEach(p -> fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES));

View File

@ -55,9 +55,9 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;

View File

@ -15,8 +15,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;

View File

@ -21,8 +21,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset;