Merge pull request 'aggregation_on_hadoop' (#91) from sandro.labruzzo/dnet-hadoop:aggregation_on_hadoop into hadoop_aggregator

ok
This commit is contained in:
Sandro La Bruzzo 2021-01-28 10:06:49 +01:00
commit 2da8bf7429
38 changed files with 3045 additions and 767 deletions

View File

@ -17,8 +17,8 @@ import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
/**
* Applies the parsing of a csv file and writes the Serialization of it in hdfs

View File

@ -4,7 +4,6 @@ package eu.dnetlib.dhp.actionmanager.project.utils;
import java.io.*;
import java.nio.charset.StandardCharsets;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -15,8 +14,8 @@ import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
/**
* Applies the parsing of an excel file and writes the Serialization of it in hdfs

View File

@ -0,0 +1,45 @@
package eu.dnetlib.dhp.aggregation.common;
import java.io.Serializable;
import org.apache.spark.util.LongAccumulator;
public class AggregationCounter implements Serializable {
private LongAccumulator totalItems;
private LongAccumulator errorItems;
private LongAccumulator processedItems;
public AggregationCounter() {
}
public AggregationCounter(LongAccumulator totalItems, LongAccumulator errorItems, LongAccumulator processedItems) {
this.totalItems = totalItems;
this.errorItems = errorItems;
this.processedItems = processedItems;
}
public LongAccumulator getTotalItems() {
return totalItems;
}
public void setTotalItems(LongAccumulator totalItems) {
this.totalItems = totalItems;
}
public LongAccumulator getErrorItems() {
return errorItems;
}
public void setErrorItems(LongAccumulator errorItems) {
this.errorItems = errorItems;
}
public LongAccumulator getProcessedItems() {
return processedItems;
}
public void setProcessedItems(LongAccumulator processedItems) {
this.processedItems = processedItems;
}
}

View File

@ -5,12 +5,9 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.cli.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
@ -22,7 +19,6 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.Node;
@ -35,9 +31,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
import eu.dnetlib.message.MessageType;
public class GenerateNativeStoreSparkJob {
@ -46,100 +40,62 @@ public class GenerateNativeStoreSparkJob {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
GenerateNativeStoreSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/collection/collection_input_parameters.json")));
IOUtils
.toString(
GenerateNativeStoreSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/collection/collection_input_parameters.json")));
parser.parseArgument(args);
final ObjectMapper jsonMapper = new ObjectMapper();
final Provenance provenance = jsonMapper.readValue(parser.get("provenance"), Provenance.class);
final long dateOfCollection = new Long(parser.get("dateOfCollection"));
final String provenanceArgument = parser.get("provenance");
log.info("Provenance is {}", provenanceArgument);
final Provenance provenance = jsonMapper.readValue(provenanceArgument, Provenance.class);
final String dateOfCollectionArgs = parser.get("dateOfCollection");
log.info("dateOfCollection is {}", dateOfCollectionArgs);
final long dateOfCollection = new Long(dateOfCollectionArgs);
final String sequenceFileInputPath = parser.get("input");
log.info("sequenceFileInputPath is {}", dateOfCollectionArgs);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final Map<String, String> ongoingMap = new HashMap<>();
final Map<String, String> reportMap = new HashMap<>();
final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest"));
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
conf,
isSparkSessionManaged,
spark -> {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final JavaPairRDD<IntWritable, Text> inputRDD = sc
.sequenceFile(parser.get("input"), IntWritable.class, Text.class);
final JavaPairRDD<IntWritable, Text> inputRDD = sc
.sequenceFile(sequenceFileInputPath, IntWritable.class, Text.class);
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
final MessageManager manager = new MessageManager(
parser.get("rabbitHost"),
parser.get("rabbitUser"),
parser.get("rabbitPassword"),
false,
false,
null);
final JavaRDD<MetadataRecord> nativeStore = inputRDD
.map(
item -> parseRecord(
item._2().toString(),
parser.get("xpath"),
parser.get("encoding"),
provenance,
dateOfCollection,
totalItems,
invalidRecords))
.filter(Objects::nonNull)
.distinct();
final JavaRDD<MetadataRecord> mappeRDD = inputRDD
.map(
item -> parseRecord(
item._2().toString(),
parser.get("xpath"),
parser.get("encoding"),
provenance,
dateOfCollection,
totalItems,
invalidRecords))
.filter(Objects::nonNull)
.distinct();
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstore = spark.createDataset(nativeStore.rdd(), encoder);
final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
mdStoreRecords.add(mdstore.count());
ongoingMap.put("ongoing", "0");
if (!test) {
manager
.sendMessage(
new Message(
parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
parser.get("rabbitOngoingQueue"),
true,
false);
}
mdstore.write().format("parquet").save(parser.get("output"));
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
mdStoreRecords.add(mdstore.count());
ongoingMap.put("ongoing", "" + totalItems.value());
if (!test) {
manager
.sendMessage(
new Message(
parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
parser.get("rabbitOngoingQueue"),
true,
false);
}
mdstore.write().format("parquet").save(parser.get("output"));
reportMap.put("inputItem", "" + totalItems.value());
reportMap.put("invalidRecords", "" + invalidRecords.value());
reportMap.put("mdStoreSize", "" + mdStoreRecords.value());
if (!test) {
manager
.sendMessage(
new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap),
parser.get("rabbitReportQueue"),
true,
false);
manager.close();
}
});
});
}
@ -166,12 +122,9 @@ public class GenerateNativeStoreSparkJob {
}
return new MetadataRecord(originalIdentifier, encoding, provenance, input, dateOfCollection);
} catch (Throwable e) {
if (invalidRecords != null)
invalidRecords.add(1);
e.printStackTrace();
invalidRecords.add(1);
return null;
}
}
}

View File

@ -4,9 +4,9 @@ package eu.dnetlib.dhp.collection.plugin;
import java.util.stream.Stream;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.CollectorException;
public interface CollectorPlugin {
Stream<String> collect(ApiDescriptor api) throws DnetCollectorException;
Stream<String> collect(ApiDescriptor api) throws CollectorException;
}

View File

@ -15,7 +15,7 @@ import com.google.common.collect.Lists;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.CollectorException;
public class OaiCollectorPlugin implements CollectorPlugin {
@ -27,7 +27,7 @@ public class OaiCollectorPlugin implements CollectorPlugin {
private OaiIteratorFactory oaiIteratorFactory;
@Override
public Stream<String> collect(final ApiDescriptor api) throws DnetCollectorException {
public Stream<String> collect(final ApiDescriptor api) throws CollectorException {
final String baseUrl = api.getBaseUrl();
final String mdFormat = api.getParams().get(FORMAT_PARAM);
final String setParam = api.getParams().get(OAI_SET_PARAM);
@ -46,19 +46,19 @@ public class OaiCollectorPlugin implements CollectorPlugin {
}
if (baseUrl == null || baseUrl.isEmpty()) {
throw new DnetCollectorException("Param 'baseurl' is null or empty");
throw new CollectorException("Param 'baseurl' is null or empty");
}
if (mdFormat == null || mdFormat.isEmpty()) {
throw new DnetCollectorException("Param 'mdFormat' is null or empty");
throw new CollectorException("Param 'mdFormat' is null or empty");
}
if (fromDate != null && !fromDate.matches("\\d{4}-\\d{2}-\\d{2}")) {
throw new DnetCollectorException("Invalid date (YYYY-MM-DD): " + fromDate);
throw new CollectorException("Invalid date (YYYY-MM-DD): " + fromDate);
}
if (untilDate != null && !untilDate.matches("\\d{4}-\\d{2}-\\d{2}")) {
throw new DnetCollectorException("Invalid date (YYYY-MM-DD): " + untilDate);
throw new CollectorException("Invalid date (YYYY-MM-DD): " + untilDate);
}
final Iterator<Iterator<String>> iters = sets

View File

@ -16,7 +16,7 @@ import org.dom4j.DocumentException;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
import eu.dnetlib.dhp.collection.worker.utils.XmlCleaner;
@ -58,7 +58,7 @@ public class OaiIterator implements Iterator<String> {
this.started = true;
try {
this.token = firstPage();
} catch (final DnetCollectorException e) {
} catch (final CollectorException e) {
throw new RuntimeException(e);
}
}
@ -80,7 +80,7 @@ public class OaiIterator implements Iterator<String> {
while (queue.isEmpty() && token != null && !token.isEmpty()) {
try {
token = otherPages(token);
} catch (final DnetCollectorException e) {
} catch (final CollectorException e) {
throw new RuntimeException(e);
}
}
@ -92,7 +92,7 @@ public class OaiIterator implements Iterator<String> {
public void remove() {
}
private String firstPage() throws DnetCollectorException {
private String firstPage() throws CollectorException {
try {
String url = baseUrl + "?verb=ListRecords&metadataPrefix=" + URLEncoder.encode(mdFormat, "UTF-8");
if (set != null && !set.isEmpty()) {
@ -108,7 +108,7 @@ public class OaiIterator implements Iterator<String> {
return downloadPage(url);
} catch (final UnsupportedEncodingException e) {
throw new DnetCollectorException(e);
throw new CollectorException(e);
}
}
@ -126,18 +126,18 @@ public class OaiIterator implements Iterator<String> {
return result.trim();
}
private String otherPages(final String resumptionToken) throws DnetCollectorException {
private String otherPages(final String resumptionToken) throws CollectorException {
try {
return downloadPage(
baseUrl
+ "?verb=ListRecords&resumptionToken="
+ URLEncoder.encode(resumptionToken, "UTF-8"));
} catch (final UnsupportedEncodingException e) {
throw new DnetCollectorException(e);
throw new CollectorException(e);
}
}
private String downloadPage(final String url) throws DnetCollectorException {
private String downloadPage(final String url) throws CollectorException {
final String xml = httpConnector.getInputSource(url);
Document doc;
@ -151,7 +151,7 @@ public class OaiIterator implements Iterator<String> {
} catch (final DocumentException e1) {
final String resumptionToken = extractResumptionToken(xml);
if (resumptionToken == null) {
throw new DnetCollectorException("Error parsing cleaned document:" + cleaned, e1);
throw new CollectorException("Error parsing cleaned document:" + cleaned, e1);
}
return resumptionToken;
}
@ -164,7 +164,7 @@ public class OaiIterator implements Iterator<String> {
log.warn("noRecordsMatch for oai call: " + url);
return null;
} else {
throw new DnetCollectorException(code + " - " + errorNode.getText());
throw new CollectorException(code + " - " + errorNode.getText());
}
}

View File

@ -1,16 +1,16 @@
package eu.dnetlib.dhp.collection.worker;
public class DnetCollectorException extends Exception {
public class CollectorException extends Exception {
/** */
private static final long serialVersionUID = -290723075076039757L;
public DnetCollectorException() {
public CollectorException() {
super();
}
public DnetCollectorException(
public CollectorException(
final String message,
final Throwable cause,
final boolean enableSuppression,
@ -18,15 +18,15 @@ public class DnetCollectorException extends Exception {
super(message, cause, enableSuppression, writableStackTrace);
}
public DnetCollectorException(final String message, final Throwable cause) {
public CollectorException(final String message, final Throwable cause) {
super(message, cause);
}
public DnetCollectorException(final String message) {
public CollectorException(final String message) {
super(message);
}
public DnetCollectorException(final Throwable cause) {
public CollectorException(final Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,93 @@
package eu.dnetlib.dhp.collection.worker;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
public class CollectorWorker {
private static final Logger log = LoggerFactory.getLogger(CollectorWorker.class);
private final CollectorPluginFactory collectorPluginFactory;
private final ApiDescriptor api;
private final String hdfsuri;
private final String hdfsPath;
public CollectorWorker(
final CollectorPluginFactory collectorPluginFactory,
final ApiDescriptor api,
final String hdfsuri,
final String hdfsPath) {
this.collectorPluginFactory = collectorPluginFactory;
this.api = api;
this.hdfsuri = hdfsuri;
this.hdfsPath = hdfsPath;
}
public void collect() throws CollectorException {
try {
final CollectorPlugin plugin = collectorPluginFactory.getPluginByProtocol(api.getProtocol());
// ====== Init HDFS File System Object
Configuration conf = new Configuration();
// Set FileSystem URI
conf.set("fs.defaultFS", hdfsuri);
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
System.setProperty("hadoop.home.dir", "/");
// Get the filesystem - HDFS
FileSystem.get(URI.create(hdfsuri), conf);
Path hdfswritepath = new Path(hdfsPath);
log.info("Created path " + hdfswritepath.toString());
final AtomicInteger counter = new AtomicInteger(0);
try (SequenceFile.Writer writer = SequenceFile
.createWriter(
conf,
SequenceFile.Writer.file(hdfswritepath),
SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class))) {
final IntWritable key = new IntWritable(counter.get());
final Text value = new Text();
plugin
.collect(api)
.forEach(
content -> {
key.set(counter.getAndIncrement());
value.set(content);
try {
writer.append(key, value);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
} catch (Throwable e) {
throw new CollectorException("Error on collecting ", e);
}
}
}

View File

@ -0,0 +1,55 @@
package eu.dnetlib.dhp.collection.worker;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
/**
* DnetCollectortWorkerApplication is the main class responsible to start the Dnet Collection into HDFS. This module
* will be executed on the hadoop cluster and taking in input some parameters that tells it which is the right collector
* plugin to use and where store the data into HDFS path
*
* @author Sandro La Bruzzo
*/
public class CollectorWorkerApplication {
private static final Logger log = LoggerFactory.getLogger(CollectorWorkerApplication.class);
private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory();
/**
* @param args
*/
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
IOUtils
.toString(
CollectorWorker.class
.getResourceAsStream(
"/eu/dnetlib/dhp/collection/collector_parameter.json")));
argumentParser.parseArgument(args);
final String hdfsuri = argumentParser.get("namenode");
log.info("hdfsURI is {}", hdfsuri);
final String hdfsPath = argumentParser.get("hdfsPath");
log.info("hdfsPath is {}" + hdfsPath);
final String apiDescriptor = argumentParser.get("apidescriptor");
log.info("apiDescriptor is {}" + apiDescriptor);
final ObjectMapper jsonMapper = new ObjectMapper();
final ApiDescriptor api = jsonMapper.readValue(apiDescriptor, ApiDescriptor.class);
final CollectorWorker worker = new CollectorWorker(collectorPluginFactory, api, hdfsuri, hdfsPath);
worker.collect();
}
}

View File

@ -1,139 +0,0 @@
package eu.dnetlib.dhp.collection.worker;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
import eu.dnetlib.message.MessageType;
public class DnetCollectorWorker {
private static final Logger log = LoggerFactory.getLogger(DnetCollectorWorker.class);
private final CollectorPluginFactory collectorPluginFactory;
private final ArgumentApplicationParser argumentParser;
private final MessageManager manager;
public DnetCollectorWorker(
final CollectorPluginFactory collectorPluginFactory,
final ArgumentApplicationParser argumentParser,
final MessageManager manager)
throws DnetCollectorException {
this.collectorPluginFactory = collectorPluginFactory;
this.argumentParser = argumentParser;
this.manager = manager;
}
public void collect() throws DnetCollectorException {
try {
final ObjectMapper jsonMapper = new ObjectMapper();
final ApiDescriptor api = jsonMapper.readValue(argumentParser.get("apidescriptor"), ApiDescriptor.class);
final CollectorPlugin plugin = collectorPluginFactory.getPluginByProtocol(api.getProtocol());
final String hdfsuri = argumentParser.get("namenode");
// ====== Init HDFS File System Object
Configuration conf = new Configuration();
// Set FileSystem URI
conf.set("fs.defaultFS", hdfsuri);
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
System.setProperty("HADOOP_USER_NAME", argumentParser.get("userHDFS"));
System.setProperty("hadoop.home.dir", "/");
// Get the filesystem - HDFS
FileSystem.get(URI.create(hdfsuri), conf);
Path hdfswritepath = new Path(argumentParser.get("hdfsPath"));
log.info("Created path " + hdfswritepath.toString());
final Map<String, String> ongoingMap = new HashMap<>();
final Map<String, String> reportMap = new HashMap<>();
final AtomicInteger counter = new AtomicInteger(0);
try (SequenceFile.Writer writer = SequenceFile
.createWriter(
conf,
SequenceFile.Writer.file(hdfswritepath),
SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class))) {
final IntWritable key = new IntWritable(counter.get());
final Text value = new Text();
plugin
.collect(api)
.forEach(
content -> {
key.set(counter.getAndIncrement());
value.set(content);
if (counter.get() % 10 == 0) {
try {
ongoingMap.put("ongoing", "" + counter.get());
log
.debug(
"Sending message: "
+ manager
.sendMessage(
new Message(
argumentParser.get("workflowId"),
"Collection",
MessageType.ONGOING,
ongoingMap),
argumentParser.get("rabbitOngoingQueue"),
true,
false));
} catch (Exception e) {
log.error("Error on sending message ", e);
}
}
try {
writer.append(key, value);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
ongoingMap.put("ongoing", "" + counter.get());
manager
.sendMessage(
new Message(
argumentParser.get("workflowId"), "Collection", MessageType.ONGOING, ongoingMap),
argumentParser.get("rabbitOngoingQueue"),
true,
false);
reportMap.put("collected", "" + counter.get());
manager
.sendMessage(
new Message(
argumentParser.get("workflowId"), "Collection", MessageType.REPORT, reportMap),
argumentParser.get("rabbitOngoingQueue"),
true,
false);
manager.close();
} catch (Throwable e) {
throw new DnetCollectorException("Error on collecting ", e);
}
}
}

View File

@ -1,49 +0,0 @@
package eu.dnetlib.dhp.collection.worker;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.MessageManager;
/**
* DnetCollectortWorkerApplication is the main class responsible to start the Dnet Collection into HDFS. This module
* will be executed on the hadoop cluster and taking in input some parameters that tells it which is the right collector
* plugin to use and where store the data into HDFS path
*
* @author Sandro La Bruzzo
*/
public class DnetCollectorWorkerApplication {
private static final Logger log = LoggerFactory.getLogger(DnetCollectorWorkerApplication.class);
private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory();
private static ArgumentApplicationParser argumentParser;
/** @param args */
public static void main(final String[] args) throws Exception {
argumentParser = new ArgumentApplicationParser(
IOUtils
.toString(
DnetCollectorWorker.class
.getResourceAsStream(
"/eu/dnetlib/collector/worker/collector_parameter.json")));
argumentParser.parseArgument(args);
log.info("hdfsPath =" + argumentParser.get("hdfsPath"));
log.info("json = " + argumentParser.get("apidescriptor"));
final MessageManager manager = new MessageManager(
argumentParser.get("rabbitHost"),
argumentParser.get("rabbitUser"),
argumentParser.get("rabbitPassword"),
false,
false,
null);
final DnetCollectorWorker worker = new DnetCollectorWorker(collectorPluginFactory, argumentParser, manager);
worker.collect();
}
}

View File

@ -3,18 +3,18 @@ package eu.dnetlib.dhp.collection.worker.utils;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.CollectorException;
public class CollectorPluginFactory {
public CollectorPlugin getPluginByProtocol(final String protocol) throws DnetCollectorException {
public CollectorPlugin getPluginByProtocol(final String protocol) throws CollectorException {
if (protocol == null)
throw new DnetCollectorException("protocol cannot be null");
throw new CollectorException("protocol cannot be null");
switch (protocol.toLowerCase().trim()) {
case "oai":
return new OaiCollectorPlugin();
default:
throw new DnetCollectorException("UNknown protocol");
throw new CollectorException("UNknown protocol");
}
}
}

View File

@ -19,7 +19,7 @@ import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.CollectorException;
public class HttpConnector {
@ -42,9 +42,9 @@ public class HttpConnector {
*
* @param requestUrl the URL
* @return the content of the downloaded resource
* @throws DnetCollectorException when retrying more than maxNumberOfRetry times
* @throws CollectorException when retrying more than maxNumberOfRetry times
*/
public String getInputSource(final String requestUrl) throws DnetCollectorException {
public String getInputSource(final String requestUrl) throws CollectorException {
return attemptDownlaodAsString(requestUrl, 1, new CollectorPluginErrorLogList());
}
@ -53,15 +53,15 @@ public class HttpConnector {
*
* @param requestUrl the URL
* @return the content of the downloaded resource as InputStream
* @throws DnetCollectorException when retrying more than maxNumberOfRetry times
* @throws CollectorException when retrying more than maxNumberOfRetry times
*/
public InputStream getInputSourceAsStream(final String requestUrl) throws DnetCollectorException {
public InputStream getInputSourceAsStream(final String requestUrl) throws CollectorException {
return attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
}
private String attemptDownlaodAsString(
final String requestUrl, final int retryNumber, final CollectorPluginErrorLogList errorList)
throws DnetCollectorException {
throws CollectorException {
try {
final InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
try {
@ -75,16 +75,16 @@ public class HttpConnector {
IOUtils.closeQuietly(s);
}
} catch (final InterruptedException e) {
throw new DnetCollectorException(e);
throw new CollectorException(e);
}
}
private InputStream attemptDownload(
final String requestUrl, final int retryNumber, final CollectorPluginErrorLogList errorList)
throws DnetCollectorException {
throws CollectorException {
if (retryNumber > maxNumberOfRetry) {
throw new DnetCollectorException("Max number of retries exceeded. Cause: \n " + errorList);
throw new CollectorException("Max number of retries exceeded. Cause: \n " + errorList);
}
log.debug("Downloading " + requestUrl + " - try: " + retryNumber);
@ -144,7 +144,7 @@ public class HttpConnector {
return attemptDownload(requestUrl, retryNumber + 1, errorList);
}
} catch (final InterruptedException e) {
throw new DnetCollectorException(e);
throw new CollectorException(e);
}
}
@ -173,13 +173,13 @@ public class HttpConnector {
}
private String obtainNewLocation(final Map<String, List<String>> headerMap)
throws DnetCollectorException {
throws CollectorException {
for (final String key : headerMap.keySet()) {
if (key != null && key.toLowerCase().equals("location") && headerMap.get(key).size() > 0) {
return headerMap.get(key).get(0);
}
}
throw new DnetCollectorException(
throw new CollectorException(
"The requested url has been MOVED, but 'location' param is MISSING");
}

View File

@ -0,0 +1,29 @@
package eu.dnetlib.dhp.transformation;
public class DnetTransformationException extends Exception {
public DnetTransformationException() {
super();
}
public DnetTransformationException(
final String message,
final Throwable cause,
final boolean enableSuppression,
final boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
public DnetTransformationException(final String message, final Throwable cause) {
super(message, cause);
}
public DnetTransformationException(final String message) {
super(message);
}
public DnetTransformationException(final Throwable cause) {
super(cause);
}
}

View File

@ -9,9 +9,10 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.cli.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
@ -24,12 +25,15 @@ import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary;
import eu.dnetlib.dhp.transformation.vocabulary.VocabularyHelper;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
import eu.dnetlib.message.MessageType;
@ -55,67 +59,38 @@ public class TransformSparkJobNode {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("input");
final String outputPath = parser.get("output");
final String workflowId = parser.get("workflowId");
final String trasformationRule = extractXSLTFromTR(
Objects.requireNonNull(DHPUtils.decompressString(parser.get("transformationRule"))));
final String inputPath = parser.get("mdstoreInputPath");
final String outputPath = parser.get("mdstoreOutputPath");
// TODO this variable will be used after implementing Messaging with DNet Aggregator
final String rabbitUser = parser.get("rabbitUser");
final String rabbitPassword = parser.get("rabbitPassword");
final String rabbitHost = parser.get("rabbitHost");
final String rabbitReportQueue = parser.get("rabbitReportQueue");
final long dateOfCollection = new Long(parser.get("dateOfCollection"));
final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest"));
final String isLookupUrl = parser.get("isLookupUrl");
log.info(String.format("isLookupUrl: %s", isLookupUrl));
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems");
final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems");
final Map<String, Vocabulary> vocabularies = new HashMap<>();
vocabularies.put("dnet:languages", VocabularyHelper.getVocabularyFromAPI("dnet:languages"));
final TransformFunction transformFunction = new TransformFunction(
totalItems,
errorItems,
transformedItems,
trasformationRule,
dateOfCollection,
vocabularies);
mdstoreInput.map(transformFunction, encoder).write().format("parquet").save(outputPath);
if (rabbitHost != null) {
System.out.println("SEND FINAL REPORT");
final Map<String, String> reportMap = new HashMap<>();
reportMap.put("inputItem", "" + totalItems.value());
reportMap.put("invalidRecords", "" + errorItems.value());
reportMap.put("mdStoreSize", "" + transformedItems.value());
System.out.println(new Message(workflowId, "Transform", MessageType.REPORT, reportMap));
if (!test) {
final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false,
false,
null);
manager
.sendMessage(
new Message(workflowId, "Transform", MessageType.REPORT, reportMap),
rabbitReportQueue,
true,
false);
manager.close();
}
}
});
spark -> transformRecords(parser.getObjectMap(), isLookupService, spark, inputPath, outputPath));
}
private static String extractXSLTFromTR(final String tr) throws DocumentException {
SAXReader reader = new SAXReader();
Document document = reader.read(new ByteArrayInputStream(tr.getBytes()));
Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']");
return node.asXML();
public static void transformRecords(final Map<String, String> args, final ISLookUpService isLookUpService,
final SparkSession spark, final String inputPath, final String outputPath) throws DnetTransformationException {
final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems");
final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems");
final AggregationCounter ct = new AggregationCounter(totalItems, errorItems, transformedItems);
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
final MapFunction<MetadataRecord, MetadataRecord> XSLTTransformationFunction = TransformationFactory
.getTransformationPlugin(args, ct, isLookUpService);
mdstoreInput.map(XSLTTransformationFunction, encoder).write().save(outputPath);
log.info("Transformed item " + ct.getProcessedItems().count());
log.info("Total item " + ct.getTotalItems().count());
log.info("Transformation Error item " + ct.getErrorItems().count());
}
}

View File

@ -0,0 +1,69 @@
package eu.dnetlib.dhp.transformation;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.MapFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class TransformationFactory {
private static final Logger log = LoggerFactory.getLogger(TransformationFactory.class);
public static final String TRULE_XQUERY = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//TITLE = \"%s\" return $x//CODE/text()";
public static MapFunction<MetadataRecord, MetadataRecord> getTransformationPlugin(
final Map<String, String> jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService)
throws DnetTransformationException {
try {
final String transformationPlugin = jobArgument.get("transformationPlugin");
log.info("Transformation plugin required " + transformationPlugin);
switch (transformationPlugin) {
case "XSLT_TRANSFORM": {
final String transformationRuleName = jobArgument.get("transformationRuleTitle");
if (StringUtils.isBlank(transformationRuleName))
throw new DnetTransformationException("Missing Parameter transformationRule");
final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
final String transformationRule = queryTransformationRuleFromIS(
transformationRuleName, isLookupService);
final long dateOfTransformation = new Long(jobArgument.get("dateOfTransformation"));
return new XSLTTransformationFunction(counters, transformationRule, dateOfTransformation,
vocabularies);
}
default:
throw new DnetTransformationException(
"transformation plugin does not exists for " + transformationPlugin);
}
} catch (Throwable e) {
throw new DnetTransformationException(e);
}
}
private static String queryTransformationRuleFromIS(final String transformationRuleName,
final ISLookUpService isLookUpService) throws Exception {
final String query = String.format(TRULE_XQUERY, transformationRuleName);
log.info("asking query to IS: " + query);
List<String> result = isLookUpService.quickSearchProfile(query);
if (result == null || result.isEmpty())
throw new DnetTransformationException(
"Unable to find transformation rule with name: " + transformationRuleName);
return result.get(0);
}
}

View File

@ -1,19 +1,16 @@
package eu.dnetlib.dhp.transformation.functions;
package eu.dnetlib.dhp.transformation.xslt;
import java.util.Map;
import java.util.Optional;
import eu.dnetlib.dhp.transformation.vocabulary.Term;
import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import net.sf.saxon.s9api.*;
import scala.Serializable;
public class Cleaner implements ExtensionFunction, Serializable {
private final Map<String, Vocabulary> vocabularies;
private final VocabularyGroup vocabularies;
public Cleaner(Map<String, Vocabulary> vocabularies) {
public Cleaner(final VocabularyGroup vocabularies) {
this.vocabularies = vocabularies;
}
@ -39,14 +36,9 @@ public class Cleaner implements ExtensionFunction, Serializable {
public XdmValue call(XdmValue[] xdmValues) throws SaxonApiException {
final String currentValue = xdmValues[0].itemAt(0).getStringValue();
final String vocabularyName = xdmValues[1].itemAt(0).getStringValue();
Optional<Term> cleanedValue = vocabularies
.get(vocabularyName)
.getTerms()
.stream()
.filter(it -> it.getNativeName().equalsIgnoreCase(currentValue))
.findAny();
Qualifier cleanedValue = vocabularies.getSynonymAsQualifier(vocabularyName, currentValue);
return new XdmAtomicValue(
cleanedValue.isPresent() ? cleanedValue.get().getCode() : currentValue);
cleanedValue != null ? cleanedValue.getClassid() : currentValue);
}
}

View File

@ -1,41 +1,35 @@
package eu.dnetlib.dhp.transformation;
package eu.dnetlib.dhp.transformation.xslt;
import java.io.ByteArrayInputStream;
import java.io.StringWriter;
import java.util.Map;
import javax.xml.transform.stream.StreamSource;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.util.LongAccumulator;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.functions.Cleaner;
import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary;
import net.sf.saxon.s9api.*;
public class TransformFunction implements MapFunction<MetadataRecord, MetadataRecord> {
public class XSLTTransformationFunction implements MapFunction<MetadataRecord, MetadataRecord> {
private final AggregationCounter aggregationCounter;
private final LongAccumulator totalItems;
private final LongAccumulator errorItems;
private final LongAccumulator transformedItems;
private final String transformationRule;
private final Cleaner cleanFunction;
private final long dateOfTransformation;
public TransformFunction(
LongAccumulator totalItems,
LongAccumulator errorItems,
LongAccumulator transformedItems,
public XSLTTransformationFunction(
final AggregationCounter aggregationCounter,
final String transformationRule,
long dateOfTransformation,
final Map<String, Vocabulary> vocabularies)
final VocabularyGroup vocabularies)
throws Exception {
this.totalItems = totalItems;
this.errorItems = errorItems;
this.transformedItems = transformedItems;
this.aggregationCounter = aggregationCounter;
this.transformationRule = transformationRule;
this.dateOfTransformation = dateOfTransformation;
cleanFunction = new Cleaner(vocabularies);
@ -43,7 +37,7 @@ public class TransformFunction implements MapFunction<MetadataRecord, MetadataRe
@Override
public MetadataRecord call(MetadataRecord value) {
totalItems.add(1);
aggregationCounter.getTotalItems().add(1);
try {
Processor processor = new Processor(false);
processor.registerExtensionFunction(cleanFunction);
@ -64,10 +58,10 @@ public class TransformFunction implements MapFunction<MetadataRecord, MetadataRe
final String xml = output.toString();
value.setBody(xml);
value.setDateOfTransformation(dateOfTransformation);
transformedItems.add(1);
aggregationCounter.getProcessedItems().add(1);
return value;
} catch (Throwable e) {
errorItems.add(1);
aggregationCounter.getErrorItems().add(1);
return null;
}
}

View File

@ -41,46 +41,10 @@
"paramDescription": "the path of the result DataFrame on HDFS",
"paramRequired": true
},
{
"paramName": "ru",
"paramLongName": "rabbitUser",
"paramDescription": "the user to connect with RabbitMq for messaging",
"paramRequired": true
},
{
"paramName": "rp",
"paramLongName": "rabbitPassword",
"paramDescription": "the password to connect with RabbitMq for messaging",
"paramRequired": true
},
{
"paramName": "rh",
"paramLongName": "rabbitHost",
"paramDescription": "the host of the RabbitMq server",
"paramRequired": true
},
{
"paramName": "ro",
"paramLongName": "rabbitOngoingQueue",
"paramDescription": "the name of the ongoing queue",
"paramRequired": true
},
{
"paramName": "rr",
"paramLongName": "rabbitReportQueue",
"paramDescription": "the name of the report queue",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workflowId",
"paramDescription": "the identifier of the dnet Workflow",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "isTest",
"paramDescription": "the name of the report queue",
"paramRequired": false
}
]

View File

@ -0,0 +1,6 @@
[
{"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true},
{"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true},
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true},
{"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": false}
]

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -1,10 +1,5 @@
<workflow-app name="CollectionWorkflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sequenceFilePath</name>
<description>the path to store the sequence file of the native metadata collected</description>
</property>
<property>
<name>mdStorePath</name>
<description>the path of the native mdstore</description>
@ -39,72 +34,52 @@
<description>The identifier of the workflow</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
</global>
<start to="DeleteMDStoresNative"/>
<start to="CollectionWorker"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DeleteMDStoresNative">
<fs>
<mkdir path='${sequenceFilePath}'/>
<mkdir path='${mdStorePath}'/>
<delete path='${sequenceFilePath}'/>
<delete path='${mdStorePath}'/>
</fs>
<ok to="CollectionWorker"/>
<error to="Kill"/>
</action>
<action name="CollectionWorker">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.collection.worker.DnetCollectorWorker</main-class>
<java-opts>-p</java-opts><java-opts>${sequenceFilePath}</java-opts>
<java-opts>-a</java-opts><java-opts>${apiDescription}</java-opts>
<java-opts>-n</java-opts><java-opts>${nameNode}</java-opts>
<java-opts>-rh</java-opts><java-opts>${rmq_host}</java-opts>
<java-opts>-ru</java-opts><java-opts>${rmq_user}</java-opts>
<java-opts>-rp</java-opts><java-opts>${rmq_pwd}</java-opts>
<java-opts>-rr</java-opts><java-opts>${rmq_report}</java-opts>
<java-opts>-ro</java-opts><java-opts>${rmq_ongoing}</java-opts>
<java-opts>-u</java-opts><java-opts>sandro.labruzzo</java-opts>
<java-opts>-w</java-opts><java-opts>${workflowId}</java-opts>
<main-class>eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/sequenceFile_${mdstoreVersion}</arg>
<arg>--apidescriptor</arg><arg>${apiDescription}</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
</java>
<ok to="GenerateNativeStoreSparkJob"/>
<error to="Kill"/>
</action>
<action name="GenerateNativeStoreSparkJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateNativeStoreSparkJob</name>
<name>Generate Native MetadataStore</name>
<class>eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob</class>
<jar>dhp-aggregations-1.0.0-SNAPSHOT.jar</jar>
<spark-opts>--num-executors 50 --conf spark.yarn.jars=&quot;hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2&quot;</spark-opts>
<arg>--encoding</arg> <arg>${metadataEncoding}</arg>
<arg>--dateOfCollection</arg> <arg>${timestamp}</arg>
<arg>--provenance</arg> <arg> ${dataSourceInfo}</arg>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--encoding</arg><arg>${metadataEncoding}</arg>
<arg>--dateOfCollection</arg><arg>${timestamp}</arg>
<arg>--provenance</arg><arg>${dataSourceInfo}</arg>
<arg>--xpath</arg><arg>${identifierPath}</arg>
<arg>--input</arg><arg>${sequenceFilePath}</arg>
<arg>--input</arg><arg>${workingDir}/sequenceFile</arg>
<arg>--output</arg><arg>${mdStorePath}</arg>
<arg>-rh</arg><arg>${rmq_host}</arg>
<arg>-ru</arg><arg>${rmq_user}</arg>
<arg>-rp</arg><arg>${rmq_pwd}</arg>
<arg>-rr</arg><arg>${rmq_report}</arg>
<arg>-ro</arg><arg>${rmq_ongoing}</arg>
<arg>-w</arg><arg>${workflowId}</arg>
</spark>
<ok to="End"/>
<error to="DropInvalidStore"/>
</action>
<action name="DropInvalidStore">
<fs>
<delete path='${mdStorePath}/../'/>
</fs>
<ok to="Kill"/>
<error to="Kill"/>
</action>

View File

@ -1,12 +0,0 @@
[
{"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true},
{"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true},
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true},
{"paramName":"u", "paramLongName":"userHDFS", "paramDescription": "the user wich create the hdfs seq file", "paramRequired": true},
{"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true},
{"paramName":"rp", "paramLongName":"rabbitPassword", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true},
{"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true},
{"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true},
{"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true},
{"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true}
]

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -2,7 +2,7 @@
<parameters>
<property>
<name>mdstoreInputPath</name>
<description>the path of the input MDStore</description>
<description>the path of the native MDStore</description>
</property>
<property>
@ -11,66 +11,57 @@
</property>
<property>
<name>transformationRule</name>
<name>transformationRuleTitle</name>
<description>The transformation Rule to apply</description>
</property>
<property>
<name>timestamp</name>
<description>The timestamp of the collection date</description>
<name>transformationPlugin</name>
<description>The transformation Plugin</description>
</property>
<property>
<name>workflowId</name>
<description>The identifier of the workflow</description>
<name>dateOfTransformation</name>
<description>The timestamp of the transformation date</description>
</property>
</parameters>
<start to="DeletePathIfExists"/>
<start to="TransformJob"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DeletePathIfExists">
<fs>
<mkdir path='${mdstoreOutputPath}'/>
<delete path='${mdstoreOutputPath}'/>
</fs>
<ok to="TransformJob"/>
<error to="Kill"/>
</action>
<action name="TransformJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>cluster</mode>
<name>MDBuilder</name>
<name>Transform MetadataStore</name>
<class>eu.dnetlib.dhp.transformation.TransformSparkJobNode</class>
<jar>dhp-aggregations-1.0.0-SNAPSHOT.jar</jar>
<spark-opts>--num-executors 50 --conf spark.yarn.jars=&quot;hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2&quot;</spark-opts>
<arg>--dateOfCollection</arg> <arg>${timestamp}</arg>
<arg>-mt</arg> <arg>yarn</arg>
<arg>--input</arg><arg>${mdstoreInputPath}</arg>
<arg>--output</arg><arg>${mdstoreOutputPath}</arg>
<arg>-w</arg><arg>${workflowId}</arg>
<arg>-tr</arg><arg>${transformationRule}</arg>
<arg>-ru</arg><arg>${rmq_user}</arg>
<arg>-rp</arg><arg>${rmq_pwd}</arg>
<arg>-rh</arg><arg>${rmq_host}</arg>
<arg>-ro</arg><arg>${rmq_ongoing}</arg>
<arg>-rr</arg><arg>${rmq_report}</arg>
<jar>dhp-aggregations-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--mdstoreInputPath</arg><arg>${mdstoreInputPath}</arg>
<arg>--mdstoreOutputPath</arg><arg>${mdstoreOutputPath}</arg>
<arg>--dateOfTransformation</arg><arg>${dateOfTransformation}</arg>
<arg>--transformationPlugin</arg><arg>${transformationPlugin}</arg>
<arg>--transformationRuleTitle</arg><arg>${transformationRuleTitle}</arg>
</spark>
<ok to="End"/>
<error to="DropInvalidStore"/>
</action>
<action name="DropInvalidStore">
<fs>
<delete path='${mdstoreOutputPath}/../'/>
</fs>
<ok to="Kill"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -7,68 +7,42 @@
},
{
"paramName": "d",
"paramLongName": "dateOfCollection",
"paramLongName": "dateOfTransformation",
"paramDescription": "the date when the record has been stored",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "input",
"paramLongName": "mdstoreInputPath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "output",
"paramLongName": "mdstoreOutputPath",
"paramDescription": "the path of the result DataFrame on HDFS",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workflowId",
"paramDescription": "the identifier of the dnet Workflow",
"paramRequired": true
},
{
"paramName": "tr",
"paramLongName": "transformationRule",
"paramLongName": "transformationRuleTitle",
"paramDescription": "the transformation Rule to apply to the input MDStore",
"paramRequired": true
},
{
"paramName": "ru",
"paramLongName": "rabbitUser",
"paramDescription": "the user to connect with RabbitMq for messaging",
"paramName": "i",
"paramLongName": "isLookupUrl",
"paramDescription": "the Information System Service LookUp URL",
"paramRequired": true
},
{
"paramName": "rp",
"paramLongName": "rabbitPassword",
"paramDescription": "the password to connect with RabbitMq for messaging",
"paramName": "tp",
"paramLongName": "transformationPlugin",
"paramDescription": "the transformation plugin to apply",
"paramRequired": true
},
{
"paramName": "rh",
"paramLongName": "rabbitHost",
"paramDescription": "the host of the RabbitMq server",
"paramRequired": true
},
{
"paramName": "ro",
"paramLongName": "rabbitOngoingQueue",
"paramDescription": "the name of the ongoing queue",
"paramRequired": true
},
{
"paramName": "rr",
"paramLongName": "rabbitReportQueue",
"paramDescription": "the name of the report queue",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "isTest",
"paramDescription": "the name of the report queue",
"paramRequired": false
}
]

View File

@ -6,16 +6,15 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
@Disabled
public class EXCELParserTest {
@ -31,7 +30,7 @@ public class EXCELParserTest {
}
@Test
public void test1() throws DnetCollectorException, IOException, InvalidFormatException, ClassNotFoundException,
public void test1() throws CollectorException, IOException, InvalidFormatException, ClassNotFoundException,
IllegalAccessException, InstantiationException {
EXCELParser excelParser = new EXCELParser();

View File

@ -1,8 +1,6 @@
package eu.dnetlib.dhp.actionmanager.project.httpconnector;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
@ -11,6 +9,9 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
@Disabled
public class HttpConnectorTest {
@ -31,12 +32,12 @@ public class HttpConnectorTest {
@Test
public void testGetInputSource() throws DnetCollectorException {
public void testGetInputSource() throws CollectorException {
System.out.println(connector.getInputSource(URL));
}
@Test
public void testGoodServers() throws DnetCollectorException {
public void testGoodServers() throws CollectorException {
System.out.println(connector.getInputSource(URL_GOODSNI_SERVER));
}

View File

@ -5,17 +5,19 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.*;
import java.io.File;
import java.nio.file.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.DnetCollectorWorker;
import eu.dnetlib.dhp.collection.worker.CollectorWorker;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
@ -23,43 +25,6 @@ import eu.dnetlib.message.MessageManager;
@Disabled
public class DnetCollectorWorkerApplicationTests {
private final ArgumentApplicationParser argumentParser = mock(ArgumentApplicationParser.class);
private final MessageManager messageManager = mock(MessageManager.class);
private DnetCollectorWorker worker;
@BeforeEach
public void setup() throws Exception {
ObjectMapper mapper = new ObjectMapper();
final String apiJson = mapper.writeValueAsString(getApi());
when(argumentParser.get("apidescriptor")).thenReturn(apiJson);
when(argumentParser.get("namenode")).thenReturn("file://tmp/test.seq");
when(argumentParser.get("hdfsPath")).thenReturn("/tmp/file.seq");
when(argumentParser.get("userHDFS")).thenReturn("sandro");
when(argumentParser.get("workflowId")).thenReturn("sandro");
when(argumentParser.get("rabbitOngoingQueue")).thenReturn("sandro");
when(messageManager.sendMessage(any(Message.class), anyString(), anyBoolean(), anyBoolean()))
.thenAnswer(
a -> {
System.out.println("sent message: " + a.getArguments()[0]);
return true;
});
when(messageManager.sendMessage(any(Message.class), anyString()))
.thenAnswer(
a -> {
System.out.println("Called");
return true;
});
worker = new DnetCollectorWorker(new CollectorPluginFactory(), argumentParser, messageManager);
}
@AfterEach
public void dropDown() {
File f = new File("/tmp/file.seq");
f.delete();
}
@Test
public void testFindPlugin() throws Exception {
final CollectorPluginFactory collectorPluginEnumerator = new CollectorPluginFactory();
@ -79,8 +44,14 @@ public class DnetCollectorWorkerApplicationTests {
}
@Test
public void testFeeding() throws Exception {
public void testFeeding(@TempDir Path testDir) throws Exception {
System.out.println(testDir.toString());
CollectorWorker worker = new CollectorWorker(new CollectorPluginFactory(), getApi(),
"file://" + testDir.toString() + "/file.seq", testDir.toString() + "/file.seq");
worker.collect();
// TODO create ASSERT HERE
}
private ApiDescriptor getApi() {

View File

@ -1,45 +1,66 @@
package eu.dnetlib.dhp.transformation;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.xml.transform.stream.StreamSource;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.collection.CollectionJobTest;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.functions.Cleaner;
import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary;
import eu.dnetlib.dhp.transformation.vocabulary.VocabularyHelper;
import eu.dnetlib.dhp.utils.DHPUtils;
import net.sf.saxon.s9api.*;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ExtendWith(MockitoExtension.class)
public class TransformationJobTest {
private static SparkSession spark;
@Mock
private ISLookUpService isLookUpService;
private VocabularyGroup vocabularies;
@BeforeEach
public void setUp() throws ISLookUpException, IOException {
lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs());
lenient()
.when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY))
.thenReturn(synonyms());
vocabularies = VocabularyGroup.loadVocsFromIS(isLookUpService);
}
@BeforeAll
public static void beforeAll() {
SparkConf conf = new SparkConf();
@ -53,66 +74,68 @@ public class TransformationJobTest {
spark.stop();
}
@Mock
private LongAccumulator accumulator;
@Test
@DisplayName("Test Transform Single XML using XSLTTransformator")
public void testTransformSaxonHE() throws Exception {
Map<String, Vocabulary> vocabularies = new HashMap<>();
vocabularies.put("dnet:languages", VocabularyHelper.getVocabularyFromAPI("dnet:languages"));
Cleaner cleanFunction = new Cleaner(vocabularies);
Processor proc = new Processor(false);
proc.registerExtensionFunction(cleanFunction);
final XsltCompiler comp = proc.newXsltCompiler();
XsltExecutable exp = comp
.compile(
new StreamSource(
this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/ext_simple.xsl")));
XdmNode source = proc
.newDocumentBuilder()
.build(
new StreamSource(
this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input.xml")));
XsltTransformer trans = exp.load();
trans.setInitialContextNode(source);
final StringWriter output = new StringWriter();
Serializer out = proc.newSerializer(output);
out.setOutputProperty(Serializer.Property.METHOD, "xml");
out.setOutputProperty(Serializer.Property.INDENT, "yes");
trans.setDestination(out);
trans.transform();
System.out.println(output.toString());
// We Set the input Record getting the XML from the classpath
final MetadataRecord mr = new MetadataRecord();
mr.setBody(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input.xml")));
// We Load the XSLT transformation Rule from the classpath
XSLTTransformationFunction tr = loadTransformationRule("/eu/dnetlib/dhp/transform/ext_simple.xsl");
// Print the record
System.out.println(tr.call(mr).getBody());
// TODO Create significant Assert
}
@DisplayName("Test TransformSparkJobNode.main")
@Test
public void transformTest(@TempDir Path testDir) throws Exception {
final String mdstore_input = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstorenative").getFile();
final String mdstore_output = testDir.toString() + "/version";
final String xslt = DHPUtils
.compressString(
IOUtils
.toString(
this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml")));
TransformSparkJobNode
.main(
new String[] {
"-issm", "true",
"-i", mdstore_input,
"-o", mdstore_output,
"-d", "1",
"-w", "1",
"-tr", xslt,
"-t", "true",
"-ru", "",
"-rp", "",
"-rh", "",
"-ro", "",
"-rr", ""
});
mockupTrasformationRule("simpleTRule", "/eu/dnetlib/dhp/transform/ext_simple.xsl");
// final String arguments = "-issm true -i %s -o %s -d 1 -w 1 -tp XSLT_TRANSFORM -tr simpleTRule";
final Map<String, String> parameters = Stream.of(new String[][] {
{
"dateOfTransformation", "1234"
},
{
"transformationPlugin", "XSLT_TRANSFORM"
},
{
"transformationRuleTitle", "simpleTRule"
},
}).collect(Collectors.toMap(data -> data[0], data -> data[1]));
TransformSparkJobNode.transformRecords(parameters, isLookUpService, spark, mdstore_input, mdstore_output);
// TODO introduce useful assertions
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mOutput = spark.read().format("parquet").load(mdstore_output).as(encoder);
final Long total = mOutput.count();
final long recordTs = mOutput
.filter((FilterFunction<MetadataRecord>) p -> p.getDateOfTransformation() == 1234)
.count();
final long recordNotEmpty = mOutput
.filter((FilterFunction<MetadataRecord>) p -> !StringUtils.isBlank(p.getBody()))
.count();
assertEquals(total, recordTs);
assertEquals(total, recordNotEmpty);
}
@Test
@ -127,39 +150,27 @@ public class TransformationJobTest {
Files.deleteIfExists(tempDirWithPrefix);
}
@Test
public void testTransformFunction() throws Exception {
SAXReader reader = new SAXReader();
Document document = reader.read(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"));
Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']");
final String xslt = node.asXML();
Map<String, Vocabulary> vocabularies = new HashMap<>();
vocabularies.put("dnet:languages", VocabularyHelper.getVocabularyFromAPI("dnet:languages"));
private void mockupTrasformationRule(final String trule, final String path) throws Exception {
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
TransformFunction tf = new TransformFunction(accumulator, accumulator, accumulator, xslt, 1, vocabularies);
MetadataRecord record = new MetadataRecord();
record
.setBody(
IOUtils
.toString(
this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input.xml")));
final MetadataRecord result = tf.call(record);
assertNotNull(result.getBody());
System.out.println(result.getBody());
lenient()
.when(isLookUpService.quickSearchProfile(String.format(TransformationFactory.TRULE_XQUERY, trule)))
.thenReturn(Collections.singletonList(trValue));
}
@Test
public void extractTr() throws Exception {
private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception {
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
final LongAccumulator la = new LongAccumulator();
return new XSLTTransformationFunction(new AggregationCounter(la, la, la), trValue, 0, vocabularies);
}
final String xmlTr = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"));
private List<String> vocs() throws IOException {
return IOUtils
.readLines(TransformationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/transform/terms.txt"));
}
SAXReader reader = new SAXReader();
Document document = reader.read(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"));
Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']");
System.out.println(node.asXML());
private List<String> synonyms() throws IOException {
return IOUtils
.readLines(TransformationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/transform/synonyms.txt"));
}
}

View File

@ -1,15 +1,16 @@
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:oaf="http://namespace.openaire.eu/oaf"
xmlns:dnetFunction="http://eu/dnetlib/trasform/extension"
xmlns:vocabulary="http://eu/dnetlib/trasform/extension"
xmlns:dr="http://www.driver-repository.eu/namespace/dr"
version="2.0"
exclude-result-prefixes="xsl">
exclude-result-prefixes="xsl vocabulary">
<xsl:template match="/">
<oai:record>
<xsl:copy-of select="//oai:header"/>
<metadata>
<xsl:for-each select="//*[local-name()='subject']">
<subject><xsl:value-of select="dnetFunction:clean(.,'dnet:languages')"/></subject>
<xsl:for-each select="//oai:set">
<dr:CobjCategory><xsl:value-of select="vocabulary:clean(.,'dnet:publication_resource')"/></dr:CobjCategory>
</xsl:for-each>
</metadata>
<oaf:about>

View File

@ -1,37 +1,68 @@
<record xmlns="http://www.openarchives.org/OAI/2.0/">
<header>
<identifier>oai:research.chalmers.se:243692</identifier>
<datestamp>2018-01-25T18:04:43Z</datestamp>
<setSpec>openaire</setSpec>
</header>
<metadata>
<oai_dc:dc xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd">
<dc:title>Incipient Berezinskii-Kosterlitz-Thouless transition in two-dimensional coplanar Josephson junctions</dc:title>
<dc:identifier>https://research.chalmers.se/en/publication/243692</dc:identifier>
<dc:date>2016</dc:date>
<dc:creator>Massarotti, D.</dc:creator>
<dc:creator>Jouault, B.</dc:creator>
<dc:creator>Rouco, V.</dc:creator>
<dc:creator>Charpentier, Sophie</dc:creator>
<dc:creator>Bauch, Thilo</dc:creator>
<dc:creator>Michon, A.</dc:creator>
<dc:creator>De Candia, A.</dc:creator>
<dc:creator>Lucignano, P.</dc:creator>
<dc:creator>Lombardi, Floriana</dc:creator>
<dc:creator>Tafuri, F.</dc:creator>
<dc:creator>Tagliacozzo, A.</dc:creator>
<dc:subject>Acoli</dc:subject>
<dc:subject>Abkhazian</dc:subject>
<dc:subject>Condensed Matter Physics</dc:subject>
<dc:description>Superconducting hybrid junctions are revealing a variety of effects. Some of them are due to the special layout of these devices, which often use a coplanar configuration with relatively large barrier channels and the possibility of hosting Pearl vortices. A Josephson junction with a quasi-ideal two-dimensional barrier has been realized by growing graphene on SiC with Al electrodes. Chemical vapor deposition offers centimeter size monolayer areas where it is possible to realize a comparative analysis of different devices with nominally the same barrier. In samples with a graphene gap below 400 nm, we have found evidence of Josephson coherence in the presence of an incipient Berezinskii-Kosterlitz-Thouless transition. When the magnetic field is cycled, a remarkable hysteretic collapse and revival of the Josephson supercurrent occurs. Similar hysteresis are found in granular systems and are usually justified within the Bean critical state model (CSM). We show that the CSM, with appropriate account for the low-dimensional geometry, can partly explain the odd features measured in these junctions.</dc:description>
<dc:relation>info:eu-repo/grantAgreement/EC/FP7/604391//Graphene-Based Revolutions in ICT And Beyond (Graphene Flagship)/</dc:relation>
<dc:relation>info:eu-repo/semantics/altIdentifier/doi/10.1103/PhysRevB.94.054525</dc:relation>
<dc:type>info:eu-repo/semantics/article</dc:type>
<dc:source>Physical Review B vol.94(2016)</dc:source>
<dc:rights>info:eu-repo/semantics/openAccess</dc:rights>
<?xml version="1.0" encoding="UTF-8"?>
<oai:record xmlns="http://namespace.openaire.eu/"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:dr="http://www.driver-repository.eu/namespace/dr"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:oaf="http://namespace.openaire.eu/oaf"
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<oai:header>
<dri:objIdentifier>od______2294::00029b7f0a2a7e090e55b625a9079d83</dri:objIdentifier>
<dri:recordIdentifier>oai:pub.uni-bielefeld.de:2578942</dri:recordIdentifier>
<dri:dateOfCollection>2018-11-23T15:15:33.974+01:00</dri:dateOfCollection>
<oaf:datasourceprefix>od______2294</oaf:datasourceprefix>
<identifier xmlns="http://www.openarchives.org/OAI/2.0/">oai:pub.uni-bielefeld.de:2578942</identifier>
<datestamp xmlns="http://www.openarchives.org/OAI/2.0/">2018-07-24T13:01:16Z</datestamp>
<setSpec xmlns="http://www.openarchives.org/OAI/2.0/">conference</setSpec>
<setSpec xmlns="http://www.openarchives.org/OAI/2.0/">ddc:000</setSpec>
<setSpec xmlns="http://www.openarchives.org/OAI/2.0/">conferenceFtxt</setSpec>
<setSpec xmlns="http://www.openarchives.org/OAI/2.0/">driver</setSpec>
<setSpec xmlns="http://www.openarchives.org/OAI/2.0/">open_access</setSpec>
</oai:header>
<metadata xmlns="http://www.openarchives.org/OAI/2.0/">
<oai_dc:dc xmlns="http://www.openarchives.org/OAI/2.0/oai_dc/"
xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd">
<dc:title>Mobile recommendation agents making online use of visual attention information at the point of sale</dc:title>
<dc:creator>Pfeiffer, Thies</dc:creator>
<dc:creator>Pfeiffer, Jella</dc:creator>
<dc:creator>Meißner, Martin</dc:creator>
<dc:creator>Davis, Fred</dc:creator>
<dc:creator>Riedl, René</dc:creator>
<dc:creator>Jan, vom Brocke</dc:creator>
<dc:creator>Léger, Pierre-Majorique</dc:creator>
<dc:creator>Randolph, Adriane</dc:creator>
<dc:subject>Mobile Cognitive Assistance Systems
Information Systems</dc:subject>
<dc:subject>ddc:000</dc:subject>
<dc:description>We aim to utilize online information about visual attention for developing mobile recommendation agents (RAs) for use at the point of sale. Up to now, most RAs are focussed exclusively at personalization in an e-commerce setting. Very little is known, however, about mobile RAs that offer information and assistance at the point of sale based on individual-level feature based preference models (Murray and Häubl 2009). Current attempts provide information about products at the point of sale by manually scanning barcodes or using RFID (Kowatsch et al. 2011, Heijden 2005), e.g. using specific apps for smartphones. We argue that an online access to the current visual attention of the user offers a much larger potential. Integrating mobile eye tracking into ordinary glasses would yield a direct benefit of applying neuroscience methods in the users everyday life. First, learning from consumers attentional processes over time and adapting recommendations based on this learning allows us to provide very accurate and relevant recommendations, potentially increasing the perceived usefulness. Second, our proposed system needs little explicit user input (no scanning or navigation on screen) making it easy to use. Thus, instead of learning from click behaviour and past customer ratings, as it is the case in the e-commerce setting, the mobile RA learns from eye movements by participating online in every day decision processes. We argue that mobile RAs should be built based on current research in human judgment and decision making (Murray et al. 2010). In our project, we therefore follow a two-step approach: In the empirical basic research stream, we aim to understand the users interaction with the product shelf: the actions and patterns of users behaviour (eye movements, gestures, approaching a product closer) and their correspondence to the users informational needs. In the empirical system development stream, we create prototypes of mobile RAs and test experimentally the factors that influence the users adoption. For example, we suggest that a users involvement in the process, such as a need for exact nutritional information or for assistance (e.g., reading support for elderly) will influence the users intention to use such as system. The experiments are conducted both in our immersive virtual reality supermarket presented in a CAVE, where we can also easily display information to the user and track the eye movement in great accuracy, as well as in real-world supermarkets (see Figure 1), so that the findings can be better generalized to natural decision situations (Gidlöf et al. 2013). In a first pilot study with five randomly chosen participants in a supermarket, we evaluated which sort of mobile RAs consumers favour in order to get a first impression of the users acceptance of the technology. Figure 1 shows an excerpt of one consumers eye movements during a decision process. First results show long eye cascades and short fixations on many products in situations where users are uncertain and in need for support. Furthermore, we find a surprising acceptance of the technology itself throughout all ages (23 61 years). At the same time, consumers express serious fear of being manipulated by such a technology. For that reason, they strongly prefer the information to be provided by trusted third party or shared with family members and friends (see also Murray and Häubl 2009). Our pilot will be followed by a larger field experiment in March in order to learn more about factors that influence the users acceptance as well as the eye movement patterns that reflect typical phases of decision processes and indicate the need for support by a RA.</dc:description>
<dc:date>2013</dc:date>
<dc:type>info:eu-repo/semantics/conferenceObject</dc:type>
<dc:type>doc-type:conferenceObject</dc:type>
<dc:type>text</dc:type>
<dc:identifier>https://pub.uni-bielefeld.de/record/2578942</dc:identifier>
<dc:identifier>https://pub.uni-bielefeld.de/download/2578942/2602478</dc:identifier>
<dc:source>Pfeiffer T, Pfeiffer J, Meißner M. Mobile recommendation agents making online use of visual attention information at the point of sale. In: Davis F, Riedl R, Jan vom B, Léger P-M, Randolph A, eds. <em>Proceedings of the Gmunden Retreat on NeuroIS 2013</em>. 2013: 3-3.</dc:source>
<dc:language>eng</dc:language>
<dc:audience>Researchers</dc:audience>
<dc:format>application/pdf</dc:format>
<dc:rights>info:eu-repo/semantics/openAccess</dc:rights>
</oai_dc:dc>
</metadata>
</record>
<about xmlns="">
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
<originDescription altered="true" harvestDate="2018-11-23T15:15:33.974+01:00">
<baseURL>http://pub.uni-bielefeld.de/oai</baseURL>
<identifier>oai:pub.uni-bielefeld.de:2578942</identifier>
<datestamp>2018-07-24T13:01:16Z</datestamp>
<metadataNamespace>http://www.openarchives.org/OAI/2.0/oai_dc/</metadataNamespace>
</originDescription>
</provenance>
<oaf:datainfo>
<oaf:inferred>false</oaf:inferred>
<oaf:deletedbyinference>false</oaf:deletedbyinference>
<oaf:trust>0.9</oaf:trust>
<oaf:inferenceprovenance/>
<oaf:provenanceaction classid="sysimport:crosswalk:repository"
classname="sysimport:crosswalk:repository"
schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
</oaf:datainfo>
</about>
</oai:record>

View File

@ -55,9 +55,9 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;

View File

@ -15,8 +15,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;

View File

@ -21,8 +21,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset;