dhp-collection-worker integrated in dhp-workflows

This commit is contained in:
Sandro La Bruzzo 2019-10-24 11:36:59 +02:00
parent c8e3e4d7c3
commit 5a8a323f2a
39 changed files with 375 additions and 236 deletions

View File

@ -1,115 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Inherit defaults from Spring Boot -->
<parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-applications</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>eu.dnetlib</groupId>
<artifactId>dhp-collector-worker</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.1</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>verify</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.4</version>
<configuration>
<detectLinks>true</detectLinks>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself. -->
</plugins>
</build>
<dependencies>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>jaxen</groupId>
<artifactId>jaxen</artifactId>
</dependency>
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,2 +0,0 @@
spring.main.banner-mode=off
logging.level.root=OFF

View File

@ -1,14 +0,0 @@
### Root Level ###
log4j.rootLogger=WARN, CONSOLE
### Configuration for the CONSOLE appender ###
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c - %m%n
org.apache.cxf.Logger=org.apache.cxf.common.logging.Log4jLogger
### Application Level ###
log4j.logger.eu.dnetlib=INFO
log4j.logger.eu.dnetlib.collector.worker.DnetCollectorWorker=DEBUG

View File

@ -1,7 +1,6 @@
package eu.dnetlib.dhp.model.mdstore; package eu.dnetlib.dhp.model.mdstore;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable; import java.io.Serializable;
@ -44,6 +43,11 @@ public class MetadataRecord implements Serializable {
*/ */
private long dateOfCollection; private long dateOfCollection;
/**
* the date when the record has been stored
*/
private long dateOfTransformation;
public MetadataRecord() { public MetadataRecord() {
this.dateOfCollection = System.currentTimeMillis(); this.dateOfCollection = System.currentTimeMillis();
@ -109,6 +113,14 @@ public class MetadataRecord implements Serializable {
this.dateOfCollection = dateOfCollection; this.dateOfCollection = dateOfCollection;
} }
public long getDateOfTransformation() {
return dateOfTransformation;
}
public void setDateOfTransformation(long dateOfTransformation) {
this.dateOfTransformation = dateOfTransformation;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (!(o instanceof MetadataRecord)) { if (!(o instanceof MetadataRecord)) {

View File

@ -68,12 +68,14 @@ public class GenerateNativeStoreSparkJob {
final SparkSession spark = SparkSession final SparkSession spark = SparkSession
.builder() .builder()
.appName("GenerateNativeStoreSparkJob") .appName("GenerateNativeStoreSparkJob")
.master("yarn") .master(parser.get("master"))
.getOrCreate(); .getOrCreate();
final Map<String, String> ongoingMap = new HashMap<>(); final Map<String, String> ongoingMap = new HashMap<>();
final Map<String, String> reportMap = new HashMap<>(); final Map<String, String> reportMap = new HashMap<>();
final boolean test = parser.get("isTest") == null?false: Boolean.valueOf(parser.get("isTest"));
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final JavaPairRDD<IntWritable, Text> inputRDD = sc.sequenceFile(parser.get("input"), IntWritable.class, Text.class); final JavaPairRDD<IntWritable, Text> inputRDD = sc.sequenceFile(parser.get("input"), IntWritable.class, Text.class);
@ -88,19 +90,28 @@ public class GenerateNativeStoreSparkJob {
.filter(Objects::nonNull).distinct(); .filter(Objects::nonNull).distinct();
ongoingMap.put("ongoing", "0"); ongoingMap.put("ongoing", "0");
manager.sendMessage(new Message(parser.get("workflowId"),"DataFrameCreation", MessageType.ONGOING, ongoingMap ), parser.get("rabbitOngoingQueue"), true, false); if (!test) {
manager.sendMessage(new Message(parser.get("workflowId"),"DataFrameCreation", MessageType.ONGOING, ongoingMap ), parser.get("rabbitOngoingQueue"), true, false);
}
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class); final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstore = spark.createDataset(mappeRDD.rdd(), encoder); final Dataset<MetadataRecord> mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords"); final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
mdStoreRecords.add(mdstore.count()); mdStoreRecords.add(mdstore.count());
ongoingMap.put("ongoing", ""+ totalItems.value()); ongoingMap.put("ongoing", ""+ totalItems.value());
manager.sendMessage(new Message(parser.get("workflowId"),"DataFrameCreation", MessageType.ONGOING, ongoingMap ), parser.get("rabbitOngoingQueue"), true, false); if (!test) {
manager.sendMessage(new Message(parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), parser.get("rabbitOngoingQueue"), true, false);
}
mdstore.write().format("parquet").save(parser.get("output")); mdstore.write().format("parquet").save(parser.get("output"));
reportMap.put("inputItem" , ""+ totalItems.value()); reportMap.put("inputItem" , ""+ totalItems.value());
reportMap.put("invalidRecords", "" + invalidRecords.value()); reportMap.put("invalidRecords", "" + invalidRecords.value());
reportMap.put("mdStoreSize", "" + mdStoreRecords.value()); reportMap.put("mdStoreSize", "" + mdStoreRecords.value());
manager.sendMessage(new Message(parser.get("workflowId"),"Collection", MessageType.REPORT, reportMap ), parser.get("rabbitReportQueue"), true, false); if (!test) {
manager.sendMessage(new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap), parser.get("rabbitReportQueue"), true, false);
manager.close();
}
} }
} }

View File

@ -1,10 +1,10 @@
package eu.dnetlib.collector.worker.plugins; package eu.dnetlib.dhp.collection.plugin;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import java.util.stream.Stream; import java.util.stream.Stream;
import eu.dnetlib.collector.worker.DnetCollectorException;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
public interface CollectorPlugin { public interface CollectorPlugin {
Stream<String> collect(ApiDescriptor api) throws DnetCollectorException; Stream<String> collect(ApiDescriptor api) throws DnetCollectorException;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.collector.worker.plugins.oai; package eu.dnetlib.dhp.collection.plugin.oai;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
@ -11,9 +11,10 @@ import com.google.common.base.Splitter;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import eu.dnetlib.collector.worker.DnetCollectorException;
import eu.dnetlib.collector.worker.model.ApiDescriptor; import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.collector.worker.plugins.CollectorPlugin; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
public class OaiCollectorPlugin implements CollectorPlugin { public class OaiCollectorPlugin implements CollectorPlugin {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.collector.worker.plugins.oai; package eu.dnetlib.dhp.collection.plugin.oai;
import java.io.StringReader; import java.io.StringReader;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
@ -7,6 +7,9 @@ import java.util.Iterator;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
import eu.dnetlib.dhp.collection.worker.utils.XmlCleaner;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -15,9 +18,6 @@ import org.dom4j.DocumentException;
import org.dom4j.Node; import org.dom4j.Node;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import eu.dnetlib.collector.worker.DnetCollectorException;
import eu.dnetlib.collector.worker.utils.HttpConnector;
import eu.dnetlib.collector.worker.utils.XmlCleaner;
public class OaiIterator implements Iterator<String> { public class OaiIterator implements Iterator<String> {

View File

@ -1,7 +1,8 @@
package eu.dnetlib.collector.worker.plugins.oai; package eu.dnetlib.dhp.collection.plugin.oai;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
import java.util.Iterator; import java.util.Iterator;
import eu.dnetlib.collector.worker.utils.HttpConnector;
public class OaiIteratorFactory { public class OaiIteratorFactory {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.collector.worker; package eu.dnetlib.dhp.collection.worker;
public class DnetCollectorException extends Exception { public class DnetCollectorException extends Exception {

View File

@ -1,10 +1,11 @@
package eu.dnetlib.collector.worker; package eu.dnetlib.dhp.collection.worker;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor; import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.collector.worker.plugins.CollectorPlugin;
import eu.dnetlib.collector.worker.utils.CollectorPluginFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.Message; import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager; import eu.dnetlib.message.MessageManager;
import eu.dnetlib.message.MessageType; import eu.dnetlib.message.MessageType;

View File

@ -1,7 +1,8 @@
package eu.dnetlib.collector.worker; package eu.dnetlib.dhp.collection.worker;
import eu.dnetlib.collector.worker.utils.CollectorPluginFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.MessageManager; import eu.dnetlib.message.MessageManager;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.slf4j.Logger; import org.slf4j.Logger;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.collector.worker.utils; package eu.dnetlib.dhp.collection.worker.utils;
import java.util.LinkedList; import java.util.LinkedList;

View File

@ -1,8 +1,10 @@
package eu.dnetlib.collector.worker.utils; package eu.dnetlib.dhp.collection.worker.utils;
import eu.dnetlib.collector.worker.DnetCollectorException; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.collector.worker.plugins.CollectorPlugin; import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
import eu.dnetlib.collector.worker.plugins.oai.OaiCollectorPlugin; import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
;
public class CollectorPluginFactory { public class CollectorPluginFactory {

View File

@ -1,29 +1,22 @@
package eu.dnetlib.collector.worker.utils; package eu.dnetlib.dhp.collection.worker.utils;
import java.io.IOException;
import java.io.InputStream;
import java.net.CookieHandler;
import java.net.CookieManager;
import java.net.CookiePolicy;
import java.net.HttpURLConnection;
import java.net.URL;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Map;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.math.NumberUtils; import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import javax.net.ssl.HttpsURLConnection;
import eu.dnetlib.collector.worker.DnetCollectorException; import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.io.InputStream;
import java.net.*;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Map;
public class HttpConnector { public class HttpConnector {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.collector.worker.utils; package eu.dnetlib.dhp.collection.worker.utils;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;

View File

@ -2,6 +2,7 @@ package eu.dnetlib.dhp.transformation;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.functions.Cleaner; import eu.dnetlib.dhp.transformation.functions.Cleaner;
import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary;
import net.sf.saxon.s9api.*; import net.sf.saxon.s9api.*;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
@ -9,6 +10,7 @@ import org.apache.spark.util.LongAccumulator;
import javax.xml.transform.stream.StreamSource; import javax.xml.transform.stream.StreamSource;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Map;
public class TransformFunction implements MapFunction<MetadataRecord, MetadataRecord> { public class TransformFunction implements MapFunction<MetadataRecord, MetadataRecord> {
@ -16,29 +18,30 @@ public class TransformFunction implements MapFunction<MetadataRecord, MetadataRe
private final LongAccumulator totalItems; private final LongAccumulator totalItems;
private final LongAccumulator errorItems; private final LongAccumulator errorItems;
private final LongAccumulator transformedItems; private final LongAccumulator transformedItems;
private final String trasformationRule; private final String transformationRule;
private final Cleaner cleanFunction;
private final long dateOfTransformation; private final long dateOfTransformation;
public TransformFunction(LongAccumulator totalItems, LongAccumulator errorItems, LongAccumulator transformedItems, final String trasformationRule, long dateOfTransformation) throws Exception { public TransformFunction(LongAccumulator totalItems, LongAccumulator errorItems, LongAccumulator transformedItems, final String transformationRule, long dateOfTransformation, final Map<String, Vocabulary> vocabularies) throws Exception {
this.totalItems= totalItems; this.totalItems= totalItems;
this.errorItems = errorItems; this.errorItems = errorItems;
this.transformedItems = transformedItems; this.transformedItems = transformedItems;
this.trasformationRule = trasformationRule; this.transformationRule = transformationRule;
this.dateOfTransformation = dateOfTransformation; this.dateOfTransformation = dateOfTransformation;
cleanFunction = new Cleaner(vocabularies);
} }
@Override @Override
public MetadataRecord call(MetadataRecord value) { public MetadataRecord call(MetadataRecord value) {
totalItems.add(1); totalItems.add(1);
try { try {
final Cleaner cleanFunction = new Cleaner();
Processor processor = new Processor(false); Processor processor = new Processor(false);
processor.registerExtensionFunction(cleanFunction); processor.registerExtensionFunction(cleanFunction);
final XsltCompiler comp = processor.newXsltCompiler(); final XsltCompiler comp = processor.newXsltCompiler();
XsltExecutable xslt = comp.compile(new StreamSource(new ByteArrayInputStream(trasformationRule.getBytes()))); XsltExecutable xslt = comp.compile(new StreamSource(new ByteArrayInputStream(transformationRule.getBytes())));
XdmNode source = processor.newDocumentBuilder().build(new StreamSource(new ByteArrayInputStream(value.getBody().getBytes()))); XdmNode source = processor.newDocumentBuilder().build(new StreamSource(new ByteArrayInputStream(value.getBody().getBytes())));
XsltTransformer trans = xslt.load(); XsltTransformer trans = xslt.load();
trans.setInitialContextNode(source); trans.setInitialContextNode(source);
@ -50,7 +53,7 @@ public class TransformFunction implements MapFunction<MetadataRecord, MetadataRe
trans.transform(); trans.transform();
final String xml = output.toString(); final String xml = output.toString();
value.setBody(xml); value.setBody(xml);
value.setDateOfCollection(dateOfTransformation); value.setDateOfTransformation(dateOfTransformation);
transformedItems.add(1); transformedItems.add(1);
return value; return value;
}catch (Throwable e) { }catch (Throwable e) {

View File

@ -3,6 +3,8 @@ package eu.dnetlib.dhp.transformation;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob; import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary;
import eu.dnetlib.dhp.transformation.vocabulary.VocabularyHelper;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.message.Message; import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager; import eu.dnetlib.message.MessageManager;
@ -53,12 +55,18 @@ public class TransformSparkJobNode {
.master(master) .master(master)
.getOrCreate(); .getOrCreate();
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class); final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder); final Dataset<MetadataRecord> mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems"); final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems"); final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems");
final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems"); final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems");
final TransformFunction transformFunction = new TransformFunction(totalItems, errorItems, transformedItems, trasformationRule, dateOfCollection) ; final Map<String, Vocabulary> vocabularies = new HashMap<>();
vocabularies.put("dnet:languages", VocabularyHelper.getVocabularyFromAPI("dnet:languages"));
final TransformFunction transformFunction = new TransformFunction(totalItems, errorItems, transformedItems, trasformationRule, dateOfCollection, vocabularies) ;
mdstoreInput.map(transformFunction, encoder).write().format("parquet").save(outputPath); mdstoreInput.map(transformFunction, encoder).write().format("parquet").save(outputPath);
if (rabbitHost != null) { if (rabbitHost != null) {
System.out.println("SEND FINAL REPORT"); System.out.println("SEND FINAL REPORT");

View File

@ -1,8 +1,22 @@
package eu.dnetlib.dhp.transformation.functions; package eu.dnetlib.dhp.transformation.functions;
import eu.dnetlib.dhp.transformation.vocabulary.Term;
import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary;
import net.sf.saxon.s9api.*; import net.sf.saxon.s9api.*;
import scala.Serializable;
public class Cleaner implements ExtensionFunction { import java.util.Map;
import java.util.Optional;
public class Cleaner implements ExtensionFunction, Serializable {
private final Map<String, Vocabulary> vocabularies;
public Cleaner(Map<String, Vocabulary> vocabularies) {
this.vocabularies = vocabularies;
}
@Override @Override
public QName getName() { public QName getName() {
@ -11,20 +25,25 @@ public class Cleaner implements ExtensionFunction {
@Override @Override
public SequenceType getResultType() { public SequenceType getResultType() {
return SequenceType.makeSequenceType(ItemType.STRING, OccurrenceIndicator.ONE); return SequenceType.makeSequenceType(ItemType.STRING, OccurrenceIndicator.ONE_OR_MORE);
} }
@Override @Override
public SequenceType[] getArgumentTypes() { public SequenceType[] getArgumentTypes() {
return new SequenceType[] return new SequenceType[]
{ {
SequenceType.makeSequenceType(ItemType.STRING, OccurrenceIndicator.ONE),
SequenceType.makeSequenceType(ItemType.STRING, OccurrenceIndicator.ONE) SequenceType.makeSequenceType(ItemType.STRING, OccurrenceIndicator.ONE)
}; };
} }
@Override @Override
public XdmValue call(XdmValue[] xdmValues) throws SaxonApiException { public XdmValue call(XdmValue[] xdmValues) throws SaxonApiException {
final String currentValue = xdmValues[0].itemAt(0).getStringValue(); final String currentValue = xdmValues[0].itemAt(0).getStringValue();
return new XdmAtomicValue("cleaned"+currentValue); final String vocabularyName =xdmValues[1].itemAt(0).getStringValue();
Optional<Term> cleanedValue = vocabularies.get(vocabularyName).getTerms().stream().filter(it -> it.getNativeName().equalsIgnoreCase(currentValue)).findAny();
return new XdmAtomicValue(cleanedValue.isPresent()?cleanedValue.get().getCode():currentValue);
} }
} }

View File

@ -0,0 +1,53 @@
package eu.dnetlib.dhp.transformation.vocabulary;
import java.io.Serializable;
public class Term implements Serializable {
private String englishName;
private String nativeName;
private String encoding;
private String code;
private String synonyms;
public String getEnglishName() {
return englishName;
}
public void setEnglishName(String englishName) {
this.englishName = englishName;
}
public String getNativeName() {
return nativeName;
}
public void setNativeName(String nativeName) {
this.nativeName = nativeName;
}
public String getEncoding() {
return encoding;
}
public void setEncoding(String encoding) {
this.encoding = encoding;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getSynonyms() {
return synonyms;
}
public void setSynonyms(String synonyms) {
this.synonyms = synonyms;
}
}

View File

@ -0,0 +1,57 @@
package eu.dnetlib.dhp.transformation.vocabulary;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
public class Vocabulary implements Serializable {
private String id;
private String name;
private String description;
private String code;
private List<Term> terms;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public List<Term> getTerms() {
return terms;
}
public void setTerms(List<Term> terms) {
this.terms = terms;
}
}

View File

@ -0,0 +1,23 @@
package eu.dnetlib.dhp.transformation.vocabulary;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import java.io.Serializable;
import java.net.URL;
import java.nio.charset.Charset;
public class VocabularyHelper implements Serializable {
private final static String OPENAIRE_URL ="http://api.openaire.eu/vocabularies/%s.json";
public static Vocabulary getVocabularyFromAPI(final String vocabularyName) throws Exception {
final URL url = new URL(String.format(OPENAIRE_URL, vocabularyName));
final String response = IOUtils.toString(url, Charset.defaultCharset());
final ObjectMapper jsonMapper = new ObjectMapper();
final Vocabulary vocabulary = jsonMapper.readValue(response, Vocabulary.class);
return vocabulary;
}
}

View File

@ -1,4 +1,5 @@
[ [
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"e", "paramLongName":"encoding", "paramDescription": "the encoding of the input record should be JSON or XML", "paramRequired": true}, {"paramName":"e", "paramLongName":"encoding", "paramDescription": "the encoding of the input record should be JSON or XML", "paramRequired": true},
{"paramName":"d", "paramLongName":"dateOfCollection", "paramDescription": "the date when the record has been stored", "paramRequired": true}, {"paramName":"d", "paramLongName":"dateOfCollection", "paramDescription": "the date when the record has been stored", "paramRequired": true},
{"paramName":"p", "paramLongName":"provenance", "paramDescription": "the infos about the provenance of the collected records", "paramRequired": true}, {"paramName":"p", "paramLongName":"provenance", "paramDescription": "the infos about the provenance of the collected records", "paramRequired": true},
@ -10,5 +11,6 @@
{"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true}, {"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true},
{"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true}, {"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true},
{"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true}, {"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true},
{"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true} {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true},
{"paramName":"t", "paramLongName":"isTest", "paramDescription": "the name of the report queue", "paramRequired": false}
] ]

View File

@ -55,22 +55,21 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="CollectionWorker"> <action name="CollectionWorker">
<shell xmlns="uri:oozie:shell-action:0.1"> <java>
<job-tracker>${jobTracker}</job-tracker> <job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node> <name-node>${nameNode}</name-node>
<exec>lib/dhp-collector-worker-1.0.0.jar</exec> <main-class>eu.dnetlib.dhp.collection.worker.DnetCollectorWorker</main-class>
<argument>-p</argument><argument>${sequenceFilePath}</argument> <java-opts>-p</java-opts><java-opts>${sequenceFilePath}</java-opts>
<argument>-a</argument><argument>${apiDescription}</argument> <java-opts>-a</java-opts><java-opts>${apiDescription}</java-opts>
<argument>-n</argument><argument>${nameNode}</argument> <java-opts>-n</java-opts><java-opts>${nameNode}</java-opts>
<argument>-rh</argument><argument>${rmq_host}</argument> <java-opts>-rh</java-opts><java-opts>${rmq_host}</java-opts>
<argument>-ru</argument><argument>${rmq_user}</argument> <java-opts>-ru</java-opts><java-opts>${rmq_user}</java-opts>
<argument>-rp</argument><argument>${rmq_pwd}</argument> <java-opts>-rp</java-opts><java-opts>${rmq_pwd}</java-opts>
<argument>-rr</argument><argument>${rmq_report}</argument> <java-opts>-rr</java-opts><java-opts>${rmq_report}</java-opts>
<argument>-ro</argument><argument>${rmq_ongoing}</argument> <java-opts>-ro</java-opts><java-opts>${rmq_ongoing}</java-opts>
<argument>-u</argument><argument>sandro.labruzzo</argument> <java-opts>-u</java-opts><java-opts>sandro.labruzzo</java-opts>
<argument>-w</argument><argument>${workflowId}</argument> <java-opts>-w</java-opts><java-opts>${workflowId}</java-opts>
<capture-output/> </java>
</shell>
<ok to="GenerateNativeStoreSparkJob"/> <ok to="GenerateNativeStoreSparkJob"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -3,21 +3,48 @@ package eu.dnetlib.dhp.collection;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance; import eu.dnetlib.dhp.model.mdstore.Provenance;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.junit.Assert; import org.junit.*;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class CollectionJobTest { public class CollectionJobTest {
private Path testDir;
@Before
public void setup() throws IOException {
testDir = Files.createTempDirectory("dhp-collection");
}
@After
public void teadDown() throws IOException {
FileUtils.deleteDirectory(testDir.toFile());
}
@Test @Test
@Ignore public void tesCollection () throws Exception {
public void test () throws Exception {
Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix"); Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix");
GenerateNativeStoreSparkJob.main(new String[] {"-e", "XML","-d", ""+System.currentTimeMillis(),"-p", new ObjectMapper().writeValueAsString(provenance), "-x","./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']","-i","/home/sandro/Downloads/oai_1","-o","/home/sandro/Downloads/mdstore_result"}); GenerateNativeStoreSparkJob.main(new String[] {
"-mt", "local",
"-w", "wid",
"-e", "XML",
"-d", ""+System.currentTimeMillis(),
"-p", new ObjectMapper().writeValueAsString(provenance),
"-x", "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
"-i", this.getClass().getResource("/eu/dnetlib/dhp/collection/native.seq").toString(),
"-o", testDir.toString()+"/store",
"-t", "true",
"-ru", "",
"-rp", "",
"-rh", "",
"-ro", "",
"-rr", ""});
System.out.println(new ObjectMapper().writeValueAsString(provenance)); System.out.println(new ObjectMapper().writeValueAsString(provenance));
} }

View File

@ -1,9 +1,10 @@
package eu.dnetlib.collector.worker; package eu.dnetlib.dhp.collector.worker;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor; import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.collector.worker.utils.CollectorPluginFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.DnetCollectorWorker;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.Message; import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager; import eu.dnetlib.message.MessageManager;
import org.junit.After; import org.junit.After;
@ -48,7 +49,7 @@ public class DnetCollectorWorkerApplicationTests {
@After @After
public void dropDown(){ public void dropDown(){
File f = new File("/tmp/test.seq"); File f = new File("/tmp/file.seq");
f.delete(); f.delete();
} }

View File

@ -2,26 +2,30 @@ package eu.dnetlib.dhp.transformation;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.functions.Cleaner; import eu.dnetlib.dhp.transformation.functions.Cleaner;
import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary;
import eu.dnetlib.dhp.transformation.vocabulary.VocabularyHelper;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
import net.sf.saxon.s9api.*; import net.sf.saxon.s9api.*;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.Node; import org.dom4j.Node;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import org.junit.Assert; import org.junit.*;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule; import org.mockito.junit.MockitoRule;
import javax.xml.transform.stream.StreamSource; import javax.xml.transform.stream.StreamSource;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.io.StringWriter; import java.io.StringWriter;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
public class TransformationJobTest { public class TransformationJobTest {
@ -31,9 +35,26 @@ public class TransformationJobTest {
@Rule @Rule
public MockitoRule mockitoRule = MockitoJUnit.rule(); public MockitoRule mockitoRule = MockitoJUnit.rule();
private Path testDir;
@Before
public void setup() throws IOException {
testDir = Files.createTempDirectory("dhp-collection");
}
@After
public void teadDown() throws IOException {
FileUtils.deleteDirectory(testDir.toFile());
}
@Test @Test
public void testTransformSaxonHE() throws Exception { public void testTransformSaxonHE() throws Exception {
Cleaner cleanFunction = new Cleaner();
Map<String, Vocabulary> vocabularies = new HashMap<>();
vocabularies.put("dnet:languages", VocabularyHelper.getVocabularyFromAPI("dnet:languages"));
Cleaner cleanFunction = new Cleaner(vocabularies);
Processor proc = new Processor(false); Processor proc = new Processor(false);
proc.registerExtensionFunction(cleanFunction); proc.registerExtensionFunction(cleanFunction);
final XsltCompiler comp = proc.newXsltCompiler(); final XsltCompiler comp = proc.newXsltCompiler();
@ -53,26 +74,30 @@ public class TransformationJobTest {
@Test @Test
public void transformTest() throws Exception { public void transformTest() throws Exception {
final String mdstore_input = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstore").getFile(); final String mdstore_input = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstorenative").getFile();
Path tempDirWithPrefix = Files.createTempDirectory("mdstore_output"); final String mdstore_output = testDir.toString()+"/version";
final String mdstore_output = tempDirWithPrefix.toFile().getAbsolutePath()+"/version";
final String xslt = DHPUtils.compressString(IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"))); final String xslt = DHPUtils.compressString(IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml")));
TransformSparkJobNode.main(new String[]{
"-mt", "local",
"-i", mdstore_input,
"-o", mdstore_output,
"-d", "1",
"-w", "1",
"-tr", xslt,
"-t", "true",
"-ru", "",
"-rp", "",
"-rh", "",
"-ro", "",
"-rr", ""});
System.out.println(xslt);
TransformSparkJobNode.main(new String[]{"-mt","local", "-i", mdstore_input, "-o", mdstore_output,"-d","1", "-w","1","-tr", xslt, "-t", "true", "-ru","", "-rp","", "-rh","", "-ro","", "-rr",""});
Files.walk(tempDirWithPrefix)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
} }
@Test @Test
public void tryLoadFolderOnCP() throws Exception { public void tryLoadFolderOnCP() throws Exception {
final String path = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstore").getFile(); final String path = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstorenative").getFile();
System.out.println("path = " + path); System.out.println("path = " + path);
Path tempDirWithPrefix = Files.createTempDirectory("mdsotre_output"); Path tempDirWithPrefix = Files.createTempDirectory("mdsotre_output");
@ -90,8 +115,10 @@ public class TransformationJobTest {
Document document = reader.read(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml")); Document document = reader.read(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"));
Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']"); Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']");
final String xslt = node.asXML(); final String xslt = node.asXML();
Map<String, Vocabulary> vocabularies = new HashMap<>();
vocabularies.put("dnet:languages", VocabularyHelper.getVocabularyFromAPI("dnet:languages"));
TransformFunction tf = new TransformFunction(accumulator, accumulator, accumulator, xslt, 1); TransformFunction tf = new TransformFunction(accumulator, accumulator, accumulator, xslt, 1, vocabularies);
MetadataRecord record = new MetadataRecord(); MetadataRecord record = new MetadataRecord();
record.setBody(IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input.xml"))); record.setBody(IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input.xml")));

View File

@ -0,0 +1,17 @@
package eu.dnetlib.dhp.transformation.vocabulary;
import org.junit.Test;
import static org.junit.Assert.*;
public class VocabularyTest {
@Test
public void testLoadVocabulary() throws Exception {
final Vocabulary vocabulary = VocabularyHelper.getVocabularyFromAPI("dnet:languages");
assertEquals("dnet:languages",vocabulary.getName());
}
}

View File

@ -9,7 +9,7 @@
<xsl:copy-of select="//oai:header"/> <xsl:copy-of select="//oai:header"/>
<metadata> <metadata>
<xsl:for-each select="//*[local-name()='subject']"> <xsl:for-each select="//*[local-name()='subject']">
<subject><xsl:value-of select="eg:clean(.)"/></subject> <subject><xsl:value-of select="eg:clean(.,'dnet:languages')"/></subject>
</xsl:for-each> </xsl:for-each>
</metadata> </metadata>
<oaf:about> <oaf:about>

View File

@ -20,8 +20,8 @@
<dc:creator>Lombardi, Floriana</dc:creator> <dc:creator>Lombardi, Floriana</dc:creator>
<dc:creator>Tafuri, F.</dc:creator> <dc:creator>Tafuri, F.</dc:creator>
<dc:creator>Tagliacozzo, A.</dc:creator> <dc:creator>Tagliacozzo, A.</dc:creator>
<dc:subject>Materials Chemistry</dc:subject> <dc:subject>Acoli</dc:subject>
<dc:subject>Geochemistry</dc:subject> <dc:subject>Abkhazian</dc:subject>
<dc:subject>Condensed Matter Physics</dc:subject> <dc:subject>Condensed Matter Physics</dc:subject>
<dc:description>Superconducting hybrid junctions are revealing a variety of effects. Some of them are due to the special layout of these devices, which often use a coplanar configuration with relatively large barrier channels and the possibility of hosting Pearl vortices. A Josephson junction with a quasi-ideal two-dimensional barrier has been realized by growing graphene on SiC with Al electrodes. Chemical vapor deposition offers centimeter size monolayer areas where it is possible to realize a comparative analysis of different devices with nominally the same barrier. In samples with a graphene gap below 400 nm, we have found evidence of Josephson coherence in the presence of an incipient Berezinskii-Kosterlitz-Thouless transition. When the magnetic field is cycled, a remarkable hysteretic collapse and revival of the Josephson supercurrent occurs. Similar hysteresis are found in granular systems and are usually justified within the Bean critical state model (CSM). We show that the CSM, with appropriate account for the low-dimensional geometry, can partly explain the odd features measured in these junctions.</dc:description> <dc:description>Superconducting hybrid junctions are revealing a variety of effects. Some of them are due to the special layout of these devices, which often use a coplanar configuration with relatively large barrier channels and the possibility of hosting Pearl vortices. A Josephson junction with a quasi-ideal two-dimensional barrier has been realized by growing graphene on SiC with Al electrodes. Chemical vapor deposition offers centimeter size monolayer areas where it is possible to realize a comparative analysis of different devices with nominally the same barrier. In samples with a graphene gap below 400 nm, we have found evidence of Josephson coherence in the presence of an incipient Berezinskii-Kosterlitz-Thouless transition. When the magnetic field is cycled, a remarkable hysteretic collapse and revival of the Josephson supercurrent occurs. Similar hysteresis are found in granular systems and are usually justified within the Bean critical state model (CSM). We show that the CSM, with appropriate account for the low-dimensional geometry, can partly explain the odd features measured in these junctions.</dc:description>
<dc:relation>info:eu-repo/grantAgreement/EC/FP7/604391//Graphene-Based Revolutions in ICT And Beyond (Graphene Flagship)/</dc:relation> <dc:relation>info:eu-repo/grantAgreement/EC/FP7/604391//Graphene-Based Revolutions in ICT And Beyond (Graphene Flagship)/</dc:relation>

View File

@ -24,7 +24,7 @@
<xsl:copy-of select="//oai:header"/> <xsl:copy-of select="//oai:header"/>
<metadata> <metadata>
<xsl:for-each select="//*[local-name()='subject']"> <xsl:for-each select="//*[local-name()='subject']">
<subject><xsl:value-of select="eg:clean(.)"/></subject> <subject><xsl:value-of select="eg:clean(.,'dnet:languages')"/></subject>
</xsl:for-each> </xsl:for-each>
</metadata> </metadata>
<oaf:about> <oaf:about>

12
pom.xml
View File

@ -242,6 +242,18 @@
<version>${google.protobuf.version}</version> <version>${google.protobuf.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaire-data-protos</artifactId>
<version>3.9.4</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>