[graph cleaning] update collectedfrom & hostedby references as consequence of the datasource deduplication #260
|
@ -0,0 +1,81 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.common.action;
|
||||||
|
|
||||||
|
import java.io.BufferedWriter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.common.DbClient;
|
||||||
|
import eu.dnetlib.dhp.common.action.model.MasterDuplicate;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
|
|
||||||
|
public class ReadDatasourceMasterDuplicateFromDB {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(ReadDatasourceMasterDuplicateFromDB.class);
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static final String QUERY = "SELECT distinct dd.id as masterId, d.officialname as masterName, dd.duplicate as duplicateId "
|
||||||
|
+
|
||||||
|
"FROM dsm_dedup_services dd join dsm_services d on (dd.id = d.id);";
|
||||||
|
|
||||||
|
public static int execute(String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode)
|
||||||
|
throws IOException {
|
||||||
|
int count = 0;
|
||||||
|
try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set("fs.defaultFS", hdfsNameNode);
|
||||||
|
FileSystem fileSystem = FileSystem.get(conf);
|
||||||
|
FSDataOutputStream fos = fileSystem.create(new Path(hdfsPath));
|
||||||
|
|
||||||
|
log.info("running query: {}", QUERY);
|
||||||
|
log.info("storing results in: {}", hdfsPath);
|
||||||
|
|
||||||
|
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) {
|
||||||
|
dbClient.processResults(QUERY, rs -> writeMap(datasourceMasterMap(rs), writer));
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MasterDuplicate datasourceMasterMap(ResultSet rs) {
|
||||||
|
try {
|
||||||
|
final MasterDuplicate md = new MasterDuplicate();
|
||||||
|
|
||||||
|
final String duplicateId = rs.getString("duplicateId");
|
||||||
|
final String masterId = rs.getString("masterId");
|
||||||
|
final String masterName = rs.getString("masterName");
|
||||||
|
|
||||||
|
md.setDuplicateId(OafMapperUtils.createOpenaireId(10, duplicateId, true));
|
||||||
|
md.setMasterId(OafMapperUtils.createOpenaireId(10, masterId, true));
|
||||||
|
md.setMasterName(masterName);
|
||||||
|
|
||||||
|
return md;
|
||||||
|
} catch (final SQLException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void writeMap(final MasterDuplicate dm, final BufferedWriter writer) {
|
||||||
|
try {
|
||||||
|
writer.write(OBJECT_MAPPER.writeValueAsString(dm));
|
||||||
|
writer.newLine();
|
||||||
|
} catch (final IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.common.action.model;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 21/07/22
|
||||||
|
*/
|
||||||
|
public class MasterDuplicate implements Serializable {
|
||||||
|
private String duplicateId;
|
||||||
|
private String masterId;
|
||||||
|
private String masterName;
|
||||||
|
|
||||||
|
public String getDuplicateId() {
|
||||||
|
return duplicateId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDuplicateId(String duplicateId) {
|
||||||
|
this.duplicateId = duplicateId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMasterId() {
|
||||||
|
return masterId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMasterId(String masterId) {
|
||||||
|
this.masterId = masterId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMasterName() {
|
||||||
|
return masterName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMasterName(String masterName) {
|
||||||
|
this.masterName = masterName;
|
||||||
|
}
|
||||||
|
}
|
|
@ -47,8 +47,8 @@ public class CleanContextSparkJob implements Serializable {
|
||||||
String inputPath = parser.get("inputPath");
|
String inputPath = parser.get("inputPath");
|
||||||
log.info("inputPath: {}", inputPath);
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
String workingPath = parser.get("workingPath");
|
String workingDir = parser.get("workingDir");
|
||||||
log.info("workingPath: {}", workingPath);
|
log.info("workingDir: {}", workingDir);
|
||||||
|
|
||||||
String contextId = parser.get("contextId");
|
String contextId = parser.get("contextId");
|
||||||
log.info("contextId: {}", contextId);
|
log.info("contextId: {}", contextId);
|
||||||
|
@ -67,12 +67,12 @@ public class CleanContextSparkJob implements Serializable {
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
|
|
||||||
cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingPath);
|
cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingDir);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T extends Result> void cleanContext(SparkSession spark, String contextId, String verifyParam,
|
private static <T extends Result> void cleanContext(SparkSession spark, String contextId, String verifyParam,
|
||||||
String inputPath, Class<T> entityClazz, String workingPath) {
|
String inputPath, Class<T> entityClazz, String workingDir) {
|
||||||
Dataset<T> res = spark
|
Dataset<T> res = spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(inputPath)
|
.textFile(inputPath)
|
||||||
|
@ -106,11 +106,11 @@ public class CleanContextSparkJob implements Serializable {
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(workingPath);
|
.json(workingDir);
|
||||||
|
|
||||||
spark
|
spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(workingPath)
|
.textFile(workingDir)
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
||||||
Encoders.bean(entityClazz))
|
Encoders.bean(entityClazz))
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.graph.clean;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.common.action.ReadDatasourceMasterDuplicateFromDB;
|
||||||
|
|
||||||
|
public class MasterDuplicateAction {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MasterDuplicateAction.class);
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws Exception {
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
MasterDuplicateAction.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/oa/graph/datasourcemaster_parameters.json")));
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final String dbUrl = parser.get("postgresUrl");
|
||||||
|
log.info("postgresUrl: {}", dbUrl);
|
||||||
|
|
||||||
|
final String dbUser = parser.get("postgresUser");
|
||||||
|
log.info("postgresUser: {}", dbUser);
|
||||||
|
|
||||||
|
final String dbPassword = parser.get("postgresPassword");
|
||||||
|
log.info("postgresPassword: {}", dbPassword);
|
||||||
|
|
||||||
|
final String hdfsPath = parser.get("hdfsPath");
|
||||||
|
log.info("hdfsPath: {}", hdfsPath);
|
||||||
|
|
||||||
|
final String hdfsNameNode = parser.get("hdfsNameNode");
|
||||||
|
log.info("hdfsNameNode: {}", hdfsNameNode);
|
||||||
|
|
||||||
|
int rows = ReadDatasourceMasterDuplicateFromDB.execute(dbUrl, dbUser, dbPassword, hdfsPath, hdfsNameNode);
|
||||||
|
|
||||||
|
log.info("written {} rows", rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,197 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.graph.clean.cfhb;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
import org.apache.spark.sql.expressions.Aggregator;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
import eu.dnetlib.dhp.common.action.model.MasterDuplicate;
|
||||||
|
import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Instance;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class CleanCfHbSparkJob {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJob.class);
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
CleanCountrySparkJob.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json"));
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
String inputPath = parser.get("inputPath");
|
||||||
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
|
String resolvedPath = parser.get("resolvedPath");
|
||||||
|
log.info("resolvedPath: {}", resolvedPath);
|
||||||
|
|
||||||
|
String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
String dsMasterDuplicatePath = parser.get("datasourceMasterDuplicate");
|
||||||
|
log.info("datasourceMasterDuplicate: {}", dsMasterDuplicatePath);
|
||||||
|
|
||||||
|
String graphTableClassName = parser.get("graphTableClassName");
|
||||||
|
log.info("graphTableClassName: {}", graphTableClassName);
|
||||||
|
|
||||||
|
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
|
||||||
|
HdfsSupport.remove(resolvedPath, spark.sparkContext().hadoopConfiguration());
|
||||||
|
cleanCfHb(
|
||||||
|
spark, inputPath, entityClazz, resolvedPath, dsMasterDuplicatePath, outputPath);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T extends Result> void cleanCfHb(SparkSession spark, String inputPath, Class<T> entityClazz,
|
||||||
|
String resolvedPath, String masterDuplicatePath, String outputPath) {
|
||||||
|
|
||||||
|
// read the master-duplicate tuples
|
||||||
|
Dataset<MasterDuplicate> md = spark
|
||||||
|
.read()
|
||||||
|
.textFile(masterDuplicatePath)
|
||||||
|
.map(as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class));
|
||||||
|
|
||||||
|
// prepare the resolved CF|HB references with the corresponding EMPTY master ID
|
||||||
|
Dataset<IdCfHbMapping> resolved = spark
|
||||||
|
.read()
|
||||||
|
.textFile(inputPath)
|
||||||
|
.map(as(entityClazz), Encoders.bean(entityClazz))
|
||||||
|
.flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class));
|
||||||
|
|
||||||
|
// set the EMPTY master ID/NAME and save it
|
||||||
|
resolved
|
||||||
|
.joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId")))
|
||||||
|
.map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(resolvedPath);
|
||||||
|
|
||||||
|
// read again the resolved CF|HB mapping
|
||||||
|
Dataset<IdCfHbMapping> resolvedDS = spark
|
||||||
|
.read()
|
||||||
|
.textFile(resolvedPath)
|
||||||
|
.map(as(IdCfHbMapping.class), Encoders.bean(IdCfHbMapping.class));
|
||||||
|
|
||||||
|
// read the result table
|
||||||
|
Dataset<T> res = spark
|
||||||
|
.read()
|
||||||
|
.textFile(inputPath)
|
||||||
|
.map(as(entityClazz), Encoders.bean(entityClazz));
|
||||||
|
|
||||||
|
// Join the results with the resolved CF|HB mapping, apply the mapping and save it
|
||||||
|
res
|
||||||
|
.joinWith(resolvedDS, res.col("id").equalTo(resolvedDS.col("resultId")), "left")
|
||||||
|
.groupByKey((MapFunction<Tuple2<T, IdCfHbMapping>, String>) t -> t._1().getId(), Encoders.STRING())
|
||||||
|
.mapGroups(getMapGroupsFunction(), Encoders.bean(entityClazz))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MapFunction<Tuple2<IdCfHbMapping, MasterDuplicate>, IdCfHbMapping> asIdCfHbMapping() {
|
||||||
|
return t -> {
|
||||||
|
t._1().setMasterId(t._2().getMasterId());
|
||||||
|
t._1().setMasterName(t._2().getMasterName());
|
||||||
|
return t._1();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T extends Result> FlatMapFunction<T, IdCfHbMapping> flattenCfHbFn() {
|
||||||
|
return r -> Stream
|
||||||
|
.concat(
|
||||||
|
r.getCollectedfrom().stream().map(KeyValue::getKey),
|
||||||
|
Stream
|
||||||
|
.concat(
|
||||||
|
r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey),
|
||||||
|
r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey)))
|
||||||
|
.distinct()
|
||||||
|
.map(s -> asIdCfHbMapping(r.getId(), s))
|
||||||
|
.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T extends Result> MapGroupsFunction<String, Tuple2<T, IdCfHbMapping>, T> getMapGroupsFunction() {
|
||||||
|
return new MapGroupsFunction<String, Tuple2<T, IdCfHbMapping>, T>() {
|
||||||
|
@Override
|
||||||
|
public T call(String key, Iterator<Tuple2<T, IdCfHbMapping>> values) {
|
||||||
|
final Tuple2<T, IdCfHbMapping> first = values.next();
|
||||||
|
final T res = first._1();
|
||||||
|
|
||||||
|
updateResult(res, first._2());
|
||||||
|
values.forEachRemaining(t -> updateResult(res, t._2()));
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateResult(T res, IdCfHbMapping m) {
|
||||||
|
if (Objects.nonNull(m)) {
|
||||||
|
res.getCollectedfrom().forEach(kv -> updateKeyValue(kv, m));
|
||||||
|
res.getInstance().forEach(i -> {
|
||||||
|
updateKeyValue(i.getHostedby(), m);
|
||||||
|
updateKeyValue(i.getCollectedfrom(), m);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateKeyValue(final KeyValue kv, final IdCfHbMapping a) {
|
||||||
|
if (kv.getKey().equals(a.getCfhb())) {
|
||||||
|
kv.setKey(a.getMasterId());
|
||||||
|
kv.setValue(a.getMasterName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static IdCfHbMapping asIdCfHbMapping(String resultId, String cfHb) {
|
||||||
|
IdCfHbMapping m = new IdCfHbMapping(resultId);
|
||||||
|
m.setCfhb(cfHb);
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <R> MapFunction<String, R> as(Class<R> clazz) {
|
||||||
|
return s -> OBJECT_MAPPER.readValue(s, clazz);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,54 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.graph.clean.cfhb;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class IdCfHbMapping implements Serializable {
|
||||||
|
|
||||||
|
private String resultId;
|
||||||
|
|
||||||
|
private String cfhb;
|
||||||
|
|
||||||
|
private String masterId;
|
||||||
|
|
||||||
|
private String masterName;
|
||||||
|
|
||||||
|
public IdCfHbMapping() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public IdCfHbMapping(String id) {
|
||||||
|
this.resultId = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getResultId() {
|
||||||
|
return resultId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setResultId(String resultId) {
|
||||||
|
this.resultId = resultId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCfhb() {
|
||||||
|
return cfhb;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCfhb(String cfhb) {
|
||||||
|
this.cfhb = cfhb;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMasterId() {
|
||||||
|
return masterId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMasterId(String masterId) {
|
||||||
|
this.masterId = masterId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMasterName() {
|
||||||
|
return masterName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMasterName(String masterName) {
|
||||||
|
this.masterName = masterName;
|
||||||
|
}
|
||||||
|
}
|
|
@ -58,8 +58,8 @@ public class CleanCountrySparkJob implements Serializable {
|
||||||
String inputPath = parser.get("inputPath");
|
String inputPath = parser.get("inputPath");
|
||||||
log.info("inputPath: {}", inputPath);
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
String workingPath = parser.get("workingPath");
|
String workingDir = parser.get("workingDir");
|
||||||
log.info("workingPath: {}", workingPath);
|
log.info("workingDir: {}", workingDir);
|
||||||
|
|
||||||
String datasourcePath = parser.get("hostedBy");
|
String datasourcePath = parser.get("hostedBy");
|
||||||
log.info("datasourcePath: {}", datasourcePath);
|
log.info("datasourcePath: {}", datasourcePath);
|
||||||
|
@ -85,12 +85,12 @@ public class CleanCountrySparkJob implements Serializable {
|
||||||
spark -> {
|
spark -> {
|
||||||
|
|
||||||
cleanCountry(
|
cleanCountry(
|
||||||
spark, country, verifyParam, inputPath, entityClazz, workingPath, collectedfrom, datasourcePath);
|
spark, country, verifyParam, inputPath, entityClazz, workingDir, collectedfrom, datasourcePath);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T extends Result> void cleanCountry(SparkSession spark, String country, String[] verifyParam,
|
private static <T extends Result> void cleanCountry(SparkSession spark, String country, String[] verifyParam,
|
||||||
String inputPath, Class<T> entityClazz, String workingPath, String collectedfrom, String datasourcePath) {
|
String inputPath, Class<T> entityClazz, String workingDir, String collectedfrom, String datasourcePath) {
|
||||||
|
|
||||||
List<String> hostedBy = spark
|
List<String> hostedBy = spark
|
||||||
.read()
|
.read()
|
||||||
|
@ -134,11 +134,11 @@ public class CleanCountrySparkJob implements Serializable {
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(workingPath);
|
.json(workingDir);
|
||||||
|
|
||||||
spark
|
spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(workingPath)
|
.textFile(workingDir)
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
||||||
Encoders.bean(entityClazz))
|
Encoders.bean(entityClazz))
|
||||||
|
|
|
@ -54,8 +54,8 @@ public class GetDatasourceFromCountry implements Serializable {
|
||||||
String inputPath = parser.get("inputPath");
|
String inputPath = parser.get("inputPath");
|
||||||
log.info("inputPath: {}", inputPath);
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
String workingPath = parser.get("workingPath");
|
String workingPath = parser.get("workingDir");
|
||||||
log.info("workingPath: {}", workingPath);
|
log.info("workingDir: {}", workingPath);
|
||||||
|
|
||||||
String country = parser.get("country");
|
String country = parser.get("country");
|
||||||
log.info("country: {}", country);
|
log.info("country: {}", country);
|
||||||
|
@ -70,7 +70,7 @@ public class GetDatasourceFromCountry implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void getDatasourceFromCountry(SparkSession spark, String country, String inputPath,
|
private static void getDatasourceFromCountry(SparkSession spark, String country, String inputPath,
|
||||||
String workingPath) {
|
String workingDir) {
|
||||||
|
|
||||||
Dataset<Organization> organization = spark
|
Dataset<Organization> organization = spark
|
||||||
.read()
|
.read()
|
||||||
|
@ -100,7 +100,7 @@ public class GetDatasourceFromCountry implements Serializable {
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(workingPath);
|
.json(workingDir);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -317,13 +317,13 @@
|
||||||
</switch>
|
</switch>
|
||||||
</decision>
|
</decision>
|
||||||
|
|
||||||
|
|
||||||
<fork name="fork_clean_context">
|
<fork name="fork_clean_context">
|
||||||
<path start="clean_publication_context"/>
|
<path start="clean_publication_context"/>
|
||||||
<path start="clean_dataset_context"/>
|
<path start="clean_dataset_context"/>
|
||||||
<path start="clean_otherresearchproduct_context"/>
|
<path start="clean_otherresearchproduct_context"/>
|
||||||
<path start="clean_software_context"/>
|
<path start="clean_software_context"/>
|
||||||
</fork>
|
</fork>
|
||||||
|
|
||||||
<action name="clean_publication_context">
|
<action name="clean_publication_context">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
|
@ -343,7 +343,7 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
|
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working/publication</arg>
|
<arg>--workingDir</arg><arg>${workingDir}/working/publication</arg>
|
||||||
<arg>--contextId</arg><arg>${contextId}</arg>
|
<arg>--contextId</arg><arg>${contextId}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
@ -370,7 +370,7 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
|
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working/dataset</arg>
|
<arg>--workingDir</arg><arg>${workingDir}/working/dataset</arg>
|
||||||
<arg>--contextId</arg><arg>${contextId}</arg>
|
<arg>--contextId</arg><arg>${contextId}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
@ -397,7 +397,7 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
|
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working/otherresearchproduct</arg>
|
<arg>--workingDir</arg><arg>${workingDir}/working/otherresearchproduct</arg>
|
||||||
<arg>--contextId</arg><arg>${contextId}</arg>
|
<arg>--contextId</arg><arg>${contextId}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
@ -424,7 +424,7 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
|
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working/software</arg>
|
<arg>--workingDir</arg><arg>${workingDir}/working/software</arg>
|
||||||
<arg>--contextId</arg><arg>${contextId}</arg>
|
<arg>--contextId</arg><arg>${contextId}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
@ -434,7 +434,6 @@
|
||||||
|
|
||||||
<join name="wait_clean_context" to="select_datasourceId_from_country"/>
|
<join name="wait_clean_context" to="select_datasourceId_from_country"/>
|
||||||
|
|
||||||
|
|
||||||
<action name="select_datasourceId_from_country">
|
<action name="select_datasourceId_from_country">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
|
@ -453,20 +452,20 @@
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
--conf spark.sql.shuffle.partitions=7680
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${graphOutputPath}</arg>
|
<arg>--inputPath</arg><arg>${graphOutputPath}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working/hostedby</arg>
|
<arg>--workingDir</arg><arg>${workingDir}/working/hostedby</arg>
|
||||||
<arg>--country</arg><arg>${country}</arg>
|
<arg>--country</arg><arg>${country}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="fork_clean_country"/>
|
<ok to="fork_clean_country"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
|
||||||
<fork name="fork_clean_country">
|
<fork name="fork_clean_country">
|
||||||
<path start="clean_publication_country"/>
|
<path start="clean_publication_country"/>
|
||||||
<path start="clean_dataset_country"/>
|
<path start="clean_dataset_country"/>
|
||||||
<path start="clean_otherresearchproduct_country"/>
|
<path start="clean_otherresearchproduct_country"/>
|
||||||
<path start="clean_software_country"/>
|
<path start="clean_software_country"/>
|
||||||
</fork>
|
</fork>
|
||||||
|
|
||||||
<action name="clean_publication_country">
|
<action name="clean_publication_country">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
|
@ -486,7 +485,7 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
|
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working/publication</arg>
|
<arg>--workingDir</arg><arg>${workingDir}/working/publication</arg>
|
||||||
<arg>--country</arg><arg>${country}</arg>
|
<arg>--country</arg><arg>${country}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
||||||
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
|
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
|
||||||
|
@ -515,7 +514,7 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
|
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working/dataset</arg>
|
<arg>--workingDir</arg><arg>${workingDir}/working/dataset</arg>
|
||||||
<arg>--country</arg><arg>${country}</arg>
|
<arg>--country</arg><arg>${country}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
||||||
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
|
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
|
||||||
|
@ -544,7 +543,7 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
|
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working/otherresearchproduct</arg>
|
<arg>--workingDir</arg><arg>${workingDir}/working/otherresearchproduct</arg>
|
||||||
<arg>--country</arg><arg>${country}</arg>
|
<arg>--country</arg><arg>${country}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
||||||
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
|
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
|
||||||
|
@ -573,7 +572,7 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
|
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working/software</arg>
|
<arg>--workingDir</arg><arg>${workingDir}/working/software</arg>
|
||||||
<arg>--country</arg><arg>${country}</arg>
|
<arg>--country</arg><arg>${country}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
|
||||||
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
|
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
|
||||||
|
@ -583,7 +582,202 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<join name="wait_clean_country" to="End"/>
|
<join name="wait_clean_country" to="should_patch_datasource_ids"/>
|
||||||
|
|
||||||
|
<decision name="should_patch_datasource_ids">
|
||||||
|
<switch>
|
||||||
|
<case to="get_ds_master_duplicate">${wf:conf('shouldClean') eq true}</case>
|
||||||
|
<default to="End"/>
|
||||||
|
</switch>
|
||||||
|
</decision>
|
||||||
|
|
||||||
|
<action name="get_ds_master_duplicate">
|
||||||
|
<java>
|
||||||
|
<main-class>eu.dnetlib.dhp.oa.graph.clean.MasterDuplicateAction</main-class>
|
||||||
|
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
|
||||||
|
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
||||||
|
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||||
|
<arg>--hdfsPath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||||
|
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||||
|
</java>
|
||||||
|
<ok to="fork_patch_cfhb"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<fork name="fork_patch_cfhb">
|
||||||
|
<path start="patch_publication_cfhb"/>
|
||||||
|
<path start="patch_dataset_cfhb"/>
|
||||||
|
<path start="patch_otherresearchproduct_cfhb"/>
|
||||||
|
<path start="patch_software_cfhb"/>
|
||||||
|
</fork>
|
||||||
|
|
||||||
|
<action name="patch_publication_cfhb">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>patch publication cfhb</name>
|
||||||
|
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
|
||||||
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=7680
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
|
||||||
|
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/publication</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingPath}/cfHbPatched/publication</arg>
|
||||||
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||||
|
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="wait_clean_cfhb"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="patch_dataset_cfhb">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>patch dataset cfhb</name>
|
||||||
|
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
|
||||||
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=7680
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
|
||||||
|
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/dataset</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingPath}/cfHbPatched/dataset</arg>
|
||||||
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||||
|
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="wait_clean_cfhb"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="patch_otherresearchproduct_cfhb">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>patch otherresearchproduct cfhb</name>
|
||||||
|
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
|
||||||
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=7680
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
|
||||||
|
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/otherresearchproduct</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingPath}/cfHbPatched/otherresearchproduct</arg>
|
||||||
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||||
|
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="wait_clean_cfhb"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="patch_software_cfhb">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>patch software cfhb</name>
|
||||||
|
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
|
||||||
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=7680
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
|
||||||
|
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/software</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingPath}/cfHbPatched/software</arg>
|
||||||
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||||
|
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="wait_clean_cfhb"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<join name="wait_clean_cfhb" to="fork_copy_cfhb_patched_results"/>
|
||||||
|
|
||||||
|
<fork name="fork_copy_cfhb_patched_results">
|
||||||
|
<path start="copy_cfhb_patched_publication"/>
|
||||||
|
<path start="copy_cfhb_patched_dataset"/>
|
||||||
|
<path start="copy_cfhb_patched_otherresearchproduct"/>
|
||||||
|
<path start="copy_cfhb_patched_software"/>
|
||||||
|
</fork>
|
||||||
|
|
||||||
|
<action name="copy_cfhb_patched_publication">
|
||||||
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${graphOutputPath}/publication"/>
|
||||||
|
</prepare>
|
||||||
|
<arg>${workingDir}/cfHbPatched/publication</arg>
|
||||||
|
<arg>${graphOutputPath}/publication</arg>
|
||||||
|
</distcp>
|
||||||
|
<ok to="copy_wait"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="copy_cfhb_patched_dataset">
|
||||||
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${graphOutputPath}/dataset"/>
|
||||||
|
</prepare>
|
||||||
|
<arg>${workingDir}/cfHbPatched/dataset</arg>
|
||||||
|
<arg>${graphOutputPath}/dataset</arg>
|
||||||
|
</distcp>
|
||||||
|
<ok to="copy_wait"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="copy_cfhb_patched_otherresearchproduct">
|
||||||
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${graphOutputPath}/otherresearchproduct"/>
|
||||||
|
</prepare>
|
||||||
|
<arg>${workingDir}/cfHbPatched/otherresearchproduct</arg>
|
||||||
|
<arg>${graphOutputPath}/otherresearchproduct</arg>
|
||||||
|
</distcp>
|
||||||
|
<ok to="copy_wait"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="copy_cfhb_patched_software">
|
||||||
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<prepare>
|
||||||
|
<delete path="${graphOutputPath}/software"/>
|
||||||
|
</prepare>
|
||||||
|
<arg>${workingDir}/cfHbPatched/software</arg>
|
||||||
|
<arg>${graphOutputPath}/software</arg>
|
||||||
|
</distcp>
|
||||||
|
<ok to="copy_wait"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<join name="copy_wait" to="End"/>
|
||||||
|
|
||||||
<end name="End"/>
|
<end name="End"/>
|
||||||
|
|
||||||
</workflow-app>
|
</workflow-app>
|
|
@ -0,0 +1,32 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "pu",
|
||||||
|
"paramLongName": "postgresUrl",
|
||||||
|
"paramDescription": "the jdbc url to the postgres",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "uid",
|
||||||
|
"paramLongName": "postgresUser",
|
||||||
|
"paramDescription": "the postgres user",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "pwd",
|
||||||
|
"paramLongName": "postgresPassword",
|
||||||
|
"paramDescription": "the postgres password=",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "p",
|
||||||
|
"paramLongName": "hdfsPath",
|
||||||
|
"paramDescription": "the target path on HDFS",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "nn",
|
||||||
|
"paramLongName": "hdfsNameNode",
|
||||||
|
"paramDescription": "the HDFS nameNode",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,38 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "when true will stop SparkSession after job execution",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "in",
|
||||||
|
"paramLongName": "inputPath",
|
||||||
|
"paramDescription": "the path to the graph data dump to read",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "rp",
|
||||||
|
"paramLongName": "resolvedPath",
|
||||||
|
"paramDescription": "the path to store the resolved records",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "out",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path to store the output graph",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "class",
|
||||||
|
"paramLongName": "graphTableClassName",
|
||||||
|
"paramDescription": "class name moelling the graph table",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "md",
|
||||||
|
"paramLongName": "datasourceMasterDuplicate",
|
||||||
|
"paramDescription": "path to the file on HDFS holding the datasource id tuples [master, duplicate]",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -12,8 +12,8 @@
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"paramName": "wp",
|
"paramName": "wd",
|
||||||
"paramLongName": "workingPath",
|
"paramLongName": "workingDir",
|
||||||
"paramDescription": "the path to store the output graph",
|
"paramDescription": "the path to store the output graph",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
|
|
|
@ -12,8 +12,8 @@
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"paramName": "wp",
|
"paramName": "wd",
|
||||||
"paramLongName": "workingPath",
|
"paramLongName": "workingDir",
|
||||||
"paramDescription": "the path to store the output graph",
|
"paramDescription": "the path to store the output graph",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
|
|
|
@ -12,8 +12,8 @@
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"paramName": "wp",
|
"paramName": "wd",
|
||||||
"paramLongName": "workingPath",
|
"paramLongName": "workingDir",
|
||||||
"paramDescription": "the path to store the output graph",
|
"paramDescription": "the path to store the output graph",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
|
|
|
@ -0,0 +1,213 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.graph.clean.cfhb;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.*;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
|
||||||
|
public class CleanCfHbSparkJobTest {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJobTest.class);
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path testBaseTmpPath;
|
||||||
|
|
||||||
|
private static String resolvedPath;
|
||||||
|
|
||||||
|
private static String graphInputPath;
|
||||||
|
|
||||||
|
private static String graphOutputPath;
|
||||||
|
|
||||||
|
private static String dsMasterDuplicatePath;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException, URISyntaxException {
|
||||||
|
|
||||||
|
testBaseTmpPath = Files.createTempDirectory(CleanCfHbSparkJobTest.class.getSimpleName());
|
||||||
|
log.info("using test base path {}", testBaseTmpPath);
|
||||||
|
|
||||||
|
final File entitiesSources = Paths
|
||||||
|
.get(CleanCfHbSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/entities").toURI())
|
||||||
|
.toFile();
|
||||||
|
|
||||||
|
FileUtils
|
||||||
|
.copyDirectory(
|
||||||
|
entitiesSources,
|
||||||
|
testBaseTmpPath.resolve("input").resolve("entities").toFile());
|
||||||
|
|
||||||
|
FileUtils
|
||||||
|
.copyFileToDirectory(
|
||||||
|
Paths
|
||||||
|
.get(
|
||||||
|
CleanCfHbSparkJobTest.class
|
||||||
|
.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json")
|
||||||
|
.toURI())
|
||||||
|
.toFile(),
|
||||||
|
testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toFile());
|
||||||
|
|
||||||
|
graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString();
|
||||||
|
resolvedPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbResolved").toString();
|
||||||
|
graphOutputPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbPatched").toString();
|
||||||
|
dsMasterDuplicatePath = testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toString();
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.setAppName(CleanCfHbSparkJobTest.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(CleanCfHbSparkJobTest.class.getSimpleName())
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(testBaseTmpPath.toFile());
|
||||||
|
spark.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testCleanCfHbSparkJob() throws Exception {
|
||||||
|
final String outputPath = graphOutputPath + "/dataset";
|
||||||
|
final String inputPath = graphInputPath + "/dataset";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Dataset> records = read(spark, inputPath, Dataset.class);
|
||||||
|
Dataset d = records
|
||||||
|
.filter("id = '50|doi_________::09821844208a5cd6300b2bfb13bca1b9'")
|
||||||
|
.first();
|
||||||
|
assertEquals("10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", d.getCollectedfrom().get(0).getKey());
|
||||||
|
assertEquals("Bacterial Protein Interaction Database - DUP", d.getCollectedfrom().get(0).getValue());
|
||||||
|
assertEquals(
|
||||||
|
"10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", d.getInstance().get(0).getCollectedfrom().getKey());
|
||||||
|
assertEquals(
|
||||||
|
"Bacterial Protein Interaction Database - DUP", d.getInstance().get(0).getCollectedfrom().getValue());
|
||||||
|
|
||||||
|
d = records
|
||||||
|
.filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3adc37a'")
|
||||||
|
.first();
|
||||||
|
assertEquals("10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", d.getCollectedfrom().get(0).getKey());
|
||||||
|
assertEquals("FILUR DATA - DUP", d.getCollectedfrom().get(0).getValue());
|
||||||
|
assertEquals(
|
||||||
|
"10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", d.getInstance().get(0).getCollectedfrom().getKey());
|
||||||
|
assertEquals("FILUR DATA - DUP", d.getInstance().get(0).getCollectedfrom().getValue());
|
||||||
|
assertEquals(
|
||||||
|
"10|re3data_____::6ffd7bc058f762912dc494cd9c175341", d.getInstance().get(0).getHostedby().getKey());
|
||||||
|
assertEquals("depositar - DUP", d.getInstance().get(0).getHostedby().getValue());
|
||||||
|
|
||||||
|
d = records
|
||||||
|
.filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'")
|
||||||
|
.first();
|
||||||
|
assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey());
|
||||||
|
assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue());
|
||||||
|
assertEquals(
|
||||||
|
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey());
|
||||||
|
assertEquals(
|
||||||
|
"DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue());
|
||||||
|
assertEquals(
|
||||||
|
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey());
|
||||||
|
assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue());
|
||||||
|
|
||||||
|
CleanCfHbSparkJob
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
|
"--inputPath", inputPath,
|
||||||
|
"--outputPath", outputPath,
|
||||||
|
"--resolvedPath", resolvedPath + "/dataset",
|
||||||
|
"--graphTableClassName", Dataset.class.getCanonicalName(),
|
||||||
|
"--datasourceMasterDuplicate", dsMasterDuplicatePath
|
||||||
|
});
|
||||||
|
|
||||||
|
assertTrue(Files.exists(Paths.get(graphOutputPath, "dataset")));
|
||||||
|
|
||||||
|
records = read(spark, outputPath, Dataset.class);
|
||||||
|
|
||||||
|
assertEquals(3, records.count());
|
||||||
|
|
||||||
|
d = records
|
||||||
|
.filter("id = '50|doi_________::09821844208a5cd6300b2bfb13bca1b9'")
|
||||||
|
.first();
|
||||||
|
assertEquals("10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", d.getCollectedfrom().get(0).getKey());
|
||||||
|
assertEquals("Bacterial Protein Interaction Database", d.getCollectedfrom().get(0).getValue());
|
||||||
|
assertEquals(
|
||||||
|
"10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", d.getInstance().get(0).getCollectedfrom().getKey());
|
||||||
|
assertEquals("Bacterial Protein Interaction Database", d.getInstance().get(0).getCollectedfrom().getValue());
|
||||||
|
|
||||||
|
d = records
|
||||||
|
.filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3adc37a'")
|
||||||
|
.first();
|
||||||
|
assertEquals("10|re3data_____::fc1db64b3964826913b1e9eafe830490", d.getCollectedfrom().get(0).getKey());
|
||||||
|
assertEquals("FULIR Data", d.getCollectedfrom().get(0).getValue());
|
||||||
|
assertEquals(
|
||||||
|
"10|re3data_____::fc1db64b3964826913b1e9eafe830490", d.getInstance().get(0).getCollectedfrom().getKey());
|
||||||
|
assertEquals("FULIR Data", d.getInstance().get(0).getCollectedfrom().getValue());
|
||||||
|
assertEquals(
|
||||||
|
"10|fairsharing_::3f647cadf56541fb9513cb63ec370187", d.getInstance().get(0).getHostedby().getKey());
|
||||||
|
assertEquals("depositar", d.getInstance().get(0).getHostedby().getValue());
|
||||||
|
|
||||||
|
d = records
|
||||||
|
.filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'")
|
||||||
|
.first();
|
||||||
|
assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey());
|
||||||
|
assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue());
|
||||||
|
assertEquals(
|
||||||
|
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey());
|
||||||
|
assertEquals(
|
||||||
|
"DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue());
|
||||||
|
assertEquals(
|
||||||
|
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey());
|
||||||
|
assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue());
|
||||||
|
|
||||||
|
d = records
|
||||||
|
.filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'")
|
||||||
|
.first();
|
||||||
|
assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey());
|
||||||
|
assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue());
|
||||||
|
assertEquals(
|
||||||
|
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey());
|
||||||
|
assertEquals(
|
||||||
|
"DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue());
|
||||||
|
assertEquals(
|
||||||
|
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey());
|
||||||
|
assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
private <R> org.apache.spark.sql.Dataset<R> read(SparkSession spark, String path, Class<R> clazz) {
|
||||||
|
return spark
|
||||||
|
.read()
|
||||||
|
.textFile(path)
|
||||||
|
.map(as(clazz), Encoders.bean(clazz));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <R> MapFunction<String, R> as(Class<R> clazz) {
|
||||||
|
return s -> OBJECT_MAPPER.readValue(s, clazz);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1002,7 +1002,8 @@ class MappersTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testEOSCFuture_ROHub() throws IOException {
|
void testEOSCFuture_ROHub() throws IOException {
|
||||||
final String xml = IOUtils.toString(Objects.requireNonNull(getClass().getResourceAsStream("photic-zone-transformed.xml")));
|
final String xml = IOUtils
|
||||||
|
.toString(Objects.requireNonNull(getClass().getResourceAsStream("photic-zone-transformed.xml")));
|
||||||
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
|
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
|
||||||
final OtherResearchProduct rocrate = (OtherResearchProduct) list.get(0);
|
final OtherResearchProduct rocrate = (OtherResearchProduct) list.get(0);
|
||||||
assertNotNull(rocrate.getEoscifguidelines());
|
assertNotNull(rocrate.getEoscifguidelines());
|
||||||
|
|
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,4 @@
|
||||||
|
{ "duplicateId" : "10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", "masterId" : "10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", "masterName" : "Bacterial Protein Interaction Database" }
|
||||||
|
{ "duplicateId" : "10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", "masterId" : "10|re3data_____::fc1db64b3964826913b1e9eafe830490", "masterName" : "FULIR Data" }
|
||||||
|
{ "duplicateId" : "10|re3data_____::6ffd7bc058f762912dc494cd9c175341", "masterId" : "10|fairsharing_::3f647cadf56541fb9513cb63ec370187", "masterName" : "depositar" }
|
||||||
|
{ "duplicateId" : "10|scindeksserb::07022f78a8cc6d1171092454ecdbb47c", "masterId" : "10|doajarticles::07022f78a8cc6d1171092454ecdbb47c", "masterName" : "Artefact" }
|
|
@ -1,13 +1,14 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.provision;
|
package eu.dnetlib.dhp.oa.provision;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
|
import java.io.IOException;
|
||||||
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
|
import java.io.StringReader;
|
||||||
import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory;
|
|
||||||
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
|
import javax.xml.transform.Transformer;
|
||||||
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
import javax.xml.transform.TransformerException;
|
||||||
import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.solr.client.solrj.util.ClientUtils;
|
import org.apache.solr.client.solrj.util.ClientUtils;
|
||||||
import org.apache.solr.common.SolrInputDocument;
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
|
@ -18,71 +19,73 @@ import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import javax.xml.transform.Transformer;
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import javax.xml.transform.TransformerException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.StringReader;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
|
||||||
|
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
|
||||||
|
import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory;
|
||||||
|
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
||||||
|
import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
|
||||||
|
|
||||||
public class EOSCFuture_Test {
|
public class EOSCFuture_Test {
|
||||||
|
|
||||||
public static ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
public static ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||||
|
|
||||||
public static final String VERSION = "2021-04-15T10:05:53Z";
|
public static final String VERSION = "2021-04-15T10:05:53Z";
|
||||||
public static final String DSID = "b9ee796a-c49f-4473-a708-e7d67b84c16d_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl";
|
public static final String DSID = "b9ee796a-c49f-4473-a708-e7d67b84c16d_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl";
|
||||||
|
|
||||||
private ContextMapper contextMapper;
|
private ContextMapper contextMapper;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
contextMapper = new ContextMapper();
|
contextMapper = new ContextMapper();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEOSC_ROHub() throws IOException, DocumentException, TransformerException {
|
||||||
|
|
||||||
@Test
|
final ContextMapper contextMapper = new ContextMapper();
|
||||||
public void testEOSC_ROHub() throws IOException, DocumentException, TransformerException {
|
|
||||||
|
|
||||||
final ContextMapper contextMapper = new ContextMapper();
|
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
|
||||||
|
XmlConverterJob.schemaLocation);
|
||||||
|
|
||||||
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
|
final OtherResearchProduct p = OBJECT_MAPPER
|
||||||
XmlConverterJob.schemaLocation);
|
.readValue(
|
||||||
|
IOUtils.toString(getClass().getResourceAsStream("eosc-future/photic-zone.json")),
|
||||||
|
OtherResearchProduct.class);
|
||||||
|
|
||||||
final OtherResearchProduct p = OBJECT_MAPPER
|
final String xml = xmlRecordFactory.build(new JoinedEntity<>(p));
|
||||||
.readValue(IOUtils.toString(getClass().getResourceAsStream("eosc-future/photic-zone.json")), OtherResearchProduct.class);
|
|
||||||
|
|
||||||
final String xml = xmlRecordFactory.build(new JoinedEntity<>(p));
|
assertNotNull(xml);
|
||||||
|
|
||||||
assertNotNull(xml);
|
final Document doc = new SAXReader().read(new StringReader(xml));
|
||||||
|
|
||||||
final Document doc = new SAXReader().read(new StringReader(xml));
|
assertNotNull(doc);
|
||||||
|
System.out.println(doc.asXML());
|
||||||
|
|
||||||
assertNotNull(doc);
|
testRecordTransformation(xml);
|
||||||
System.out.println(doc.asXML());
|
}
|
||||||
|
|
||||||
|
private void testRecordTransformation(final String record) throws IOException, TransformerException {
|
||||||
|
final String fields = IOUtils.toString(getClass().getResourceAsStream("fields.xml"));
|
||||||
|
final String xslt = IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl"));
|
||||||
|
|
||||||
testRecordTransformation(xml);
|
final String transformer = XmlIndexingJob.getLayoutTransformer("DMF", fields, xslt);
|
||||||
}
|
|
||||||
|
|
||||||
|
final Transformer tr = SaxonTransformerFactory.newInstance(transformer);
|
||||||
|
|
||||||
private void testRecordTransformation(final String record) throws IOException, TransformerException {
|
final String indexRecordXML = XmlIndexingJob.toIndexRecord(tr, record);
|
||||||
final String fields = IOUtils.toString(getClass().getResourceAsStream("fields.xml"));
|
|
||||||
final String xslt = IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl"));
|
|
||||||
|
|
||||||
final String transformer = XmlIndexingJob.getLayoutTransformer("DMF", fields, xslt);
|
final SolrInputDocument solrDoc = new StreamingInputDocumentFactory(VERSION, DSID)
|
||||||
|
.parseDocument(indexRecordXML);
|
||||||
|
|
||||||
final Transformer tr = SaxonTransformerFactory.newInstance(transformer);
|
final String xmlDoc = ClientUtils.toXML(solrDoc);
|
||||||
|
|
||||||
final String indexRecordXML = XmlIndexingJob.toIndexRecord(tr, record);
|
Assertions.assertNotNull(xmlDoc);
|
||||||
|
System.out.println(xmlDoc);
|
||||||
final SolrInputDocument solrDoc = new StreamingInputDocumentFactory(VERSION, DSID)
|
}
|
||||||
.parseDocument(indexRecordXML);
|
|
||||||
|
|
||||||
final String xmlDoc = ClientUtils.toXML(solrDoc);
|
|
||||||
|
|
||||||
Assertions.assertNotNull(xmlDoc);
|
|
||||||
System.out.println(xmlDoc);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue