forked from D-Net/dnet-hadoop
Implemented message manager, Fixed bug on collection worker, implemented Collecion and Transform spark job
This commit is contained in:
parent
58c4e1f725
commit
403c13eebf
|
@ -4,9 +4,9 @@
|
|||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<!-- Inherit defaults from Spring Boot -->
|
||||
<parent>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-applications</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<version>2.1.3.RELEASE</version>
|
||||
<relativePath>../</relativePath>
|
||||
</parent>
|
||||
|
||||
|
@ -18,7 +18,34 @@
|
|||
|
||||
|
||||
|
||||
|
||||
<repositories>
|
||||
|
||||
<repository>
|
||||
<id>dnet45-releases</id>
|
||||
<name>D-Net 45 releases</name>
|
||||
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases</url>
|
||||
<layout>default</layout>
|
||||
<snapshots>
|
||||
<enabled>false</enabled>
|
||||
</snapshots>
|
||||
<releases>
|
||||
<enabled>true</enabled>
|
||||
</releases>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>dnet45-bootstrap-release</id>
|
||||
<name>dnet45 bootstrap release</name>
|
||||
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-bootstrap-release</url>
|
||||
<layout>default</layout>
|
||||
<snapshots>
|
||||
<enabled>false</enabled>
|
||||
</snapshots>
|
||||
<releases>
|
||||
<enabled>true</enabled>
|
||||
</releases>
|
||||
</repository>
|
||||
|
||||
<repository>
|
||||
<id>cloudera</id>
|
||||
<name>Cloudera Repository</name>
|
||||
|
@ -32,12 +59,89 @@
|
|||
</repository>
|
||||
</repositories>
|
||||
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<configuration>
|
||||
<executable>true</executable>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.6.0</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
<encoding>${project.build.sourceEncoding}</encoding>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>3.0.2</version>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-source-plugin</artifactId>
|
||||
<version>3.0.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>attach-sources</id>
|
||||
<phase>verify</phase>
|
||||
<goals>
|
||||
<goal>jar-no-fork</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>2.19.1</version>
|
||||
<configuration>
|
||||
<redirectTestOutputToFile>true</redirectTestOutputToFile>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
<version>2.10.4</version>
|
||||
<configuration>
|
||||
<detectLinks>true</detectLinks>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<version>3.0.0</version>
|
||||
</plugin>
|
||||
<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself. -->
|
||||
</plugins>
|
||||
|
||||
</build>
|
||||
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-common</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>commons-cli</groupId>
|
||||
<artifactId>commons-cli</artifactId>
|
||||
<version>1.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
|
@ -67,13 +171,6 @@
|
|||
<groupId>jaxen</groupId>
|
||||
<artifactId>jaxen</artifactId>
|
||||
</dependency>
|
||||
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
|
||||
<dependency>
|
||||
<groupId>com.rabbitmq</groupId>
|
||||
<artifactId>amqp-client</artifactId>
|
||||
<version>5.6.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
|
|
|
@ -4,6 +4,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import eu.dnetlib.collector.worker.model.ApiDescriptor;
|
||||
import eu.dnetlib.collector.worker.plugins.CollectorPlugin;
|
||||
import eu.dnetlib.collector.worker.utils.CollectorPluginEnumerator;
|
||||
import eu.dnetlib.message.Message;
|
||||
import eu.dnetlib.message.MessageManager;
|
||||
import eu.dnetlib.message.MessageType;
|
||||
import org.apache.commons.cli.*;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -19,6 +23,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
||||
|
@ -55,20 +61,114 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
|
|||
*/
|
||||
@Override
|
||||
public void run(final String... args) throws Exception {
|
||||
if (args.length == 0) { return; }
|
||||
if (args.length != 3) { throw new DnetCollectorException("Invalid number of parameters, expected: hdfs_path and json_api_description"); }
|
||||
//TODO : migrate to https://commons.apache.org/proper/commons-cli/usage.html
|
||||
Options options = new Options();
|
||||
options.addOption(Option.builder("p")
|
||||
.longOpt("hdfsPath")
|
||||
.required(true)
|
||||
.desc("the path where storing the sequential file")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
options.addOption(Option.builder("a")
|
||||
.longOpt("apidescriptor")
|
||||
.required(true)
|
||||
.desc("the Json enconding of the API Descriptor")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
final String hdfsPath = args[0];
|
||||
options.addOption(Option.builder("n")
|
||||
.longOpt("namenode")
|
||||
.required(true)
|
||||
.desc("the Name Node URI")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("u")
|
||||
.longOpt("userHDFS")
|
||||
.required(true)
|
||||
.desc("the user wich create the hdfs seq file")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("ru")
|
||||
.longOpt("rabbitUser")
|
||||
.required(true)
|
||||
.desc("the user to connect with RabbitMq for messaging")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("rp")
|
||||
.longOpt("rabbitPassWord")
|
||||
.required(true)
|
||||
.desc("the password to connect with RabbitMq for messaging")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("rh")
|
||||
.longOpt("rabbitHost")
|
||||
.required(true)
|
||||
.desc("the host of the RabbitMq server")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("ro")
|
||||
.longOpt("rabbitOngoingQueue")
|
||||
.required(true)
|
||||
.desc("the name of the ongoing queue")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("rr")
|
||||
.longOpt("rabbitReportQueue")
|
||||
.required(true)
|
||||
.desc("the name of the report queue")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
|
||||
options.addOption(Option.builder("w")
|
||||
.longOpt("workflowId")
|
||||
.required(true)
|
||||
.desc("the identifier of the dnet Workflow")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
CommandLineParser parser = new DefaultParser();
|
||||
String hdfsPath ;
|
||||
String json;
|
||||
String nameNode;
|
||||
String user;
|
||||
String rabbitUser;
|
||||
String rabbitPassword;
|
||||
String rabbitHost;
|
||||
String rabbitOngoingQueue;
|
||||
String rabbitReportQueue;
|
||||
String workflowId;
|
||||
|
||||
try {
|
||||
CommandLine cmd = parser.parse(options, args);
|
||||
hdfsPath = cmd.getOptionValue("p");
|
||||
json = cmd.getOptionValue("a");
|
||||
nameNode = cmd.getOptionValue("n");
|
||||
user = cmd.getOptionValue("u");
|
||||
rabbitUser = cmd.getOptionValue("ru");
|
||||
rabbitPassword = cmd.getOptionValue("rp");
|
||||
rabbitHost = cmd.getOptionValue("rh");
|
||||
rabbitOngoingQueue = cmd.getOptionValue("ro");
|
||||
rabbitReportQueue = cmd.getOptionValue("rr");
|
||||
workflowId = cmd.getOptionValue("w");
|
||||
} catch (ParseException e) {
|
||||
System.out.println("Error on executing collector worker, missing parameter:");
|
||||
e.printStackTrace();
|
||||
HelpFormatter formatter = new HelpFormatter();
|
||||
formatter.printHelp("dhp-collector-worker", options);
|
||||
return;
|
||||
}
|
||||
log.info("hdfsPath ="+hdfsPath);
|
||||
|
||||
final String json = args[1];
|
||||
|
||||
|
||||
final String nameNode = args[2];
|
||||
|
||||
log.info("json = "+json);
|
||||
|
||||
final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false, null);
|
||||
|
||||
|
||||
final ObjectMapper jsonMapper = new ObjectMapper();
|
||||
final ApiDescriptor api = jsonMapper.readValue(json, ApiDescriptor.class);
|
||||
|
||||
|
@ -84,7 +184,7 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
|
|||
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
||||
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
||||
|
||||
System.setProperty("HADOOP_USER_NAME", "sandro.labruzzo");
|
||||
System.setProperty("HADOOP_USER_NAME", user);
|
||||
System.setProperty("hadoop.home.dir", "/");
|
||||
//Get the filesystem - HDFS
|
||||
FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf);
|
||||
|
@ -100,10 +200,22 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
|
|||
final IntWritable key = new IntWritable(counter.get());
|
||||
final Text value = new Text();
|
||||
|
||||
final Map<String, String> ongoingMap = new HashMap<>();
|
||||
final Map<String, String> reportMap = new HashMap<>();
|
||||
|
||||
plugin.collect(api).forEach(content -> {
|
||||
|
||||
key.set(counter.getAndIncrement());
|
||||
value.set(content);
|
||||
key.set(counter.getAndIncrement());
|
||||
value.set(content);
|
||||
if (counter.get() % 10 ==0) {
|
||||
try {
|
||||
ongoingMap.put("ongoing", ""+counter.get());
|
||||
manager.sendMessage(new Message(workflowId,"Collection", MessageType.ONGOING, ongoingMap ), rabbitOngoingQueue, true, false);
|
||||
} catch (Exception e) {
|
||||
log.error("Error on sending message ", e);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
writer.append(key, value);
|
||||
} catch (IOException e) {
|
||||
|
@ -111,6 +223,11 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
|
|||
}
|
||||
|
||||
});
|
||||
ongoingMap.put("ongoing", ""+counter.get());
|
||||
manager.sendMessage(new Message(workflowId,"Collection", MessageType.ONGOING, ongoingMap ), rabbitOngoingQueue, true, false);
|
||||
reportMap.put("collected", ""+counter.get());
|
||||
manager.sendMessage(new Message(workflowId,"Collection", MessageType.REPORT, reportMap ), rabbitReportQueue, true, false);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1 +1,2 @@
|
|||
|
||||
spring.main.banner-mode=off
|
||||
logging.level.root=OFF
|
|
@ -8,9 +8,9 @@
|
|||
|
||||
<!-- Inherit defaults from Spring Boot -->
|
||||
<parent>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-applications</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<version>2.1.3.RELEASE</version>
|
||||
<relativePath>../</relativePath>
|
||||
</parent>
|
||||
|
||||
|
@ -28,6 +28,7 @@
|
|||
|
||||
|
||||
|
||||
|
||||
<!-- Add typical dependencies for a web application -->
|
||||
<dependencies>
|
||||
<dependency>
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
|
@ -14,37 +15,17 @@
|
|||
<module>dhp-collector-worker</module>
|
||||
</modules>
|
||||
|
||||
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<!-- Import dependency management from Spring Boot -->
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-dependencies</artifactId>
|
||||
<version>2.1.3.RELEASE</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-common</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-common</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<configuration>
|
||||
<executable>true</executable>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -4,7 +4,6 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
|
||||
|
||||
//TODO sholud be moved on dhp-common
|
||||
public class ApiDescriptor {
|
||||
|
||||
private String id;
|
||||
|
|
|
@ -1,9 +1,16 @@
|
|||
package eu.dnetlib.dhp.utils;
|
||||
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.commons.codec.binary.Base64OutputStream;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
|
||||
public class DHPUtils {
|
||||
|
||||
|
@ -22,4 +29,31 @@ public class DHPUtils {
|
|||
return String.format("%s::%s",nsPrefix, DHPUtils.md5(originalId));
|
||||
}
|
||||
|
||||
public static String compressString(final String input ) {
|
||||
try ( ByteArrayOutputStream out = new ByteArrayOutputStream(); Base64OutputStream b64os = new Base64OutputStream(out)) {
|
||||
GZIPOutputStream gzip = new GZIPOutputStream(b64os);
|
||||
gzip.write(input.getBytes(StandardCharsets.UTF_8));
|
||||
gzip.close();
|
||||
return out.toString();
|
||||
} catch (Throwable e ) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static String decompressString(final String input) {
|
||||
byte[] byteArray = Base64.decodeBase64(input.getBytes());
|
||||
int len;
|
||||
try (GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream((byteArray))); ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length)) {
|
||||
byte[] buffer = new byte[1024];
|
||||
while((len = gis.read(buffer)) != -1){
|
||||
bos.write(buffer, 0, len);
|
||||
}
|
||||
return bos.toString();
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -39,6 +39,8 @@ public class MessageConsumer extends DefaultConsumer {
|
|||
else {
|
||||
//TODO LOGGING EXCEPTION
|
||||
}
|
||||
} finally {
|
||||
getChannel().basicAck(envelope.getDeliveryTag(), false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,9 +5,11 @@ import com.rabbitmq.client.Connection;
|
|||
import com.rabbitmq.client.ConnectionFactory;
|
||||
import sun.rmi.runtime.Log;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class MessageManager {
|
||||
|
||||
|
@ -41,42 +43,45 @@ public class MessageManager {
|
|||
this.durable = durable;
|
||||
this.autodelete = autodelete;
|
||||
}
|
||||
private Channel createChannel(final String queueName, final boolean durable, final boolean autodelete ) throws Exception {
|
||||
|
||||
private Connection createConnection() throws IOException, TimeoutException {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost(this.messageHost);
|
||||
factory.setUsername(this.username);
|
||||
factory.setPassword(this.password);
|
||||
Connection connection = factory.newConnection();
|
||||
Map<String, Object> args = new HashMap<String, Object>();
|
||||
return factory.newConnection();
|
||||
}
|
||||
|
||||
private Channel createChannel(final Connection connection, final String queueName, final boolean durable, final boolean autodelete ) throws Exception {
|
||||
Map<String, Object> args = new HashMap<>();
|
||||
args.put("x-message-ttl", 10000);
|
||||
Channel channel = connection.createChannel();
|
||||
channel.queueDeclare(queueName, durable, false, this.autodelete, args);
|
||||
return channel;
|
||||
}
|
||||
public boolean sendMessage(final Message message, String queueName) throws Exception {
|
||||
try (Channel channel = createChannel(queueName, this.durable, this.autodelete)) {
|
||||
try (Connection connection = createConnection(); Channel channel = createChannel(connection, queueName, this.durable, this.autodelete)) {
|
||||
|
||||
channel.basicPublish("", queueName,null, message.toString().getBytes());
|
||||
return true;
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean sendMessage(final Message message, String queueName, boolean durable_var, boolean autodelete_var) throws Exception {
|
||||
try (Channel channel = createChannel(queueName, durable_var, autodelete_var)) {
|
||||
try (Connection connection = createConnection(); Channel channel = createChannel(connection, queueName, durable_var, autodelete_var)) {
|
||||
|
||||
channel.basicPublish("", queueName,null, message.toString().getBytes());
|
||||
return true;
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void startConsumingMessage(final String queueName, final boolean durable, final boolean autodelete) throws Exception{
|
||||
Channel channel = createChannel(queueName, durable, autodelete);
|
||||
|
||||
Channel channel = createChannel(createConnection(), queueName, durable, autodelete);
|
||||
channel.basicConsume(queueName, false, new MessageConsumer(channel,queueMessages));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,33 +51,5 @@ public class MessageTest {
|
|||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void sendMessageTest() throws Exception {
|
||||
|
||||
final String expectedJson= "{\"workflowId\":\"wId\",\"jobName\":\"Collection\",\"type\":\"ONGOING\",\"body\":{\"ExecutionTime\":\"30s\",\"parsedItem\":\"300\"}}";
|
||||
Message m = new Message();
|
||||
m.setWorkflowId("wf_20190405_105048_275");
|
||||
m.setType(MessageType.ONGOING);
|
||||
m.setJobName("Collection");
|
||||
Map<String,String> body= new HashMap<>();
|
||||
body.put("progressCount", "100");
|
||||
body.put("ExecutionTime", "30s");
|
||||
|
||||
m.setBody(body);
|
||||
|
||||
MessageManager mm = new MessageManager("broker1-dev-dnet.d4science.org","r_admin", "9g8fed7gpohef9y84th98h", false,false, null);
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
mm.sendMessage(m, "dev_ongoing");
|
||||
|
||||
m.setType(MessageType.REPORT);
|
||||
|
||||
body.put("mdStoreSize", "368");
|
||||
|
||||
|
||||
mm.sendMessage(m, "dev_report", true, false);
|
||||
}
|
||||
}
|
|
@ -20,12 +20,15 @@
|
|||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_2.11</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-common</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>dom4j</groupId>
|
||||
<artifactId>dom4j</artifactId>
|
||||
|
@ -37,7 +40,12 @@
|
|||
<artifactId>jaxen</artifactId>
|
||||
<version>1.1.6</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<version>2.25.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
|
|
@ -3,6 +3,9 @@ package eu.dnetlib.dhp.collection;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
||||
import eu.dnetlib.dhp.model.mdstore.Provenance;
|
||||
import eu.dnetlib.message.Message;
|
||||
import eu.dnetlib.message.MessageManager;
|
||||
import eu.dnetlib.message.MessageType;
|
||||
import org.apache.commons.cli.*;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
|
@ -21,6 +24,8 @@ import org.dom4j.io.SAXReader;
|
|||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
public class GenerateNativeStoreSparkJob {
|
||||
|
@ -52,6 +57,67 @@ public class GenerateNativeStoreSparkJob {
|
|||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
Options options = generateApplicationArguments();
|
||||
|
||||
|
||||
CommandLineParser parser = new DefaultParser();
|
||||
CommandLine cmd = parser.parse( options, args);
|
||||
|
||||
final String encoding = cmd.getOptionValue("e");
|
||||
final long dateOfCollection = new Long(cmd.getOptionValue("d"));
|
||||
final String jsonProvenance = cmd.getOptionValue("p");
|
||||
final ObjectMapper jsonMapper = new ObjectMapper();
|
||||
final Provenance provenance = jsonMapper.readValue(jsonProvenance, Provenance.class);
|
||||
final String xpath = cmd.getOptionValue("x");
|
||||
final String inputPath = cmd.getOptionValue("i");
|
||||
final String outputPath = cmd.getOptionValue("o");
|
||||
final String rabbitUser = cmd.getOptionValue("ru");
|
||||
final String rabbitPassword = cmd.getOptionValue("rp");
|
||||
final String rabbitHost = cmd.getOptionValue("rh");
|
||||
final String rabbitOngoingQueue = cmd.getOptionValue("ro");
|
||||
final String rabbitReportQueue = cmd.getOptionValue("rr");
|
||||
final String workflowId = cmd.getOptionValue("w");
|
||||
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName("GenerateNativeStoreSparkJob")
|
||||
.master("yarn")
|
||||
.getOrCreate();
|
||||
|
||||
final Map<String, String> ongoingMap = new HashMap<>();
|
||||
final Map<String, String> reportMap = new HashMap<>();
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
final JavaPairRDD<IntWritable, Text> inputRDD = sc.sequenceFile(inputPath, IntWritable.class, Text.class);
|
||||
|
||||
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
|
||||
|
||||
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
|
||||
|
||||
final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false, null);
|
||||
|
||||
final JavaRDD<MetadataRecord> mappeRDD = inputRDD.map(item -> parseRecord(item._2().toString(), xpath, encoding, provenance, dateOfCollection, totalItems, invalidRecords))
|
||||
.filter(Objects::nonNull).distinct();
|
||||
|
||||
ongoingMap.put("ongoing", "0");
|
||||
manager.sendMessage(new Message(workflowId,"DataFrameCreation", MessageType.ONGOING, ongoingMap ), rabbitOngoingQueue, true, false);
|
||||
|
||||
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
|
||||
final Dataset<MetadataRecord> mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
|
||||
final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
|
||||
mdStoreRecords.add(mdstore.count());
|
||||
ongoingMap.put("ongoing", ""+ totalItems.value());
|
||||
manager.sendMessage(new Message(workflowId,"DataFrameCreation", MessageType.ONGOING, ongoingMap ), rabbitOngoingQueue, true, false);
|
||||
|
||||
mdstore.write().format("parquet").save(outputPath);
|
||||
reportMap.put("inputItem" , ""+ totalItems.value());
|
||||
reportMap.put("invalidRecords", "" + invalidRecords.value());
|
||||
reportMap.put("mdStoreSize", "" + mdStoreRecords.value());
|
||||
manager.sendMessage(new Message(workflowId,"Collection", MessageType.REPORT, reportMap ), rabbitReportQueue, true, false);
|
||||
}
|
||||
|
||||
private static Options generateApplicationArguments() {
|
||||
Options options = new Options();
|
||||
options.addOption(Option.builder("e")
|
||||
.longOpt("encoding")
|
||||
|
@ -93,43 +159,48 @@ public class GenerateNativeStoreSparkJob {
|
|||
.hasArg()
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("ru")
|
||||
.longOpt("rabbitUser")
|
||||
.required(true)
|
||||
.desc("the user to connect with RabbitMq for messaging")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
CommandLineParser parser = new DefaultParser();
|
||||
CommandLine cmd = parser.parse( options, args);
|
||||
options.addOption(Option.builder("rp")
|
||||
.longOpt("rabbitPassWord")
|
||||
.required(true)
|
||||
.desc("the password to connect with RabbitMq for messaging")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
final String encoding = cmd.getOptionValue("e");
|
||||
final long dateOfCollection = new Long(cmd.getOptionValue("d"));
|
||||
final String jsonProvenance = cmd.getOptionValue("p");
|
||||
final ObjectMapper jsonMapper = new ObjectMapper();
|
||||
final Provenance provenance = jsonMapper.readValue(jsonProvenance, Provenance.class);
|
||||
final String xpath = cmd.getOptionValue("x");
|
||||
final String inputPath = cmd.getOptionValue("i");
|
||||
final String outputPath = cmd.getOptionValue("o");
|
||||
options.addOption(Option.builder("rh")
|
||||
.longOpt("rabbitHost")
|
||||
.required(true)
|
||||
.desc("the host of the RabbitMq server")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName("GenerateNativeStoreSparkJob")
|
||||
.master("local")
|
||||
.getOrCreate();
|
||||
options.addOption(Option.builder("ro")
|
||||
.longOpt("rabbitOngoingQueue")
|
||||
.required(true)
|
||||
.desc("the name of the ongoing queue")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
options.addOption(Option.builder("rr")
|
||||
.longOpt("rabbitReportQueue")
|
||||
.required(true)
|
||||
.desc("the name of the report queue")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
final JavaPairRDD<IntWritable, Text> inputRDD = sc.sequenceFile(inputPath, IntWritable.class, Text.class);
|
||||
|
||||
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
|
||||
|
||||
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
|
||||
|
||||
final JavaRDD<MetadataRecord> mappeRDD = inputRDD.map(item -> parseRecord(item._2().toString(), xpath, encoding, provenance, dateOfCollection, totalItems, invalidRecords))
|
||||
.filter(Objects::nonNull).distinct();
|
||||
|
||||
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
|
||||
final Dataset<MetadataRecord> mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
|
||||
final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
|
||||
mdStoreRecords.add(mdstore.count());
|
||||
System.out.println("totalItems.value() = " + totalItems.value());
|
||||
System.out.println("invalidRecords = " + invalidRecords.value());
|
||||
System.out.println("mdstoreRecords.value() = " + mdStoreRecords.value());
|
||||
mdstore.write().format("parquet").save(outputPath);
|
||||
options.addOption(Option.builder("w")
|
||||
.longOpt("workflowId")
|
||||
.required(true)
|
||||
.desc("the identifier of the dnet Workflow")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
return options;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,51 +0,0 @@
|
|||
package eu.dnetlib.dhp.collection;
|
||||
|
||||
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
||||
import eu.dnetlib.dhp.transformation.TransformFunction;
|
||||
import org.apache.commons.cli.*;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoder;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
|
||||
public class TransformSparkJobNode {
|
||||
|
||||
public static void main(String[] args) throws ParseException {
|
||||
Options options = new Options();
|
||||
|
||||
options.addOption(Option.builder("i")
|
||||
.longOpt("input")
|
||||
.required(true)
|
||||
.desc("input path of the sequence file")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
options.addOption(Option.builder("o")
|
||||
.longOpt("output")
|
||||
.required(true)
|
||||
.desc("output path of the mdstore")
|
||||
.hasArg()
|
||||
.build());
|
||||
|
||||
final CommandLineParser parser = new DefaultParser();
|
||||
final CommandLine cmd = parser.parse( options, args);
|
||||
|
||||
final String inputPath = cmd.getOptionValue("i");
|
||||
final String outputPath = cmd.getOptionValue("o");
|
||||
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName("GenerateNativeStoreSparkJob")
|
||||
.master("local")
|
||||
.getOrCreate();
|
||||
|
||||
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
|
||||
final Dataset<MetadataRecord> mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
|
||||
final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
|
||||
|
||||
final TransformFunction mfunc = new TransformFunction(totalItems);
|
||||
mdstoreInput.map(mfunc, encoder).write().format("parquet").save(outputPath);
|
||||
System.out.println("totalItems = " + totalItems.value());
|
||||
|
||||
}
|
||||
}
|
|
@ -4,18 +4,52 @@ import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
|||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
|
||||
import javax.xml.transform.OutputKeys;
|
||||
import javax.xml.transform.Transformer;
|
||||
import javax.xml.transform.TransformerFactory;
|
||||
import javax.xml.transform.stream.StreamResult;
|
||||
import javax.xml.transform.stream.StreamSource;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.StringWriter;
|
||||
|
||||
public class TransformFunction implements MapFunction<MetadataRecord, MetadataRecord> {
|
||||
|
||||
|
||||
private final LongAccumulator totalItems;
|
||||
private final LongAccumulator errorItems;
|
||||
private final LongAccumulator transformedItems;
|
||||
private final String trasformationRule;
|
||||
|
||||
public TransformFunction(LongAccumulator totalItems) {
|
||||
private final long dateOfTransformation;
|
||||
|
||||
|
||||
public TransformFunction(LongAccumulator totalItems, LongAccumulator errorItems, LongAccumulator transformedItems, final String trasformationRule, long dateOfTransformation) {
|
||||
this.totalItems= totalItems;
|
||||
this.errorItems = errorItems;
|
||||
this.transformedItems = transformedItems;
|
||||
this.trasformationRule = trasformationRule;
|
||||
this.dateOfTransformation = dateOfTransformation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetadataRecord call(MetadataRecord value) throws Exception {
|
||||
public MetadataRecord call(MetadataRecord value) {
|
||||
totalItems.add(1);
|
||||
return value;
|
||||
try {
|
||||
final TransformerFactory factory = TransformerFactory.newInstance();
|
||||
factory.newTransformer();
|
||||
final StreamSource xsltSource = new StreamSource(new ByteArrayInputStream(trasformationRule.getBytes()));
|
||||
final Transformer transformer = factory.newTransformer(xsltSource);
|
||||
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
|
||||
final StringWriter output = new StringWriter();
|
||||
transformer.transform(new StreamSource(new ByteArrayInputStream(value.getBody().getBytes())), new StreamResult(output));
|
||||
final String xml = output.toString();
|
||||
value.setBody(xml);
|
||||
value.setDateOfCollection(dateOfTransformation);
|
||||
transformedItems.add(1);
|
||||
return value;
|
||||
}catch (Throwable e) {
|
||||
errorItems.add(1);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,160 @@
|
|||
package eu.dnetlib.dhp.transformation;
|
||||
|
||||
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
import eu.dnetlib.message.Message;
|
||||
import eu.dnetlib.message.MessageManager;
|
||||
import eu.dnetlib.message.MessageType;
|
||||
import org.apache.commons.cli.*;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoder;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.dom4j.Node;
|
||||
import org.dom4j.io.SAXReader;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class TransformSparkJobNode {
|
||||
|
||||
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
System.out.println(args[i]);
|
||||
}
|
||||
|
||||
Options options = new Options();
|
||||
|
||||
options.addOption(Option.builder("mt")
|
||||
.longOpt("master")
|
||||
.required(true)
|
||||
.desc("should be local or yarn")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("d")
|
||||
.longOpt("dateOfCollection")
|
||||
.required(true)
|
||||
.desc("the date of collection")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("i")
|
||||
.longOpt("input")
|
||||
.required(true)
|
||||
.desc("input path of the sequence file")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
options.addOption(Option.builder("o")
|
||||
.longOpt("output")
|
||||
.required(true)
|
||||
.desc("output path of the mdstore")
|
||||
.hasArg()
|
||||
.build());
|
||||
options.addOption(Option.builder("w")
|
||||
.longOpt("workflowId")
|
||||
.required(true)
|
||||
.desc("the identifier of the dnet Workflow")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("tr")
|
||||
.longOpt("transformationRule")
|
||||
.required(true)
|
||||
.desc("the transformation Rule to apply to the input MDStore")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("ru")
|
||||
.longOpt("rabbitUser")
|
||||
.required(false)
|
||||
.desc("the user to connect with RabbitMq for messaging")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("rp")
|
||||
.longOpt("rabbitPassWord")
|
||||
.required(false)
|
||||
.desc("the password to connect with RabbitMq for messaging")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("rh")
|
||||
.longOpt("rabbitHost")
|
||||
.required(false)
|
||||
.desc("the host of the RabbitMq server")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("ro")
|
||||
.longOpt("rabbitOngoingQueue")
|
||||
.required(false)
|
||||
.desc("the name of the ongoing queue")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
options.addOption(Option.builder("rr")
|
||||
.longOpt("rabbitReportQueue")
|
||||
.required(false)
|
||||
.desc("the name of the report queue")
|
||||
.hasArg() // This option has an argument.
|
||||
.build());
|
||||
|
||||
|
||||
final CommandLineParser parser = new DefaultParser();
|
||||
final CommandLine cmd = parser.parse( options, args);
|
||||
|
||||
final String inputPath = cmd.getOptionValue("i");
|
||||
final String outputPath = cmd.getOptionValue("o");
|
||||
final String workflowId = cmd.getOptionValue("w");
|
||||
final String trasformationRule = extractXSLTFromTR(DHPUtils.decompressString(cmd.getOptionValue("tr")));
|
||||
final String master = cmd.getOptionValue("mt");
|
||||
final String rabbitUser = cmd.getOptionValue("ru");
|
||||
final String rabbitPassword = cmd.getOptionValue("rp");
|
||||
final String rabbitHost = cmd.getOptionValue("rh");
|
||||
final String rabbitReportQueue = cmd.getOptionValue("rr");
|
||||
final long dateOfCollection = new Long(cmd.getOptionValue("d"));
|
||||
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName("TransformStoreSparkJob")
|
||||
.master(master)
|
||||
.getOrCreate();
|
||||
|
||||
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
|
||||
final Dataset<MetadataRecord> mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
|
||||
|
||||
final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
|
||||
final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems");
|
||||
final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems");
|
||||
|
||||
final TransformFunction transformFunction = new TransformFunction(totalItems, errorItems, transformedItems, trasformationRule, dateOfCollection) ;
|
||||
mdstoreInput.map(transformFunction, encoder).write().format("parquet").save(outputPath);
|
||||
|
||||
|
||||
if (rabbitHost != null) {
|
||||
final Map<String, String> reportMap = new HashMap<>();
|
||||
reportMap.put("inputItem" , ""+ totalItems.value());
|
||||
reportMap.put("invalidRecords", "" + errorItems.value());
|
||||
reportMap.put("mdStoreSize", "" + transformedItems.value());
|
||||
final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false, null);
|
||||
manager.sendMessage(new Message(workflowId, "Transform", MessageType.REPORT, reportMap), rabbitReportQueue, true, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static String extractXSLTFromTR(final String tr) throws DocumentException {
|
||||
SAXReader reader = new SAXReader();
|
||||
Document document = reader.read(new ByteArrayInputStream(tr.getBytes()));
|
||||
Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']");
|
||||
return node.asXML();
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
<workflow-app name="Oozie_Java_Wf" xmlns="uri:oozie:workflow:0.5">
|
||||
<workflow-app name="CollectionWorkflow" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>sequenceFilePath</name>
|
||||
|
@ -20,7 +20,7 @@
|
|||
<description>A json encoding of the Datasource Info</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>identifierXPath</name>
|
||||
<name>identifierPath</name>
|
||||
<description>An xpath to retrieve the metadata idnentifier for the generation of DNet Identifier </description>
|
||||
</property>
|
||||
|
||||
|
@ -33,6 +33,11 @@
|
|||
<name>timestamp</name>
|
||||
<description>The timestamp of the collection date</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>workflowId</name>
|
||||
<description>The identifier of the workflow</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="DeleteMDStoresNative"/>
|
||||
|
@ -41,33 +46,41 @@
|
|||
</kill>
|
||||
<action name="DeleteMDStoresNative">
|
||||
<fs>
|
||||
<mkdir path='${sequenceFilePath}'/>
|
||||
<mkdir path='${mdStorePath}'/>
|
||||
<delete path='${sequenceFilePath}'/>
|
||||
<delete path='${mdStorePath}'/>
|
||||
</fs>
|
||||
<ok to="CollectionWorker"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="CollectionWorker">
|
||||
<shell xmlns="uri:oozie:shell-action:0.1">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<exec>lib/dhp-collector-worker-1.0.0.jar</exec>
|
||||
<argument>${sequenceFilePath}</argument>
|
||||
<argument>${apiDescription}</argument>
|
||||
<argument>${nameNode}</argument>
|
||||
<argument>-p</argument><argument>${sequenceFilePath}</argument>
|
||||
<argument>-a</argument><argument>${apiDescription}</argument>
|
||||
<argument>-n</argument><argument>${nameNode}</argument>
|
||||
<argument>-rh</argument><argument>${rmq_host}</argument>
|
||||
<argument>-ru</argument><argument>${rmq_user}</argument>
|
||||
<argument>-rp</argument><argument>${rmq_pwd}</argument>
|
||||
<argument>-rr</argument><argument>${rmq_report}</argument>
|
||||
<argument>-ro</argument><argument>${rmq_ongoing}</argument>
|
||||
<argument>-u</argument><argument>sandro.labruzzo</argument>
|
||||
<argument>-w</argument><argument>${workflowId}</argument>
|
||||
<capture-output/>
|
||||
</shell>
|
||||
<ok to="mdBuilder"/>
|
||||
<ok to="GenerateNativeStoreSparkJob"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="mdBuilder">
|
||||
<action name="GenerateNativeStoreSparkJob">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>MDBuilder</name>
|
||||
<name>GenerateNativeStoreSparkJob</name>
|
||||
<class>eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob</class>
|
||||
<jar>dhp-aggregations-1.0.0-SNAPSHOT.jar</jar>
|
||||
<spark-opts>--num-executors 50 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2"</spark-opts>
|
||||
|
@ -77,9 +90,24 @@
|
|||
<arg>--xpath</arg><arg>${identifierPath}</arg>
|
||||
<arg>--input</arg><arg>${sequenceFilePath}</arg>
|
||||
<arg>--output</arg><arg>${mdStorePath}</arg>
|
||||
<arg>-rh</arg><arg>${rmq_host}</arg>
|
||||
<arg>-ru</arg><arg>${rmq_user}</arg>
|
||||
<arg>-rp</arg><arg>${rmq_pwd}</arg>
|
||||
<arg>-rr</arg><arg>${rmq_report}</arg>
|
||||
<arg>-ro</arg><arg>${rmq_ongoing}</arg>
|
||||
<arg>-w</arg><arg>${workflowId}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="DropInvalidStore"/>
|
||||
</action>
|
||||
|
||||
<action name="DropInvalidStore">
|
||||
<fs>
|
||||
<delete path='${mdStorePath}/../'/>
|
||||
</fs>
|
||||
<ok to="Kill"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -0,0 +1,76 @@
|
|||
<workflow-app name="Transformation_Workflow" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>mdstoreInputPath</name>
|
||||
<description>the path of the input MDStore</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mdstoreOutputPath</name>
|
||||
<description>the path of the cleaned mdstore</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>transformationRule</name>
|
||||
<description>The transformation Rule to apply</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>timestamp</name>
|
||||
<description>The timestamp of the collection date</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>workflowId</name>
|
||||
<description>The identifier of the workflow</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="DeletePathIfExists"/>
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
<action name="DeletePathIfExists">
|
||||
<fs>
|
||||
<mkdir path='${mdstoreOutputPath}'/>
|
||||
<delete path='${mdstoreOutputPath}'/>
|
||||
</fs>
|
||||
<ok to="TransformJob"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="TransformJob">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>MDBuilder</name>
|
||||
<class>eu.dnetlib.dhp.transformation.TransformSparkJobNode</class>
|
||||
<jar>dhp-aggregations-1.0.0-SNAPSHOT.jar</jar>
|
||||
<spark-opts>--num-executors 50 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2"</spark-opts>
|
||||
<arg>--dateOfCollection</arg> <arg>${timestamp}</arg>
|
||||
<arg>-mt</arg> <arg>yarn</arg>
|
||||
<arg>--input</arg><arg>${mdstoreInputPath}</arg>
|
||||
<arg>--output</arg><arg>${mdstoreOutputPath}</arg>
|
||||
<arg>-w</arg><arg>${workflowId}</arg>
|
||||
<arg>-tr</arg><arg>${transformationRule}</arg>
|
||||
<arg>-ru</arg><arg>${rmq_user}</arg>
|
||||
<arg>-rp</arg><arg>${rmq_pwd}</arg>
|
||||
<arg>-rh</arg><arg>${rmq_host}</arg>
|
||||
<arg>-ro</arg><arg>${rmq_ongoing}</arg>
|
||||
<arg>-rr</arg><arg>${rmq_report}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="DropInvalidStore"/>
|
||||
</action>
|
||||
|
||||
<action name="DropInvalidStore">
|
||||
<fs>
|
||||
<delete path='${mdstoreOutputPath}/../'/>
|
||||
</fs>
|
||||
<ok to="Kill"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -5,6 +5,7 @@ import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
|||
import eu.dnetlib.dhp.model.mdstore.Provenance;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -13,6 +14,7 @@ public class CollectionJobTest {
|
|||
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void test () throws Exception {
|
||||
Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix");
|
||||
GenerateNativeStoreSparkJob.main(new String[] {"-e", "XML","-d", ""+System.currentTimeMillis(),"-p", new ObjectMapper().writeValueAsString(provenance), "-x","./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']","-i","/home/sandro/Downloads/oai_1","-o","/home/sandro/Downloads/mdstore_result"});
|
||||
|
@ -20,17 +22,6 @@ public class CollectionJobTest {
|
|||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void transformTest () throws Exception {
|
||||
|
||||
TransformSparkJobNode.main(new String[]{"-o","/home/sandro/Downloads/mdstore_cleande","-i","/home/sandro/Downloads/mdstore_result"});
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testGenerationMetadataRecord() throws Exception {
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
package eu.dnetlib.dhp.transformation;
|
||||
|
||||
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.Node;
|
||||
import org.dom4j.io.SAXReader;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.MockitoJUnit;
|
||||
import org.mockito.junit.MockitoRule;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Comparator;
|
||||
|
||||
|
||||
public class TransformationJobTest {
|
||||
|
||||
|
||||
@Mock
|
||||
LongAccumulator accumulator;
|
||||
|
||||
@Rule
|
||||
public MockitoRule mockitoRule = MockitoJUnit.rule();
|
||||
|
||||
@Test
|
||||
public void transformTest() throws Exception {
|
||||
final String mdstore_input = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstore").getFile();
|
||||
Path tempDirWithPrefix = Files.createTempDirectory("mdstore_output");
|
||||
|
||||
final String mdstore_output = tempDirWithPrefix.toFile().getAbsolutePath()+"/version";
|
||||
|
||||
final String xslt = DHPUtils.compressString(IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml")));
|
||||
|
||||
System.out.println(xslt);
|
||||
TransformSparkJobNode.main(new String[]{"-mt","local", "-i", mdstore_input, "-o", mdstore_output,"-d","1", "-w","1","-tr", xslt});
|
||||
|
||||
Files.walk(tempDirWithPrefix)
|
||||
.sorted(Comparator.reverseOrder())
|
||||
.map(Path::toFile)
|
||||
.forEach(File::delete);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void tryLoadFolderOnCP() throws Exception {
|
||||
final String path = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstore").getFile();
|
||||
System.out.println("path = " + path);
|
||||
|
||||
Path tempDirWithPrefix = Files.createTempDirectory("mdsotre_output");
|
||||
|
||||
|
||||
System.out.println(tempDirWithPrefix.toFile().getAbsolutePath());
|
||||
|
||||
Files.deleteIfExists(tempDirWithPrefix);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testTransformFunction() throws Exception {
|
||||
|
||||
final String xmlTr = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"));
|
||||
|
||||
SAXReader reader = new SAXReader();
|
||||
Document document = reader.read(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"));
|
||||
Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']");
|
||||
final String xslt = node.asXML();
|
||||
|
||||
TransformFunction tf = new TransformFunction(accumulator, accumulator, accumulator, xslt, 1);
|
||||
|
||||
MetadataRecord record = new MetadataRecord();
|
||||
record.setBody(IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input.xml")));
|
||||
|
||||
final MetadataRecord result = tf.call(record);
|
||||
Assert.assertNotNull(result.getBody());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void extractTr() throws Exception {
|
||||
|
||||
final String xmlTr = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"));
|
||||
|
||||
SAXReader reader = new SAXReader();
|
||||
Document document = reader.read(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"));
|
||||
Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']");
|
||||
|
||||
System.out.println(node.asXML());
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
<record xmlns="http://www.openarchives.org/OAI/2.0/">
|
||||
<header>
|
||||
<identifier>oai:research.chalmers.se:243692</identifier>
|
||||
<datestamp>2018-01-25T18:04:43Z</datestamp>
|
||||
<setSpec>openaire</setSpec>
|
||||
</header>
|
||||
<metadata>
|
||||
<oai_dc:dc xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd">
|
||||
<dc:title>Incipient Berezinskii-Kosterlitz-Thouless transition in two-dimensional coplanar Josephson junctions</dc:title>
|
||||
<dc:identifier>https://research.chalmers.se/en/publication/243692</dc:identifier>
|
||||
<dc:date>2016</dc:date>
|
||||
<dc:creator>Massarotti, D.</dc:creator>
|
||||
<dc:creator>Jouault, B.</dc:creator>
|
||||
<dc:creator>Rouco, V.</dc:creator>
|
||||
<dc:creator>Charpentier, Sophie</dc:creator>
|
||||
<dc:creator>Bauch, Thilo</dc:creator>
|
||||
<dc:creator>Michon, A.</dc:creator>
|
||||
<dc:creator>De Candia, A.</dc:creator>
|
||||
<dc:creator>Lucignano, P.</dc:creator>
|
||||
<dc:creator>Lombardi, Floriana</dc:creator>
|
||||
<dc:creator>Tafuri, F.</dc:creator>
|
||||
<dc:creator>Tagliacozzo, A.</dc:creator>
|
||||
<dc:subject>Materials Chemistry</dc:subject>
|
||||
<dc:subject>Geochemistry</dc:subject>
|
||||
<dc:subject>Condensed Matter Physics</dc:subject>
|
||||
<dc:description>Superconducting hybrid junctions are revealing a variety of effects. Some of them are due to the special layout of these devices, which often use a coplanar configuration with relatively large barrier channels and the possibility of hosting Pearl vortices. A Josephson junction with a quasi-ideal two-dimensional barrier has been realized by growing graphene on SiC with Al electrodes. Chemical vapor deposition offers centimeter size monolayer areas where it is possible to realize a comparative analysis of different devices with nominally the same barrier. In samples with a graphene gap below 400 nm, we have found evidence of Josephson coherence in the presence of an incipient Berezinskii-Kosterlitz-Thouless transition. When the magnetic field is cycled, a remarkable hysteretic collapse and revival of the Josephson supercurrent occurs. Similar hysteresis are found in granular systems and are usually justified within the Bean critical state model (CSM). We show that the CSM, with appropriate account for the low-dimensional geometry, can partly explain the odd features measured in these junctions.</dc:description>
|
||||
<dc:relation>info:eu-repo/grantAgreement/EC/FP7/604391//Graphene-Based Revolutions in ICT And Beyond (Graphene Flagship)/</dc:relation>
|
||||
<dc:relation>info:eu-repo/semantics/altIdentifier/doi/10.1103/PhysRevB.94.054525</dc:relation>
|
||||
<dc:type>info:eu-repo/semantics/article</dc:type>
|
||||
<dc:source>Physical Review B vol.94(2016)</dc:source>
|
||||
<dc:rights>info:eu-repo/semantics/openAccess</dc:rights>
|
||||
<dc:language>eng</dc:language>
|
||||
<dc:audience>Researchers</dc:audience>
|
||||
<dc:format>application/pdf</dc:format>
|
||||
</oai_dc:dc>
|
||||
</metadata>
|
||||
</record>
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,39 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<RESOURCE_PROFILE>
|
||||
<HEADER>
|
||||
<RESOURCE_IDENTIFIER value="d6fa79f2-486e-482d-b37c-62129af2cd9a_VHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZXMvVHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZVR5cGU="/>
|
||||
<RESOURCE_TYPE value="TransformationRuleDSResourceType"/>
|
||||
<RESOURCE_KIND value="TransformationRuleDSResources"/>
|
||||
<RESOURCE_URI value=""/>
|
||||
<DATE_OF_CREATION value="2019-04-11T11:15:30+00:00"/>
|
||||
</HEADER>
|
||||
<BODY>
|
||||
<CONFIGURATION>
|
||||
<IMPORTED/>
|
||||
<SCRIPT>
|
||||
<TITLE>Dnet Hadoop TR </TITLE>
|
||||
<CODE>
|
||||
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
|
||||
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
|
||||
xmlns:oaf="http://namespace.openaire.eu/oaf" version="2.0"
|
||||
exclude-result-prefixes="xsl">
|
||||
<xsl:template match="/">
|
||||
<oai:record>
|
||||
<xsl:copy-of select="//oai:header"/>
|
||||
<xsl:copy-of select="//oai:metadata"/>
|
||||
<oaf:about>
|
||||
<oaf:datainfo>
|
||||
<oaf:TestValue>incomplete</oaf:TestValue>
|
||||
<oaf:provisionMode>collected</oaf:provisionMode>
|
||||
</oaf:datainfo>
|
||||
</oaf:about>
|
||||
</oai:record>
|
||||
</xsl:template>
|
||||
</xsl:stylesheet>
|
||||
</CODE>
|
||||
</SCRIPT>
|
||||
</CONFIGURATION>
|
||||
<STATUS/>
|
||||
<SECURITY_PARAMETERS>SECURITY_PARAMETERS</SECURITY_PARAMETERS>
|
||||
</BODY>
|
||||
</RESOURCE_PROFILE>
|
Loading…
Reference in New Issue