Merge branch 'beta' into importpoci

This commit is contained in:
Claudio Atzori 2023-10-03 10:43:53 +02:00
commit 5919e488dd
16 changed files with 405 additions and 804 deletions

View File

@ -1,97 +0,0 @@
package eu.dnetlib.dhp.oa.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
public class DispatchEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
Objects
.requireNonNull(
DispatchEntitiesSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json")));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible"));
log.info("filterInvisible: {}", filterInvisible);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> dispatchEntities(spark, inputPath, outputPath, filterInvisible));
}
private static void dispatchEntities(
SparkSession spark,
String inputPath,
String outputPath,
boolean filterInvisible) {
Dataset<String> df = spark.read().textFile(inputPath);
ModelSupport.oafTypes.entrySet().parallelStream().forEach(entry -> {
String entityType = entry.getKey();
Class<?> clazz = entry.getValue();
final String entityPath = outputPath + "/" + entityType;
if (!entityType.equalsIgnoreCase("relation")) {
HdfsSupport.remove(entityPath, spark.sparkContext().hadoopConfiguration());
Dataset<Row> entityDF = spark
.read()
.schema(Encoders.bean(clazz).schema())
.json(
df
.filter((FilterFunction<String>) s -> s.startsWith(clazz.getName()))
.map(
(MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"),
Encoders.STRING()));
if (filterInvisible) {
entityDF = entityDF.filter("dataInfo.invisible != true");
}
entityDF
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(entityPath);
}
});
}
}

View File

@ -2,36 +2,28 @@
package eu.dnetlib.dhp.oa.merge; package eu.dnetlib.dhp.oa.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.when;
import java.io.IOException; import java.util.Map;
import java.util.List;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2; import scala.Tuple2;
@ -39,13 +31,9 @@ import scala.Tuple2;
* Groups the graph content by entity identifier to ensure ID uniqueness * Groups the graph content by entity identifier to ensure ID uniqueness
*/ */
public class GroupEntitiesSparkJob { public class GroupEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
private static final String ID_JPATH = "$.id"; private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -66,9 +54,15 @@ public class GroupEntitiesSparkJob {
String graphInputPath = parser.get("graphInputPath"); String graphInputPath = parser.get("graphInputPath");
log.info("graphInputPath: {}", graphInputPath); log.info("graphInputPath: {}", graphInputPath);
String checkpointPath = parser.get("checkpointPath");
log.info("checkpointPath: {}", checkpointPath);
String outputPath = parser.get("outputPath"); String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible"));
log.info("filterInvisible: {}", filterInvisible);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses()); conf.registerKryoClasses(ModelSupport.getOafModelClasses());
@ -78,126 +72,95 @@ public class GroupEntitiesSparkJob {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
groupEntities(spark, graphInputPath, outputPath); groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible);
}); });
} }
private static void groupEntities( private static void groupEntities(
SparkSession spark, SparkSession spark,
String inputPath, String inputPath,
String outputPath) { String checkpointPath,
String outputPath,
boolean filterInvisible) {
final TypedColumn<OafEntity, OafEntity> aggregator = new GroupingAggregator().toColumn(); Dataset<OafEntity> allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC);
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
spark for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
.read() String entity = e.getKey().name();
.textFile(toSeq(listEntityPaths(inputPath, sc))) Class<? extends OafEntity> entityClass = e.getValue();
.map((MapFunction<String, OafEntity>) GroupEntitiesSparkJob::parseOaf, Encoders.kryo(OafEntity.class)) String entityInputPath = inputPath + "/" + entity;
.filter((FilterFunction<OafEntity>) e -> StringUtils.isNotBlank(ModelSupport.idFn().apply(e)))
.groupByKey((MapFunction<OafEntity, String>) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) {
.agg(aggregator) continue;
}
allEntities = allEntities
.union(
((Dataset<OafEntity>) spark
.read()
.schema(Encoders.bean(entityClass).schema())
.json(entityInputPath)
.filter("length(id) > 0")
.as(Encoders.bean(entityClass)))
.map((MapFunction<OafEntity, OafEntity>) r -> r, OAFENTITY_KRYO_ENC));
}
Dataset<?> groupedEntities = allEntities
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
.reduceGroups((ReduceFunction<OafEntity>) (b, a) -> OafMapperUtils.mergeEntities(b, a))
.map( .map(
(MapFunction<Tuple2<String, OafEntity>, String>) t -> t._2().getClass().getName() + (MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2(
"|" + OBJECT_MAPPER.writeValueAsString(t._2()), t._2().getClass().getName(), t._2()),
Encoders.STRING()) Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
// pivot on "_1" (classname of the entity)
// created columns containing only entities of the same class
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();
groupedEntities = groupedEntities
.withColumn(
entity,
when(col("_1").equalTo(entityClass.getName()), col("_2")));
}
groupedEntities
.drop("_1", "_2")
.write() .write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.text(outputPath); .option("compression", "gzip")
} .save(checkpointPath);
public static class GroupingAggregator extends Aggregator<OafEntity, OafEntity, OafEntity> { ForkJoinPool parPool = new ForkJoinPool(ModelSupport.entityTypes.size());
@Override ModelSupport.entityTypes
public OafEntity zero() { .entrySet()
return null;
}
@Override
public OafEntity reduce(OafEntity b, OafEntity a) {
return mergeAndGet(b, a);
}
private OafEntity mergeAndGet(OafEntity b, OafEntity a) {
if (Objects.nonNull(a) && Objects.nonNull(b)) {
return OafMapperUtils.mergeEntities(b, a);
}
return Objects.isNull(a) ? b : a;
}
@Override
public OafEntity merge(OafEntity b, OafEntity a) {
return mergeAndGet(b, a);
}
@Override
public OafEntity finish(OafEntity j) {
return j;
}
@Override
public Encoder<OafEntity> bufferEncoder() {
return Encoders.kryo(OafEntity.class);
}
@Override
public Encoder<OafEntity> outputEncoder() {
return Encoders.kryo(OafEntity.class);
}
}
private static OafEntity parseOaf(String s) {
DocumentContext dc = JsonPath
.parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
final String id = dc.read(ID_JPATH);
if (StringUtils.isNotBlank(id)) {
String prefix = StringUtils.substringBefore(id, "|");
switch (prefix) {
case "10":
return parse(s, Datasource.class);
case "20":
return parse(s, Organization.class);
case "40":
return parse(s, Project.class);
case "50":
String resultType = dc.read("$.resulttype.classid");
switch (resultType) {
case "publication":
return parse(s, Publication.class);
case "dataset":
return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class);
case "software":
return parse(s, Software.class);
case "other":
return parse(s, OtherResearchProduct.class);
default:
throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType));
}
default:
throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix));
}
} else {
throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s));
}
}
private static <T extends OafEntity> OafEntity parse(String s, Class<T> clazz) {
try {
return OBJECT_MAPPER.readValue(s, clazz);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
private static List<String> listEntityPaths(String inputPath, JavaSparkContext sc) {
return HdfsSupport
.listFiles(inputPath, sc.hadoopConfiguration())
.stream() .stream()
.filter(f -> !f.toLowerCase().contains("relation")) .map(e -> parPool.submit(() -> {
.collect(Collectors.toList()); String entity = e.getKey().name();
} Class<? extends OafEntity> entityClass = e.getValue();
spark
.read()
.load(checkpointPath)
.select(col(entity).as("value"))
.filter("value IS NOT NULL")
.as(OAFENTITY_KRYO_ENC)
.map((MapFunction<OafEntity, OafEntity>) r -> r, (Encoder<OafEntity>) Encoders.bean(entityClass))
.filter(filterInvisible ? "dataInfo.invisible != TRUE" : "TRUE")
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + entity);
}))
.collect(Collectors.toList())
.forEach(t -> {
try {
t.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
} }

View File

@ -1,26 +0,0 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "i",
"paramLongName": "inputPath",
"paramDescription": "the source path",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "path of the output graph",
"paramRequired": true
},
{
"paramName": "fi",
"paramLongName": "filterInvisible",
"paramDescription": "if true filters out invisible entities",
"paramRequired": true
}
]

View File

@ -8,13 +8,25 @@
{ {
"paramName": "gin", "paramName": "gin",
"paramLongName": "graphInputPath", "paramLongName": "graphInputPath",
"paramDescription": "the graph root path", "paramDescription": "the input graph root path",
"paramRequired": true
},
{
"paramName": "cp",
"paramLongName": "checkpointPath",
"paramDescription": "checkpoint directory",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "out", "paramName": "out",
"paramLongName": "outputPath", "paramLongName": "outputPath",
"paramDescription": "the output merged graph root path", "paramDescription": "the output graph root path",
"paramRequired": true
},
{
"paramName": "fi",
"paramLongName": "filterInvisible",
"paramDescription": "if true filters out invisible entities",
"paramRequired": true "paramRequired": true
} }
] ]

View File

@ -81,7 +81,7 @@ case class SparkModel(conf: DedupConfig) {
MapDocumentUtil.truncateList( MapDocumentUtil.truncateList(
MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType), MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType),
fdef.getSize fdef.getSize
).toArray ).asScala
case Type.StringConcat => case Type.StringConcat =>
val jpaths = CONCAT_REGEX.split(fdef.getPath) val jpaths = CONCAT_REGEX.split(fdef.getPath)

View File

@ -117,6 +117,11 @@ public class MapDocumentUtil {
return result; return result;
} }
if (type == Type.List && jresult instanceof List) {
((List<?>) jresult).forEach(x -> result.add(x.toString()));
return result;
}
if (jresult instanceof JSONArray) { if (jresult instanceof JSONArray) {
((JSONArray) jresult).forEach(it -> { ((JSONArray) jresult).forEach(it -> {
try { try {

View File

@ -2,7 +2,9 @@
package eu.dnetlib.dhp.broker.oa.util; package eu.dnetlib.dhp.broker.oa.util;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.commons.io.IOUtils;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -27,10 +29,14 @@ public class TrustUtils {
static { static {
mapper = new ObjectMapper(); mapper = new ObjectMapper();
try { try {
dedupConfig = mapper dedupConfig = DedupConfig
.readValue( .load(
DedupConfig.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"), IOUtils
DedupConfig.class); .toString(
DedupConfig.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"),
StandardCharsets.UTF_8));
deduper = new SparkDeduper(dedupConfig); deduper = new SparkDeduper(dedupConfig);
} catch (final IOException e) { } catch (final IOException e) {
log.error("Error loading dedupConfig, e"); log.error("Error loading dedupConfig, e");
@ -57,7 +63,7 @@ public class TrustUtils {
return TrustUtils.rescale(score, threshold); return TrustUtils.rescale(score, threshold);
} catch (final Exception e) { } catch (final Exception e) {
log.error("Error computing score between results", e); log.error("Error computing score between results", e);
return BrokerConstants.MIN_TRUST; throw new RuntimeException(e);
} }
} }

View File

@ -1,57 +0,0 @@
package eu.dnetlib.dhp.oa.dedup;
import java.util.Objects;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class RelationAggregator extends Aggregator<Relation, Relation, Relation> {
private static final Relation ZERO = new Relation();
@Override
public Relation zero() {
return ZERO;
}
@Override
public Relation reduce(Relation b, Relation a) {
return mergeRel(b, a);
}
@Override
public Relation merge(Relation b, Relation a) {
return mergeRel(b, a);
}
@Override
public Relation finish(Relation r) {
return r;
}
private Relation mergeRel(Relation b, Relation a) {
if (Objects.equals(b, ZERO)) {
return a;
}
if (Objects.equals(a, ZERO)) {
return b;
}
b.mergeFrom(a);
return b;
}
@Override
public Encoder<Relation> bufferEncoder() {
return Encoders.kryo(Relation.class);
}
@Override
public Encoder<Relation> outputEncoder() {
return Encoders.kryo(Relation.class);
}
}

View File

@ -1,78 +0,0 @@
package eu.dnetlib.dhp.oa.dedup
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.slf4j.LoggerFactory
object SparkCleanRelation {
private val log = LoggerFactory.getLogger(classOf[SparkCleanRelation])
@throws[Exception]
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(
IOUtils.toString(
classOf[SparkCleanRelation].getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")
)
)
parser.parseArgument(args)
val conf = new SparkConf
new SparkCleanRelation(parser, AbstractSparkAction.getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")))
}
}
class SparkCleanRelation(parser: ArgumentApplicationParser, spark: SparkSession)
extends AbstractSparkAction(parser, spark) {
override def run(isLookUpService: ISLookUpService): Unit = {
val graphBasePath = parser.get("graphBasePath")
val inputPath = parser.get("inputPath")
val outputPath = parser.get("outputPath")
SparkCleanRelation.log.info("graphBasePath: '{}'", graphBasePath)
SparkCleanRelation.log.info("inputPath: '{}'", inputPath)
SparkCleanRelation.log.info("outputPath: '{}'", outputPath)
AbstractSparkAction.removeOutputDir(spark, outputPath)
val entities =
Seq("datasource", "project", "organization", "publication", "dataset", "software", "otherresearchproduct")
val idsSchema = StructType.fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>")
val emptyIds = spark.createDataFrame(spark.sparkContext.emptyRDD[Row].setName("empty"),
idsSchema)
val ids = entities
.foldLeft(emptyIds)((ds, entity) => {
val entityPath = graphBasePath + '/' + entity
if (HdfsSupport.exists(entityPath, spark.sparkContext.hadoopConfiguration)) {
ds.union(spark.read.schema(idsSchema).json(entityPath))
} else {
ds
}
})
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
.select("id")
.distinct()
val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(inputPath)
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
AbstractSparkAction.save(
relations
.join(ids, col("source") === ids("id"), "leftsemi")
.join(ids, col("target") === ids("id"), "leftsemi"),
outputPath,
SaveMode.Overwrite
)
}
}

View File

@ -3,23 +3,19 @@ package eu.dnetlib.dhp.oa.dedup;
import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.col;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
@ -70,73 +66,63 @@ public class SparkPropagateRelation extends AbstractSparkAction {
log.info("workingPath: '{}'", workingPath); log.info("workingPath: '{}'", workingPath);
log.info("graphOutputPath: '{}'", graphOutputPath); log.info("graphOutputPath: '{}'", graphOutputPath);
final String outputRelationPath = DedupUtility.createEntityPath(graphOutputPath, "relation");
removeOutputDir(spark, outputRelationPath);
Dataset<Relation> mergeRels = spark Dataset<Relation> mergeRels = spark
.read() .read()
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) .load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
.as(REL_BEAN_ENC); .as(REL_BEAN_ENC);
// <mergedObjectID, dedupID> // <mergedObjectID, dedupID>
Dataset<Row> mergedIds = mergeRels Dataset<Row> idsToMerge = mergeRels
.where(col("relClass").equalTo(ModelConstants.MERGES)) .where(col("relClass").equalTo(ModelConstants.MERGES))
.select(col("source").as("dedupID"), col("target").as("mergedObjectID")) .select(col("source").as("dedupID"), col("target").as("mergedObjectID"))
.distinct() .distinct();
.cache();
Dataset<Row> allRels = spark Dataset<Row> allRels = spark
.read() .read()
.schema(REL_BEAN_ENC.schema()) .schema(REL_BEAN_ENC.schema())
.json(DedupUtility.createEntityPath(graphBasePath, "relation")); .json(graphBasePath + "/relation");
Dataset<Relation> dedupedRels = allRels Dataset<Relation> dedupedRels = allRels
.joinWith(mergedIds, allRels.col("source").equalTo(mergedIds.col("mergedObjectID")), "left_outer") .joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
.joinWith(mergedIds, col("_1.target").equalTo(mergedIds.col("mergedObjectID")), "left_outer") .joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
.select("_1._1", "_1._2.dedupID", "_2.dedupID") .select("_1._1", "_1._2.dedupID", "_2.dedupID")
.as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING()))
.flatMap(SparkPropagateRelation::addInferredRelations, REL_KRYO_ENC); .map((MapFunction<Tuple3<Relation, String, String>, Relation>) t -> {
Relation rel = t._1();
String newSource = t._2();
String newTarget = t._3();
Dataset<Relation> processedRelations = distinctRelations( if (rel.getDataInfo() == null) {
dedupedRels.union(mergeRels.map((MapFunction<Relation, Relation>) r -> r, REL_KRYO_ENC))) rel.setDataInfo(new DataInfo());
.filter((FilterFunction<Relation>) r -> !Objects.equals(r.getSource(), r.getTarget())); }
save(processedRelations, outputRelationPath, SaveMode.Overwrite); if (newSource != null || newTarget != null) {
} rel.getDataInfo().setDeletedbyinference(false);
private static Iterator<Relation> addInferredRelations(Tuple3<Relation, String, String> t) throws Exception { if (newSource != null)
Relation existingRel = t._1(); rel.setSource(newSource);
String newSource = t._2();
String newTarget = t._3();
if (newSource == null && newTarget == null) { if (newTarget != null)
return Collections.singleton(t._1()).iterator(); rel.setTarget(newTarget);
} }
// update existing relation return rel;
if (existingRel.getDataInfo() == null) { }, REL_BEAN_ENC);
existingRel.setDataInfo(new DataInfo());
}
existingRel.getDataInfo().setDeletedbyinference(true);
// Create new relation inferred by dedupIDs // ids of records that are both not deletedbyinference and not invisible
Relation inferredRel = (Relation) BeanUtils.cloneBean(existingRel); Dataset<Row> ids = validIds(spark, graphBasePath);
inferredRel.setDataInfo((DataInfo) BeanUtils.cloneBean(existingRel.getDataInfo())); // filter relations that point to valid records, can force them to be visible
inferredRel.getDataInfo().setDeletedbyinference(false); Dataset<Relation> cleanedRels = dedupedRels
.join(ids, col("source").equalTo(ids.col("id")), "leftsemi")
.join(ids, col("target").equalTo(ids.col("id")), "leftsemi")
.as(REL_BEAN_ENC)
.map((MapFunction<Relation, Relation>) r -> {
r.getDataInfo().setInvisible(false);
return r;
}, REL_KRYO_ENC);
if (newSource != null) Dataset<Relation> distinctRels = cleanedRels
inferredRel.setSource(newSource);
if (newTarget != null)
inferredRel.setTarget(newTarget);
return Arrays.asList(existingRel, inferredRel).iterator();
}
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
return rels
.filter(getRelationFilterFunction())
.groupByKey( .groupByKey(
(MapFunction<Relation, String>) r -> String (MapFunction<Relation, String>) r -> String
.join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), .join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()),
@ -146,13 +132,33 @@ public class SparkPropagateRelation extends AbstractSparkAction {
return b; return b;
}) })
.map((MapFunction<Tuple2<String, Relation>, Relation>) Tuple2::_2, REL_BEAN_ENC); .map((MapFunction<Tuple2<String, Relation>, Relation>) Tuple2::_2, REL_BEAN_ENC);
final String outputRelationPath = graphOutputPath + "/relation";
removeOutputDir(spark, outputRelationPath);
save(
distinctRels
.union(mergeRels)
.filter("source != target AND dataInfo.deletedbyinference != true AND dataInfo.invisible != true"),
outputRelationPath,
SaveMode.Overwrite);
} }
private FilterFunction<Relation> getRelationFilterFunction() { static Dataset<Row> validIds(SparkSession spark, String graphBasePath) {
return r -> StringUtils.isNotBlank(r.getSource()) || StructType idsSchema = StructType
StringUtils.isNotBlank(r.getTarget()) || .fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>");
StringUtils.isNotBlank(r.getRelType()) ||
StringUtils.isNotBlank(r.getSubRelType()) || Dataset<Row> allIds = spark.emptyDataset(RowEncoder.apply(idsSchema));
StringUtils.isNotBlank(r.getRelClass());
for (EntityType entityType : ModelSupport.entityTypes.keySet()) {
String entityPath = graphBasePath + '/' + entityType.name();
if (HdfsSupport.exists(entityPath, spark.sparkContext().hadoopConfiguration())) {
allIds = allIds.union(spark.read().schema(idsSchema).json(entityPath));
}
}
return allIds
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
.select("id")
.distinct();
} }
} }

View File

@ -1,20 +0,0 @@
[
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of raw graph",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "inputPath",
"paramDescription": "the path to the input relation to cleanup",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "the path of the output relation cleaned",
"paramRequired": true
}
]

View File

@ -100,35 +100,9 @@
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--graphOutputPath</arg><arg>${workingPath}/propagaterelation/</arg> <arg>--graphOutputPath</arg><arg>${graphOutputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
</spark> </spark>
<ok to="CleanRelation"/>
<error to="Kill"/>
</action>
<action name="CleanRelation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCleanRelation</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemoryOverhead}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--inputPath</arg><arg>${workingPath}/propagaterelation/relation</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
</spark>
<ok to="group_entities"/> <ok to="group_entities"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -152,31 +126,7 @@
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg> <arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg> <arg>--checkpointPath</arg><arg>${workingPath}/grouped_entities</arg>
</spark>
<ok to="dispatch_entities"/>
<error to="Kill"/>
</action>
<action name="dispatch_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch grouped entitities</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemoryOverhead}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}</arg> <arg>--outputPath</arg><arg>${graphOutputPath}</arg>
<arg>--filterInvisible</arg><arg>${filterInvisible}</arg> <arg>--filterInvisible</arg><arg>${filterInvisible}</arg>
</spark> </spark>

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory; import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count; import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.lenient;
@ -23,14 +22,13 @@ import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock; import org.mockito.Mock;
@ -46,8 +44,6 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ -62,6 +58,8 @@ public class SparkDedupTest implements Serializable {
private static String testGraphBasePath; private static String testGraphBasePath;
private static String testOutputBasePath; private static String testOutputBasePath;
private static String testDedupGraphBasePath; private static String testDedupGraphBasePath;
private static String testConsistencyGraphBasePath;
private static final String testActionSetId = "test-orchestrator"; private static final String testActionSetId = "test-orchestrator";
private static String whitelistPath; private static String whitelistPath;
private static List<String> whiteList; private static List<String> whiteList;
@ -75,6 +73,7 @@ public class SparkDedupTest implements Serializable {
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()) .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI())
.toFile() .toFile()
.getAbsolutePath(); .getAbsolutePath();
testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath() .toAbsolutePath()
.toString(); .toString();
@ -83,6 +82,10 @@ public class SparkDedupTest implements Serializable {
.toAbsolutePath() .toAbsolutePath()
.toString(); .toString();
testConsistencyGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
whitelistPath = Paths whitelistPath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/whitelist.simrels.txt").toURI()) .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/whitelist.simrels.txt").toURI())
.toFile() .toFile()
@ -674,22 +677,45 @@ public class SparkDedupTest implements Serializable {
assertEquals(mergedOrp, deletedOrp); assertEquals(mergedOrp, deletedOrp);
} }
@Test
@Order(6)
void copyRelationsNoOpenorgsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyRelationsNoOpenorgs.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath
});
new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService);
final Dataset<Row> outputRels = spark.read().text(testDedupGraphBasePath + "/relation");
System.out.println(outputRels.count());
// assertEquals(2382, outputRels.count());
}
@Test @Test
@Order(7) @Order(7)
void propagateRelationTest() throws Exception { void propagateRelationTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")); classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"));
String outputRelPath = testDedupGraphBasePath + "/propagaterelation";
parser parser
.parseArgument( .parseArgument(
new String[] { new String[] {
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", outputRelPath "-i", testDedupGraphBasePath, "-w", testOutputBasePath, "-o", testConsistencyGraphBasePath
}); });
new SparkPropagateRelation(parser, spark).run(isLookUpService); new SparkPropagateRelation(parser, spark).run(isLookUpService);
long relations = jsc.textFile(outputRelPath + "/relation").count(); long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
// assertEquals(4860, relations); // assertEquals(4860, relations);
System.out.println("relations = " + relations); System.out.println("relations = " + relations);
@ -699,95 +725,52 @@ public class SparkDedupTest implements Serializable {
.read() .read()
.load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")) .load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*"))
.as(Encoders.bean(Relation.class)); .as(Encoders.bean(Relation.class));
final JavaPairRDD<String, String> mergedIds = mergeRels
.where("relClass == 'merges'")
.select(mergeRels.col("target"))
.distinct()
.toJavaRDD()
.mapToPair(
(PairFunction<Row, String, String>) r -> new Tuple2<String, String>(r.getString(0), "d"));
JavaRDD<String> toCheck = jsc Dataset<Row> inputRels = spark
.textFile(outputRelPath + "/relation") .read()
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) .json(testDedupGraphBasePath + "/relation");
.join(mergedIds)
.map(t -> t._2()._1())
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json))
.join(mergedIds)
.map(t -> t._2()._1());
long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); Dataset<Row> outputRels = spark
long updated = toCheck.count(); .read()
.json(testConsistencyGraphBasePath + "/relation");
assertEquals(updated, deletedbyinference); assertEquals(
0, outputRels
.filter("dataInfo.deletedbyinference == true OR dataInfo.invisible == true")
.count());
assertEquals(
5, outputRels
.filter("relClass NOT IN ('merges', 'isMergedIn')")
.count());
assertEquals(5 + mergeRels.count(), outputRels.count());
} }
@Test @Test
@Order(8) @Order(8)
void testCleanBaseRelations() throws Exception { void testCleanedPropagatedRelations() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
// append dangling relations to be cleaned up
Dataset<Row> df_before = spark Dataset<Row> df_before = spark
.read() .read()
.schema(Encoders.bean(Relation.class).schema()) .schema(Encoders.bean(Relation.class).schema())
.json(testGraphBasePath + "/relation"); .json(testDedupGraphBasePath + "/relation");
Dataset<Row> df_input = df_before
.unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a")))
.unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a")));
df_input.write().mode(SaveMode.Overwrite).json(testOutputBasePath + "_tmp");
parser
.parseArgument(
new String[] {
"--graphBasePath", testGraphBasePath,
"--inputPath", testGraphBasePath + "/relation",
"--outputPath", testDedupGraphBasePath + "/relation"
});
new SparkCleanRelation(parser, spark).run(isLookUpService);
Dataset<Row> df_after = spark Dataset<Row> df_after = spark
.read() .read()
.schema(Encoders.bean(Relation.class).schema()) .schema(Encoders.bean(Relation.class).schema())
.json(testDedupGraphBasePath + "/relation"); .json(testConsistencyGraphBasePath + "/relation");
assertNotEquals(df_before.count(), df_input.count());
assertNotEquals(df_input.count(), df_after.count());
assertEquals(5, df_after.count());
}
@Test
@Order(9)
void testCleanDedupedRelations() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
String inputRelPath = testDedupGraphBasePath + "/propagaterelation/relation";
// append dangling relations to be cleaned up
Dataset<Row> df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputRelPath);
df_before.filter(col("dataInfo.deletedbyinference").notEqual(true)).show(50, false);
parser
.parseArgument(
new String[] {
"--graphBasePath", testGraphBasePath,
"--inputPath", inputRelPath,
"--outputPath", testDedupGraphBasePath + "/relation"
});
new SparkCleanRelation(parser, spark).run(isLookUpService);
Dataset<Row> df_after = spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(testDedupGraphBasePath + "/relation");
assertNotEquals(df_before.count(), df_after.count()); assertNotEquals(df_before.count(), df_after.count());
assertEquals(0, df_after.count());
assertEquals(
0, df_after
.filter("dataInfo.deletedbyinference == true OR dataInfo.invisible == true")
.count());
assertEquals(
5, df_after
.filter("relClass NOT IN ('merges', 'isMergedIn')")
.count());
} }
@Test @Test
@ -813,6 +796,7 @@ public class SparkDedupTest implements Serializable {
public static void finalCleanUp() throws IOException { public static void finalCleanUp() throws IOException {
FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
FileUtils.deleteDirectory(new File(testConsistencyGraphBasePath));
} }
public boolean isDeletedByInference(String s) { public boolean isDeletedByInference(String s) {

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory; import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.col;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.lenient;
@ -15,10 +16,6 @@ import java.nio.file.Paths;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
@ -33,8 +30,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ -44,11 +39,11 @@ public class SparkOpenorgsProvisionTest implements Serializable {
ISLookUpService isLookUpService; ISLookUpService isLookUpService;
private static SparkSession spark; private static SparkSession spark;
private static JavaSparkContext jsc;
private static String testGraphBasePath; private static String testGraphBasePath;
private static String testOutputBasePath; private static String testOutputBasePath;
private static String testDedupGraphBasePath; private static String testDedupGraphBasePath;
private static String testConsistencyGraphBasePath;
private static final String testActionSetId = "test-orchestrator"; private static final String testActionSetId = "test-orchestrator";
@BeforeAll @BeforeAll
@ -64,6 +59,9 @@ public class SparkOpenorgsProvisionTest implements Serializable {
testDedupGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-") testDedupGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-")
.toAbsolutePath() .toAbsolutePath()
.toString(); .toString();
testConsistencyGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
@ -76,8 +74,13 @@ public class SparkOpenorgsProvisionTest implements Serializable {
.master("local[*]") .master("local[*]")
.config(conf) .config(conf)
.getOrCreate(); .getOrCreate();
}
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @AfterAll
public static void finalCleanUp() throws IOException {
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
FileUtils.deleteDirectory(new File(testConsistencyGraphBasePath));
} }
@BeforeEach @BeforeEach
@ -186,26 +189,21 @@ public class SparkOpenorgsProvisionTest implements Serializable {
new SparkUpdateEntity(parser, spark).run(isLookUpService); new SparkUpdateEntity(parser, spark).run(isLookUpService);
long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count(); Dataset<Row> organizations = spark.read().json(testDedupGraphBasePath + "/organization");
long mergedOrgs = spark Dataset<Row> mergedOrgs = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.as(Encoders.bean(Relation.class))
.where("relClass=='merges'") .where("relClass=='merges'")
.javaRDD() .select("target")
.map(Relation::getTarget) .distinct();
.distinct()
.count();
assertEquals(80, organizations); assertEquals(80, organizations.count());
long deletedOrgs = jsc Dataset<Row> deletedOrgs = organizations
.textFile(testDedupGraphBasePath + "/organization") .filter("dataInfo.deletedbyinference = TRUE");
.filter(this::isDeletedByInference)
.count();
assertEquals(mergedOrgs, deletedOrgs); assertEquals(mergedOrgs.count(), deletedOrgs.count());
} }
@Test @Test
@ -226,10 +224,9 @@ public class SparkOpenorgsProvisionTest implements Serializable {
new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService); new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService);
final JavaRDD<String> rels = jsc.textFile(testDedupGraphBasePath + "/relation"); final Dataset<Row> outputRels = spark.read().text(testDedupGraphBasePath + "/relation");
assertEquals(2382, rels.count());
assertEquals(2382, outputRels.count());
} }
@Test @Test
@ -244,51 +241,41 @@ public class SparkOpenorgsProvisionTest implements Serializable {
parser parser
.parseArgument( .parseArgument(
new String[] { new String[] {
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath "-i", testDedupGraphBasePath, "-w", testOutputBasePath, "-o", testConsistencyGraphBasePath
}); });
new SparkPropagateRelation(parser, spark).run(isLookUpService); new SparkPropagateRelation(parser, spark).run(isLookUpService);
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4896, relations);
// check deletedbyinference
final Dataset<Relation> mergeRels = spark final Dataset<Relation> mergeRels = spark
.read() .read()
.load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")) .load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*"))
.as(Encoders.bean(Relation.class)); .as(Encoders.bean(Relation.class));
final JavaPairRDD<String, String> mergedIds = mergeRels
Dataset<Row> inputRels = spark
.read()
.json(testDedupGraphBasePath + "/relation");
Dataset<Row> outputRels = spark
.read()
.json(testConsistencyGraphBasePath + "/relation");
final Dataset<Row> mergedIds = mergeRels
.where("relClass == 'merges'") .where("relClass == 'merges'")
.select(mergeRels.col("target")) .select(col("target").as("id"))
.distinct() .distinct();
.toJavaRDD()
.mapToPair(
(PairFunction<Row, String, String>) r -> new Tuple2<String, String>(r.getString(0), "d"));
JavaRDD<String> toCheck = jsc Dataset<Row> toUpdateRels = inputRels
.textFile(testDedupGraphBasePath + "/relation") .as("rel")
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) .join(mergedIds.as("s"), col("rel.source").equalTo(col("s.id")), "left_outer")
.join(mergedIds) .join(mergedIds.as("t"), col("rel.target").equalTo(col("t.id")), "left_outer")
.map(t -> t._2()._1()) .filter("s.id IS NOT NULL OR t.id IS NOT NULL")
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json)) .distinct();
.join(mergedIds)
.map(t -> t._2()._1());
long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); Dataset<Row> updatedRels = inputRels
long updated = toCheck.count(); .select("source", "target", "relClass")
.except(outputRels.select("source", "target", "relClass"));
assertEquals(updated, deletedbyinference); assertEquals(toUpdateRels.count(), updatedRels.count());
assertEquals(140, outputRels.count());
} }
@AfterAll
public static void finalCleanUp() throws IOException {
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
}
public boolean isDeletedByInference(String s) {
return s.contains("\"deletedbyinference\":true");
}
} }

View File

@ -96,30 +96,7 @@
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg> <arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg> <arg>--checkpointPath</arg><arg>${workingPath}/grouped_entities</arg>
</spark>
<ok to="dispatch_entities"/>
<error to="Kill"/>
</action>
<action name="dispatch_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch grouped entities</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}</arg> <arg>--outputPath</arg><arg>${graphOutputPath}</arg>
<arg>--filterInvisible</arg><arg>${filterInvisible}</arg> <arg>--filterInvisible</arg><arg>${filterInvisible}</arg>
</spark> </spark>

View File

@ -1,16 +1,15 @@
package eu.dnetlib.dhp.oa.graph.group; package eu.dnetlib.dhp.oa.graph.group;
import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException; import eu.dnetlib.dhp.common.HdfsSupport;
import java.net.URISyntaxException; import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import java.nio.file.Files; import eu.dnetlib.dhp.schema.common.ModelSupport;
import java.nio.file.Path; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import java.nio.file.Paths; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -19,118 +18,108 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import com.fasterxml.jackson.databind.DeserializationFeature; import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper; import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import eu.dnetlib.dhp.common.HdfsSupport; import static org.junit.jupiter.api.Assertions.assertEquals;
import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob;
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class GroupEntitiesSparkJobTest { public class GroupEntitiesSparkJobTest {
private static SparkSession spark; private static SparkSession spark;
private static ObjectMapper mapper = new ObjectMapper() private static ObjectMapper mapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static Path workingDir; private static Path workingDir;
private Path dataInputPath; private Path dataInputPath;
private Path groupEntityPath; private Path checkpointPath;
private Path dispatchEntityPath;
@BeforeAll private Path outputPath;
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName());
SparkConf conf = new SparkConf(); @BeforeAll
conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); public static void beforeAll() throws IOException {
conf.setMaster("local"); workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName());
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
spark = SparkSession.builder().config(conf).getOrCreate();
}
@BeforeEach SparkConf conf = new SparkConf();
public void beforeEach() throws IOException, URISyntaxException { conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName());
dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); conf.setMaster("local");
groupEntityPath = workingDir.resolve("grouped_entity"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
dispatchEntityPath = workingDir.resolve("dispatched_entity"); conf.registerKryoClasses(ModelSupport.getOafModelClasses());
} spark = SparkSession.builder().config(conf).getOrCreate();
}
@AfterAll @BeforeEach
public static void afterAll() throws IOException { public void beforeEach() throws IOException, URISyntaxException {
spark.stop(); dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI());
FileUtils.deleteDirectory(workingDir.toFile()); checkpointPath = workingDir.resolve("grouped_entity");
} outputPath = workingDir.resolve("dispatched_entity");
}
@Test @AfterAll
@Order(1) public static void afterAll() throws IOException {
void testGroupEntities() throws Exception { spark.stop();
GroupEntitiesSparkJob.main(new String[] { FileUtils.deleteDirectory(workingDir.toFile());
"-isSparkSessionManaged", }
Boolean.FALSE.toString(),
"-graphInputPath",
dataInputPath.toString(),
"-outputPath",
groupEntityPath.toString()
});
Dataset<Result> output = spark @Test
.read() @Order(1)
.textFile(groupEntityPath.toString()) void testGroupEntities() throws Exception {
.map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) GroupEntitiesSparkJob.main(new String[]{
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); "-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-graphInputPath",
dataInputPath.toString(),
"-checkpointPath",
checkpointPath.toString(),
"-outputPath",
outputPath.toString(),
"-filterInvisible",
Boolean.FALSE.toString()
});
assertEquals( Dataset<OafEntity> checkpointTable = spark
1, .read()
output .load(checkpointPath.toString())
.filter( .selectExpr("COALESCE(*)")
(FilterFunction<Result>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" .as(Encoders.kryo(OafEntity.class));
.equals(r.getId()) &&
r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
.count());
}
@Test
@Order(2)
void testDispatchEntities() throws Exception {
DispatchEntitiesSparkJob.main(new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
groupEntityPath.toString(),
"-outputPath",
dispatchEntityPath.resolve(".").toString(),
"-filterInvisible",
Boolean.TRUE.toString()
});
Dataset<Result> output = spark assertEquals(
.read() 1,
.textFile( checkpointTable
DHPUtils .filter(
.toSeq( (FilterFunction<OafEntity>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9"
HdfsSupport .equals(r.getId()) &&
.listFiles(dispatchEntityPath.toString(), spark.sparkContext().hadoopConfiguration()))) r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); .count());
assertEquals(3, output.count());
assertEquals( Dataset<Result> output = spark
2, .read()
output .textFile(
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING()) DHPUtils
.filter((FilterFunction<String>) s -> s.equals("publication")) .toSeq(
.count()); HdfsSupport
assertEquals( .listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration())))
1, .map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
output
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING()) assertEquals(3, output.count());
.filter((FilterFunction<String>) s -> s.equals("dataset")) assertEquals(
.count()); 2,
} output
} .map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
.filter((FilterFunction<String>) s -> s.equals("publication"))
.count());
assertEquals(
1,
output
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
.filter((FilterFunction<String>) s -> s.equals("dataset"))
.count());
}
}