diff --git a/dhp-applications/dhp-collector-worker/pom.xml b/dhp-applications/dhp-collector-worker/pom.xml
index 8dec778e6..fd567aa37 100644
--- a/dhp-applications/dhp-collector-worker/pom.xml
+++ b/dhp-applications/dhp-collector-worker/pom.xml
@@ -4,9 +4,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- eu.dnetlib.dhp
- dhp-applications
- 1.0.0-SNAPSHOT
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.1.3.RELEASE
../
@@ -18,7 +18,34 @@
+
+
+
+ dnet45-releases
+ D-Net 45 releases
+ http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases
+ default
+
+ false
+
+
+ true
+
+
+
+ dnet45-bootstrap-release
+ dnet45 bootstrap release
+ http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-bootstrap-release
+ default
+
+ false
+
+
+ true
+
+
+
cloudera
Cloudera Repository
@@ -32,12 +59,89 @@
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+ true
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.6.0
+
+
+ 1.8
+ ${project.build.sourceEncoding}
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 3.0.2
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.0.1
+
+
+ attach-sources
+ verify
+
+ jar-no-fork
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.19.1
+
+ true
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+ 2.10.4
+
+ true
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 3.0.0
+
+
+
+
+
+
+
eu.dnetlib.dhp
dhp-common
1.0.0-SNAPSHOT
+
+
+ commons-cli
+ commons-cli
+ 1.4
+
org.springframework.boot
spring-boot-starter
@@ -67,13 +171,6 @@
jaxen
jaxen
-
-
- com.rabbitmq
- amqp-client
- 5.6.0
-
-
org.springframework.boot
spring-boot-starter-test
diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java
index d654a30e8..c853207d3 100644
--- a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java
+++ b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java
@@ -4,6 +4,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.collector.worker.plugins.CollectorPlugin;
import eu.dnetlib.collector.worker.utils.CollectorPluginEnumerator;
+import eu.dnetlib.message.Message;
+import eu.dnetlib.message.MessageManager;
+import eu.dnetlib.message.MessageType;
+import org.apache.commons.cli.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -19,6 +23,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.io.IOException;
import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -55,20 +61,114 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
*/
@Override
public void run(final String... args) throws Exception {
- if (args.length == 0) { return; }
- if (args.length != 3) { throw new DnetCollectorException("Invalid number of parameters, expected: hdfs_path and json_api_description"); }
- //TODO : migrate to https://commons.apache.org/proper/commons-cli/usage.html
+ Options options = new Options();
+ options.addOption(Option.builder("p")
+ .longOpt("hdfsPath")
+ .required(true)
+ .desc("the path where storing the sequential file")
+ .hasArg() // This option has an argument.
+ .build());
+ options.addOption(Option.builder("a")
+ .longOpt("apidescriptor")
+ .required(true)
+ .desc("the Json enconding of the API Descriptor")
+ .hasArg() // This option has an argument.
+ .build());
- final String hdfsPath = args[0];
+ options.addOption(Option.builder("n")
+ .longOpt("namenode")
+ .required(true)
+ .desc("the Name Node URI")
+ .hasArg() // This option has an argument.
+ .build());
+ options.addOption(Option.builder("u")
+ .longOpt("userHDFS")
+ .required(true)
+ .desc("the user wich create the hdfs seq file")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("ru")
+ .longOpt("rabbitUser")
+ .required(true)
+ .desc("the user to connect with RabbitMq for messaging")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("rp")
+ .longOpt("rabbitPassWord")
+ .required(true)
+ .desc("the password to connect with RabbitMq for messaging")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("rh")
+ .longOpt("rabbitHost")
+ .required(true)
+ .desc("the host of the RabbitMq server")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("ro")
+ .longOpt("rabbitOngoingQueue")
+ .required(true)
+ .desc("the name of the ongoing queue")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("rr")
+ .longOpt("rabbitReportQueue")
+ .required(true)
+ .desc("the name of the report queue")
+ .hasArg() // This option has an argument.
+ .build());
+
+
+ options.addOption(Option.builder("w")
+ .longOpt("workflowId")
+ .required(true)
+ .desc("the identifier of the dnet Workflow")
+ .hasArg() // This option has an argument.
+ .build());
+
+ CommandLineParser parser = new DefaultParser();
+ String hdfsPath ;
+ String json;
+ String nameNode;
+ String user;
+ String rabbitUser;
+ String rabbitPassword;
+ String rabbitHost;
+ String rabbitOngoingQueue;
+ String rabbitReportQueue;
+ String workflowId;
+
+ try {
+ CommandLine cmd = parser.parse(options, args);
+ hdfsPath = cmd.getOptionValue("p");
+ json = cmd.getOptionValue("a");
+ nameNode = cmd.getOptionValue("n");
+ user = cmd.getOptionValue("u");
+ rabbitUser = cmd.getOptionValue("ru");
+ rabbitPassword = cmd.getOptionValue("rp");
+ rabbitHost = cmd.getOptionValue("rh");
+ rabbitOngoingQueue = cmd.getOptionValue("ro");
+ rabbitReportQueue = cmd.getOptionValue("rr");
+ workflowId = cmd.getOptionValue("w");
+ } catch (ParseException e) {
+ System.out.println("Error on executing collector worker, missing parameter:");
+ e.printStackTrace();
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("dhp-collector-worker", options);
+ return;
+ }
log.info("hdfsPath ="+hdfsPath);
-
- final String json = args[1];
-
-
- final String nameNode = args[2];
-
log.info("json = "+json);
+
+ final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false, null);
+
+
final ObjectMapper jsonMapper = new ObjectMapper();
final ApiDescriptor api = jsonMapper.readValue(json, ApiDescriptor.class);
@@ -84,7 +184,7 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
- System.setProperty("HADOOP_USER_NAME", "sandro.labruzzo");
+ System.setProperty("HADOOP_USER_NAME", user);
System.setProperty("hadoop.home.dir", "/");
//Get the filesystem - HDFS
FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf);
@@ -100,10 +200,22 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
final IntWritable key = new IntWritable(counter.get());
final Text value = new Text();
+ final Map ongoingMap = new HashMap<>();
+ final Map reportMap = new HashMap<>();
+
plugin.collect(api).forEach(content -> {
- key.set(counter.getAndIncrement());
- value.set(content);
+ key.set(counter.getAndIncrement());
+ value.set(content);
+ if (counter.get() % 10 ==0) {
+ try {
+ ongoingMap.put("ongoing", ""+counter.get());
+ manager.sendMessage(new Message(workflowId,"Collection", MessageType.ONGOING, ongoingMap ), rabbitOngoingQueue, true, false);
+ } catch (Exception e) {
+ log.error("Error on sending message ", e);
+ }
+ }
+
try {
writer.append(key, value);
} catch (IOException e) {
@@ -111,6 +223,11 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
}
});
+ ongoingMap.put("ongoing", ""+counter.get());
+ manager.sendMessage(new Message(workflowId,"Collection", MessageType.ONGOING, ongoingMap ), rabbitOngoingQueue, true, false);
+ reportMap.put("collected", ""+counter.get());
+ manager.sendMessage(new Message(workflowId,"Collection", MessageType.REPORT, reportMap ), rabbitReportQueue, true, false);
+
}
}
diff --git a/dhp-applications/dhp-collector-worker/src/main/resources/application.properties b/dhp-applications/dhp-collector-worker/src/main/resources/application.properties
index 8b1378917..2a867fa5f 100644
--- a/dhp-applications/dhp-collector-worker/src/main/resources/application.properties
+++ b/dhp-applications/dhp-collector-worker/src/main/resources/application.properties
@@ -1 +1,2 @@
-
+spring.main.banner-mode=off
+logging.level.root=OFF
\ No newline at end of file
diff --git a/dhp-applications/dhp-mdstore-manager-app/pom.xml b/dhp-applications/dhp-mdstore-manager-app/pom.xml
index db3d702ce..645535c0a 100644
--- a/dhp-applications/dhp-mdstore-manager-app/pom.xml
+++ b/dhp-applications/dhp-mdstore-manager-app/pom.xml
@@ -8,9 +8,9 @@
- eu.dnetlib.dhp
- dhp-applications
- 1.0.0-SNAPSHOT
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.1.3.RELEASE
../
@@ -28,6 +28,7 @@
+
diff --git a/dhp-applications/pom.xml b/dhp-applications/pom.xml
index 7e52fc76d..15a241e6e 100644
--- a/dhp-applications/pom.xml
+++ b/dhp-applications/pom.xml
@@ -1,5 +1,6 @@
-
+
4.0.0
eu.dnetlib.dhp
@@ -14,37 +15,17 @@
dhp-collector-worker
+
+
-
- org.springframework.boot
- spring-boot-dependencies
- 2.1.3.RELEASE
- pom
- import
+ eu.dnetlib.dhp
+ dhp-common
+ 1.0.0-SNAPSHOT
-
-
- eu.dnetlib.dhp
- dhp-common
- 1.0.0-SNAPSHOT
-
-
-
-
-
-
- org.springframework.boot
- spring-boot-maven-plugin
-
- true
-
-
-
-
diff --git a/dhp-common/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java b/dhp-common/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java
index 6874d97f4..255104eda 100644
--- a/dhp-common/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java
+++ b/dhp-common/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java
@@ -4,7 +4,6 @@ import java.util.HashMap;
import java.util.Map;
-//TODO sholud be moved on dhp-common
public class ApiDescriptor {
private String id;
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java
index d37272d21..846ece5ed 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java
@@ -1,9 +1,16 @@
package eu.dnetlib.dhp.utils;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.binary.Base64OutputStream;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.StringUtils;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
public class DHPUtils {
@@ -22,4 +29,31 @@ public class DHPUtils {
return String.format("%s::%s",nsPrefix, DHPUtils.md5(originalId));
}
+ public static String compressString(final String input ) {
+ try ( ByteArrayOutputStream out = new ByteArrayOutputStream(); Base64OutputStream b64os = new Base64OutputStream(out)) {
+ GZIPOutputStream gzip = new GZIPOutputStream(b64os);
+ gzip.write(input.getBytes(StandardCharsets.UTF_8));
+ gzip.close();
+ return out.toString();
+ } catch (Throwable e ) {
+ return null;
+ }
+ }
+
+
+ public static String decompressString(final String input) {
+ byte[] byteArray = Base64.decodeBase64(input.getBytes());
+ int len;
+ try (GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream((byteArray))); ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length)) {
+ byte[] buffer = new byte[1024];
+ while((len = gis.read(buffer)) != -1){
+ bos.write(buffer, 0, len);
+ }
+ return bos.toString();
+ } catch (Exception e) {
+ return null;
+ }
+
+ }
+
}
diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java
index ee58f11e8..af5339034 100644
--- a/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java
+++ b/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java
@@ -39,6 +39,8 @@ public class MessageConsumer extends DefaultConsumer {
else {
//TODO LOGGING EXCEPTION
}
+ } finally {
+ getChannel().basicAck(envelope.getDeliveryTag(), false);
}
}
}
diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java
index 8db9f3ce1..2b5e90838 100644
--- a/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java
+++ b/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java
@@ -5,9 +5,11 @@ import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import sun.rmi.runtime.Log;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeoutException;
public class MessageManager {
@@ -41,42 +43,45 @@ public class MessageManager {
this.durable = durable;
this.autodelete = autodelete;
}
- private Channel createChannel(final String queueName, final boolean durable, final boolean autodelete ) throws Exception {
+
+ private Connection createConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.messageHost);
factory.setUsername(this.username);
factory.setPassword(this.password);
- Connection connection = factory.newConnection();
- Map args = new HashMap();
+ return factory.newConnection();
+ }
+
+ private Channel createChannel(final Connection connection, final String queueName, final boolean durable, final boolean autodelete ) throws Exception {
+ Map args = new HashMap<>();
args.put("x-message-ttl", 10000);
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, durable, false, this.autodelete, args);
return channel;
}
public boolean sendMessage(final Message message, String queueName) throws Exception {
- try (Channel channel = createChannel(queueName, this.durable, this.autodelete)) {
+ try (Connection connection = createConnection(); Channel channel = createChannel(connection, queueName, this.durable, this.autodelete)) {
channel.basicPublish("", queueName,null, message.toString().getBytes());
return true;
} catch (Throwable e) {
- e.printStackTrace();
- return false;
+ throw new RuntimeException(e);
}
}
public boolean sendMessage(final Message message, String queueName, boolean durable_var, boolean autodelete_var) throws Exception {
- try (Channel channel = createChannel(queueName, durable_var, autodelete_var)) {
+ try (Connection connection = createConnection(); Channel channel = createChannel(connection, queueName, durable_var, autodelete_var)) {
channel.basicPublish("", queueName,null, message.toString().getBytes());
return true;
} catch (Throwable e) {
- e.printStackTrace();
- return false;
+ throw new RuntimeException(e);
}
}
public void startConsumingMessage(final String queueName, final boolean durable, final boolean autodelete) throws Exception{
- Channel channel = createChannel(queueName, durable, autodelete);
+
+ Channel channel = createChannel(createConnection(), queueName, durable, autodelete);
channel.basicConsume(queueName, false, new MessageConsumer(channel,queueMessages));
}
}
diff --git a/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java b/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java
index baed36304..fbc9dc251 100644
--- a/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java
+++ b/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java
@@ -51,33 +51,5 @@ public class MessageTest {
}
- @Test
- public void sendMessageTest() throws Exception {
- final String expectedJson= "{\"workflowId\":\"wId\",\"jobName\":\"Collection\",\"type\":\"ONGOING\",\"body\":{\"ExecutionTime\":\"30s\",\"parsedItem\":\"300\"}}";
- Message m = new Message();
- m.setWorkflowId("wf_20190405_105048_275");
- m.setType(MessageType.ONGOING);
- m.setJobName("Collection");
- Map body= new HashMap<>();
- body.put("progressCount", "100");
- body.put("ExecutionTime", "30s");
-
- m.setBody(body);
-
- MessageManager mm = new MessageManager("broker1-dev-dnet.d4science.org","r_admin", "9g8fed7gpohef9y84th98h", false,false, null);
-
-
-
-
-
- mm.sendMessage(m, "dev_ongoing");
-
- m.setType(MessageType.REPORT);
-
- body.put("mdStoreSize", "368");
-
-
- mm.sendMessage(m, "dev_report", true, false);
- }
}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index 0534e23b8..21ddaea99 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -20,12 +20,15 @@
org.apache.spark
spark-sql_2.11
+
eu.dnetlib.dhp
dhp-common
1.0.0-SNAPSHOT
+
+
dom4j
dom4j
@@ -37,7 +40,12 @@
jaxen
1.1.6
-
+
+ org.mockito
+ mockito-core
+ 2.25.0
+ test
+
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java
index 719e1f134..5973153cd 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java
@@ -3,6 +3,9 @@ package eu.dnetlib.dhp.collection;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
+import eu.dnetlib.message.Message;
+import eu.dnetlib.message.MessageManager;
+import eu.dnetlib.message.MessageType;
import org.apache.commons.cli.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
@@ -21,6 +24,8 @@ import org.dom4j.io.SAXReader;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
public class GenerateNativeStoreSparkJob {
@@ -52,6 +57,67 @@ public class GenerateNativeStoreSparkJob {
public static void main(String[] args) throws Exception {
+ Options options = generateApplicationArguments();
+
+
+ CommandLineParser parser = new DefaultParser();
+ CommandLine cmd = parser.parse( options, args);
+
+ final String encoding = cmd.getOptionValue("e");
+ final long dateOfCollection = new Long(cmd.getOptionValue("d"));
+ final String jsonProvenance = cmd.getOptionValue("p");
+ final ObjectMapper jsonMapper = new ObjectMapper();
+ final Provenance provenance = jsonMapper.readValue(jsonProvenance, Provenance.class);
+ final String xpath = cmd.getOptionValue("x");
+ final String inputPath = cmd.getOptionValue("i");
+ final String outputPath = cmd.getOptionValue("o");
+ final String rabbitUser = cmd.getOptionValue("ru");
+ final String rabbitPassword = cmd.getOptionValue("rp");
+ final String rabbitHost = cmd.getOptionValue("rh");
+ final String rabbitOngoingQueue = cmd.getOptionValue("ro");
+ final String rabbitReportQueue = cmd.getOptionValue("rr");
+ final String workflowId = cmd.getOptionValue("w");
+
+ final SparkSession spark = SparkSession
+ .builder()
+ .appName("GenerateNativeStoreSparkJob")
+ .master("yarn")
+ .getOrCreate();
+
+ final Map ongoingMap = new HashMap<>();
+ final Map reportMap = new HashMap<>();
+
+ final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+
+ final JavaPairRDD inputRDD = sc.sequenceFile(inputPath, IntWritable.class, Text.class);
+
+ final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
+
+ final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
+
+ final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false, null);
+
+ final JavaRDD mappeRDD = inputRDD.map(item -> parseRecord(item._2().toString(), xpath, encoding, provenance, dateOfCollection, totalItems, invalidRecords))
+ .filter(Objects::nonNull).distinct();
+
+ ongoingMap.put("ongoing", "0");
+ manager.sendMessage(new Message(workflowId,"DataFrameCreation", MessageType.ONGOING, ongoingMap ), rabbitOngoingQueue, true, false);
+
+ final Encoder encoder = Encoders.bean(MetadataRecord.class);
+ final Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
+ final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
+ mdStoreRecords.add(mdstore.count());
+ ongoingMap.put("ongoing", ""+ totalItems.value());
+ manager.sendMessage(new Message(workflowId,"DataFrameCreation", MessageType.ONGOING, ongoingMap ), rabbitOngoingQueue, true, false);
+
+ mdstore.write().format("parquet").save(outputPath);
+ reportMap.put("inputItem" , ""+ totalItems.value());
+ reportMap.put("invalidRecords", "" + invalidRecords.value());
+ reportMap.put("mdStoreSize", "" + mdStoreRecords.value());
+ manager.sendMessage(new Message(workflowId,"Collection", MessageType.REPORT, reportMap ), rabbitReportQueue, true, false);
+ }
+
+ private static Options generateApplicationArguments() {
Options options = new Options();
options.addOption(Option.builder("e")
.longOpt("encoding")
@@ -93,43 +159,48 @@ public class GenerateNativeStoreSparkJob {
.hasArg()
.build());
+ options.addOption(Option.builder("ru")
+ .longOpt("rabbitUser")
+ .required(true)
+ .desc("the user to connect with RabbitMq for messaging")
+ .hasArg() // This option has an argument.
+ .build());
- CommandLineParser parser = new DefaultParser();
- CommandLine cmd = parser.parse( options, args);
+ options.addOption(Option.builder("rp")
+ .longOpt("rabbitPassWord")
+ .required(true)
+ .desc("the password to connect with RabbitMq for messaging")
+ .hasArg() // This option has an argument.
+ .build());
- final String encoding = cmd.getOptionValue("e");
- final long dateOfCollection = new Long(cmd.getOptionValue("d"));
- final String jsonProvenance = cmd.getOptionValue("p");
- final ObjectMapper jsonMapper = new ObjectMapper();
- final Provenance provenance = jsonMapper.readValue(jsonProvenance, Provenance.class);
- final String xpath = cmd.getOptionValue("x");
- final String inputPath = cmd.getOptionValue("i");
- final String outputPath = cmd.getOptionValue("o");
+ options.addOption(Option.builder("rh")
+ .longOpt("rabbitHost")
+ .required(true)
+ .desc("the host of the RabbitMq server")
+ .hasArg() // This option has an argument.
+ .build());
- final SparkSession spark = SparkSession
- .builder()
- .appName("GenerateNativeStoreSparkJob")
- .master("local")
- .getOrCreate();
+ options.addOption(Option.builder("ro")
+ .longOpt("rabbitOngoingQueue")
+ .required(true)
+ .desc("the name of the ongoing queue")
+ .hasArg() // This option has an argument.
+ .build());
- final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+ options.addOption(Option.builder("rr")
+ .longOpt("rabbitReportQueue")
+ .required(true)
+ .desc("the name of the report queue")
+ .hasArg() // This option has an argument.
+ .build());
- final JavaPairRDD inputRDD = sc.sequenceFile(inputPath, IntWritable.class, Text.class);
- final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
-
- final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
-
- final JavaRDD mappeRDD = inputRDD.map(item -> parseRecord(item._2().toString(), xpath, encoding, provenance, dateOfCollection, totalItems, invalidRecords))
- .filter(Objects::nonNull).distinct();
-
- final Encoder encoder = Encoders.bean(MetadataRecord.class);
- final Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
- final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
- mdStoreRecords.add(mdstore.count());
- System.out.println("totalItems.value() = " + totalItems.value());
- System.out.println("invalidRecords = " + invalidRecords.value());
- System.out.println("mdstoreRecords.value() = " + mdStoreRecords.value());
- mdstore.write().format("parquet").save(outputPath);
+ options.addOption(Option.builder("w")
+ .longOpt("workflowId")
+ .required(true)
+ .desc("the identifier of the dnet Workflow")
+ .hasArg() // This option has an argument.
+ .build());
+ return options;
}
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/TransformSparkJobNode.java
deleted file mode 100644
index 926b13bba..000000000
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/TransformSparkJobNode.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package eu.dnetlib.dhp.collection;
-
-import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
-import eu.dnetlib.dhp.transformation.TransformFunction;
-import org.apache.commons.cli.*;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoder;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.util.LongAccumulator;
-
-public class TransformSparkJobNode {
-
- public static void main(String[] args) throws ParseException {
- Options options = new Options();
-
- options.addOption(Option.builder("i")
- .longOpt("input")
- .required(true)
- .desc("input path of the sequence file")
- .hasArg() // This option has an argument.
- .build());
- options.addOption(Option.builder("o")
- .longOpt("output")
- .required(true)
- .desc("output path of the mdstore")
- .hasArg()
- .build());
-
- final CommandLineParser parser = new DefaultParser();
- final CommandLine cmd = parser.parse( options, args);
-
- final String inputPath = cmd.getOptionValue("i");
- final String outputPath = cmd.getOptionValue("o");
-
- final SparkSession spark = SparkSession
- .builder()
- .appName("GenerateNativeStoreSparkJob")
- .master("local")
- .getOrCreate();
-
- final Encoder encoder = Encoders.bean(MetadataRecord.class);
- final Dataset mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
- final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
-
- final TransformFunction mfunc = new TransformFunction(totalItems);
- mdstoreInput.map(mfunc, encoder).write().format("parquet").save(outputPath);
- System.out.println("totalItems = " + totalItems.value());
-
- }
-}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformFunction.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformFunction.java
index 2380c673a..b6d247e13 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformFunction.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformFunction.java
@@ -4,18 +4,52 @@ import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.util.LongAccumulator;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.transform.stream.StreamSource;
+import java.io.ByteArrayInputStream;
+import java.io.StringWriter;
+
public class TransformFunction implements MapFunction {
private final LongAccumulator totalItems;
+ private final LongAccumulator errorItems;
+ private final LongAccumulator transformedItems;
+ private final String trasformationRule;
- public TransformFunction(LongAccumulator totalItems) {
+ private final long dateOfTransformation;
+
+
+ public TransformFunction(LongAccumulator totalItems, LongAccumulator errorItems, LongAccumulator transformedItems, final String trasformationRule, long dateOfTransformation) {
this.totalItems= totalItems;
+ this.errorItems = errorItems;
+ this.transformedItems = transformedItems;
+ this.trasformationRule = trasformationRule;
+ this.dateOfTransformation = dateOfTransformation;
}
@Override
- public MetadataRecord call(MetadataRecord value) throws Exception {
+ public MetadataRecord call(MetadataRecord value) {
totalItems.add(1);
- return value;
+ try {
+ final TransformerFactory factory = TransformerFactory.newInstance();
+ factory.newTransformer();
+ final StreamSource xsltSource = new StreamSource(new ByteArrayInputStream(trasformationRule.getBytes()));
+ final Transformer transformer = factory.newTransformer(xsltSource);
+ transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+ final StringWriter output = new StringWriter();
+ transformer.transform(new StreamSource(new ByteArrayInputStream(value.getBody().getBytes())), new StreamResult(output));
+ final String xml = output.toString();
+ value.setBody(xml);
+ value.setDateOfCollection(dateOfTransformation);
+ transformedItems.add(1);
+ return value;
+ }catch (Throwable e) {
+ errorItems.add(1);
+ return null;
+ }
}
-}
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java
new file mode 100644
index 000000000..ebca61b56
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java
@@ -0,0 +1,160 @@
+package eu.dnetlib.dhp.transformation;
+
+import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
+import eu.dnetlib.dhp.utils.DHPUtils;
+import eu.dnetlib.message.Message;
+import eu.dnetlib.message.MessageManager;
+import eu.dnetlib.message.MessageType;
+import org.apache.commons.cli.*;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.util.LongAccumulator;
+import org.dom4j.Document;
+import org.dom4j.DocumentException;
+import org.dom4j.Node;
+import org.dom4j.io.SAXReader;
+
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TransformSparkJobNode {
+
+
+
+ public static void main(String[] args) throws Exception {
+
+ for (int i = 0; i < args.length; i++) {
+ System.out.println(args[i]);
+ }
+
+ Options options = new Options();
+
+ options.addOption(Option.builder("mt")
+ .longOpt("master")
+ .required(true)
+ .desc("should be local or yarn")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("d")
+ .longOpt("dateOfCollection")
+ .required(true)
+ .desc("the date of collection")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("i")
+ .longOpt("input")
+ .required(true)
+ .desc("input path of the sequence file")
+ .hasArg() // This option has an argument.
+ .build());
+ options.addOption(Option.builder("o")
+ .longOpt("output")
+ .required(true)
+ .desc("output path of the mdstore")
+ .hasArg()
+ .build());
+ options.addOption(Option.builder("w")
+ .longOpt("workflowId")
+ .required(true)
+ .desc("the identifier of the dnet Workflow")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("tr")
+ .longOpt("transformationRule")
+ .required(true)
+ .desc("the transformation Rule to apply to the input MDStore")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("ru")
+ .longOpt("rabbitUser")
+ .required(false)
+ .desc("the user to connect with RabbitMq for messaging")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("rp")
+ .longOpt("rabbitPassWord")
+ .required(false)
+ .desc("the password to connect with RabbitMq for messaging")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("rh")
+ .longOpt("rabbitHost")
+ .required(false)
+ .desc("the host of the RabbitMq server")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("ro")
+ .longOpt("rabbitOngoingQueue")
+ .required(false)
+ .desc("the name of the ongoing queue")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("rr")
+ .longOpt("rabbitReportQueue")
+ .required(false)
+ .desc("the name of the report queue")
+ .hasArg() // This option has an argument.
+ .build());
+
+
+ final CommandLineParser parser = new DefaultParser();
+ final CommandLine cmd = parser.parse( options, args);
+
+ final String inputPath = cmd.getOptionValue("i");
+ final String outputPath = cmd.getOptionValue("o");
+ final String workflowId = cmd.getOptionValue("w");
+ final String trasformationRule = extractXSLTFromTR(DHPUtils.decompressString(cmd.getOptionValue("tr")));
+ final String master = cmd.getOptionValue("mt");
+ final String rabbitUser = cmd.getOptionValue("ru");
+ final String rabbitPassword = cmd.getOptionValue("rp");
+ final String rabbitHost = cmd.getOptionValue("rh");
+ final String rabbitReportQueue = cmd.getOptionValue("rr");
+ final long dateOfCollection = new Long(cmd.getOptionValue("d"));
+
+ final SparkSession spark = SparkSession
+ .builder()
+ .appName("TransformStoreSparkJob")
+ .master(master)
+ .getOrCreate();
+
+ final Encoder encoder = Encoders.bean(MetadataRecord.class);
+ final Dataset mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
+
+ final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
+ final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems");
+ final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems");
+
+ final TransformFunction transformFunction = new TransformFunction(totalItems, errorItems, transformedItems, trasformationRule, dateOfCollection) ;
+ mdstoreInput.map(transformFunction, encoder).write().format("parquet").save(outputPath);
+
+
+ if (rabbitHost != null) {
+ final Map reportMap = new HashMap<>();
+ reportMap.put("inputItem" , ""+ totalItems.value());
+ reportMap.put("invalidRecords", "" + errorItems.value());
+ reportMap.put("mdStoreSize", "" + transformedItems.value());
+ final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false, null);
+ manager.sendMessage(new Message(workflowId, "Transform", MessageType.REPORT, reportMap), rabbitReportQueue, true, false);
+ }
+
+ }
+
+
+ private static String extractXSLTFromTR(final String tr) throws DocumentException {
+ SAXReader reader = new SAXReader();
+ Document document = reader.read(new ByteArrayInputStream(tr.getBytes()));
+ Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']");
+ return node.asXML();
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/oozie/workflows/oozie_collection_workflows.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/oozie/workflows/oozie_collection_workflows.xml
index c6279e7c5..1602519e0 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/oozie/workflows/oozie_collection_workflows.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/oozie/workflows/oozie_collection_workflows.xml
@@ -1,4 +1,4 @@
-
+
sequenceFilePath
@@ -20,7 +20,7 @@
A json encoding of the Datasource Info
- identifierXPath
+ identifierPath
An xpath to retrieve the metadata idnentifier for the generation of DNet Identifier
@@ -33,6 +33,11 @@
timestamp
The timestamp of the collection date
+
+
+ workflowId
+ The identifier of the workflow
+
@@ -41,33 +46,41 @@
+
+
-
${jobTracker}
${nameNode}
lib/dhp-collector-worker-1.0.0.jar
- ${sequenceFilePath}
- ${apiDescription}
- ${nameNode}
+ -p${sequenceFilePath}
+ -a${apiDescription}
+ -n${nameNode}
+ -rh${rmq_host}
+ -ru${rmq_user}
+ -rp${rmq_pwd}
+ -rr${rmq_report}
+ -ro${rmq_ongoing}
+ -usandro.labruzzo
+ -w${workflowId}
-
+
-
+
${jobTracker}
${nameNode}
yarn
cluster
- MDBuilder
+ GenerateNativeStoreSparkJob
eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob
dhp-aggregations-1.0.0-SNAPSHOT.jar
--num-executors 50 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2"
@@ -77,9 +90,24 @@
--xpath${identifierPath}
--input${sequenceFilePath}
--output${mdStorePath}
+ -rh${rmq_host}
+ -ru${rmq_user}
+ -rp${rmq_pwd}
+ -rr${rmq_report}
+ -ro${rmq_ongoing}
+ -w${workflowId}
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/oozie/workflows/oozie_transform_workflows.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/oozie/workflows/oozie_transform_workflows.xml
new file mode 100644
index 000000000..4b1e3d84b
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/oozie/workflows/oozie_transform_workflows.xml
@@ -0,0 +1,76 @@
+
+
+
+ mdstoreInputPath
+ the path of the input MDStore
+
+
+
+ mdstoreOutputPath
+ the path of the cleaned mdstore
+
+
+
+ transformationRule
+ The transformation Rule to apply
+
+
+
+ timestamp
+ The timestamp of the collection date
+
+
+
+ workflowId
+ The identifier of the workflow
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ yarn
+ cluster
+ MDBuilder
+ eu.dnetlib.dhp.transformation.TransformSparkJobNode
+ dhp-aggregations-1.0.0-SNAPSHOT.jar
+ --num-executors 50 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2"
+ --dateOfCollection ${timestamp}
+ -mt yarn
+ --input${mdstoreInputPath}
+ --output${mdstoreOutputPath}
+ -w${workflowId}
+ -tr${transformationRule}
+ -ru${rmq_user}
+ -rp${rmq_pwd}
+ -rh${rmq_host}
+ -ro${rmq_ongoing}
+ -rr${rmq_report}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java
index 3bed60586..8b49b9df7 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java
@@ -5,6 +5,7 @@ import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import org.apache.commons.io.IOUtils;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
@@ -13,6 +14,7 @@ public class CollectionJobTest {
@Test
+ @Ignore
public void test () throws Exception {
Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix");
GenerateNativeStoreSparkJob.main(new String[] {"-e", "XML","-d", ""+System.currentTimeMillis(),"-p", new ObjectMapper().writeValueAsString(provenance), "-x","./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']","-i","/home/sandro/Downloads/oai_1","-o","/home/sandro/Downloads/mdstore_result"});
@@ -20,17 +22,6 @@ public class CollectionJobTest {
}
- @Test
- public void transformTest () throws Exception {
-
- TransformSparkJobNode.main(new String[]{"-o","/home/sandro/Downloads/mdstore_cleande","-i","/home/sandro/Downloads/mdstore_result"});
-
-
-
-
- }
-
-
@Test
public void testGenerationMetadataRecord() throws Exception {
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
new file mode 100644
index 000000000..547b53187
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
@@ -0,0 +1,101 @@
+package eu.dnetlib.dhp.transformation;
+
+import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
+import eu.dnetlib.dhp.utils.DHPUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.util.LongAccumulator;
+import org.dom4j.Document;
+import org.dom4j.Node;
+import org.dom4j.io.SAXReader;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+
+
+public class TransformationJobTest {
+
+
+ @Mock
+ LongAccumulator accumulator;
+
+ @Rule
+ public MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Test
+ public void transformTest() throws Exception {
+ final String mdstore_input = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstore").getFile();
+ Path tempDirWithPrefix = Files.createTempDirectory("mdstore_output");
+
+ final String mdstore_output = tempDirWithPrefix.toFile().getAbsolutePath()+"/version";
+
+ final String xslt = DHPUtils.compressString(IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml")));
+
+ System.out.println(xslt);
+ TransformSparkJobNode.main(new String[]{"-mt","local", "-i", mdstore_input, "-o", mdstore_output,"-d","1", "-w","1","-tr", xslt});
+
+ Files.walk(tempDirWithPrefix)
+ .sorted(Comparator.reverseOrder())
+ .map(Path::toFile)
+ .forEach(File::delete);
+ }
+
+
+ @Test
+ public void tryLoadFolderOnCP() throws Exception {
+ final String path = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstore").getFile();
+ System.out.println("path = " + path);
+
+ Path tempDirWithPrefix = Files.createTempDirectory("mdsotre_output");
+
+
+ System.out.println(tempDirWithPrefix.toFile().getAbsolutePath());
+
+ Files.deleteIfExists(tempDirWithPrefix);
+ }
+
+
+ @Test
+ public void testTransformFunction() throws Exception {
+
+ final String xmlTr = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"));
+
+ SAXReader reader = new SAXReader();
+ Document document = reader.read(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"));
+ Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']");
+ final String xslt = node.asXML();
+
+ TransformFunction tf = new TransformFunction(accumulator, accumulator, accumulator, xslt, 1);
+
+ MetadataRecord record = new MetadataRecord();
+ record.setBody(IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input.xml")));
+
+ final MetadataRecord result = tf.call(record);
+ Assert.assertNotNull(result.getBody());
+ }
+
+
+ @Test
+ public void extractTr() throws Exception {
+
+ final String xmlTr = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"));
+
+ SAXReader reader = new SAXReader();
+ Document document = reader.read(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"));
+ Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']");
+
+ System.out.println(node.asXML());
+
+
+
+ }
+
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/input.xml b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/input.xml
new file mode 100644
index 000000000..8a2297947
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/input.xml
@@ -0,0 +1,37 @@
+
+
+ oai:research.chalmers.se:243692
+ 2018-01-25T18:04:43Z
+ openaire
+
+
+
+ Incipient Berezinskii-Kosterlitz-Thouless transition in two-dimensional coplanar Josephson junctions
+ https://research.chalmers.se/en/publication/243692
+ 2016
+ Massarotti, D.
+ Jouault, B.
+ Rouco, V.
+ Charpentier, Sophie
+ Bauch, Thilo
+ Michon, A.
+ De Candia, A.
+ Lucignano, P.
+ Lombardi, Floriana
+ Tafuri, F.
+ Tagliacozzo, A.
+ Materials Chemistry
+ Geochemistry
+ Condensed Matter Physics
+ Superconducting hybrid junctions are revealing a variety of effects. Some of them are due to the special layout of these devices, which often use a coplanar configuration with relatively large barrier channels and the possibility of hosting Pearl vortices. A Josephson junction with a quasi-ideal two-dimensional barrier has been realized by growing graphene on SiC with Al electrodes. Chemical vapor deposition offers centimeter size monolayer areas where it is possible to realize a comparative analysis of different devices with nominally the same barrier. In samples with a graphene gap below 400 nm, we have found evidence of Josephson coherence in the presence of an incipient Berezinskii-Kosterlitz-Thouless transition. When the magnetic field is cycled, a remarkable hysteretic collapse and revival of the Josephson supercurrent occurs. Similar hysteresis are found in granular systems and are usually justified within the Bean critical state model (CSM). We show that the CSM, with appropriate account for the low-dimensional geometry, can partly explain the odd features measured in these junctions.
+ info:eu-repo/grantAgreement/EC/FP7/604391//Graphene-Based Revolutions in ICT And Beyond (Graphene Flagship)/
+ info:eu-repo/semantics/altIdentifier/doi/10.1103/PhysRevB.94.054525
+ info:eu-repo/semantics/article
+ Physical Review B vol.94(2016)
+ info:eu-repo/semantics/openAccess
+ eng
+ Researchers
+ application/pdf
+
+
+
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/mdstore/_SUCCESS b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/mdstore/_SUCCESS
new file mode 100644
index 000000000..e69de29bb
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/mdstore/part-00000-ccf4ed18-46a4-45d2-ab85-2878a9521dde-c000.snappy.parquet b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/mdstore/part-00000-ccf4ed18-46a4-45d2-ab85-2878a9521dde-c000.snappy.parquet
new file mode 100644
index 000000000..ef3537dcd
Binary files /dev/null and b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/mdstore/part-00000-ccf4ed18-46a4-45d2-ab85-2878a9521dde-c000.snappy.parquet differ
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/mdstore/part-00001-ccf4ed18-46a4-45d2-ab85-2878a9521dde-c000.snappy.parquet b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/mdstore/part-00001-ccf4ed18-46a4-45d2-ab85-2878a9521dde-c000.snappy.parquet
new file mode 100644
index 000000000..986ee6f67
Binary files /dev/null and b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/mdstore/part-00001-ccf4ed18-46a4-45d2-ab85-2878a9521dde-c000.snappy.parquet differ
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/tr.xml b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/tr.xml
new file mode 100644
index 000000000..9e9c23b78
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/tr.xml
@@ -0,0 +1,39 @@
+
+
+
+
+
+
+
+
+
+ SECURITY_PARAMETERS
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 11544cc60..46386caf9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -94,6 +94,13 @@
test
+
+ org.mockito
+ mockito-core
+ 2.7.22
+ test
+
+