graph cleaning refactoring #282
|
@ -1,122 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.clean;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
public class CleanContextSparkJob implements Serializable {
|
||||
private static final Logger log = LoggerFactory.getLogger(CleanContextSparkJob.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
CleanContextSparkJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json"));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String inputPath = parser.get("inputPath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
String workingDir = parser.get("workingDir");
|
||||
log.info("workingDir: {}", workingDir);
|
||||
|
||||
String contextId = parser.get("contextId");
|
||||
log.info("contextId: {}", contextId);
|
||||
|
||||
String verifyParam = parser.get("verifyParam");
|
||||
log.info("verifyParam: {}", verifyParam);
|
||||
|
||||
String graphTableClassName = parser.get("graphTableClassName");
|
||||
log.info("graphTableClassName: {}", graphTableClassName);
|
||||
|
||||
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
|
||||
cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingDir);
|
||||
});
|
||||
}
|
||||
|
||||
private static <T extends Result> void cleanContext(SparkSession spark, String contextId, String verifyParam,
|
||||
String inputPath, Class<T> entityClazz, String workingDir) {
|
||||
Dataset<T> res = spark
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map(
|
||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
||||
Encoders.bean(entityClazz));
|
||||
|
||||
res.map((MapFunction<T, T>) r -> {
|
||||
if (!r
|
||||
.getTitle()
|
||||
.stream()
|
||||
.filter(
|
||||
t -> t
|
||||
.getQualifier()
|
||||
.getClassid()
|
||||
.equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid()))
|
||||
.anyMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()))) {
|
||||
return r;
|
||||
}
|
||||
r
|
||||
.setContext(
|
||||
r
|
||||
.getContext()
|
||||
.stream()
|
||||
.filter(
|
||||
c -> !c.getId().split("::")[0]
|
||||
.equalsIgnoreCase(contextId))
|
||||
.collect(Collectors.toList()));
|
||||
return r;
|
||||
}, Encoders.bean(entityClazz))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir);
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(workingDir)
|
||||
.map(
|
||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
||||
Encoders.bean(entityClazz))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(inputPath);
|
||||
}
|
||||
}
|
|
@ -4,10 +4,8 @@ package eu.dnetlib.dhp.oa.graph.clean;
|
|||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -29,8 +27,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.common.action.model.MasterDuplicate;
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.oa.graph.clean.cfhb.IdCfHbMapping;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
|
@ -38,6 +34,7 @@ import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
|||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import scala.Tuple2;
|
||||
|
||||
|
@ -55,17 +52,17 @@ public class CleanGraphSparkJob {
|
|||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
CleanGraphSparkJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json"));
|
||||
.toString(
|
||||
CleanGraphSparkJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json"));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String isLookupUrl = parser.get("isLookupUrl");
|
||||
|
@ -76,7 +73,8 @@ public class CleanGraphSparkJob {
|
|||
new CleanGraphSparkJob(parser).run(isSparkSessionManaged, isLookup);
|
||||
}
|
||||
|
||||
public void run(Boolean isSparkSessionManaged, ISLookUpService isLookUpService) throws ISLookUpException, ClassNotFoundException {
|
||||
public void run(Boolean isSparkSessionManaged, ISLookUpService isLookUpService)
|
||||
throws ISLookUpException, ClassNotFoundException {
|
||||
|
||||
String inputPath = parser.get("inputPath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
@ -99,9 +97,10 @@ public class CleanGraphSparkJob {
|
|||
String country = parser.get("country");
|
||||
log.info("country: {}", country);
|
||||
|
||||
String[] verifyCountryParam = Optional.ofNullable(parser.get("verifyCountryParam"))
|
||||
.map(s -> s.split(";"))
|
||||
.orElse(new String[]{});
|
||||
String[] verifyCountryParam = Optional
|
||||
.ofNullable(parser.get("verifyCountryParam"))
|
||||
.map(s -> s.split(";"))
|
||||
.orElse(new String[] {});
|
||||
log.info("verifyCountryParam: {}", verifyCountryParam);
|
||||
|
||||
String collectedfrom = parser.get("collectedfrom");
|
||||
|
@ -111,9 +110,9 @@ public class CleanGraphSparkJob {
|
|||
log.info("masterDuplicatePath: {}", dsMasterDuplicatePath);
|
||||
|
||||
Boolean deepClean = Optional
|
||||
.ofNullable(parser.get("deepClean"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.FALSE);
|
||||
.ofNullable(parser.get("deepClean"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.FALSE);
|
||||
log.info("deepClean: {}", deepClean);
|
||||
|
||||
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
|
||||
|
@ -123,14 +122,14 @@ public class CleanGraphSparkJob {
|
|||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(CleanGraphSparkJob.class.getSimpleName() + "#" + entityClazz.getSimpleName());
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
|
||||
cleanGraphTable(
|
||||
spark, vocs, inputPath, entityClazz, outputPath, contextId, verifyParam, datasourcePath, country,
|
||||
verifyCountryParam, collectedfrom, dsMasterDuplicatePath, deepClean);
|
||||
});
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
|
||||
cleanGraphTable(
|
||||
spark, vocs, inputPath, entityClazz, outputPath, contextId, verifyParam, datasourcePath, country,
|
||||
verifyCountryParam, collectedfrom, dsMasterDuplicatePath, deepClean);
|
||||
});
|
||||
}
|
||||
|
||||
private static <T extends Oaf> void cleanGraphTable(
|
||||
|
@ -172,33 +171,33 @@ public class CleanGraphSparkJob {
|
|||
.map(as(clazz), Encoders.bean(clazz))
|
||||
.flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class));
|
||||
|
||||
// set the EMPTY master ID/NAME and save it
|
||||
resolved
|
||||
// set the EMPTY master ID/NAME
|
||||
Dataset<IdCfHbMapping> resolvedDs = resolved
|
||||
.joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId")))
|
||||
.map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class))
|
||||
.filter((FilterFunction<IdCfHbMapping>) m -> Objects.nonNull(m.getMasterId()));
|
||||
|
||||
// load the hostedby mapping
|
||||
Set<String> hostedBy = Sets
|
||||
.newHashSet(
|
||||
spark
|
||||
.read()
|
||||
.textFile(datasourcePath)
|
||||
.collectAsList());
|
||||
.newHashSet(
|
||||
spark
|
||||
.read()
|
||||
.textFile(datasourcePath)
|
||||
.collectAsList());
|
||||
|
||||
// perform the deep cleaning steps
|
||||
final Dataset<T> cleaned_deep = cleaned_basic
|
||||
.map(
|
||||
(MapFunction<T, T>) value -> GraphCleaningFunctions.cleanContext(value, contextId, verifyParam),
|
||||
Encoders.bean(clazz))
|
||||
.map(
|
||||
(MapFunction<T, T>) value -> GraphCleaningFunctions
|
||||
.cleanCountry(value, verifyCountryParam, hostedBy, collectedfrom, country),
|
||||
Encoders.bean(clazz));
|
||||
.map(
|
||||
(MapFunction<T, T>) value -> GraphCleaningFunctions.cleanContext(value, contextId, verifyParam),
|
||||
Encoders.bean(clazz))
|
||||
.map(
|
||||
(MapFunction<T, T>) value -> GraphCleaningFunctions
|
||||
.cleanCountry(value, verifyCountryParam, hostedBy, collectedfrom, country),
|
||||
Encoders.bean(clazz));
|
||||
|
||||
// Join the results with the resolved CF|HB mapping, apply the mapping and save it
|
||||
cleaned_deep
|
||||
.joinWith(resolved, cleaned_deep.col("id").equalTo(resolved.col("resultId")), "left")
|
||||
.joinWith(resolvedDs, cleaned_deep.col("id").equalTo(resolvedDs.col("resultId")), "left")
|
||||
.groupByKey(
|
||||
(MapFunction<Tuple2<T, IdCfHbMapping>, String>) t -> ((Result) t._1()).getId(), Encoders.STRING())
|
||||
.mapGroups(getMapGroupsFunction(), Encoders.bean(clazz))
|
||||
|
@ -302,8 +301,8 @@ public class CleanGraphSparkJob {
|
|||
|
||||
private Stream<KeyValue> filter(List<KeyValue> kvs) {
|
||||
return kvs
|
||||
.stream()
|
||||
.filter(kv -> StringUtils.isNotBlank(kv.getKey()) && StringUtils.isNotBlank(kv.getValue()));
|
||||
.stream()
|
||||
.filter(kv -> StringUtils.isNotBlank(kv.getKey()) && StringUtils.isNotBlank(kv.getValue()));
|
||||
}
|
||||
|
||||
private void updateKeyValue(final KeyValue kv, final IdCfHbMapping a) {
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.clean.country;
|
||||
package eu.dnetlib.dhp.oa.graph.clean;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
@ -21,7 +20,6 @@ import org.slf4j.LoggerFactory;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import scala.Tuple2;
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.clean.cfhb;
|
||||
package eu.dnetlib.dhp.oa.graph.clean;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
|
@ -1,227 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.clean.cfhb;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.expressions.Aggregator;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.common.action.model.MasterDuplicate;
|
||||
import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob;
|
||||
import eu.dnetlib.dhp.schema.oaf.Instance;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class CleanCfHbSparkJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJob.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
CleanCountrySparkJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/input_clean_cfhb_parameters.json"));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String inputPath = parser.get("inputPath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
String resolvedPath = parser.get("resolvedPath");
|
||||
log.info("resolvedPath: {}", resolvedPath);
|
||||
|
||||
String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
String dsMasterDuplicatePath = parser.get("masterDuplicatePath");
|
||||
log.info("masterDuplicatePath: {}", dsMasterDuplicatePath);
|
||||
|
||||
String graphTableClassName = parser.get("graphTableClassName");
|
||||
log.info("graphTableClassName: {}", graphTableClassName);
|
||||
|
||||
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
|
||||
HdfsSupport.remove(resolvedPath, spark.sparkContext().hadoopConfiguration());
|
||||
cleanCfHb(
|
||||
spark, inputPath, entityClazz, resolvedPath, dsMasterDuplicatePath, outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static <T extends Result> void cleanCfHb(SparkSession spark, String inputPath, Class<T> entityClazz,
|
||||
String resolvedPath, String masterDuplicatePath, String outputPath) {
|
||||
|
||||
// read the master-duplicate tuples
|
||||
Dataset<MasterDuplicate> md = spark
|
||||
.read()
|
||||
.textFile(masterDuplicatePath)
|
||||
.map(as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class));
|
||||
|
||||
// prepare the resolved CF|HB references with the corresponding EMPTY master ID
|
||||
Dataset<IdCfHbMapping> resolved = spark
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map(as(entityClazz), Encoders.bean(entityClazz))
|
||||
.flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class));
|
||||
|
||||
// set the EMPTY master ID/NAME and save it
|
||||
resolved
|
||||
.joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId")))
|
||||
.map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class))
|
||||
.filter((FilterFunction<IdCfHbMapping>) m -> Objects.nonNull(m.getMasterId()))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(resolvedPath);
|
||||
|
||||
// read again the resolved CF|HB mapping
|
||||
Dataset<IdCfHbMapping> resolvedDS = spark
|
||||
.read()
|
||||
.textFile(resolvedPath)
|
||||
.map(as(IdCfHbMapping.class), Encoders.bean(IdCfHbMapping.class));
|
||||
|
||||
// read the result table
|
||||
Dataset<T> res = spark
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map(as(entityClazz), Encoders.bean(entityClazz));
|
||||
|
||||
// Join the results with the resolved CF|HB mapping, apply the mapping and save it
|
||||
res
|
||||
.joinWith(resolvedDS, res.col("id").equalTo(resolvedDS.col("resultId")), "left")
|
||||
.groupByKey((MapFunction<Tuple2<T, IdCfHbMapping>, String>) t -> t._1().getId(), Encoders.STRING())
|
||||
.mapGroups(getMapGroupsFunction(), Encoders.bean(entityClazz))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath);
|
||||
}
|
||||
|
||||
private static MapFunction<Tuple2<IdCfHbMapping, MasterDuplicate>, IdCfHbMapping> asIdCfHbMapping() {
|
||||
return t -> {
|
||||
final IdCfHbMapping mapping = t._1();
|
||||
Optional
|
||||
.ofNullable(t._2())
|
||||
.ifPresent(t2 -> {
|
||||
mapping.setMasterId(t2.getMasterId());
|
||||
mapping.setMasterName(t2.getMasterName());
|
||||
|
||||
});
|
||||
return mapping;
|
||||
};
|
||||
}
|
||||
|
||||
private static <T extends Result> FlatMapFunction<T, IdCfHbMapping> flattenCfHbFn() {
|
||||
return r -> Stream
|
||||
.concat(
|
||||
Optional
|
||||
.ofNullable(r.getCollectedfrom())
|
||||
.map(cf -> cf.stream().map(KeyValue::getKey))
|
||||
.orElse(Stream.empty()),
|
||||
Stream
|
||||
.concat(
|
||||
Optional
|
||||
.ofNullable(r.getInstance())
|
||||
.map(
|
||||
instances -> instances
|
||||
.stream()
|
||||
.map(i -> Optional.ofNullable(i.getHostedby()).map(KeyValue::getKey).orElse("")))
|
||||
.orElse(Stream.empty())
|
||||
.filter(StringUtils::isNotBlank),
|
||||
Optional
|
||||
.ofNullable(r.getInstance())
|
||||
.map(
|
||||
instances -> instances
|
||||
.stream()
|
||||
.map(
|
||||
i -> Optional
|
||||
.ofNullable(i.getCollectedfrom())
|
||||
.map(KeyValue::getKey)
|
||||
.orElse("")))
|
||||
.orElse(Stream.empty())
|
||||
.filter(StringUtils::isNotBlank)))
|
||||
.distinct()
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.map(cfHb -> asIdCfHbMapping(r.getId(), cfHb))
|
||||
.iterator();
|
||||
}
|
||||
|
||||
private static <T extends Result> MapGroupsFunction<String, Tuple2<T, IdCfHbMapping>, T> getMapGroupsFunction() {
|
||||
return new MapGroupsFunction<String, Tuple2<T, IdCfHbMapping>, T>() {
|
||||
@Override
|
||||
public T call(String key, Iterator<Tuple2<T, IdCfHbMapping>> values) {
|
||||
final Tuple2<T, IdCfHbMapping> first = values.next();
|
||||
final T res = first._1();
|
||||
|
||||
updateResult(res, first._2());
|
||||
values.forEachRemaining(t -> updateResult(res, t._2()));
|
||||
return res;
|
||||
}
|
||||
|
||||
private void updateResult(T res, IdCfHbMapping m) {
|
||||
if (Objects.nonNull(m)) {
|
||||
res.getCollectedfrom().forEach(kv -> updateKeyValue(kv, m));
|
||||
res.getInstance().forEach(i -> {
|
||||
updateKeyValue(i.getHostedby(), m);
|
||||
updateKeyValue(i.getCollectedfrom(), m);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void updateKeyValue(final KeyValue kv, final IdCfHbMapping a) {
|
||||
if (kv.getKey().equals(a.getCfhb())) {
|
||||
kv.setKey(a.getMasterId());
|
||||
kv.setValue(a.getMasterName());
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
private static IdCfHbMapping asIdCfHbMapping(String resultId, String cfHb) {
|
||||
IdCfHbMapping m = new IdCfHbMapping(resultId);
|
||||
m.setCfhb(cfHb);
|
||||
return m;
|
||||
}
|
||||
|
||||
private static <R> MapFunction<String, R> as(Class<R> clazz) {
|
||||
return s -> OBJECT_MAPPER.readValue(s, clazz);
|
||||
}
|
||||
}
|
|
@ -1,211 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.clean.country;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.swing.text.html.Option;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 20/07/22
|
||||
*/
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob;
|
||||
import eu.dnetlib.dhp.schema.oaf.Country;
|
||||
import eu.dnetlib.dhp.schema.oaf.Instance;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||
|
||||
public class CleanCountrySparkJob implements Serializable {
|
||||
private static final Logger log = LoggerFactory.getLogger(CleanCountrySparkJob.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
CleanCountrySparkJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json"));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String inputPath = parser.get("inputPath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
|
||||
String workingDir = parser.get("workingDir");
|
||||
log.info("workingDir: {}", workingDir);
|
||||
|
||||
String datasourcePath = parser.get("hostedBy");
|
||||
log.info("datasourcePath: {}", datasourcePath);
|
||||
|
||||
String country = parser.get("country");
|
||||
log.info("country: {}", country);
|
||||
|
||||
String[] verifyParam = parser.get("verifyParam").split(";");
|
||||
log.info("verifyParam: {}", verifyParam);
|
||||
|
||||
String collectedfrom = parser.get("collectedfrom");
|
||||
log.info("collectedfrom: {}", collectedfrom);
|
||||
|
||||
String graphTableClassName = parser.get("graphTableClassName");
|
||||
log.info("graphTableClassName: {}", graphTableClassName);
|
||||
|
||||
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
|
||||
cleanCountry(
|
||||
spark, country, verifyParam, inputPath, entityClazz, workingDir, collectedfrom, datasourcePath);
|
||||
});
|
||||
}
|
||||
|
||||
private static <T extends Result> void cleanCountry(SparkSession spark, String country, String[] verifyParam,
|
||||
String inputPath, Class<T> entityClazz, String workingDir, String collectedfrom, String datasourcePath) {
|
||||
|
||||
List<String> hostedBy = spark
|
||||
.read()
|
||||
.textFile(datasourcePath)
|
||||
.collectAsList();
|
||||
|
||||
Dataset<T> res = spark
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map(
|
||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
||||
Encoders.bean(entityClazz));
|
||||
|
||||
res.map((MapFunction<T, T>) r -> {
|
||||
if (r.getInstance().stream().anyMatch(i -> hostedBy.contains(i.getHostedby().getKey())) ||
|
||||
!r.getCollectedfrom().stream().anyMatch(cf -> cf.getValue().equals(collectedfrom))) {
|
||||
return r;
|
||||
}
|
||||
|
||||
List<StructuredProperty> ids = getPidsAndAltIds(r).collect(Collectors.toList());
|
||||
if (ids
|
||||
.stream()
|
||||
.anyMatch(
|
||||
p -> p
|
||||
.getQualifier()
|
||||
.getClassid()
|
||||
.equals(PidType.doi.toString()) && pidInParam(p.getValue(), verifyParam))) {
|
||||
r
|
||||
.setCountry(
|
||||
r
|
||||
.getCountry()
|
||||
.stream()
|
||||
.filter(
|
||||
c -> toTakeCountry(c, country))
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
}
|
||||
|
||||
return r;
|
||||
}, Encoders.bean(entityClazz))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir);
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(workingDir)
|
||||
.map(
|
||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
||||
Encoders.bean(entityClazz))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(inputPath);
|
||||
}
|
||||
|
||||
private static <T extends Result> Stream<StructuredProperty> getPidsAndAltIds(T r) {
|
||||
final Stream<StructuredProperty> resultPids = Optional
|
||||
.ofNullable(r.getPid())
|
||||
.map(Collection::stream)
|
||||
.orElse(Stream.empty());
|
||||
|
||||
final Stream<StructuredProperty> instancePids = Optional
|
||||
.ofNullable(r.getInstance())
|
||||
.map(
|
||||
instance -> instance
|
||||
.stream()
|
||||
.flatMap(
|
||||
i -> Optional
|
||||
.ofNullable(i.getPid())
|
||||
.map(Collection::stream)
|
||||
.orElse(Stream.empty())))
|
||||
.orElse(Stream.empty());
|
||||
|
||||
final Stream<StructuredProperty> instanceAltIds = Optional
|
||||
.ofNullable(r.getInstance())
|
||||
.map(
|
||||
instance -> instance
|
||||
.stream()
|
||||
.flatMap(
|
||||
i -> Optional
|
||||
.ofNullable(i.getAlternateIdentifier())
|
||||
.map(Collection::stream)
|
||||
.orElse(Stream.empty())))
|
||||
.orElse(Stream.empty());
|
||||
|
||||
return Stream
|
||||
.concat(
|
||||
Stream.concat(resultPids, instancePids),
|
||||
instanceAltIds);
|
||||
}
|
||||
|
||||
private static boolean pidInParam(String value, String[] verifyParam) {
|
||||
for (String s : verifyParam)
|
||||
if (value.startsWith(s))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private static boolean toTakeCountry(Country c, String country) {
|
||||
// If dataInfo is not set, or dataInfo.inferenceprovenance is not set or not present then it cannot be
|
||||
// inserted via propagation
|
||||
if (!Optional.ofNullable(c.getDataInfo()).isPresent())
|
||||
return true;
|
||||
if (!Optional.ofNullable(c.getDataInfo().getInferenceprovenance()).isPresent())
|
||||
return true;
|
||||
return !(c
|
||||
.getClassid()
|
||||
.equalsIgnoreCase(country) &&
|
||||
c.getDataInfo().getInferenceprovenance().equals("propagation"));
|
||||
}
|
||||
|
||||
}
|
|
@ -99,7 +99,7 @@
|
|||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Select datasource ID from country</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.clean.country.GetDatasourceFromCountry</class>
|
||||
<class>eu.dnetlib.dhp.oa.graph.clean.GetDatasourceFromCountry</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
|
|
|
@ -1,289 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.clean;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
|
||||
public class CleanContextTest {
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
private static Path workingDir;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class);
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
workingDir = Files.createTempDirectory(CleanContextTest.class.getSimpleName());
|
||||
log.info("using work dir {}", workingDir);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(CleanContextTest.class.getSimpleName());
|
||||
|
||||
conf.setMaster("local[*]");
|
||||
conf.set("spark.driver.host", "localhost");
|
||||
conf.set("hive.metastore.local", "true");
|
||||
conf.set("spark.ui.enabled", "false");
|
||||
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||
|
||||
spark = SparkSession
|
||||
.builder()
|
||||
.appName(CleanContextTest.class.getSimpleName())
|
||||
.config(conf)
|
||||
.getOrCreate();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void afterAll() throws IOException {
|
||||
FileUtils.deleteDirectory(workingDir.toFile());
|
||||
spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResultClean() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_context.json")
|
||||
.getPath();
|
||||
final String prefix = "gcube ";
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(sourcePath)
|
||||
.map(
|
||||
(MapFunction<String, Publication>) r -> OBJECT_MAPPER.readValue(r, Publication.class),
|
||||
Encoders.bean(Publication.class))
|
||||
.write()
|
||||
.json(workingDir.toString() + "/publication");
|
||||
|
||||
CleanContextSparkJob.main(new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--inputPath", workingDir.toString() + "/publication",
|
||||
"--graphTableClassName", Publication.class.getCanonicalName(),
|
||||
"--workingDir", workingDir.toString() + "/working",
|
||||
"--contextId", "sobigdata",
|
||||
"--verifyParam", "gCube "
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
JavaRDD<Publication> tmp = sc
|
||||
.textFile(workingDir.toString() + "/publication")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
||||
|
||||
Assertions.assertEquals(7, tmp.count());
|
||||
|
||||
// original result with sobigdata context and gcube as starting string in the main title for the publication
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getContext()
|
||||
.size());
|
||||
|
||||
// original result with sobigdata context without gcube as starting string in the main title for the publication
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getContext()
|
||||
.size());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"sobigdata::projects::2",
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getContext()
|
||||
.get(0)
|
||||
.getId());
|
||||
|
||||
// original result with sobigdata context with gcube as starting string in the subtitle
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getContext()
|
||||
.size());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"sobigdata::projects::2",
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getContext()
|
||||
.get(0)
|
||||
.getId());
|
||||
List<StructuredProperty> titles = tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getTitle();
|
||||
Assertions.assertEquals(1, titles.size());
|
||||
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix));
|
||||
Assertions.assertEquals("subtitle", titles.get(0).getQualifier().getClassid());
|
||||
|
||||
// original result with sobigdata context with gcube not as starting string in the main title
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getContext()
|
||||
.size());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"sobigdata::projects::1",
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getContext()
|
||||
.get(0)
|
||||
.getId());
|
||||
titles = tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getTitle();
|
||||
Assertions.assertEquals(1, titles.size());
|
||||
Assertions.assertFalse(titles.get(0).getValue().toLowerCase().startsWith(prefix));
|
||||
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().contains(prefix.trim()));
|
||||
Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid());
|
||||
|
||||
// original result with sobigdata in context and also other contexts with gcube as starting string for the main
|
||||
// title
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getContext()
|
||||
.size());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"dh-ch",
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getContext()
|
||||
.get(0)
|
||||
.getId());
|
||||
titles = tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getTitle();
|
||||
Assertions.assertEquals(1, titles.size());
|
||||
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix));
|
||||
Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid());
|
||||
|
||||
// original result with multiple main title one of which whith gcube as starting string and with 2 contextes
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getContext()
|
||||
.size());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"dh-ch",
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getContext()
|
||||
.get(0)
|
||||
.getId());
|
||||
titles = tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getTitle();
|
||||
Assertions.assertEquals(2, titles.size());
|
||||
Assertions
|
||||
.assertTrue(
|
||||
titles
|
||||
.stream()
|
||||
.anyMatch(
|
||||
t -> t.getQualifier().getClassid().equals("main title")
|
||||
&& t.getValue().toLowerCase().startsWith(prefix)));
|
||||
|
||||
// original result without sobigdata in context with gcube as starting string for the main title
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getContext()
|
||||
.size());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"dh-ch",
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getContext()
|
||||
.get(0)
|
||||
.getId());
|
||||
titles = tmp
|
||||
.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getTitle();
|
||||
Assertions.assertEquals(2, titles.size());
|
||||
|
||||
Assertions
|
||||
.assertTrue(
|
||||
titles
|
||||
.stream()
|
||||
.anyMatch(
|
||||
t -> t.getQualifier().getClassid().equals("main title")
|
||||
&& t.getValue().toLowerCase().startsWith(prefix)));
|
||||
|
||||
}
|
||||
}
|
|
@ -1,190 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.clean;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 20/07/22
|
||||
*/
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
|
||||
public class CleanCountryTest {
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
private static Path workingDir;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class);
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
workingDir = Files.createTempDirectory(CleanCountryTest.class.getSimpleName());
|
||||
log.info("using work dir {}", workingDir);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(CleanCountryTest.class.getSimpleName());
|
||||
|
||||
conf.setMaster("local[*]");
|
||||
conf.set("spark.driver.host", "localhost");
|
||||
conf.set("hive.metastore.local", "true");
|
||||
conf.set("spark.ui.enabled", "false");
|
||||
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||
|
||||
spark = SparkSession
|
||||
.builder()
|
||||
.appName(CleanCountryTest.class.getSimpleName())
|
||||
.config(conf)
|
||||
.getOrCreate();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void afterAll() throws IOException {
|
||||
FileUtils.deleteDirectory(workingDir.toFile());
|
||||
spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResultClean() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_country.json")
|
||||
.getPath();
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(sourcePath)
|
||||
.map(
|
||||
(MapFunction<String, Publication>) r -> OBJECT_MAPPER.readValue(r, Publication.class),
|
||||
Encoders.bean(Publication.class))
|
||||
.write()
|
||||
.json(workingDir.toString() + "/publication");
|
||||
|
||||
CleanCountrySparkJob.main(new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--inputPath", workingDir.toString() + "/publication",
|
||||
"--graphTableClassName", Publication.class.getCanonicalName(),
|
||||
"--workingDir", workingDir.toString() + "/working",
|
||||
"--country", "NL",
|
||||
"--verifyParam", "10.17632",
|
||||
"--collectedfrom", "NARCIS",
|
||||
"--hostedBy", getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")
|
||||
.getPath()
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
JavaRDD<Publication> tmp = sc
|
||||
.textFile(workingDir.toString() + "/publication")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
||||
|
||||
Assertions.assertEquals(8, tmp.count());
|
||||
|
||||
// original result with NL country and doi starting with Mendely prefix, but not collectedfrom NARCIS
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getCountry()
|
||||
.size());
|
||||
|
||||
// original result with NL country and pid not starting with Mendely prefix
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getCountry()
|
||||
.size());
|
||||
|
||||
// original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS but not
|
||||
// inserted with propagation
|
||||
Assertions
|
||||
.assertEquals(
|
||||
1,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getCountry()
|
||||
.size());
|
||||
|
||||
// original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS inserted with
|
||||
// propagation
|
||||
Assertions
|
||||
.assertEquals(
|
||||
0,
|
||||
tmp
|
||||
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6ag"))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getCountry()
|
||||
.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDatasetClean() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/dataset_clean_country.json")
|
||||
.getPath();
|
||||
|
||||
spark
|
||||
.read()
|
||||
.textFile(sourcePath)
|
||||
.map(
|
||||
(MapFunction<String, Dataset>) r -> OBJECT_MAPPER.readValue(r, Dataset.class),
|
||||
Encoders.bean(Dataset.class))
|
||||
.write()
|
||||
.json(workingDir.toString() + "/dataset");
|
||||
|
||||
CleanCountrySparkJob.main(new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--inputPath", workingDir.toString() + "/dataset",
|
||||
"-graphTableClassName", Dataset.class.getCanonicalName(),
|
||||
"-workingDir", workingDir.toString() + "/working",
|
||||
"-country", "NL",
|
||||
"-verifyParam", "10.17632",
|
||||
"-collectedfrom", "NARCIS",
|
||||
"-hostedBy", getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")
|
||||
.getPath()
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
JavaRDD<Dataset> tmp = sc
|
||||
.textFile(workingDir.toString() + "/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||
|
||||
Assertions.assertEquals(1, tmp.count());
|
||||
|
||||
Assertions.assertEquals(0, tmp.first().getCountry().size());
|
||||
|
||||
}
|
||||
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -1,213 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.graph.clean.cfhb;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
|
||||
public class CleanCfHbSparkJobTest {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJobTest.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
private static Path testBaseTmpPath;
|
||||
|
||||
private static String resolvedPath;
|
||||
|
||||
private static String graphInputPath;
|
||||
|
||||
private static String graphOutputPath;
|
||||
|
||||
private static String dsMasterDuplicatePath;
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException, URISyntaxException {
|
||||
|
||||
testBaseTmpPath = Files.createTempDirectory(CleanCfHbSparkJobTest.class.getSimpleName());
|
||||
log.info("using test base path {}", testBaseTmpPath);
|
||||
|
||||
final File entitiesSources = Paths
|
||||
.get(CleanCfHbSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/entities").toURI())
|
||||
.toFile();
|
||||
|
||||
FileUtils
|
||||
.copyDirectory(
|
||||
entitiesSources,
|
||||
testBaseTmpPath.resolve("input").resolve("entities").toFile());
|
||||
|
||||
FileUtils
|
||||
.copyFileToDirectory(
|
||||
Paths
|
||||
.get(
|
||||
CleanCfHbSparkJobTest.class
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json")
|
||||
.toURI())
|
||||
.toFile(),
|
||||
testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toFile());
|
||||
|
||||
graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString();
|
||||
resolvedPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbResolved").toString();
|
||||
graphOutputPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbPatched").toString();
|
||||
dsMasterDuplicatePath = testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toString();
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(CleanCfHbSparkJobTest.class.getSimpleName());
|
||||
|
||||
conf.setMaster("local[*]");
|
||||
conf.set("spark.driver.host", "localhost");
|
||||
conf.set("spark.ui.enabled", "false");
|
||||
|
||||
spark = SparkSession
|
||||
.builder()
|
||||
.appName(CleanCfHbSparkJobTest.class.getSimpleName())
|
||||
.config(conf)
|
||||
.getOrCreate();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void afterAll() throws IOException {
|
||||
FileUtils.deleteDirectory(testBaseTmpPath.toFile());
|
||||
spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCleanCfHbSparkJob() throws Exception {
|
||||
final String outputPath = graphOutputPath + "/dataset";
|
||||
final String inputPath = graphInputPath + "/dataset";
|
||||
|
||||
org.apache.spark.sql.Dataset<Dataset> records = read(spark, inputPath, Dataset.class);
|
||||
Dataset d = records
|
||||
.filter("id = '50|doi_________::09821844208a5cd6300b2bfb13bca1b9'")
|
||||
.first();
|
||||
assertEquals("10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", d.getCollectedfrom().get(0).getKey());
|
||||
assertEquals("Bacterial Protein Interaction Database - DUP", d.getCollectedfrom().get(0).getValue());
|
||||
assertEquals(
|
||||
"10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", d.getInstance().get(0).getCollectedfrom().getKey());
|
||||
assertEquals(
|
||||
"Bacterial Protein Interaction Database - DUP", d.getInstance().get(0).getCollectedfrom().getValue());
|
||||
|
||||
d = records
|
||||
.filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3adc37a'")
|
||||
.first();
|
||||
assertEquals("10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", d.getCollectedfrom().get(0).getKey());
|
||||
assertEquals("FILUR DATA - DUP", d.getCollectedfrom().get(0).getValue());
|
||||
assertEquals(
|
||||
"10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", d.getInstance().get(0).getCollectedfrom().getKey());
|
||||
assertEquals("FILUR DATA - DUP", d.getInstance().get(0).getCollectedfrom().getValue());
|
||||
assertEquals(
|
||||
"10|re3data_____::6ffd7bc058f762912dc494cd9c175341", d.getInstance().get(0).getHostedby().getKey());
|
||||
assertEquals("depositar - DUP", d.getInstance().get(0).getHostedby().getValue());
|
||||
|
||||
d = records
|
||||
.filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'")
|
||||
.first();
|
||||
assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey());
|
||||
assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue());
|
||||
assertEquals(
|
||||
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey());
|
||||
assertEquals(
|
||||
"DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue());
|
||||
assertEquals(
|
||||
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey());
|
||||
assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue());
|
||||
|
||||
CleanCfHbSparkJob
|
||||
.main(
|
||||
new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--inputPath", inputPath,
|
||||
"--outputPath", outputPath,
|
||||
"--resolvedPath", resolvedPath + "/dataset",
|
||||
"--graphTableClassName", Dataset.class.getCanonicalName(),
|
||||
"--masterDuplicatePath", dsMasterDuplicatePath
|
||||
});
|
||||
|
||||
assertTrue(Files.exists(Paths.get(graphOutputPath, "dataset")));
|
||||
|
||||
records = read(spark, outputPath, Dataset.class);
|
||||
|
||||
assertEquals(3, records.count());
|
||||
|
||||
d = records
|
||||
.filter("id = '50|doi_________::09821844208a5cd6300b2bfb13bca1b9'")
|
||||
.first();
|
||||
assertEquals("10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", d.getCollectedfrom().get(0).getKey());
|
||||
assertEquals("Bacterial Protein Interaction Database", d.getCollectedfrom().get(0).getValue());
|
||||
assertEquals(
|
||||
"10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", d.getInstance().get(0).getCollectedfrom().getKey());
|
||||
assertEquals("Bacterial Protein Interaction Database", d.getInstance().get(0).getCollectedfrom().getValue());
|
||||
|
||||
d = records
|
||||
.filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3adc37a'")
|
||||
.first();
|
||||
assertEquals("10|re3data_____::fc1db64b3964826913b1e9eafe830490", d.getCollectedfrom().get(0).getKey());
|
||||
assertEquals("FULIR Data", d.getCollectedfrom().get(0).getValue());
|
||||
assertEquals(
|
||||
"10|re3data_____::fc1db64b3964826913b1e9eafe830490", d.getInstance().get(0).getCollectedfrom().getKey());
|
||||
assertEquals("FULIR Data", d.getInstance().get(0).getCollectedfrom().getValue());
|
||||
assertEquals(
|
||||
"10|fairsharing_::3f647cadf56541fb9513cb63ec370187", d.getInstance().get(0).getHostedby().getKey());
|
||||
assertEquals("depositar", d.getInstance().get(0).getHostedby().getValue());
|
||||
|
||||
d = records
|
||||
.filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'")
|
||||
.first();
|
||||
assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey());
|
||||
assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue());
|
||||
assertEquals(
|
||||
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey());
|
||||
assertEquals(
|
||||
"DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue());
|
||||
assertEquals(
|
||||
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey());
|
||||
assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue());
|
||||
|
||||
d = records
|
||||
.filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'")
|
||||
.first();
|
||||
assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey());
|
||||
assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue());
|
||||
assertEquals(
|
||||
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey());
|
||||
assertEquals(
|
||||
"DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue());
|
||||
assertEquals(
|
||||
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey());
|
||||
assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue());
|
||||
}
|
||||
|
||||
private <R> org.apache.spark.sql.Dataset<R> read(SparkSession spark, String path, Class<R> clazz) {
|
||||
return spark
|
||||
.read()
|
||||
.textFile(path)
|
||||
.map(as(clazz), Encoders.bean(clazz));
|
||||
}
|
||||
|
||||
private static <R> MapFunction<String, R> as(Class<R> clazz) {
|
||||
return s -> OBJECT_MAPPER.readValue(s, clazz);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue