Merge branch 'hadoop_aggregator' of code-repo.d4science.org:D-Net/dnet-hadoop into hadoop_aggregator

This commit is contained in:
Michele Artini 2021-02-03 16:08:21 +01:00
commit 33f4696d6e
47 changed files with 1242 additions and 654 deletions

View File

@ -53,11 +53,6 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
@ -98,6 +93,12 @@
<artifactId>dnet-pace-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>

View File

@ -0,0 +1,21 @@
package eu.dnetlib.dhp.application;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Properties;
public class ApplicationUtils {
public static void populateOOZIEEnv(final String paramName, String value) throws Exception {
File file = new File(System.getProperty("oozie.action.output.properties"));
Properties props = new Properties();
props.setProperty(paramName, value);
OutputStream os = new FileOutputStream(file);
props.store(os, "");
os.close();
}
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.collector.worker.model;
package eu.dnetlib.dhp.collector.worker.model;
import java.util.HashMap;
import java.util.Map;

View File

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.common.rest;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import com.fasterxml.jackson.databind.ObjectMapper;
public class DNetRestClient {
private static ObjectMapper mapper = new ObjectMapper();
public static <T> T doGET(final String url, Class<T> clazz) throws Exception {
final HttpGet httpGet = new HttpGet(url);
return doHTTPRequest(httpGet, clazz);
}
public static String doGET(final String url) throws Exception {
final HttpGet httpGet = new HttpGet(url);
return doHTTPRequest(httpGet);
}
public static <V> String doPOST(final String url, V objParam) throws Exception {
final HttpPost httpPost = new HttpPost(url);
if (objParam != null) {
final StringEntity entity = new StringEntity(mapper.writeValueAsString(objParam));
httpPost.setEntity(entity);
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
}
return doHTTPRequest(httpPost);
}
public static <T, V> T doPOST(final String url, V objParam, Class<T> clazz) throws Exception {
return mapper.readValue(doPOST(url, objParam), clazz);
}
private static String doHTTPRequest(final HttpUriRequest r) throws Exception {
CloseableHttpClient client = HttpClients.createDefault();
CloseableHttpResponse response = client.execute(r);
return IOUtils.toString(response.getEntity().getContent());
}
private static <T> T doHTTPRequest(final HttpUriRequest r, Class<T> clazz) throws Exception {
return mapper.readValue(doHTTPRequest(r), clazz);
}
}

View File

@ -67,6 +67,10 @@ public class VocabularyGroup implements Serializable {
private final Map<String, Vocabulary> vocs = new HashMap<>();
public Set<String> vocabularyNames() {
return vocs.keySet();
}
public void addVocabulary(final String id, final String name) {
vocs.put(id.toLowerCase(), new Vocabulary(id, name));
}

View File

@ -26,13 +26,13 @@ public class MetadataRecord implements Serializable {
private String body;
/** the date when the record has been stored */
private long dateOfCollection;
private Long dateOfCollection;
/** the date when the record has been stored */
private long dateOfTransformation;
private Long dateOfTransformation;
public MetadataRecord() {
this.dateOfCollection = System.currentTimeMillis();
}
public MetadataRecord(
@ -40,7 +40,7 @@ public class MetadataRecord implements Serializable {
String encoding,
Provenance provenance,
String body,
long dateOfCollection) {
Long dateOfCollection) {
this.originalId = originalId;
this.encoding = encoding;
@ -90,19 +90,19 @@ public class MetadataRecord implements Serializable {
this.body = body;
}
public long getDateOfCollection() {
public Long getDateOfCollection() {
return dateOfCollection;
}
public void setDateOfCollection(long dateOfCollection) {
public void setDateOfCollection(Long dateOfCollection) {
this.dateOfCollection = dateOfCollection;
}
public long getDateOfTransformation() {
public Long getDateOfTransformation() {
return dateOfTransformation;
}
public void setDateOfTransformation(long dateOfTransformation) {
public void setDateOfTransformation(Long dateOfTransformation) {
this.dateOfTransformation = dateOfTransformation;
}

View File

@ -1,76 +0,0 @@
package eu.dnetlib.message;
import java.io.IOException;
import java.util.Map;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class Message {
private String workflowId;
private String jobName;
private MessageType type;
private Map<String, String> body;
public static Message fromJson(final String json) throws IOException {
final ObjectMapper jsonMapper = new ObjectMapper();
return jsonMapper.readValue(json, Message.class);
}
public Message() {
}
public Message(String workflowId, String jobName, MessageType type, Map<String, String> body) {
this.workflowId = workflowId;
this.jobName = jobName;
this.type = type;
this.body = body;
}
public String getWorkflowId() {
return workflowId;
}
public void setWorkflowId(String workflowId) {
this.workflowId = workflowId;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public MessageType getType() {
return type;
}
public void setType(MessageType type) {
this.type = type;
}
public Map<String, String> getBody() {
return body;
}
public void setBody(Map<String, String> body) {
this.body = body;
}
@Override
public String toString() {
final ObjectMapper jsonMapper = new ObjectMapper();
try {
return jsonMapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
return null;
}
}
}

View File

@ -1,47 +0,0 @@
package eu.dnetlib.message;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.LinkedBlockingQueue;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class MessageConsumer extends DefaultConsumer {
final LinkedBlockingQueue<Message> queueMessages;
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
* @param queueMessages
*/
public MessageConsumer(Channel channel, LinkedBlockingQueue<Message> queueMessages) {
super(channel);
this.queueMessages = queueMessages;
}
@Override
public void handleDelivery(
String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
final String json = new String(body, StandardCharsets.UTF_8);
Message message = Message.fromJson(json);
try {
this.queueMessages.put(message);
System.out.println("Receiving Message " + message);
} catch (InterruptedException e) {
if (message.getType() == MessageType.REPORT)
throw new RuntimeException("Error on sending message");
else {
// TODO LOGGING EXCEPTION
}
} finally {
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
}
}

View File

@ -1,136 +0,0 @@
package eu.dnetlib.message;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class MessageManager {
private final String messageHost;
private final String username;
private final String password;
private Connection connection;
private final Map<String, Channel> channels = new HashMap<>();
private boolean durable;
private boolean autodelete;
private final LinkedBlockingQueue<Message> queueMessages;
public MessageManager(
String messageHost,
String username,
String password,
final LinkedBlockingQueue<Message> queueMessages) {
this.queueMessages = queueMessages;
this.messageHost = messageHost;
this.username = username;
this.password = password;
}
public MessageManager(
String messageHost,
String username,
String password,
boolean durable,
boolean autodelete,
final LinkedBlockingQueue<Message> queueMessages) {
this.queueMessages = queueMessages;
this.messageHost = messageHost;
this.username = username;
this.password = password;
this.durable = durable;
this.autodelete = autodelete;
}
private Connection createConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.messageHost);
factory.setUsername(this.username);
factory.setPassword(this.password);
return factory.newConnection();
}
private Channel createChannel(
final Connection connection,
final String queueName,
final boolean durable,
final boolean autodelete)
throws Exception {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 10000);
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, durable, false, this.autodelete, args);
return channel;
}
private Channel getOrCreateChannel(final String queueName, boolean durable, boolean autodelete)
throws Exception {
if (channels.containsKey(queueName)) {
return channels.get(queueName);
}
if (this.connection == null) {
this.connection = createConnection();
}
channels.put(queueName, createChannel(this.connection, queueName, durable, autodelete));
return channels.get(queueName);
}
public void close() throws IOException {
channels
.values()
.forEach(
ch -> {
try {
ch.close();
} catch (Exception e) {
// TODO LOG
}
});
this.connection.close();
}
public boolean sendMessage(final Message message, String queueName) throws Exception {
try {
Channel channel = getOrCreateChannel(queueName, this.durable, this.autodelete);
channel.basicPublish("", queueName, null, message.toString().getBytes());
return true;
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
public boolean sendMessage(
final Message message, String queueName, boolean durable_var, boolean autodelete_var)
throws Exception {
try {
Channel channel = getOrCreateChannel(queueName, durable_var, autodelete_var);
channel.basicPublish("", queueName, null, message.toString().getBytes());
return true;
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
public void startConsumingMessage(
final String queueName, final boolean durable, final boolean autodelete) throws Exception {
Channel channel = createChannel(createConnection(), queueName, durable, autodelete);
channel.basicConsume(queueName, false, new MessageConsumer(channel, queueMessages));
}
}

View File

@ -1,6 +0,0 @@
package eu.dnetlib.message;
public enum MessageType {
ONGOING, REPORT
}

View File

@ -1,51 +0,0 @@
package eu.dnetlib.message;
import static org.junit.jupiter.api.Assertions.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;
public class MessageTest {
@Test
public void fromJsonTest() throws IOException {
Message m = new Message();
m.setWorkflowId("wId");
m.setType(MessageType.ONGOING);
m.setJobName("Collection");
Map<String, String> body = new HashMap<>();
body.put("parsedItem", "300");
body.put("ExecutionTime", "30s");
m.setBody(body);
System.out.println("m = " + m);
Message m1 = Message.fromJson(m.toString());
assertEquals(m1.getWorkflowId(), m.getWorkflowId());
assertEquals(m1.getType(), m.getType());
assertEquals(m1.getJobName(), m.getJobName());
assertNotNull(m1.getBody());
m1.getBody().keySet().forEach(it -> assertEquals(m1.getBody().get(it), m.getBody().get(it)));
assertEquals(m1.getJobName(), m.getJobName());
}
@Test
public void toStringTest() {
final String expectedJson = "{\"workflowId\":\"wId\",\"jobName\":\"Collection\",\"type\":\"ONGOING\",\"body\":{\"ExecutionTime\":\"30s\",\"parsedItem\":\"300\"}}";
Message m = new Message();
m.setWorkflowId("wId");
m.setType(MessageType.ONGOING);
m.setJobName("Collection");
Map<String, String> body = new HashMap<>();
body.put("parsedItem", "300");
body.put("ExecutionTime", "30s");
m.setBody(body);
assertEquals(expectedJson, m.toString());
}
}

View File

@ -27,7 +27,7 @@ object GenerateDataciteDatasetSpark {
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
log.info(s"vocabulary size is ${vocabularies.getTerms("dnet:languages").size()}")
val spark: SparkSession = SparkSession.builder().config(conf)
.appName(GenerateDataciteDatasetSpark.getClass.getSimpleName)
.master(master)

View File

@ -0,0 +1,14 @@
package eu.dnetlib.dhp.aggregation.common;
public class AggregationConstants {
public static final String SEQUENCE_FILE_NAME = "/sequence_file";
public static final String MDSTORE_DATA_PATH = "/store";
public static final String MDSTORE_SIZE_PATH = "/size";
public static final String CONTENT_TOTALITEMS = "TotalItems";
public static final String CONTENT_INVALIDRECORDS = "InvalidRecords";
public static final String CONTENT_TRANSFORMEDRECORDS = "transformedItems";
}

View File

@ -0,0 +1,49 @@
package eu.dnetlib.dhp.aggregation.common;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
public class AggregationUtility {
private static final Logger log = LoggerFactory.getLogger(AggregationUtility.class);
public static final ObjectMapper MAPPER = new ObjectMapper();
public static void writeTotalSizeOnHDFS(final SparkSession spark, final Long total, final String path)
throws IOException {
log.info("writing size ({}) info file {}", total, path);
try (FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
BufferedOutputStream os = new BufferedOutputStream(fs.create(new Path(path)))) {
os.write(total.toString().getBytes(StandardCharsets.UTF_8));
os.flush();
}
}
public static <T> void saveDataset(final Dataset<T> mdstore, final String targetPath) {
log.info("saving dataset in: {}", targetPath);
mdstore
.write()
.mode(SaveMode.Overwrite)
.format("parquet")
.save(targetPath);
}
}

View File

@ -0,0 +1,148 @@
package eu.dnetlib.dhp.aggregation.mdstore;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.application.ApplicationUtils.*;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.rest.DNetRestClient;
public class MDStoreActionNode {
private static final Logger log = LoggerFactory.getLogger(MDStoreActionNode.class);
enum MDAction {
NEW_VERSION, ROLLBACK, COMMIT, READ_LOCK, READ_UNLOCK
}
public static String NEW_VERSION_URI = "%s/mdstore/%s/newVersion";
public static final String COMMIT_VERSION_URL = "%s/version/%s/commit/%s";
public static final String ROLLBACK_VERSION_URL = "%s/version/%s/abort";
public static final String READ_LOCK_URL = "%s/mdstore/%s/startReading";
public static final String READ_UNLOCK_URL = "%s/version/%s/endReading";
private static final String MDSTOREVERSIONPARAM = "mdStoreVersion";
private static final String MDSTOREREADLOCKPARAM = "mdStoreReadLockVersion";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
IOUtils
.toString(
MDStoreActionNode.class
.getResourceAsStream(
"/eu/dnetlib/dhp/collection/mdstore_action_parameters.json")));
argumentParser.parseArgument(args);
final MDAction action = MDAction.valueOf(argumentParser.get("action"));
log.info("Current action is {}", action);
final String mdStoreManagerURI = argumentParser.get("mdStoreManagerURI");
log.info("mdStoreManagerURI is {}", mdStoreManagerURI);
switch (action) {
case NEW_VERSION: {
final String mdStoreID = argumentParser.get("mdStoreID");
if (StringUtils.isBlank(mdStoreID)) {
throw new IllegalArgumentException("missing or empty argument mdStoreId");
}
final MDStoreVersion currentVersion = DNetRestClient
.doGET(String.format(NEW_VERSION_URI, mdStoreManagerURI, mdStoreID), MDStoreVersion.class);
populateOOZIEEnv(MDSTOREVERSIONPARAM, MAPPER.writeValueAsString(currentVersion));
break;
}
case COMMIT: {
final String hdfsuri = argumentParser.get("namenode");
if (StringUtils.isBlank(hdfsuri)) {
throw new IllegalArgumentException("missing or empty argument namenode");
}
final String mdStoreVersion_params = argumentParser.get("mdStoreVersion");
final MDStoreVersion mdStoreVersion = MAPPER.readValue(mdStoreVersion_params, MDStoreVersion.class);
if (StringUtils.isBlank(mdStoreVersion.getId())) {
throw new IllegalArgumentException(
"invalid MDStoreVersion value current is " + mdStoreVersion_params);
}
Configuration conf = new Configuration();
// Set FileSystem URI
conf.set("fs.defaultFS", hdfsuri);
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
System.setProperty("hadoop.home.dir", "/");
// Get the filesystem - HDFS
FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf);
Path hdfstoreSizepath = new Path(mdStoreVersion.getHdfsPath() + "/size");
FSDataInputStream inputStream = fs.open(hdfstoreSizepath);
final Long mdStoreSize = Long.parseLong(IOUtils.toString(inputStream));
inputStream.close();
fs.create(hdfstoreSizepath);
DNetRestClient
.doGET(String.format(COMMIT_VERSION_URL, mdStoreManagerURI, mdStoreVersion.getId(), mdStoreSize));
break;
}
case ROLLBACK: {
final String mdStoreVersion_params = argumentParser.get("mdStoreVersion");
final MDStoreVersion mdStoreVersion = MAPPER.readValue(mdStoreVersion_params, MDStoreVersion.class);
if (StringUtils.isBlank(mdStoreVersion.getId())) {
throw new IllegalArgumentException(
"invalid MDStoreVersion value current is " + mdStoreVersion_params);
}
DNetRestClient.doGET(String.format(ROLLBACK_VERSION_URL, mdStoreManagerURI, mdStoreVersion.getId()));
break;
}
case READ_LOCK: {
final String mdStoreID = argumentParser.get("mdStoreID");
if (StringUtils.isBlank(mdStoreID)) {
throw new IllegalArgumentException("missing or empty argument mdStoreId");
}
final MDStoreVersion currentVersion = DNetRestClient
.doGET(String.format(READ_LOCK_URL, mdStoreManagerURI, mdStoreID), MDStoreVersion.class);
populateOOZIEEnv(MDSTOREREADLOCKPARAM, MAPPER.writeValueAsString(currentVersion));
break;
}
case READ_UNLOCK: {
final String mdStoreVersion_params = argumentParser.get("readMDStoreId");
final MDStoreVersion mdStoreVersion = MAPPER.readValue(mdStoreVersion_params, MDStoreVersion.class);
if (StringUtils.isBlank(mdStoreVersion.getId())) {
throw new IllegalArgumentException(
"invalid MDStoreVersion value current is " + mdStoreVersion_params);
}
DNetRestClient.doGET(String.format(READ_UNLOCK_URL, mdStoreManagerURI, mdStoreVersion.getId()));
break;
}
default:
throw new IllegalArgumentException("invalid action");
}
}
}

View File

@ -1,9 +1,12 @@
package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
@ -13,12 +16,11 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.Node;
@ -28,10 +30,11 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import eu.dnetlib.message.MessageManager;
import scala.Tuple2;
public class GenerateNativeStoreSparkJob {
@ -46,15 +49,31 @@ public class GenerateNativeStoreSparkJob {
.getResourceAsStream(
"/eu/dnetlib/dhp/collection/collection_input_parameters.json")));
parser.parseArgument(args);
final ObjectMapper jsonMapper = new ObjectMapper();
final String provenanceArgument = parser.get("provenance");
log.info("Provenance is {}", provenanceArgument);
final Provenance provenance = jsonMapper.readValue(provenanceArgument, Provenance.class);
final Provenance provenance = MAPPER.readValue(provenanceArgument, Provenance.class);
final String dateOfCollectionArgs = parser.get("dateOfCollection");
log.info("dateOfCollection is {}", dateOfCollectionArgs);
final long dateOfCollection = new Long(dateOfCollectionArgs);
final String sequenceFileInputPath = parser.get("input");
log.info("sequenceFileInputPath is {}", dateOfCollectionArgs);
final Long dateOfCollection = new Long(dateOfCollectionArgs);
String mdStoreVersion = parser.get("mdStoreVersion");
log.info("mdStoreVersion is {}", mdStoreVersion);
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
String readMdStoreVersionParam = parser.get("readMdStoreVersion");
log.info("readMdStoreVersion is {}", readMdStoreVersionParam);
final MDStoreVersion readMdStoreVersion = StringUtils.isBlank(readMdStoreVersionParam) ? null
: MAPPER.readValue(readMdStoreVersionParam, MDStoreVersion.class);
final String xpath = parser.get("xpath");
log.info("xpath is {}", xpath);
final String encoding = parser.get("encoding");
log.info("encoding is {}", encoding);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
@ -66,21 +85,30 @@ public class GenerateNativeStoreSparkJob {
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
spark -> createNativeMDStore(
spark, provenance, dateOfCollection, xpath, encoding, currentVersion, readMdStoreVersion));
}
private static void createNativeMDStore(SparkSession spark,
Provenance provenance,
Long dateOfCollection,
String xpath,
String encoding,
MDStoreVersion currentVersion,
MDStoreVersion readVersion) throws IOException {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final JavaPairRDD<IntWritable, Text> inputRDD = sc
.sequenceFile(sequenceFileInputPath, IntWritable.class, Text.class);
final LongAccumulator totalItems = sc.sc().longAccumulator(CONTENT_TOTALITEMS);
final LongAccumulator invalidRecords = sc.sc().longAccumulator(CONTENT_INVALIDRECORDS);
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
final JavaRDD<MetadataRecord> nativeStore = inputRDD
final String seqFilePath = currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
final JavaRDD<MetadataRecord> nativeStore = sc
.sequenceFile(seqFilePath, IntWritable.class, Text.class)
.map(
item -> parseRecord(
item._2().toString(),
parser.get("xpath"),
parser.get("encoding"),
xpath,
encoding,
provenance,
dateOfCollection,
totalItems,
@ -90,12 +118,79 @@ public class GenerateNativeStoreSparkJob {
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstore = spark.createDataset(nativeStore.rdd(), encoder);
final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
mdStoreRecords.add(mdstore.count());
mdstore.write().format("parquet").save(parser.get("output"));
final String targetPath = currentVersion.getHdfsPath() + MDSTORE_DATA_PATH;
});
if (readVersion != null) { // INCREMENTAL MODE
log.info("updating {} incrementally with {}", targetPath, readVersion.getHdfsPath());
Dataset<MetadataRecord> currentMdStoreVersion = spark
.read()
.load(readVersion.getHdfsPath() + MDSTORE_DATA_PATH)
.as(encoder);
TypedColumn<MetadataRecord, MetadataRecord> aggregator = new MDStoreAggregator().toColumn();
final Dataset<MetadataRecord> map = currentMdStoreVersion
.union(mdstore)
.groupByKey(
(MapFunction<MetadataRecord, String>) MetadataRecord::getId,
Encoders.STRING())
.agg(aggregator)
.map((MapFunction<Tuple2<String, MetadataRecord>, MetadataRecord>) Tuple2::_2, encoder);
map.select("id").takeAsList(100).forEach(s -> log.info(s.toString()));
saveDataset(map, targetPath);
} else {
saveDataset(mdstore, targetPath);
}
final Long total = spark.read().load(targetPath).count();
log.info("collected {} records for datasource '{}'", total, provenance.getDatasourceName());
writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + MDSTORE_SIZE_PATH);
}
public static class MDStoreAggregator extends Aggregator<MetadataRecord, MetadataRecord, MetadataRecord> {
@Override
public MetadataRecord zero() {
return null;
}
@Override
public MetadataRecord reduce(MetadataRecord b, MetadataRecord a) {
return getLatestRecord(b, a);
}
@Override
public MetadataRecord merge(MetadataRecord b, MetadataRecord a) {
return getLatestRecord(b, a);
}
private MetadataRecord getLatestRecord(MetadataRecord b, MetadataRecord a) {
if (b == null)
return a;
if (a == null)
return b;
return (a.getDateOfCollection() > b.getDateOfCollection()) ? a : b;
}
@Override
public MetadataRecord finish(MetadataRecord r) {
return r;
}
@Override
public Encoder<MetadataRecord> bufferEncoder() {
return Encoders.bean(MetadataRecord.class);
}
@Override
public Encoder<MetadataRecord> outputEncoder() {
return Encoders.bean(MetadataRecord.class);
}
}
@ -120,7 +215,7 @@ public class GenerateNativeStoreSparkJob {
invalidRecords.add(1);
return null;
}
return new MetadataRecord(originalIdentifier, encoding, provenance, input, dateOfCollection);
return new MetadataRecord(originalIdentifier, encoding, provenance, document.asXML(), dateOfCollection);
} catch (Throwable e) {
invalidRecords.add(1);
return null;

View File

@ -3,10 +3,13 @@ package eu.dnetlib.dhp.collection.plugin;
import java.util.stream.Stream;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
public interface CollectorPlugin {
Stream<String> collect(ApiDescriptor api) throws CollectorException;
CollectorPluginErrorLogList getCollectionErrors();
}

View File

@ -9,13 +9,16 @@ import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.jetbrains.annotations.NotNull;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
public class OaiCollectorPlugin implements CollectorPlugin {
@ -26,8 +29,19 @@ public class OaiCollectorPlugin implements CollectorPlugin {
private OaiIteratorFactory oaiIteratorFactory;
private final CollectorPluginErrorLogList errorLogList = new CollectorPluginErrorLogList();
@Override
public Stream<String> collect(final ApiDescriptor api) throws CollectorException {
try {
return doCollect(api);
} catch (CollectorException e) {
errorLogList.add(e.getMessage());
throw e;
}
}
private Stream<String> doCollect(ApiDescriptor api) throws CollectorException {
final String baseUrl = api.getBaseUrl();
final String mdFormat = api.getParams().get(FORMAT_PARAM);
final String setParam = api.getParams().get(OAI_SET_PARAM);
@ -65,7 +79,7 @@ public class OaiCollectorPlugin implements CollectorPlugin {
.stream()
.map(
set -> getOaiIteratorFactory()
.newIterator(baseUrl, mdFormat, set, fromDate, untilDate))
.newIterator(baseUrl, mdFormat, set, fromDate, untilDate, errorLogList))
.iterator();
return StreamSupport
@ -79,4 +93,9 @@ public class OaiCollectorPlugin implements CollectorPlugin {
}
return oaiIteratorFactory;
}
@Override
public CollectorPluginErrorLogList getCollectionErrors() {
return errorLogList;
}
}

View File

@ -15,15 +15,17 @@ import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
import eu.dnetlib.dhp.collection.worker.utils.XmlCleaner;
public class OaiIterator implements Iterator<String> {
private static final Log log = LogFactory.getLog(OaiIterator.class); // NOPMD by marko on
// 11/24/08 5:02 PM
private static final Logger log = LoggerFactory.getLogger(OaiIterator.class);
private final Queue<String> queue = new PriorityBlockingQueue<>();
private final SAXReader reader = new SAXReader();
@ -36,6 +38,7 @@ public class OaiIterator implements Iterator<String> {
private String token;
private boolean started;
private final HttpConnector httpConnector;
private CollectorPluginErrorLogList errorLogList;
public OaiIterator(
final String baseUrl,
@ -43,7 +46,8 @@ public class OaiIterator implements Iterator<String> {
final String set,
final String fromDate,
final String untilDate,
final HttpConnector httpConnector) {
final HttpConnector httpConnector,
final CollectorPluginErrorLogList errorLogList) {
this.baseUrl = baseUrl;
this.mdFormat = mdFormat;
this.set = set;
@ -51,6 +55,7 @@ public class OaiIterator implements Iterator<String> {
this.untilDate = untilDate;
this.started = false;
this.httpConnector = httpConnector;
this.errorLogList = errorLogList;
}
private void verifyStarted() {
@ -139,7 +144,7 @@ public class OaiIterator implements Iterator<String> {
private String downloadPage(final String url) throws CollectorException {
final String xml = httpConnector.getInputSource(url);
final String xml = httpConnector.getInputSource(url, errorLogList);
Document doc;
try {
doc = reader.read(new StringReader(xml));
@ -174,4 +179,8 @@ public class OaiIterator implements Iterator<String> {
return doc.valueOf("//*[local-name()='resumptionToken']");
}
public CollectorPluginErrorLogList getErrorLogList() {
return errorLogList;
}
}

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.collection.plugin.oai;
import java.util.Iterator;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList;
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
public class OaiIteratorFactory {
@ -14,8 +15,9 @@ public class OaiIteratorFactory {
final String mdFormat,
final String set,
final String fromDate,
final String untilDate) {
return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector());
final String untilDate,
final CollectorPluginErrorLogList errorLogList) {
return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector(), errorLogList);
}
private HttpConnector getHttpConnector() {

View File

@ -14,40 +14,34 @@ import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
public class CollectorWorker {
private static final Logger log = LoggerFactory.getLogger(CollectorWorker.class);
private final CollectorPluginFactory collectorPluginFactory;
private final ApiDescriptor api;
private final String hdfsuri;
private final String hdfsPath;
private CollectorPlugin plugin;
public CollectorWorker(
final CollectorPluginFactory collectorPluginFactory,
final ApiDescriptor api,
final String hdfsuri,
final String hdfsPath) {
this.collectorPluginFactory = collectorPluginFactory;
final String hdfsPath) throws CollectorException {
this.api = api;
this.hdfsuri = hdfsuri;
this.hdfsPath = hdfsPath;
this.plugin = CollectorPluginFactory.getPluginByProtocol(api.getProtocol());
}
public void collect() throws CollectorException {
try {
final CollectorPlugin plugin = collectorPluginFactory.getPluginByProtocol(api.getProtocol());
public CollectorPluginErrorLogList collect() throws IOException, CollectorException {
// ====== Init HDFS File System Object
Configuration conf = new Configuration();
@ -59,6 +53,7 @@ public class CollectorWorker {
System.setProperty("hadoop.home.dir", "/");
// Get the filesystem - HDFS
FileSystem.get(URI.create(hdfsuri), conf);
Path hdfswritepath = new Path(hdfsPath);
@ -85,9 +80,8 @@ public class CollectorWorker {
throw new RuntimeException(e);
}
});
}
} catch (Throwable e) {
throw new CollectorException("Error on collecting ", e);
} finally {
return plugin.getCollectionErrors();
}
}
}

View File

@ -1,15 +1,22 @@
package eu.dnetlib.dhp.collection.worker;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.application.ApplicationUtils.*;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.aggregation.common.AggregationUtility;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
/**
* DnetCollectortWorkerApplication is the main class responsible to start the Dnet Collection into HDFS. This module
@ -22,8 +29,6 @@ public class CollectorWorkerApplication {
private static final Logger log = LoggerFactory.getLogger(CollectorWorkerApplication.class);
private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory();
/**
* @param args
*/
@ -38,18 +43,25 @@ public class CollectorWorkerApplication {
argumentParser.parseArgument(args);
final String hdfsuri = argumentParser.get("namenode");
log.info("hdfsURI is {}", hdfsuri);
final String hdfsPath = argumentParser.get("hdfsPath");
log.info("hdfsPath is {}" + hdfsPath);
final String apiDescriptor = argumentParser.get("apidescriptor");
log.info("apiDescriptor is {}" + apiDescriptor);
log.info("apiDescriptor is {}", apiDescriptor);
final ObjectMapper jsonMapper = new ObjectMapper();
final String mdStoreVersion = argumentParser.get("mdStoreVersion");
log.info("mdStoreVersion is {}", mdStoreVersion);
final ApiDescriptor api = jsonMapper.readValue(apiDescriptor, ApiDescriptor.class);
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
final String hdfsPath = currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
log.info("hdfs path is {}", hdfsPath);
final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class);
final CollectorWorker worker = new CollectorWorker(api, hdfsuri, hdfsPath);
CollectorPluginErrorLogList errors = worker.collect();
populateOOZIEEnv("collectorErrors", errors.toString());
final CollectorWorker worker = new CollectorWorker(collectorPluginFactory, api, hdfsuri, hdfsPath);
worker.collect();
}
}

View File

@ -7,7 +7,7 @@ import eu.dnetlib.dhp.collection.worker.CollectorException;
public class CollectorPluginFactory {
public CollectorPlugin getPluginByProtocol(final String protocol) throws CollectorException {
public static CollectorPlugin getPluginByProtocol(final String protocol) throws CollectorException {
if (protocol == null)
throw new CollectorException("protocol cannot be null");
switch (protocol.toLowerCase().trim()) {

View File

@ -16,14 +16,14 @@ import javax.net.ssl.X509TrustManager;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.worker.CollectorException;
public class HttpConnector {
private static final Log log = LogFactory.getLog(HttpConnector.class);
private static final Logger log = LoggerFactory.getLogger(HttpConnector.class);
private int maxNumberOfRetry = 6;
private int defaultDelay = 120; // seconds
@ -45,7 +45,20 @@ public class HttpConnector {
* @throws CollectorException when retrying more than maxNumberOfRetry times
*/
public String getInputSource(final String requestUrl) throws CollectorException {
return attemptDownlaodAsString(requestUrl, 1, new CollectorPluginErrorLogList());
return attemptDownloadAsString(requestUrl, 1, new CollectorPluginErrorLogList());
}
/**
* Given the URL returns the content via HTTP GET
*
* @param requestUrl the URL
* @param errorLogList the list of errors
* @return the content of the downloaded resource
* @throws CollectorException when retrying more than maxNumberOfRetry times
*/
public String getInputSource(final String requestUrl, CollectorPluginErrorLogList errorLogList)
throws CollectorException {
return attemptDownloadAsString(requestUrl, 1, errorLogList);
}
/**
@ -59,18 +72,20 @@ public class HttpConnector {
return attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
}
private String attemptDownlaodAsString(
private String attemptDownloadAsString(
final String requestUrl, final int retryNumber, final CollectorPluginErrorLogList errorList)
throws CollectorException {
log.info("requesting URL [{}]", requestUrl);
try {
final InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
try {
return IOUtils.toString(s);
} catch (final IOException e) {
log.error("error while retrieving from http-connection occured: " + requestUrl, e);
log.error("error while retrieving from http-connection occurred: {}", requestUrl, e);
Thread.sleep(defaultDelay * 1000);
errorList.add(e.getMessage());
return attemptDownlaodAsString(requestUrl, retryNumber + 1, errorList);
return attemptDownloadAsString(requestUrl, retryNumber + 1, errorList);
} finally {
IOUtils.closeQuietly(s);
}
@ -87,7 +102,7 @@ public class HttpConnector {
throw new CollectorException("Max number of retries exceeded. Cause: \n " + errorList);
}
log.debug("Downloading " + requestUrl + " - try: " + retryNumber);
log.debug("requesting URL [{}], try {}", requestUrl, retryNumber);
try {
InputStream input = null;
@ -103,7 +118,7 @@ public class HttpConnector {
final int retryAfter = obtainRetryAfter(urlConn.getHeaderFields());
if (retryAfter > 0 && urlConn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
log.warn("waiting and repeating request after " + retryAfter + " sec.");
log.warn("waiting and repeating request after {} sec.", retryAfter);
Thread.sleep(retryAfter * 1000);
errorList.add("503 Service Unavailable");
urlConn.disconnect();
@ -111,7 +126,7 @@ public class HttpConnector {
} else if (urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_PERM
|| urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_TEMP) {
final String newUrl = obtainNewLocation(urlConn.getHeaderFields());
log.debug("The requested url has been moved to " + newUrl);
log.debug("The requested url has been moved to {}", newUrl);
errorList
.add(
String
@ -121,15 +136,11 @@ public class HttpConnector {
urlConn.disconnect();
return attemptDownload(newUrl, retryNumber + 1, errorList);
} else if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
log
.error(
String
.format(
"HTTP error: %s %s", urlConn.getResponseCode(), urlConn.getResponseMessage()));
final String msg = String
.format("HTTP error: %s %s", urlConn.getResponseCode(), urlConn.getResponseMessage());
log.error(msg);
Thread.sleep(defaultDelay * 1000);
errorList
.add(
String.format("%s %s", urlConn.getResponseCode(), urlConn.getResponseMessage()));
errorList.add(msg);
urlConn.disconnect();
return attemptDownload(requestUrl, retryNumber + 1, errorList);
} else {
@ -138,7 +149,7 @@ public class HttpConnector {
return input;
}
} catch (final IOException e) {
log.error("error while retrieving from http-connection occured: " + requestUrl, e);
log.error("error while retrieving from http-connection occurred: {}", requestUrl, e);
Thread.sleep(defaultDelay * 1000);
errorList.add(e.getMessage());
return attemptDownload(requestUrl, retryNumber + 1, errorList);
@ -149,12 +160,12 @@ public class HttpConnector {
}
private void logHeaderFields(final HttpURLConnection urlConn) throws IOException {
log.debug("StatusCode: " + urlConn.getResponseMessage());
log.debug("StatusCode: {}", urlConn.getResponseMessage());
for (final Map.Entry<String, List<String>> e : urlConn.getHeaderFields().entrySet()) {
if (e.getKey() != null) {
for (final String v : e.getValue()) {
log.debug(" key: " + e.getKey() + " - value: " + v);
log.debug(" key: {} value: {}", e.getKey(), v);
}
}
}
@ -183,37 +194,6 @@ public class HttpConnector {
"The requested url has been MOVED, but 'location' param is MISSING");
}
/**
* register for https scheme; this is a workaround and not intended for the use in trusted environments
*/
public void initTrustManager() {
final X509TrustManager tm = new X509TrustManager() {
@Override
public void checkClientTrusted(final X509Certificate[] xcs, final String string) {
}
@Override
public void checkServerTrusted(final X509Certificate[] xcs, final String string) {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
};
try {
final SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(null, new TrustManager[] {
tm
}, null);
HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory());
} catch (final GeneralSecurityException e) {
log.fatal(e);
throw new IllegalStateException(e);
}
}
public int getMaxNumberOfRetry() {
return maxNumberOfRetry;
}

View File

@ -1,42 +1,31 @@
package eu.dnetlib.dhp.transformation;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.vocabulary.VocabularyHelper;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
import eu.dnetlib.message.MessageType;
public class TransformSparkJobNode {
@ -59,38 +48,64 @@ public class TransformSparkJobNode {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("mdstoreInputPath");
final String outputPath = parser.get("mdstoreOutputPath");
// TODO this variable will be used after implementing Messaging with DNet Aggregator
final String mdstoreInputVersion = parser.get("mdstoreInputVersion");
final String mdstoreOutputVersion = parser.get("mdstoreOutputVersion");
final MDStoreVersion nativeMdStoreVersion = MAPPER.readValue(mdstoreInputVersion, MDStoreVersion.class);
final String inputPath = nativeMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH;
log.info("inputPath: {}", inputPath);
final MDStoreVersion cleanedMdStoreVersion = MAPPER.readValue(mdstoreOutputVersion, MDStoreVersion.class);
final String outputBasePath = cleanedMdStoreVersion.getHdfsPath();
log.info("outputBasePath: {}", outputBasePath);
final String isLookupUrl = parser.get("isLookupUrl");
log.info(String.format("isLookupUrl: %s", isLookupUrl));
final String dateOfTransformation = parser.get("dateOfTransformation");
log.info(String.format("dateOfTransformation: %s", dateOfTransformation));
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
log.info("Retrieved {} vocabularies", vocabularies.vocabularyNames().size());
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> transformRecords(parser.getObjectMap(), isLookupService, spark, inputPath, outputPath));
spark -> {
transformRecords(
parser.getObjectMap(), isLookupService, spark, inputPath, outputBasePath);
});
}
public static void transformRecords(final Map<String, String> args, final ISLookUpService isLookUpService,
final SparkSession spark, final String inputPath, final String outputPath) throws DnetTransformationException {
final SparkSession spark, final String inputPath, final String outputBasePath)
throws DnetTransformationException, IOException {
final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems");
final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems");
final LongAccumulator totalItems = spark.sparkContext().longAccumulator(CONTENT_TOTALITEMS);
final LongAccumulator errorItems = spark.sparkContext().longAccumulator(CONTENT_INVALIDRECORDS);
final LongAccumulator transformedItems = spark.sparkContext().longAccumulator(CONTENT_TRANSFORMEDRECORDS);
final AggregationCounter ct = new AggregationCounter(totalItems, errorItems, transformedItems);
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
final MapFunction<MetadataRecord, MetadataRecord> XSLTTransformationFunction = TransformationFactory
.getTransformationPlugin(args, ct, isLookUpService);
mdstoreInput.map(XSLTTransformationFunction, encoder).write().save(outputPath);
final Dataset<MetadataRecord> mdstore = spark
.read()
.format("parquet")
.load(inputPath)
.as(encoder)
.map(
TransformationFactory.getTransformationPlugin(args, ct, isLookUpService),
encoder);
saveDataset(mdstore, outputBasePath + MDSTORE_DATA_PATH);
log.info("Transformed item " + ct.getProcessedItems().count());
log.info("Total item " + ct.getTotalItems().count());
log.info("Transformation Error item " + ct.getErrorItems().count());
writeTotalSizeOnHDFS(spark, mdstore.count(), outputBasePath + MDSTORE_SIZE_PATH);
}
}

View File

@ -18,7 +18,7 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class TransformationFactory {
private static final Logger log = LoggerFactory.getLogger(TransformationFactory.class);
public static final String TRULE_XQUERY = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//TITLE = \"%s\" return $x//CODE/text()";
public static final String TRULE_XQUERY = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//RESOURCE_IDENTIFIER/@value = \"%s\" return $x//CODE/*[local-name() =\"stylesheet\"]";
public static MapFunction<MetadataRecord, MetadataRecord> getTransformationPlugin(
final Map<String, String> jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService)
@ -30,13 +30,13 @@ public class TransformationFactory {
log.info("Transformation plugin required " + transformationPlugin);
switch (transformationPlugin) {
case "XSLT_TRANSFORM": {
final String transformationRuleName = jobArgument.get("transformationRuleTitle");
if (StringUtils.isBlank(transformationRuleName))
final String transformationRuleId = jobArgument.get("transformationRuleId");
if (StringUtils.isBlank(transformationRuleId))
throw new DnetTransformationException("Missing Parameter transformationRule");
final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
final String transformationRule = queryTransformationRuleFromIS(
transformationRuleName, isLookupService);
transformationRuleId, isLookupService);
final long dateOfTransformation = new Long(jobArgument.get("dateOfTransformation"));
return new XSLTTransformationFunction(counters, transformationRule, dateOfTransformation,
@ -54,15 +54,15 @@ public class TransformationFactory {
}
}
private static String queryTransformationRuleFromIS(final String transformationRuleName,
private static String queryTransformationRuleFromIS(final String transformationRuleId,
final ISLookUpService isLookUpService) throws Exception {
final String query = String.format(TRULE_XQUERY, transformationRuleName);
log.info("asking query to IS: " + query);
final String query = String.format(TRULE_XQUERY, transformationRuleId);
System.out.println("asking query to IS: " + query);
List<String> result = isLookUpService.quickSearchProfile(query);
if (result == null || result.isEmpty())
throw new DnetTransformationException(
"Unable to find transformation rule with name: " + transformationRuleName);
"Unable to find transformation rule with name: " + transformationRuleId);
return result.get(0);
}

View File

@ -15,4 +15,9 @@
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -30,16 +30,16 @@
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "input",
"paramDescription": "the path of the sequencial file to read",
"paramName": "mv",
"paramLongName": "mdStoreVersion",
"paramDescription": "the Metadata Store Version Info",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "output",
"paramDescription": "the path of the result DataFrame on HDFS",
"paramRequired": true
"paramName": "rmv",
"paramLongName": "readMdStoreVersion",
"paramDescription": "the Read Lock Metadata Store Version bean",
"paramRequired": false
},
{
"paramName": "w",

View File

@ -1,6 +1,26 @@
[
{"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true},
{"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true},
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true},
{"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": false}
{
"paramName": "a",
"paramLongName": "apidescriptor",
"paramDescription": "the JSON encoding of the API Descriptor",
"paramRequired": true
},
{
"paramName": "n",
"paramLongName": "namenode",
"paramDescription": "the Name Node URI",
"paramRequired": true
},
{
"paramName": "mv",
"paramLongName": "mdStoreVersion",
"paramDescription": "the MDStore Version bean",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workflowId",
"paramDescription": "the identifier of the dnet Workflow",
"paramRequired": false
}
]

View File

@ -0,0 +1,45 @@
[
{
"paramName": "a",
"paramLongName": "action",
"paramDescription": "the JSON encoding of the API Descriptor",
"paramRequired": true
},
{
"paramName": "mu",
"paramLongName": "mdStoreManagerURI",
"paramDescription": "the MDStore Manager URI",
"paramRequired": true
},
{
"paramName": "mi",
"paramLongName": "mdStoreID",
"paramDescription": "the Metadata Store ID",
"paramRequired": false
},
{
"paramName": "ms",
"paramLongName": "mdStoreSize",
"paramDescription": "the Metadata Store Size",
"paramRequired": false
},
{
"paramName": "mv",
"paramLongName": "mdStoreVersion",
"paramDescription": "the Metadata Version Bean",
"paramRequired": false
},
{
"paramName": "n",
"paramLongName": "namenode",
"paramDescription": "the Name Node URI",
"paramRequired": false
},
{
"paramName": "rm",
"paramLongName": "readMDStoreId",
"paramDescription": "the ID Locked to Read",
"paramRequired": false
}
]

View File

@ -15,4 +15,8 @@
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -1,58 +1,95 @@
<workflow-app name="CollectionWorkflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>mdStorePath</name>
<description>the path of the native mdstore</description>
</property>
<property>
<name>apiDescription</name>
<description>A json encoding of the API Description class</description>
</property>
<property>
<name>dataSourceInfo</name>
<description>A json encoding of the Datasource Info</description>
</property>
<property>
<name>identifierPath</name>
<description>An xpath to retrieve the metadata idnentifier for the generation of DNet Identifier </description>
<description>An xpath to retrieve the metadata identifier for the generation of DNet Identifier </description>
</property>
<property>
<name>metadataEncoding</name>
<description> The type of the metadata XML/JSON</description>
</property>
<property>
<name>timestamp</name>
<description>The timestamp of the collection date</description>
</property>
<property>
<name>workflowId</name>
<description>The identifier of the workflow</description>
</property>
<property>
<name>mdStoreID</name>
<description>The identifier of the mdStore</description>
</property>
<property>
<name>mdStoreManagerURI</name>
<description>The URI of the MDStore Manager</description>
</property>
<property>
<name>collectionMode</name>
<description>Should be REFRESH or INCREMENTAL</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
</global>
<start to="CollectionWorker"/>
<start to="collection_mode"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="collection_mode">
<switch>
<case to="StartTransaction">${wf:conf('collectionMode') eq 'REFRESH'}</case>
<case to="BeginRead">${wf:conf('collectionMode') eq 'INCREMENTAL'}</case>
<default to="StartTransaction"/>
</switch>
</decision>
<action name="BeginRead">
<java>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>READ_LOCK</arg>
<arg>--mdStoreID</arg><arg>${mdStoreID}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<capture-output/>
</java>
<ok to="StartTransaction"/>
<error to="Kill"/>
</action>
<action name="StartTransaction">
<java>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>NEW_VERSION</arg>
<arg>--mdStoreID</arg><arg>${mdStoreID}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<capture-output/>
</java>
<ok to="CollectionWorker"/>
<error to="Kill"/>
</action>
<action name="CollectionWorker">
<java>
<main-class>eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/sequenceFile_${mdstoreVersion}</arg>
<arg>--apidescriptor</arg><arg>${apiDescription}</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
</java>
<ok to="GenerateNativeStoreSparkJob"/>
<error to="Kill"/>
<error to="FailCollection"/>
</action>
<action name="GenerateNativeStoreSparkJob">
@ -75,13 +112,76 @@
<arg>--dateOfCollection</arg><arg>${timestamp}</arg>
<arg>--provenance</arg><arg>${dataSourceInfo}</arg>
<arg>--xpath</arg><arg>${identifierPath}</arg>
<arg>--input</arg><arg>${workingDir}/sequenceFile</arg>
<arg>--output</arg><arg>${mdStorePath}</arg>
<arg>-w</arg><arg>${workflowId}</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--readMdStoreVersion</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg>
</spark>
<ok to="collection_mode_end"/>
<error to="FailCollection"/>
</action>
<decision name="collection_mode_end">
<switch>
<case to="CommitVersion">${wf:conf('collectionMode') eq 'REFRESH'}</case>
<case to="EndRead">${wf:conf('collectionMode') eq 'INCREMENTAL'}</case>
<default to="CommitVersion"/>
</switch>
</decision>
<action name="EndRead">
<java>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>READ_UNLOCK</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<arg>--readMDStoreId</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg>
<capture-output/>
</java>
<ok to="CommitVersion"/>
<error to="Kill"/>
</action>
<action name="CommitVersion">
<java>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>COMMIT</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<decision name="FailCollection">
<switch>
<case to="RollBack">${wf:conf('collectionMode') eq 'REFRESH'}</case>
<case to="EndReadRollBack">${wf:conf('collectionMode') eq 'INCREMENTAL'}</case>
<default to="RollBack"/>
</switch>
</decision>
<action name="EndReadRollBack">
<java>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>READ_UNLOCK</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<arg>--readMDStoreId</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg>
<capture-output/>
</java>
<ok to="RollBack"/>
<error to="Kill"/>
</action>
<action name="RollBack">
<java>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>ROLLBACK</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
</java>
<ok to="Kill"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -15,4 +15,5 @@
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -1,45 +1,85 @@
<workflow-app name="Transformation_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>mdstoreInputPath</name>
<description>the path of the native MDStore</description>
<name>mdStoreInputId</name>
<description>the identifier of the native MDStore</description>
</property>
<property>
<name>mdstoreOutputPath</name>
<name>mdStoreOutputId</name>
<description>the identifier of the cleaned MDStore</description>
</property>
<property>
<name>mdStoreManagerURI</name>
<description>the path of the cleaned mdstore</description>
</property>
<property>
<name>transformationRuleTitle</name>
<name>transformationRuleId</name>
<description>The transformation Rule to apply</description>
</property>
<property>
<name>transformationPlugin</name>
<value>XSLT_TRANSFORM</value>
<description>The transformation Plugin</description>
</property>
<property>
<name>dateOfTransformation</name>
<description>The timestamp of the transformation date</description>
</property>
<property>
<name>isLookupUrl</name>
<description>The IS lookUp service endopoint</description>
</property>
</parameters>
<start to="TransformJob"/>
<start to="BeginRead"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="BeginRead">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>READ_LOCK</arg>
<arg>--mdStoreID</arg><arg>${mdStoreInputId}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<capture-output/>
</java>
<ok to="StartTransaction"/>
<error to="Kill"/>
</action>
<action name="StartTransaction">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>NEW_VERSION</arg>
<arg>--mdStoreID</arg><arg>${mdStoreOutputId}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<capture-output/>
</java>
<ok to="TransformJob"/>
<error to="EndReadRollBack"/>
</action>
<action name="TransformJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Transform MetadataStore</name>
<class>eu.dnetlib.dhp.transformation.TransformSparkJobNode</class>
<jar>dhp-aggregations-${projectVersion}.jar</jar>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
@ -49,19 +89,89 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--mdstoreInputPath</arg><arg>${mdstoreInputPath}</arg>
<arg>--mdstoreOutputPath</arg><arg>${mdstoreOutputPath}</arg>
<arg>--mdstoreOutputVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--mdstoreInputVersion</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg>
<arg>--dateOfTransformation</arg><arg>${dateOfTransformation}</arg>
<arg>--transformationPlugin</arg><arg>${transformationPlugin}</arg>
<arg>--transformationRuleTitle</arg><arg>${transformationRuleTitle}</arg>
<arg>--transformationRuleId</arg><arg>${transformationRuleId}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="EndRead"/>
<error to="EndReadRollBack"/>
</action>
<action name="EndRead">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>READ_UNLOCK</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<arg>--readMDStoreId</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg>
<capture-output/>
</java>
<ok to="CommitVersion"/>
<error to="Kill"/>
</action>
<action name="CommitVersion">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>COMMIT</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="EndReadRollBack">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>READ_UNLOCK</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<arg>--readMDStoreId</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg>
<capture-output/>
</java>
<ok to="RollBack"/>
<error to="Kill"/>
</action>
<action name="RollBack">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>ROLLBACK</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
</java>
<ok to="Kill"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -13,19 +13,19 @@
},
{
"paramName": "i",
"paramLongName": "mdstoreInputPath",
"paramDescription": "the path of the sequencial file to read",
"paramLongName": "mdstoreInputVersion",
"paramDescription": "the mdStore Version bean of the Input",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "mdstoreOutputPath",
"paramDescription": "the path of the result DataFrame on HDFS",
"paramLongName": "mdstoreOutputVersion",
"paramDescription": "the mdStore Version bean of the Output",
"paramRequired": true
},
{
"paramName": "tr",
"paramLongName": "transformationRuleTitle",
"paramLongName": "transformationRuleId",
"paramDescription": "the transformation Rule to apply to the input MDStore",
"paramRequired": true
},

View File

@ -0,0 +1,199 @@
package eu.dnetlib.dhp.aggregation;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.TransformSparkJobNode;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class AggregationJobTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static Encoder<MetadataRecord> encoder;
private static final String encoding = "XML";
private static final String dateOfCollection = System.currentTimeMillis() + "";
private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']";
private static String provenance;
private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
provenance = IOUtils
.toString(AggregationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json"));
workingDir = Files.createTempDirectory(AggregationJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(AggregationJobTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
encoder = Encoders.bean(MetadataRecord.class);
spark = SparkSession
.builder()
.appName(AggregationJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
@Order(1)
public void testGenerateNativeStoreSparkJobRefresh() throws Exception {
MDStoreVersion mdStoreV1 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json");
FileUtils.forceMkdir(new File(mdStoreV1.getHdfsPath()));
IOUtils
.copy(
getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"),
new FileOutputStream(mdStoreV1.getHdfsPath() + "/sequence_file"));
GenerateNativeStoreSparkJob
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-encoding", encoding,
"-dateOfCollection", dateOfCollection,
"-provenance", provenance,
"-xpath", xpath,
"-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1),
"-readMdStoreVersion", "",
"-workflowId", "abc"
});
verify(mdStoreV1);
}
@Test
@Order(2)
public void testGenerateNativeStoreSparkJobIncremental() throws Exception {
MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json");
FileUtils.forceMkdir(new File(mdStoreV2.getHdfsPath()));
IOUtils
.copy(
getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"),
new FileOutputStream(mdStoreV2.getHdfsPath() + "/sequence_file"));
MDStoreVersion mdStoreV1 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json");
GenerateNativeStoreSparkJob
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-encoding", encoding,
"-dateOfCollection", dateOfCollection,
"-provenance", provenance,
"-xpath", xpath,
"-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2),
"-readMdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1),
"-workflowId", "abc"
});
verify(mdStoreV2);
}
@Test
@Order(3)
public void testTransformSparkJob() throws Exception {
MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json");
MDStoreVersion mdStoreCleanedVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json");
TransformSparkJobNode.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-dateOfTransformation", dateOfCollection,
"-mdstoreInputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2),
"-mdstoreOutputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreCleanedVersion),
"-transformationPlugin", "XSLT_TRANSFORM",
"-isLookupUrl", "https://dev-openaire.d4science.org/is/services/isLookUp",
"-transformationRuleId",
"183dde52-a69b-4db9-a07e-1ef2be105294_VHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZXMvVHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZVR5cGU="
});
}
protected void verify(MDStoreVersion mdStoreVersion) throws IOException {
Assertions.assertTrue(new File(mdStoreVersion.getHdfsPath()).exists());
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
long seqFileSize = sc
.sequenceFile(mdStoreVersion.getHdfsPath() + "/sequence_file", IntWritable.class, Text.class)
.count();
final Dataset<MetadataRecord> mdstore = spark.read().load(mdStoreVersion.getHdfsPath() + "/store").as(encoder);
long mdStoreSize = mdstore.count();
long declaredSize = Long.parseLong(IOUtils.toString(new FileReader(mdStoreVersion.getHdfsPath() + "/size")));
Assertions.assertEquals(seqFileSize, declaredSize, "the size must be equal");
Assertions.assertEquals(seqFileSize, mdStoreSize, "the size must be equal");
long uniqueIds = mdstore
.map((MapFunction<MetadataRecord, String>) MetadataRecord::getId, Encoders.STRING())
.distinct()
.count();
Assertions.assertEquals(seqFileSize, uniqueIds, "the size must be equal");
}
private MDStoreVersion prepareVersion(String filename) throws IOException {
MDStoreVersion mdstore = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResource(filename)), MDStoreVersion.class);
mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString()));
return mdstore;
}
}

View File

@ -16,6 +16,8 @@ import org.junit.jupiter.api.io.TempDir;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreCurrentVersion;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import eu.dnetlib.dhp.schema.common.ModelSupport;
@ -37,6 +39,17 @@ public class CollectionJobTest {
spark.stop();
}
@Test
public void testJSONSerialization() throws Exception {
final String s = IOUtils.toString(getClass().getResourceAsStream("input.json"));
System.out.println("s = " + s);
final ObjectMapper mapper = new ObjectMapper();
MDStoreVersion mi = mapper.readValue(s, MDStoreVersion.class);
assertNotNull(mi);
}
@Test
public void tesCollection(@TempDir Path testDir) throws Exception {
final Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix");

View File

@ -2,25 +2,18 @@
package eu.dnetlib.dhp.collector.worker;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.*;
import java.io.File;
import java.nio.file.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.CollectorWorker;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
@Disabled
public class DnetCollectorWorkerApplicationTests {
@ -47,7 +40,7 @@ public class DnetCollectorWorkerApplicationTests {
public void testFeeding(@TempDir Path testDir) throws Exception {
System.out.println(testDir.toString());
CollectorWorker worker = new CollectorWorker(new CollectorPluginFactory(), getApi(),
CollectorWorker worker = new CollectorWorker(getApi(),
"file://" + testDir.toString() + "/file.seq", testDir.toString() + "/file.seq");
worker.collect();

View File

@ -38,6 +38,7 @@ import eu.dnetlib.dhp.collection.CollectionJobTest;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;

View File

@ -0,0 +1,9 @@
{
"id":"md-cleaned",
"mdstore":"md-cleaned",
"writing":false,
"readCount":1,
"lastUpdate":1612187563099,
"size":71,
"hdfsPath":"%s/mdstore/md-cleaned"
}

View File

@ -0,0 +1,9 @@
{
"id":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42-1612187678801",
"mdstore":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42",
"writing":true,
"readCount":0,
"lastUpdate":null,
"size":0,
"hdfsPath":"%s/mdstore/md-84e86d00-5771-4ed9-b17f-177ef4b46e42/v1"
}

View File

@ -0,0 +1,9 @@
{
"id":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42-1612187459108",
"mdstore":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42",
"writing":false,
"readCount":1,
"lastUpdate":1612187563099,
"size":71,
"hdfsPath":"%s/mdstore/md-84e86d00-5771-4ed9-b17f-177ef4b46e42/v2"
}

View File

@ -0,0 +1,5 @@
{
"datasourceId":"74912366-d6df-49c1-a1fd-8a52fa98ce5f_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU\u003d",
"datasourceName":"PSNC Institutional Repository",
"nsPrefix":"psnc______pl"
}

View File

@ -9,7 +9,9 @@
<oai:record>
<xsl:copy-of select="//oai:header"/>
<metadata>
<xsl:for-each select="//oai:set">
<xsl:copy-of select="//oai:metadata/*"/>
<xsl:for-each select="//oai:setSpec">
<dr:CobjCategory><xsl:value-of select="vocabulary:clean(.,'dnet:publication_resource')"/></dr:CobjCategory>
</xsl:for-each>
</metadata>

View File

@ -1,68 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<oai:record xmlns="http://namespace.openaire.eu/"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:dr="http://www.driver-repository.eu/namespace/dr"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:oaf="http://namespace.openaire.eu/oaf"
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<oai:header>
<dri:objIdentifier>od______2294::00029b7f0a2a7e090e55b625a9079d83</dri:objIdentifier>
<dri:recordIdentifier>oai:pub.uni-bielefeld.de:2578942</dri:recordIdentifier>
<dri:dateOfCollection>2018-11-23T15:15:33.974+01:00</dri:dateOfCollection>
<oaf:datasourceprefix>od______2294</oaf:datasourceprefix>
<identifier xmlns="http://www.openarchives.org/OAI/2.0/">oai:pub.uni-bielefeld.de:2578942</identifier>
<datestamp xmlns="http://www.openarchives.org/OAI/2.0/">2018-07-24T13:01:16Z</datestamp>
<setSpec xmlns="http://www.openarchives.org/OAI/2.0/">conference</setSpec>
<setSpec xmlns="http://www.openarchives.org/OAI/2.0/">ddc:000</setSpec>
<setSpec xmlns="http://www.openarchives.org/OAI/2.0/">conferenceFtxt</setSpec>
<setSpec xmlns="http://www.openarchives.org/OAI/2.0/">driver</setSpec>
<setSpec xmlns="http://www.openarchives.org/OAI/2.0/">open_access</setSpec>
</oai:header>
<metadata xmlns="http://www.openarchives.org/OAI/2.0/">
<oai_dc:dc xmlns="http://www.openarchives.org/OAI/2.0/oai_dc/"
xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd">
<dc:title>Mobile recommendation agents making online use of visual attention information at the point of sale</dc:title>
<dc:creator>Pfeiffer, Thies</dc:creator>
<dc:creator>Pfeiffer, Jella</dc:creator>
<dc:creator>Meißner, Martin</dc:creator>
<dc:creator>Davis, Fred</dc:creator>
<dc:creator>Riedl, René</dc:creator>
<dc:creator>Jan, vom Brocke</dc:creator>
<dc:creator>Léger, Pierre-Majorique</dc:creator>
<dc:creator>Randolph, Adriane</dc:creator>
<dc:subject>Mobile Cognitive Assistance Systems
Information Systems</dc:subject>
<dc:subject>ddc:000</dc:subject>
<dc:description>We aim to utilize online information about visual attention for developing mobile recommendation agents (RAs) for use at the point of sale. Up to now, most RAs are focussed exclusively at personalization in an e-commerce setting. Very little is known, however, about mobile RAs that offer information and assistance at the point of sale based on individual-level feature based preference models (Murray and Häubl 2009). Current attempts provide information about products at the point of sale by manually scanning barcodes or using RFID (Kowatsch et al. 2011, Heijden 2005), e.g. using specific apps for smartphones. We argue that an online access to the current visual attention of the user offers a much larger potential. Integrating mobile eye tracking into ordinary glasses would yield a direct benefit of applying neuroscience methods in the users everyday life. First, learning from consumers attentional processes over time and adapting recommendations based on this learning allows us to provide very accurate and relevant recommendations, potentially increasing the perceived usefulness. Second, our proposed system needs little explicit user input (no scanning or navigation on screen) making it easy to use. Thus, instead of learning from click behaviour and past customer ratings, as it is the case in the e-commerce setting, the mobile RA learns from eye movements by participating online in every day decision processes. We argue that mobile RAs should be built based on current research in human judgment and decision making (Murray et al. 2010). In our project, we therefore follow a two-step approach: In the empirical basic research stream, we aim to understand the users interaction with the product shelf: the actions and patterns of users behaviour (eye movements, gestures, approaching a product closer) and their correspondence to the users informational needs. In the empirical system development stream, we create prototypes of mobile RAs and test experimentally the factors that influence the users adoption. For example, we suggest that a users involvement in the process, such as a need for exact nutritional information or for assistance (e.g., reading support for elderly) will influence the users intention to use such as system. The experiments are conducted both in our immersive virtual reality supermarket presented in a CAVE, where we can also easily display information to the user and track the eye movement in great accuracy, as well as in real-world supermarkets (see Figure 1), so that the findings can be better generalized to natural decision situations (Gidlöf et al. 2013). In a first pilot study with five randomly chosen participants in a supermarket, we evaluated which sort of mobile RAs consumers favour in order to get a first impression of the users acceptance of the technology. Figure 1 shows an excerpt of one consumers eye movements during a decision process. First results show long eye cascades and short fixations on many products in situations where users are uncertain and in need for support. Furthermore, we find a surprising acceptance of the technology itself throughout all ages (23 61 years). At the same time, consumers express serious fear of being manipulated by such a technology. For that reason, they strongly prefer the information to be provided by trusted third party or shared with family members and friends (see also Murray and Häubl 2009). Our pilot will be followed by a larger field experiment in March in order to learn more about factors that influence the users acceptance as well as the eye movement patterns that reflect typical phases of decision processes and indicate the need for support by a RA.</dc:description>
<dc:date>2013</dc:date>
<dc:type>info:eu-repo/semantics/conferenceObject</dc:type>
<dc:type>doc-type:conferenceObject</dc:type>
<dc:type>text</dc:type>
<dc:identifier>https://pub.uni-bielefeld.de/record/2578942</dc:identifier>
<dc:identifier>https://pub.uni-bielefeld.de/download/2578942/2602478</dc:identifier>
<dc:source>Pfeiffer T, Pfeiffer J, Meißner M. Mobile recommendation agents making online use of visual attention information at the point of sale. In: Davis F, Riedl R, Jan vom B, Léger P-M, Randolph A, eds. <em>Proceedings of the Gmunden Retreat on NeuroIS 2013</em>. 2013: 3-3.</dc:source>
<dc:language>eng</dc:language>
<dc:rights>info:eu-repo/semantics/openAccess</dc:rights>
<record xmlns="http://www.openarchives.org/OAI/2.0/">
<header>
<identifier>oai:lib.psnc.pl:278</identifier>
<datestamp>2011-08-25T15:17:13Z</datestamp>
<setSpec>PSNCRepository:PSNCExternalRepository:exhibitions</setSpec>
<setSpec>PSNCRepository:PSNCExternalRepository:Departments</setSpec>
<setSpec>PSNCRepository:PSNCExternalRepository:Departments:NetworkServices</setSpec>
<setSpec>PSNCRepository:PSNCExternalRepository</setSpec>
<setSpec>PSNCRepository:PSNCExternalRepository:publications</setSpec>
<setSpec>PSNCRepository</setSpec>
</header>
<metadata>
<oai_dc:dc xmlns:oai_dc="http://www.openarchives.org/OAI/2.0/oai_dc/"
xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd">
<dc:title xml:lang="pl">
<![CDATA[Distributed Search Mechanisms in dLibra Digital Library Framework]]></dc:title>
<dc:creator xml:lang="pl"><![CDATA[Mazurek, Cezary]]></dc:creator>
<dc:creator xml:lang="pl"><![CDATA[Werla, Marcin]]></dc:creator>
<dc:date xml:lang="pl"><![CDATA[2005.10]]></dc:date>
<dc:type xml:lang="pl"><![CDATA[plakat]]></dc:type>
<dc:format xml:lang="pl"><![CDATA[image/jpeg]]></dc:format>
<dc:identifier><![CDATA[https://lib.psnc.pl/dlibra/docmetadata?showContent=true&id=278]]></dc:identifier>
<dc:identifier><![CDATA[oai:lib.psnc.pl:278]]></dc:identifier>
<dc:source xml:lang="pl">
<![CDATA[CERN Workshop on Innovations in Scholarly Communication (OAI4)]]></dc:source>
<dc:language xml:lang="pl"><![CDATA[eng]]></dc:language>
<dc:relation><![CDATA[oai:lib.psnc.pl:publication:339]]></dc:relation>
</oai_dc:dc>
</metadata>
<about xmlns="">
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
<originDescription altered="true" harvestDate="2018-11-23T15:15:33.974+01:00">
<baseURL>http://pub.uni-bielefeld.de/oai</baseURL>
<identifier>oai:pub.uni-bielefeld.de:2578942</identifier>
<datestamp>2018-07-24T13:01:16Z</datestamp>
<metadataNamespace>http://www.openarchives.org/OAI/2.0/oai_dc/</metadataNamespace>
</originDescription>
</provenance>
<oaf:datainfo>
<oaf:inferred>false</oaf:inferred>
<oaf:deletedbyinference>false</oaf:deletedbyinference>
<oaf:trust>0.9</oaf:trust>
<oaf:inferenceprovenance/>
<oaf:provenanceaction classid="sysimport:crosswalk:repository"
classname="sysimport:crosswalk:repository"
schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
</oaf:datainfo>
</about>
</oai:record>
</record>

View File

@ -374,11 +374,6 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>