Merge branch 'beta' into SWH_integration

This commit is contained in:
Claudio Atzori 2023-10-06 12:21:51 +02:00
commit 73c49b8d26
34 changed files with 786 additions and 902 deletions

1
.gitignore vendored
View File

@ -26,3 +26,4 @@ spark-warehouse
/**/*.log /**/*.log
/**/.factorypath /**/.factorypath
/**/.scalafmt.conf /**/.scalafmt.conf
/.java-version

View File

@ -1,97 +0,0 @@
package eu.dnetlib.dhp.oa.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
public class DispatchEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
Objects
.requireNonNull(
DispatchEntitiesSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json")));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible"));
log.info("filterInvisible: {}", filterInvisible);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> dispatchEntities(spark, inputPath, outputPath, filterInvisible));
}
private static void dispatchEntities(
SparkSession spark,
String inputPath,
String outputPath,
boolean filterInvisible) {
Dataset<String> df = spark.read().textFile(inputPath);
ModelSupport.oafTypes.entrySet().parallelStream().forEach(entry -> {
String entityType = entry.getKey();
Class<?> clazz = entry.getValue();
final String entityPath = outputPath + "/" + entityType;
if (!entityType.equalsIgnoreCase("relation")) {
HdfsSupport.remove(entityPath, spark.sparkContext().hadoopConfiguration());
Dataset<Row> entityDF = spark
.read()
.schema(Encoders.bean(clazz).schema())
.json(
df
.filter((FilterFunction<String>) s -> s.startsWith(clazz.getName()))
.map(
(MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"),
Encoders.STRING()));
if (filterInvisible) {
entityDF = entityDF.filter("dataInfo.invisible != true");
}
entityDF
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(entityPath);
}
});
}
}

View File

@ -2,36 +2,28 @@
package eu.dnetlib.dhp.oa.merge; package eu.dnetlib.dhp.oa.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.when;
import java.io.IOException; import java.util.Map;
import java.util.List;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2; import scala.Tuple2;
@ -39,13 +31,9 @@ import scala.Tuple2;
* Groups the graph content by entity identifier to ensure ID uniqueness * Groups the graph content by entity identifier to ensure ID uniqueness
*/ */
public class GroupEntitiesSparkJob { public class GroupEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
private static final String ID_JPATH = "$.id"; private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -66,9 +54,15 @@ public class GroupEntitiesSparkJob {
String graphInputPath = parser.get("graphInputPath"); String graphInputPath = parser.get("graphInputPath");
log.info("graphInputPath: {}", graphInputPath); log.info("graphInputPath: {}", graphInputPath);
String checkpointPath = parser.get("checkpointPath");
log.info("checkpointPath: {}", checkpointPath);
String outputPath = parser.get("outputPath"); String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible"));
log.info("filterInvisible: {}", filterInvisible);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses()); conf.registerKryoClasses(ModelSupport.getOafModelClasses());
@ -78,126 +72,95 @@ public class GroupEntitiesSparkJob {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
groupEntities(spark, graphInputPath, outputPath); groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible);
}); });
} }
private static void groupEntities( private static void groupEntities(
SparkSession spark, SparkSession spark,
String inputPath, String inputPath,
String outputPath) { String checkpointPath,
String outputPath,
boolean filterInvisible) {
Dataset<OafEntity> allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC);
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();
String entityInputPath = inputPath + "/" + entity;
if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) {
continue;
}
allEntities = allEntities
.union(
((Dataset<OafEntity>) spark
.read()
.schema(Encoders.bean(entityClass).schema())
.json(entityInputPath)
.filter("length(id) > 0")
.as(Encoders.bean(entityClass)))
.map((MapFunction<OafEntity, OafEntity>) r -> r, OAFENTITY_KRYO_ENC));
}
Dataset<?> groupedEntities = allEntities
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
.reduceGroups((ReduceFunction<OafEntity>) (b, a) -> OafMapperUtils.mergeEntities(b, a))
.map(
(MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2(
t._2().getClass().getName(), t._2()),
Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
// pivot on "_1" (classname of the entity)
// created columns containing only entities of the same class
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();
groupedEntities = groupedEntities
.withColumn(
entity,
when(col("_1").equalTo(entityClass.getName()), col("_2")));
}
groupedEntities
.drop("_1", "_2")
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.save(checkpointPath);
ForkJoinPool parPool = new ForkJoinPool(ModelSupport.entityTypes.size());
ModelSupport.entityTypes
.entrySet()
.stream()
.map(e -> parPool.submit(() -> {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();
final TypedColumn<OafEntity, OafEntity> aggregator = new GroupingAggregator().toColumn();
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
spark spark
.read() .read()
.textFile(toSeq(listEntityPaths(inputPath, sc))) .load(checkpointPath)
.map((MapFunction<String, OafEntity>) GroupEntitiesSparkJob::parseOaf, Encoders.kryo(OafEntity.class)) .select(col(entity).as("value"))
.filter((FilterFunction<OafEntity>) e -> StringUtils.isNotBlank(ModelSupport.idFn().apply(e))) .filter("value IS NOT NULL")
.groupByKey((MapFunction<OafEntity, String>) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) .as(OAFENTITY_KRYO_ENC)
.agg(aggregator) .map((MapFunction<OafEntity, OafEntity>) r -> r, (Encoder<OafEntity>) Encoders.bean(entityClass))
.map( .filter(filterInvisible ? "dataInfo.invisible != TRUE" : "TRUE")
(MapFunction<Tuple2<String, OafEntity>, String>) t -> t._2().getClass().getName() +
"|" + OBJECT_MAPPER.writeValueAsString(t._2()),
Encoders.STRING())
.write() .write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.text(outputPath); .option("compression", "gzip")
} .json(outputPath + "/" + entity);
}))
public static class GroupingAggregator extends Aggregator<OafEntity, OafEntity, OafEntity> { .collect(Collectors.toList())
.forEach(t -> {
@Override
public OafEntity zero() {
return null;
}
@Override
public OafEntity reduce(OafEntity b, OafEntity a) {
return mergeAndGet(b, a);
}
private OafEntity mergeAndGet(OafEntity b, OafEntity a) {
if (Objects.nonNull(a) && Objects.nonNull(b)) {
return OafMapperUtils.mergeEntities(b, a);
}
return Objects.isNull(a) ? b : a;
}
@Override
public OafEntity merge(OafEntity b, OafEntity a) {
return mergeAndGet(b, a);
}
@Override
public OafEntity finish(OafEntity j) {
return j;
}
@Override
public Encoder<OafEntity> bufferEncoder() {
return Encoders.kryo(OafEntity.class);
}
@Override
public Encoder<OafEntity> outputEncoder() {
return Encoders.kryo(OafEntity.class);
}
}
private static OafEntity parseOaf(String s) {
DocumentContext dc = JsonPath
.parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
final String id = dc.read(ID_JPATH);
if (StringUtils.isNotBlank(id)) {
String prefix = StringUtils.substringBefore(id, "|");
switch (prefix) {
case "10":
return parse(s, Datasource.class);
case "20":
return parse(s, Organization.class);
case "40":
return parse(s, Project.class);
case "50":
String resultType = dc.read("$.resulttype.classid");
switch (resultType) {
case "publication":
return parse(s, Publication.class);
case "dataset":
return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class);
case "software":
return parse(s, Software.class);
case "other":
return parse(s, OtherResearchProduct.class);
default:
throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType));
}
default:
throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix));
}
} else {
throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s));
}
}
private static <T extends OafEntity> OafEntity parse(String s, Class<T> clazz) {
try { try {
return OBJECT_MAPPER.readValue(s, clazz); t.get();
} catch (IOException e) { } catch (InterruptedException | ExecutionException e) {
throw new IllegalArgumentException(e); throw new RuntimeException(e);
} }
});
} }
private static List<String> listEntityPaths(String inputPath, JavaSparkContext sc) {
return HdfsSupport
.listFiles(inputPath, sc.hadoopConfiguration())
.stream()
.filter(f -> !f.toLowerCase().contains("relation"))
.collect(Collectors.toList());
}
} }

View File

@ -1,26 +0,0 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "i",
"paramLongName": "inputPath",
"paramDescription": "the source path",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "path of the output graph",
"paramRequired": true
},
{
"paramName": "fi",
"paramLongName": "filterInvisible",
"paramDescription": "if true filters out invisible entities",
"paramRequired": true
}
]

View File

@ -8,13 +8,25 @@
{ {
"paramName": "gin", "paramName": "gin",
"paramLongName": "graphInputPath", "paramLongName": "graphInputPath",
"paramDescription": "the graph root path", "paramDescription": "the input graph root path",
"paramRequired": true
},
{
"paramName": "cp",
"paramLongName": "checkpointPath",
"paramDescription": "checkpoint directory",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "out", "paramName": "out",
"paramLongName": "outputPath", "paramLongName": "outputPath",
"paramDescription": "the output merged graph root path", "paramDescription": "the output graph root path",
"paramRequired": true
},
{
"paramName": "fi",
"paramLongName": "filterInvisible",
"paramDescription": "if true filters out invisible entities",
"paramRequired": true "paramRequired": true
} }
] ]

View File

@ -81,7 +81,7 @@ case class SparkModel(conf: DedupConfig) {
MapDocumentUtil.truncateList( MapDocumentUtil.truncateList(
MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType), MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType),
fdef.getSize fdef.getSize
).toArray ).asScala
case Type.StringConcat => case Type.StringConcat =>
val jpaths = CONCAT_REGEX.split(fdef.getPath) val jpaths = CONCAT_REGEX.split(fdef.getPath)

View File

@ -1,6 +1,23 @@
package eu.dnetlib.pace.util; package eu.dnetlib.pace.util;
/*
* Diff Match and Patch
* Copyright 2018 The diff-match-patch Authors.
* https://github.com/google/diff-match-patch
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* /*
* Diff Match and Patch * Diff Match and Patch
* Copyright 2018 The diff-match-patch Authors. * Copyright 2018 The diff-match-patch Authors.

View File

@ -117,6 +117,11 @@ public class MapDocumentUtil {
return result; return result;
} }
if (type == Type.List && jresult instanceof List) {
((List<?>) jresult).forEach(x -> result.add(x.toString()));
return result;
}
if (jresult instanceof JSONArray) { if (jresult instanceof JSONArray) {
((JSONArray) jresult).forEach(it -> { ((JSONArray) jresult).forEach(it -> {
try { try {

View File

@ -12,6 +12,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -30,12 +31,16 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2; import scala.Tuple2;
public class CreateActionSetSparkJob implements Serializable { public class CreateActionSetSparkJob implements Serializable {
public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations"; public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations";
public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations"; public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations";
private static final String ID_PREFIX = "50|doi_________::"; private static final String DOI_PREFIX = "50|doi_________::";
private static final String PMID_PREFIX = "50|pmid________::";
private static final String TRUST = "0.91"; private static final String TRUST = "0.91";
private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class); private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
@ -84,34 +89,56 @@ public class CreateActionSetSparkJob implements Serializable {
private static void extractContent(SparkSession spark, String inputPath, String outputPath, private static void extractContent(SparkSession spark, String inputPath, String outputPath,
boolean shouldDuplicateRels) { boolean shouldDuplicateRels) {
spark
getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, "COCI")
.union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, "POCI"))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
}
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(SparkSession spark, String inputPath,
boolean shouldDuplicateRels, String prefix) {
return spark
.read() .read()
.textFile(inputPath + "/*") .textFile(inputPath + "/" + prefix + "/" + prefix + "_JSON/*")
.map( .map(
(MapFunction<String, COCI>) value -> OBJECT_MAPPER.readValue(value, COCI.class), (MapFunction<String, COCI>) value -> OBJECT_MAPPER.readValue(value, COCI.class),
Encoders.bean(COCI.class)) Encoders.bean(COCI.class))
.flatMap( .flatMap(
(FlatMapFunction<COCI, Relation>) value -> createRelation(value, shouldDuplicateRels).iterator(), (FlatMapFunction<COCI, Relation>) value -> createRelation(
value, shouldDuplicateRels, prefix)
.iterator(),
Encoders.bean(Relation.class)) Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) value -> value != null) .filter((FilterFunction<Relation>) value -> value != null)
.toJavaRDD() .toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p)) .map(p -> new AtomicAction(p.getClass(), p))
.mapToPair( .mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa)))) new Text(OBJECT_MAPPER.writeValueAsString(aa))));
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
} }
private static List<Relation> createRelation(COCI value, boolean duplicate) { private static List<Relation> createRelation(COCI value, boolean duplicate, String p) {
List<Relation> relationList = new ArrayList<>(); List<Relation> relationList = new ArrayList<>();
String prefix;
String citing = ID_PREFIX String citing;
String cited;
if (p.equals("COCI")) {
prefix = DOI_PREFIX;
citing = prefix
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting())); + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting()));
final String cited = ID_PREFIX cited = prefix
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited())); + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited()));
} else {
prefix = PMID_PREFIX;
citing = prefix
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("pmid", value.getCiting()));
cited = prefix
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("pmid", value.getCited()));
}
if (!citing.equals(cited)) { if (!citing.equals(cited)) {
relationList relationList
.add( .add(
@ -120,7 +147,7 @@ public class CreateActionSetSparkJob implements Serializable {
cited, ModelConstants.CITES)); cited, ModelConstants.CITES));
if (duplicate && value.getCiting().endsWith(".refs")) { if (duplicate && value.getCiting().endsWith(".refs")) {
citing = ID_PREFIX + IdentifierFactory citing = prefix + IdentifierFactory
.md5( .md5(
CleaningFunctions CleaningFunctions
.normalizePidValue( .normalizePidValue(
@ -132,59 +159,30 @@ public class CreateActionSetSparkJob implements Serializable {
return relationList; return relationList;
} }
private static Collection<Relation> getRelations(String citing, String cited) {
return Arrays
.asList(
getRelation(citing, cited, ModelConstants.CITES),
getRelation(cited, citing, ModelConstants.IS_CITED_BY));
}
public static Relation getRelation( public static Relation getRelation(
String source, String source,
String target, String target,
String relclass) { String relclass) {
Relation r = new Relation();
r.setCollectedfrom(getCollectedFrom()); return OafMapperUtils
r.setSource(source); .getRelation(
r.setTarget(target); source,
r.setRelClass(relclass); target,
r.setRelType(ModelConstants.RESULT_RESULT); ModelConstants.RESULT_RESULT,
r.setSubRelType(ModelConstants.CITATION); ModelConstants.CITATION,
r relclass,
.setDataInfo( Arrays
getDataInfo()); .asList(
return r; OafMapperUtils.keyValue(ModelConstants.OPENOCITATIONS_ID, ModelConstants.OPENOCITATIONS_NAME)),
OafMapperUtils
.dataInfo(
false, null, false, false,
OafMapperUtils
.qualifier(
OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME,
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
TRUST),
null);
} }
public static List<KeyValue> getCollectedFrom() {
KeyValue kv = new KeyValue();
kv.setKey(ModelConstants.OPENOCITATIONS_ID);
kv.setValue(ModelConstants.OPENOCITATIONS_NAME);
return Arrays.asList(kv);
}
public static DataInfo getDataInfo() {
DataInfo di = new DataInfo();
di.setInferred(false);
di.setDeletedbyinference(false);
di.setTrust(TRUST);
di
.setProvenanceaction(
getQualifier(OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS));
return di;
}
public static Qualifier getQualifier(String class_id, String class_name,
String qualifierSchema) {
Qualifier pa = new Qualifier();
pa.setClassid(class_id);
pa.setClassname(class_name);
pa.setSchemeid(qualifierSchema);
pa.setSchemename(qualifierSchema);
return pa;
}
} }

View File

@ -45,6 +45,9 @@ public class GetOpenCitationsRefs implements Serializable {
final String hdfsNameNode = parser.get("hdfsNameNode"); final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode); log.info("hdfsNameNode {}", hdfsNameNode);
final String prefix = parser.get("prefix");
log.info("prefix {}", prefix);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode); conf.set("fs.defaultFS", hdfsNameNode);
@ -53,30 +56,31 @@ public class GetOpenCitationsRefs implements Serializable {
GetOpenCitationsRefs ocr = new GetOpenCitationsRefs(); GetOpenCitationsRefs ocr = new GetOpenCitationsRefs();
for (String file : inputFile) { for (String file : inputFile) {
ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem); ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem, prefix);
} }
} }
private void doExtract(String inputFile, String workingPath, FileSystem fileSystem) private void doExtract(String inputFile, String workingPath, FileSystem fileSystem, String prefix)
throws IOException { throws IOException {
final Path path = new Path(inputFile); final Path path = new Path(inputFile);
FSDataInputStream oc_zip = fileSystem.open(path); FSDataInputStream oc_zip = fileSystem.open(path);
int count = 1; // int count = 1;
try (ZipInputStream zis = new ZipInputStream(oc_zip)) { try (ZipInputStream zis = new ZipInputStream(oc_zip)) {
ZipEntry entry = null; ZipEntry entry = null;
while ((entry = zis.getNextEntry()) != null) { while ((entry = zis.getNextEntry()) != null) {
if (!entry.isDirectory()) { if (!entry.isDirectory()) {
String fileName = entry.getName(); String fileName = entry.getName();
fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count; // fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count;
count++; fileName = fileName.substring(0, fileName.lastIndexOf("."));
// count++;
try ( try (
FSDataOutputStream out = fileSystem FSDataOutputStream out = fileSystem
.create(new Path(workingPath + "/COCI/" + fileName + ".gz")); .create(new Path(workingPath + "/" + prefix + "/" + fileName + ".gz"));
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
IOUtils.copy(zis, gzipOs); IOUtils.copy(zis, gzipOs);

View File

@ -49,6 +49,9 @@ public class ReadCOCI implements Serializable {
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath {}", workingPath); log.info("workingPath {}", workingPath);
final String format = parser.get("format");
log.info("format {}", format);
SparkConf sconf = new SparkConf(); SparkConf sconf = new SparkConf();
final String delimiter = Optional final String delimiter = Optional
@ -64,13 +67,14 @@ public class ReadCOCI implements Serializable {
workingPath, workingPath,
inputFile, inputFile,
outputPath, outputPath,
delimiter); delimiter,
format);
}); });
} }
private static void doRead(SparkSession spark, String workingPath, String[] inputFiles, private static void doRead(SparkSession spark, String workingPath, String[] inputFiles,
String outputPath, String outputPath,
String delimiter) throws IOException { String delimiter, String format) throws IOException {
for (String inputFile : inputFiles) { for (String inputFile : inputFiles) {
String p_string = workingPath + "/" + inputFile + ".gz"; String p_string = workingPath + "/" + inputFile + ".gz";
@ -87,9 +91,15 @@ public class ReadCOCI implements Serializable {
cociData.map((MapFunction<Row, COCI>) row -> { cociData.map((MapFunction<Row, COCI>) row -> {
COCI coci = new COCI(); COCI coci = new COCI();
coci.setOci(row.getString(0)); if (format.equals("COCI")) {
coci.setCiting(row.getString(1)); coci.setCiting(row.getString(1));
coci.setCited(row.getString(2)); coci.setCited(row.getString(2));
} else {
coci.setCiting(String.valueOf(row.getInt(1)));
coci.setCited(String.valueOf(row.getInt(2)));
}
coci.setOci(row.getString(0));
return coci; return coci;
}, Encoders.bean(COCI.class)) }, Encoders.bean(COCI.class))
.write() .write()

View File

@ -21,5 +21,10 @@
"paramLongName": "shouldDuplicateRels", "paramLongName": "shouldDuplicateRels",
"paramDescription": "the hdfs name node", "paramDescription": "the hdfs name node",
"paramRequired": false "paramRequired": false
},{
"paramName": "p",
"paramLongName": "prefix",
"paramDescription": "the hdfs name node",
"paramRequired": true
} }
] ]

View File

@ -16,5 +16,11 @@
"paramLongName": "hdfsNameNode", "paramLongName": "hdfsNameNode",
"paramDescription": "the hdfs name node", "paramDescription": "the hdfs name node",
"paramRequired": true "paramRequired": true
},
{
"paramName": "p",
"paramLongName": "prefix",
"paramDescription": "COCI or POCI",
"paramRequired": true
} }
] ]

View File

@ -30,7 +30,12 @@
"paramLongName": "inputFile", "paramLongName": "inputFile",
"paramDescription": "the hdfs name node", "paramDescription": "the hdfs name node",
"paramRequired": true "paramRequired": true
} }, {
"paramName": "f",
"paramLongName": "format",
"paramDescription": "the hdfs name node",
"paramRequired": true
}
] ]

View File

@ -46,7 +46,7 @@
</configuration> </configuration>
<exec>download.sh</exec> <exec>download.sh</exec>
<argument>${filelist}</argument> <argument>${filelist}</argument>
<argument>${workingPath}/Original</argument> <argument>${workingPath}/${prefix}/Original</argument>
<env-var>HADOOP_USER_NAME=${wf:user()}</env-var> <env-var>HADOOP_USER_NAME=${wf:user()}</env-var>
<file>download.sh</file> <file>download.sh</file>
<capture-output/> <capture-output/>
@ -59,7 +59,8 @@
<main-class>eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs</main-class> <main-class>eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--inputFile</arg><arg>${inputFile}</arg> <arg>--inputFile</arg><arg>${inputFile}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}/${prefix}</arg>
<arg>--prefix</arg><arg>${prefix}</arg>
</java> </java>
<ok to="read"/> <ok to="read"/>
<error to="Kill"/> <error to="Kill"/>
@ -82,10 +83,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<arg>--workingPath</arg><arg>${workingPath}/COCI</arg> <arg>--workingPath</arg><arg>${workingPath}/${prefix}/${prefix}</arg>
<arg>--outputPath</arg><arg>${workingPath}/COCI_JSON/</arg> <arg>--outputPath</arg><arg>${workingPath}/${prefix}/${prefix}_JSON/</arg>
<arg>--delimiter</arg><arg>${delimiter}</arg> <arg>--delimiter</arg><arg>${delimiter}</arg>
<arg>--inputFile</arg><arg>${inputFileCoci}</arg> <arg>--inputFile</arg><arg>${inputFileCoci}</arg>
<arg>--format</arg><arg>${prefix}</arg>
</spark> </spark>
<ok to="create_actionset"/> <ok to="create_actionset"/>
<error to="Kill"/> <error to="Kill"/>
@ -108,8 +110,9 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/COCI_JSON</arg> <arg>--inputPath</arg><arg>${workingPath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--prefix</arg><arg>${prefix}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -2,7 +2,9 @@
package eu.dnetlib.dhp.broker.oa.util; package eu.dnetlib.dhp.broker.oa.util;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.commons.io.IOUtils;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -27,10 +29,14 @@ public class TrustUtils {
static { static {
mapper = new ObjectMapper(); mapper = new ObjectMapper();
try { try {
dedupConfig = mapper dedupConfig = DedupConfig
.readValue( .load(
DedupConfig.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"), IOUtils
DedupConfig.class); .toString(
DedupConfig.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"),
StandardCharsets.UTF_8));
deduper = new SparkDeduper(dedupConfig); deduper = new SparkDeduper(dedupConfig);
} catch (final IOException e) { } catch (final IOException e) {
log.error("Error loading dedupConfig, e"); log.error("Error loading dedupConfig, e");
@ -57,7 +63,7 @@ public class TrustUtils {
return TrustUtils.rescale(score, threshold); return TrustUtils.rescale(score, threshold);
} catch (final Exception e) { } catch (final Exception e) {
log.error("Error computing score between results", e); log.error("Error computing score between results", e);
return BrokerConstants.MIN_TRUST; throw new RuntimeException(e);
} }
} }

View File

@ -1,57 +0,0 @@
package eu.dnetlib.dhp.oa.dedup;
import java.util.Objects;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class RelationAggregator extends Aggregator<Relation, Relation, Relation> {
private static final Relation ZERO = new Relation();
@Override
public Relation zero() {
return ZERO;
}
@Override
public Relation reduce(Relation b, Relation a) {
return mergeRel(b, a);
}
@Override
public Relation merge(Relation b, Relation a) {
return mergeRel(b, a);
}
@Override
public Relation finish(Relation r) {
return r;
}
private Relation mergeRel(Relation b, Relation a) {
if (Objects.equals(b, ZERO)) {
return a;
}
if (Objects.equals(a, ZERO)) {
return b;
}
b.mergeFrom(a);
return b;
}
@Override
public Encoder<Relation> bufferEncoder() {
return Encoders.kryo(Relation.class);
}
@Override
public Encoder<Relation> outputEncoder() {
return Encoders.kryo(Relation.class);
}
}

View File

@ -1,78 +0,0 @@
package eu.dnetlib.dhp.oa.dedup
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.slf4j.LoggerFactory
object SparkCleanRelation {
private val log = LoggerFactory.getLogger(classOf[SparkCleanRelation])
@throws[Exception]
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(
IOUtils.toString(
classOf[SparkCleanRelation].getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")
)
)
parser.parseArgument(args)
val conf = new SparkConf
new SparkCleanRelation(parser, AbstractSparkAction.getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")))
}
}
class SparkCleanRelation(parser: ArgumentApplicationParser, spark: SparkSession)
extends AbstractSparkAction(parser, spark) {
override def run(isLookUpService: ISLookUpService): Unit = {
val graphBasePath = parser.get("graphBasePath")
val inputPath = parser.get("inputPath")
val outputPath = parser.get("outputPath")
SparkCleanRelation.log.info("graphBasePath: '{}'", graphBasePath)
SparkCleanRelation.log.info("inputPath: '{}'", inputPath)
SparkCleanRelation.log.info("outputPath: '{}'", outputPath)
AbstractSparkAction.removeOutputDir(spark, outputPath)
val entities =
Seq("datasource", "project", "organization", "publication", "dataset", "software", "otherresearchproduct")
val idsSchema = StructType.fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>")
val emptyIds = spark.createDataFrame(spark.sparkContext.emptyRDD[Row].setName("empty"),
idsSchema)
val ids = entities
.foldLeft(emptyIds)((ds, entity) => {
val entityPath = graphBasePath + '/' + entity
if (HdfsSupport.exists(entityPath, spark.sparkContext.hadoopConfiguration)) {
ds.union(spark.read.schema(idsSchema).json(entityPath))
} else {
ds
}
})
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
.select("id")
.distinct()
val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(inputPath)
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
AbstractSparkAction.save(
relations
.join(ids, col("source") === ids("id"), "leftsemi")
.join(ids, col("target") === ids("id"), "leftsemi"),
outputPath,
SaveMode.Overwrite
)
}
}

View File

@ -3,23 +3,19 @@ package eu.dnetlib.dhp.oa.dedup;
import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.col;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
@ -70,73 +66,63 @@ public class SparkPropagateRelation extends AbstractSparkAction {
log.info("workingPath: '{}'", workingPath); log.info("workingPath: '{}'", workingPath);
log.info("graphOutputPath: '{}'", graphOutputPath); log.info("graphOutputPath: '{}'", graphOutputPath);
final String outputRelationPath = DedupUtility.createEntityPath(graphOutputPath, "relation");
removeOutputDir(spark, outputRelationPath);
Dataset<Relation> mergeRels = spark Dataset<Relation> mergeRels = spark
.read() .read()
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) .load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
.as(REL_BEAN_ENC); .as(REL_BEAN_ENC);
// <mergedObjectID, dedupID> // <mergedObjectID, dedupID>
Dataset<Row> mergedIds = mergeRels Dataset<Row> idsToMerge = mergeRels
.where(col("relClass").equalTo(ModelConstants.MERGES)) .where(col("relClass").equalTo(ModelConstants.MERGES))
.select(col("source").as("dedupID"), col("target").as("mergedObjectID")) .select(col("source").as("dedupID"), col("target").as("mergedObjectID"))
.distinct() .distinct();
.cache();
Dataset<Row> allRels = spark Dataset<Row> allRels = spark
.read() .read()
.schema(REL_BEAN_ENC.schema()) .schema(REL_BEAN_ENC.schema())
.json(DedupUtility.createEntityPath(graphBasePath, "relation")); .json(graphBasePath + "/relation");
Dataset<Relation> dedupedRels = allRels Dataset<Relation> dedupedRels = allRels
.joinWith(mergedIds, allRels.col("source").equalTo(mergedIds.col("mergedObjectID")), "left_outer") .joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
.joinWith(mergedIds, col("_1.target").equalTo(mergedIds.col("mergedObjectID")), "left_outer") .joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
.select("_1._1", "_1._2.dedupID", "_2.dedupID") .select("_1._1", "_1._2.dedupID", "_2.dedupID")
.as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING()))
.flatMap(SparkPropagateRelation::addInferredRelations, REL_KRYO_ENC); .map((MapFunction<Tuple3<Relation, String, String>, Relation>) t -> {
Relation rel = t._1();
Dataset<Relation> processedRelations = distinctRelations(
dedupedRels.union(mergeRels.map((MapFunction<Relation, Relation>) r -> r, REL_KRYO_ENC)))
.filter((FilterFunction<Relation>) r -> !Objects.equals(r.getSource(), r.getTarget()));
save(processedRelations, outputRelationPath, SaveMode.Overwrite);
}
private static Iterator<Relation> addInferredRelations(Tuple3<Relation, String, String> t) throws Exception {
Relation existingRel = t._1();
String newSource = t._2(); String newSource = t._2();
String newTarget = t._3(); String newTarget = t._3();
if (newSource == null && newTarget == null) { if (rel.getDataInfo() == null) {
return Collections.singleton(t._1()).iterator(); rel.setDataInfo(new DataInfo());
} }
// update existing relation if (newSource != null || newTarget != null) {
if (existingRel.getDataInfo() == null) { rel.getDataInfo().setDeletedbyinference(false);
existingRel.setDataInfo(new DataInfo());
}
existingRel.getDataInfo().setDeletedbyinference(true);
// Create new relation inferred by dedupIDs
Relation inferredRel = (Relation) BeanUtils.cloneBean(existingRel);
inferredRel.setDataInfo((DataInfo) BeanUtils.cloneBean(existingRel.getDataInfo()));
inferredRel.getDataInfo().setDeletedbyinference(false);
if (newSource != null) if (newSource != null)
inferredRel.setSource(newSource); rel.setSource(newSource);
if (newTarget != null) if (newTarget != null)
inferredRel.setTarget(newTarget); rel.setTarget(newTarget);
return Arrays.asList(existingRel, inferredRel).iterator();
} }
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) { return rel;
return rels }, REL_BEAN_ENC);
.filter(getRelationFilterFunction())
// ids of records that are both not deletedbyinference and not invisible
Dataset<Row> ids = validIds(spark, graphBasePath);
// filter relations that point to valid records, can force them to be visible
Dataset<Relation> cleanedRels = dedupedRels
.join(ids, col("source").equalTo(ids.col("id")), "leftsemi")
.join(ids, col("target").equalTo(ids.col("id")), "leftsemi")
.as(REL_BEAN_ENC)
.map((MapFunction<Relation, Relation>) r -> {
r.getDataInfo().setInvisible(false);
return r;
}, REL_KRYO_ENC);
Dataset<Relation> distinctRels = cleanedRels
.groupByKey( .groupByKey(
(MapFunction<Relation, String>) r -> String (MapFunction<Relation, String>) r -> String
.join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), .join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()),
@ -146,13 +132,33 @@ public class SparkPropagateRelation extends AbstractSparkAction {
return b; return b;
}) })
.map((MapFunction<Tuple2<String, Relation>, Relation>) Tuple2::_2, REL_BEAN_ENC); .map((MapFunction<Tuple2<String, Relation>, Relation>) Tuple2::_2, REL_BEAN_ENC);
final String outputRelationPath = graphOutputPath + "/relation";
removeOutputDir(spark, outputRelationPath);
save(
distinctRels
.union(mergeRels)
.filter("source != target AND dataInfo.deletedbyinference != true AND dataInfo.invisible != true"),
outputRelationPath,
SaveMode.Overwrite);
} }
private FilterFunction<Relation> getRelationFilterFunction() { static Dataset<Row> validIds(SparkSession spark, String graphBasePath) {
return r -> StringUtils.isNotBlank(r.getSource()) || StructType idsSchema = StructType
StringUtils.isNotBlank(r.getTarget()) || .fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>");
StringUtils.isNotBlank(r.getRelType()) ||
StringUtils.isNotBlank(r.getSubRelType()) || Dataset<Row> allIds = spark.emptyDataset(RowEncoder.apply(idsSchema));
StringUtils.isNotBlank(r.getRelClass());
for (EntityType entityType : ModelSupport.entityTypes.keySet()) {
String entityPath = graphBasePath + '/' + entityType.name();
if (HdfsSupport.exists(entityPath, spark.sparkContext().hadoopConfiguration())) {
allIds = allIds.union(spark.read().schema(idsSchema).json(entityPath));
}
}
return allIds
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
.select("id")
.distinct();
} }
} }

View File

@ -1,20 +0,0 @@
[
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of raw graph",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "inputPath",
"paramDescription": "the path to the input relation to cleanup",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "the path of the output relation cleaned",
"paramRequired": true
}
]

View File

@ -100,35 +100,9 @@
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg> <arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--graphOutputPath</arg><arg>${workingPath}/propagaterelation/</arg> <arg>--graphOutputPath</arg><arg>${graphOutputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
</spark> </spark>
<ok to="CleanRelation"/>
<error to="Kill"/>
</action>
<action name="CleanRelation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCleanRelation</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemoryOverhead}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--inputPath</arg><arg>${workingPath}/propagaterelation/relation</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
</spark>
<ok to="group_entities"/> <ok to="group_entities"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -152,31 +126,7 @@
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg> <arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg> <arg>--checkpointPath</arg><arg>${workingPath}/grouped_entities</arg>
</spark>
<ok to="dispatch_entities"/>
<error to="Kill"/>
</action>
<action name="dispatch_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch grouped entitities</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemoryOverhead}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}</arg> <arg>--outputPath</arg><arg>${graphOutputPath}</arg>
<arg>--filterInvisible</arg><arg>${filterInvisible}</arg> <arg>--filterInvisible</arg><arg>${filterInvisible}</arg>
</spark> </spark>

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory; import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count; import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.lenient;
@ -23,14 +22,13 @@ import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock; import org.mockito.Mock;
@ -46,8 +44,6 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ -62,6 +58,8 @@ public class SparkDedupTest implements Serializable {
private static String testGraphBasePath; private static String testGraphBasePath;
private static String testOutputBasePath; private static String testOutputBasePath;
private static String testDedupGraphBasePath; private static String testDedupGraphBasePath;
private static String testConsistencyGraphBasePath;
private static final String testActionSetId = "test-orchestrator"; private static final String testActionSetId = "test-orchestrator";
private static String whitelistPath; private static String whitelistPath;
private static List<String> whiteList; private static List<String> whiteList;
@ -75,6 +73,7 @@ public class SparkDedupTest implements Serializable {
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()) .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI())
.toFile() .toFile()
.getAbsolutePath(); .getAbsolutePath();
testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath() .toAbsolutePath()
.toString(); .toString();
@ -83,6 +82,10 @@ public class SparkDedupTest implements Serializable {
.toAbsolutePath() .toAbsolutePath()
.toString(); .toString();
testConsistencyGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
whitelistPath = Paths whitelistPath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/whitelist.simrels.txt").toURI()) .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/whitelist.simrels.txt").toURI())
.toFile() .toFile()
@ -674,22 +677,45 @@ public class SparkDedupTest implements Serializable {
assertEquals(mergedOrp, deletedOrp); assertEquals(mergedOrp, deletedOrp);
} }
@Test
@Order(6)
void copyRelationsNoOpenorgsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyRelationsNoOpenorgs.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath
});
new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService);
final Dataset<Row> outputRels = spark.read().text(testDedupGraphBasePath + "/relation");
System.out.println(outputRels.count());
// assertEquals(2382, outputRels.count());
}
@Test @Test
@Order(7) @Order(7)
void propagateRelationTest() throws Exception { void propagateRelationTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")); classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"));
String outputRelPath = testDedupGraphBasePath + "/propagaterelation";
parser parser
.parseArgument( .parseArgument(
new String[] { new String[] {
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", outputRelPath "-i", testDedupGraphBasePath, "-w", testOutputBasePath, "-o", testConsistencyGraphBasePath
}); });
new SparkPropagateRelation(parser, spark).run(isLookUpService); new SparkPropagateRelation(parser, spark).run(isLookUpService);
long relations = jsc.textFile(outputRelPath + "/relation").count(); long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
// assertEquals(4860, relations); // assertEquals(4860, relations);
System.out.println("relations = " + relations); System.out.println("relations = " + relations);
@ -699,95 +725,52 @@ public class SparkDedupTest implements Serializable {
.read() .read()
.load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")) .load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*"))
.as(Encoders.bean(Relation.class)); .as(Encoders.bean(Relation.class));
final JavaPairRDD<String, String> mergedIds = mergeRels
.where("relClass == 'merges'")
.select(mergeRels.col("target"))
.distinct()
.toJavaRDD()
.mapToPair(
(PairFunction<Row, String, String>) r -> new Tuple2<String, String>(r.getString(0), "d"));
JavaRDD<String> toCheck = jsc Dataset<Row> inputRels = spark
.textFile(outputRelPath + "/relation") .read()
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) .json(testDedupGraphBasePath + "/relation");
.join(mergedIds)
.map(t -> t._2()._1())
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json))
.join(mergedIds)
.map(t -> t._2()._1());
long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); Dataset<Row> outputRels = spark
long updated = toCheck.count(); .read()
.json(testConsistencyGraphBasePath + "/relation");
assertEquals(updated, deletedbyinference); assertEquals(
0, outputRels
.filter("dataInfo.deletedbyinference == true OR dataInfo.invisible == true")
.count());
assertEquals(
5, outputRels
.filter("relClass NOT IN ('merges', 'isMergedIn')")
.count());
assertEquals(5 + mergeRels.count(), outputRels.count());
} }
@Test @Test
@Order(8) @Order(8)
void testCleanBaseRelations() throws Exception { void testCleanedPropagatedRelations() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
// append dangling relations to be cleaned up
Dataset<Row> df_before = spark Dataset<Row> df_before = spark
.read() .read()
.schema(Encoders.bean(Relation.class).schema()) .schema(Encoders.bean(Relation.class).schema())
.json(testGraphBasePath + "/relation"); .json(testDedupGraphBasePath + "/relation");
Dataset<Row> df_input = df_before
.unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a")))
.unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a")));
df_input.write().mode(SaveMode.Overwrite).json(testOutputBasePath + "_tmp");
parser
.parseArgument(
new String[] {
"--graphBasePath", testGraphBasePath,
"--inputPath", testGraphBasePath + "/relation",
"--outputPath", testDedupGraphBasePath + "/relation"
});
new SparkCleanRelation(parser, spark).run(isLookUpService);
Dataset<Row> df_after = spark Dataset<Row> df_after = spark
.read() .read()
.schema(Encoders.bean(Relation.class).schema()) .schema(Encoders.bean(Relation.class).schema())
.json(testDedupGraphBasePath + "/relation"); .json(testConsistencyGraphBasePath + "/relation");
assertNotEquals(df_before.count(), df_input.count());
assertNotEquals(df_input.count(), df_after.count());
assertEquals(5, df_after.count());
}
@Test
@Order(9)
void testCleanDedupedRelations() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
String inputRelPath = testDedupGraphBasePath + "/propagaterelation/relation";
// append dangling relations to be cleaned up
Dataset<Row> df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputRelPath);
df_before.filter(col("dataInfo.deletedbyinference").notEqual(true)).show(50, false);
parser
.parseArgument(
new String[] {
"--graphBasePath", testGraphBasePath,
"--inputPath", inputRelPath,
"--outputPath", testDedupGraphBasePath + "/relation"
});
new SparkCleanRelation(parser, spark).run(isLookUpService);
Dataset<Row> df_after = spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(testDedupGraphBasePath + "/relation");
assertNotEquals(df_before.count(), df_after.count()); assertNotEquals(df_before.count(), df_after.count());
assertEquals(0, df_after.count());
assertEquals(
0, df_after
.filter("dataInfo.deletedbyinference == true OR dataInfo.invisible == true")
.count());
assertEquals(
5, df_after
.filter("relClass NOT IN ('merges', 'isMergedIn')")
.count());
} }
@Test @Test
@ -813,6 +796,7 @@ public class SparkDedupTest implements Serializable {
public static void finalCleanUp() throws IOException { public static void finalCleanUp() throws IOException {
FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
FileUtils.deleteDirectory(new File(testConsistencyGraphBasePath));
} }
public boolean isDeletedByInference(String s) { public boolean isDeletedByInference(String s) {

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory; import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.col;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.lenient;
@ -15,10 +16,6 @@ import java.nio.file.Paths;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
@ -33,8 +30,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ -44,11 +39,11 @@ public class SparkOpenorgsProvisionTest implements Serializable {
ISLookUpService isLookUpService; ISLookUpService isLookUpService;
private static SparkSession spark; private static SparkSession spark;
private static JavaSparkContext jsc;
private static String testGraphBasePath; private static String testGraphBasePath;
private static String testOutputBasePath; private static String testOutputBasePath;
private static String testDedupGraphBasePath; private static String testDedupGraphBasePath;
private static String testConsistencyGraphBasePath;
private static final String testActionSetId = "test-orchestrator"; private static final String testActionSetId = "test-orchestrator";
@BeforeAll @BeforeAll
@ -64,6 +59,9 @@ public class SparkOpenorgsProvisionTest implements Serializable {
testDedupGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-") testDedupGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-")
.toAbsolutePath() .toAbsolutePath()
.toString(); .toString();
testConsistencyGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
@ -76,8 +74,13 @@ public class SparkOpenorgsProvisionTest implements Serializable {
.master("local[*]") .master("local[*]")
.config(conf) .config(conf)
.getOrCreate(); .getOrCreate();
}
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @AfterAll
public static void finalCleanUp() throws IOException {
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
FileUtils.deleteDirectory(new File(testConsistencyGraphBasePath));
} }
@BeforeEach @BeforeEach
@ -186,26 +189,21 @@ public class SparkOpenorgsProvisionTest implements Serializable {
new SparkUpdateEntity(parser, spark).run(isLookUpService); new SparkUpdateEntity(parser, spark).run(isLookUpService);
long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count(); Dataset<Row> organizations = spark.read().json(testDedupGraphBasePath + "/organization");
long mergedOrgs = spark Dataset<Row> mergedOrgs = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.as(Encoders.bean(Relation.class))
.where("relClass=='merges'") .where("relClass=='merges'")
.javaRDD() .select("target")
.map(Relation::getTarget) .distinct();
.distinct()
.count();
assertEquals(80, organizations); assertEquals(80, organizations.count());
long deletedOrgs = jsc Dataset<Row> deletedOrgs = organizations
.textFile(testDedupGraphBasePath + "/organization") .filter("dataInfo.deletedbyinference = TRUE");
.filter(this::isDeletedByInference)
.count();
assertEquals(mergedOrgs, deletedOrgs); assertEquals(mergedOrgs.count(), deletedOrgs.count());
} }
@Test @Test
@ -226,10 +224,9 @@ public class SparkOpenorgsProvisionTest implements Serializable {
new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService); new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService);
final JavaRDD<String> rels = jsc.textFile(testDedupGraphBasePath + "/relation"); final Dataset<Row> outputRels = spark.read().text(testDedupGraphBasePath + "/relation");
assertEquals(2382, rels.count());
assertEquals(2382, outputRels.count());
} }
@Test @Test
@ -244,51 +241,41 @@ public class SparkOpenorgsProvisionTest implements Serializable {
parser parser
.parseArgument( .parseArgument(
new String[] { new String[] {
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath "-i", testDedupGraphBasePath, "-w", testOutputBasePath, "-o", testConsistencyGraphBasePath
}); });
new SparkPropagateRelation(parser, spark).run(isLookUpService); new SparkPropagateRelation(parser, spark).run(isLookUpService);
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4896, relations);
// check deletedbyinference
final Dataset<Relation> mergeRels = spark final Dataset<Relation> mergeRels = spark
.read() .read()
.load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")) .load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*"))
.as(Encoders.bean(Relation.class)); .as(Encoders.bean(Relation.class));
final JavaPairRDD<String, String> mergedIds = mergeRels
Dataset<Row> inputRels = spark
.read()
.json(testDedupGraphBasePath + "/relation");
Dataset<Row> outputRels = spark
.read()
.json(testConsistencyGraphBasePath + "/relation");
final Dataset<Row> mergedIds = mergeRels
.where("relClass == 'merges'") .where("relClass == 'merges'")
.select(mergeRels.col("target")) .select(col("target").as("id"))
.distinct() .distinct();
.toJavaRDD()
.mapToPair(
(PairFunction<Row, String, String>) r -> new Tuple2<String, String>(r.getString(0), "d"));
JavaRDD<String> toCheck = jsc Dataset<Row> toUpdateRels = inputRels
.textFile(testDedupGraphBasePath + "/relation") .as("rel")
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) .join(mergedIds.as("s"), col("rel.source").equalTo(col("s.id")), "left_outer")
.join(mergedIds) .join(mergedIds.as("t"), col("rel.target").equalTo(col("t.id")), "left_outer")
.map(t -> t._2()._1()) .filter("s.id IS NOT NULL OR t.id IS NOT NULL")
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json)) .distinct();
.join(mergedIds)
.map(t -> t._2()._1());
long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); Dataset<Row> updatedRels = inputRels
long updated = toCheck.count(); .select("source", "target", "relClass")
.except(outputRels.select("source", "target", "relClass"));
assertEquals(updated, deletedbyinference); assertEquals(toUpdateRels.count(), updatedRels.count());
assertEquals(140, outputRels.count());
} }
@AfterAll
public static void finalCleanUp() throws IOException {
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
}
public boolean isDeletedByInference(String s) {
return s.contains("\"deletedbyinference\":true");
}
} }

View File

@ -96,30 +96,7 @@
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg> <arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg> <arg>--checkpointPath</arg><arg>${workingPath}/grouped_entities</arg>
</spark>
<ok to="dispatch_entities"/>
<error to="Kill"/>
</action>
<action name="dispatch_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch grouped entities</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}</arg> <arg>--outputPath</arg><arg>${graphOutputPath}</arg>
<arg>--filterInvisible</arg><arg>${filterInvisible}</arg> <arg>--filterInvisible</arg><arg>${filterInvisible}</arg>
</spark> </spark>

View File

@ -1,16 +1,15 @@
package eu.dnetlib.dhp.oa.graph.group; package eu.dnetlib.dhp.oa.graph.group;
import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException; import eu.dnetlib.dhp.common.HdfsSupport;
import java.net.URISyntaxException; import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import java.nio.file.Files; import eu.dnetlib.dhp.schema.common.ModelSupport;
import java.nio.file.Path; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import java.nio.file.Paths; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -19,15 +18,13 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import com.fasterxml.jackson.databind.DeserializationFeature; import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper; import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import eu.dnetlib.dhp.common.HdfsSupport; import static org.junit.jupiter.api.Assertions.assertEquals;
import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob;
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class GroupEntitiesSparkJobTest { public class GroupEntitiesSparkJobTest {
@ -40,8 +37,9 @@ public class GroupEntitiesSparkJobTest {
private static Path workingDir; private static Path workingDir;
private Path dataInputPath; private Path dataInputPath;
private Path groupEntityPath; private Path checkpointPath;
private Path dispatchEntityPath;
private Path outputPath;
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
@ -58,8 +56,8 @@ public class GroupEntitiesSparkJobTest {
@BeforeEach @BeforeEach
public void beforeEach() throws IOException, URISyntaxException { public void beforeEach() throws IOException, URISyntaxException {
dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI());
groupEntityPath = workingDir.resolve("grouped_entity"); checkpointPath = workingDir.resolve("grouped_entity");
dispatchEntityPath = workingDir.resolve("dispatched_entity"); outputPath = workingDir.resolve("dispatched_entity");
} }
@AfterAll @AfterAll
@ -71,44 +69,35 @@ public class GroupEntitiesSparkJobTest {
@Test @Test
@Order(1) @Order(1)
void testGroupEntities() throws Exception { void testGroupEntities() throws Exception {
GroupEntitiesSparkJob.main(new String[] { GroupEntitiesSparkJob.main(new String[]{
"-isSparkSessionManaged", "-isSparkSessionManaged",
Boolean.FALSE.toString(), Boolean.FALSE.toString(),
"-graphInputPath", "-graphInputPath",
dataInputPath.toString(), dataInputPath.toString(),
"-checkpointPath",
checkpointPath.toString(),
"-outputPath", "-outputPath",
groupEntityPath.toString() outputPath.toString(),
"-filterInvisible",
Boolean.FALSE.toString()
}); });
Dataset<Result> output = spark Dataset<OafEntity> checkpointTable = spark
.read() .read()
.textFile(groupEntityPath.toString()) .load(checkpointPath.toString())
.map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) .selectExpr("COALESCE(*)")
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); .as(Encoders.kryo(OafEntity.class));
assertEquals( assertEquals(
1, 1,
output checkpointTable
.filter( .filter(
(FilterFunction<Result>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" (FilterFunction<OafEntity>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9"
.equals(r.getId()) && .equals(r.getId()) &&
r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
.count()); .count());
}
@Test
@Order(2)
void testDispatchEntities() throws Exception {
DispatchEntitiesSparkJob.main(new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
groupEntityPath.toString(),
"-outputPath",
dispatchEntityPath.resolve(".").toString(),
"-filterInvisible",
Boolean.TRUE.toString()
});
Dataset<Result> output = spark Dataset<Result> output = spark
.read() .read()
@ -116,7 +105,7 @@ public class GroupEntitiesSparkJobTest {
DHPUtils DHPUtils
.toSeq( .toSeq(
HdfsSupport HdfsSupport
.listFiles(dispatchEntityPath.toString(), spark.sparkContext().hadoopConfiguration()))) .listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration())))
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); .map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
assertEquals(3, output.count()); assertEquals(3, output.count());

View File

@ -49,7 +49,7 @@ public class DownloadCsvTest {
@Test @Test
void getUnibiFileTest() throws CollectorException, IOException, ClassNotFoundException { void getUnibiFileTest() throws CollectorException, IOException, ClassNotFoundException {
String fileURL = "https://pub.uni-bielefeld.de/download/2944717/2944718/issn_gold_oa_version_4.csv"; String fileURL = "https://pub.uni-bielefeld.de/download/2944717/2944718/issn_gold_oa_version_5.csv";
final String outputFile = workingDir + "/unibi_gold.json"; final String outputFile = workingDir + "/unibi_gold.json";
new DownloadCSV() new DownloadCSV()

View File

@ -1067,6 +1067,28 @@ class MappersTest {
System.out.println("***************"); System.out.println("***************");
} }
@Test
public void testD4ScienceTraining() throws IOException {
final String xml = IOUtils
.toString(Objects.requireNonNull(getClass().getResourceAsStream("d4science-1-training.xml")));
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
final OtherResearchProduct trainingMaterial = (OtherResearchProduct) list.get(0);
System.out.println("***************");
System.out.println(new ObjectMapper().writeValueAsString(trainingMaterial));
System.out.println("***************");
}
@Test
public void testD4ScienceDataset() throws IOException {
final String xml = IOUtils
.toString(Objects.requireNonNull(getClass().getResourceAsStream("d4science-2-dataset.xml")));
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
final Dataset trainingMaterial = (Dataset) list.get(0);
System.out.println("***************");
System.out.println(new ObjectMapper().writeValueAsString(trainingMaterial));
System.out.println("***************");
}
@Test @Test
void testNotWellFormed() throws IOException { void testNotWellFormed() throws IOException {
final String xml = IOUtils final String xml = IOUtils

View File

@ -0,0 +1,93 @@
<?xml version="1.0" encoding="UTF-8"?>
<oai:record xmlns:dr="http://www.driver-repository.eu/namespace/dr"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:oaf="http://namespace.openaire.eu/oaf" xmlns:oai="http://www.openarchives.org/OAI/2.0/">
<oai:header>
<dri:objIdentifier>alessia_____::104c2d4ba8878c16fa824dce5b1bea57</dri:objIdentifier>
<dri:recordIdentifier>12d8f77e-d66f-46f5-8d88-af7db23bc4c9</dri:recordIdentifier>
<dri:dateOfCollection>2023-09-08T10:12:35.864+02:00</dri:dateOfCollection>
<oaf:datasourceprefix>alessia_____</oaf:datasourceprefix>
<dr:dateOfTransformation>2023-09-08T11:31:45.692+02:00</dr:dateOfTransformation>
</oai:header>
<oai:metadata>
<datacite:resource
xmlns:datacite="http://datacite.org/schema/kernel-4"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://datacite.org/schema/kernel-4 http://schema.datacite.org/meta/kernel-4/metadata.xsd">
<datacite:identifier identifierType="URL">http://data.d4science.org/ctlg/ResourceCatalogue/visual_analytics_for_data_scientists</datacite:identifier>
<datacite:alternateIdentifiers/>
<datacite:creators>
<datacite:creator>
<datacite:creatorName>BRAGHIERI MARCO</datacite:creatorName>
</datacite:creator>
</datacite:creators>
<datacite:titles>
<datacite:title>Visual Analytics for Data Scientists</datacite:title>
</datacite:titles>
<datacite:publisher>SoBigData++</datacite:publisher>
<datacite:publicationYear/>
<datacite:dates>
<datacite:date dateType="Issued"/>
</datacite:dates>
<datacite:resourceType resourceTypeGeneral="TrainingMaterial">TrainingMaterial</datacite:resourceType>
<datacite:descriptions>
<datacite:description descriptionType="Abstract">Participants to this module shall
- Learn the principles and rules underlying the design of visual data
representations and human-computer interactions
- Understand, adapt and apply representative visual analytics methods and systems for diverse types
of data and problems
- Analyse and evaluate the structure and properties
of data to select or devise appropriate methods for data exploration
- Combine visualization, interactive techniques, and computational
processing to develop practical data analysis for problem solving
(This teaching material on Visual Analytics for Data Scientists is part of a MSc module at City University London).
The author did not intend to violate any copyright on figures or content. In case you are the legal owner of any copyrighted content, please contact info@sobigdata.eu and we will immediately remove it</datacite:description>
</datacite:descriptions>
<datacite:subjects>
<datacite:subject>Visual analytics</datacite:subject>
</datacite:subjects>
<datacite:formats>
<datacite:format>Slides</datacite:format>
<datacite:format>Other</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>ZIP</datacite:format>
</datacite:formats>
</datacite:resource>
<oaf:accessrights>OPEN</oaf:accessrights>
<dr:CobjCategory type="other">0010</dr:CobjCategory>
<oaf:dateAccepted/>
<oaf:hostedBy id="alessia_____::alessia" name="Alessia"/>
<oaf:collectedFrom id="alessia_____::alessia" name="Alessia"/>
<oaf:license>other-open</oaf:license>
<oaf:projectid>corda__h2020::871042</oaf:projectid>
</oai:metadata>
<about xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
<originDescription altered="true" harvestDate="2023-09-08T10:12:35.864+02:00">
<baseURL>https%3A%2F%2Fapi.d4science.org%2Fcatalogue%2Fitems</baseURL>
<identifier/>
<datestamp/>
<metadataNamespace/>
</originDescription>
</provenance>
<oaf:datainfo>
<oaf:inferred>false</oaf:inferred>
<oaf:deletedbyinference>false</oaf:deletedbyinference>
<oaf:trust>0.9</oaf:trust>
<oaf:inferenceprovenance/>
<oaf:provenanceaction classid="sysimport:crosswalk"
classname="Harvested" schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
</oaf:datainfo>
</about>
</oai:record>

View File

@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<oai:record xmlns:dr="http://www.driver-repository.eu/namespace/dr"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:oaf="http://namespace.openaire.eu/oaf" xmlns:oai="http://www.openarchives.org/OAI/2.0/">
<oai:header>
<dri:objIdentifier>alessia_____::028879484548f4e1c630e1c503e35231</dri:objIdentifier>
<dri:recordIdentifier>4fed018e-c2ff-4afa-b7b5-1ca1beebf850</dri:recordIdentifier>
<dri:dateOfCollection>2023-09-08T12:14:27.615+02:00</dri:dateOfCollection>
<oaf:datasourceprefix>alessia_____</oaf:datasourceprefix>
<dr:dateOfTransformation>2023-09-08T12:14:51.7+02:00</dr:dateOfTransformation>
</oai:header>
<oai:metadata>
<datacite:resource
xmlns:datacite="http://datacite.org/schema/kernel-4"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://datacite.org/schema/kernel-4 http://schema.datacite.org/meta/kernel-4/metadata.xsd">
<datacite:identifier identifierType="URL">http://data.d4science.org/ctlg/ResourceCatalogue/city-to-city_migration</datacite:identifier>
<datacite:alternateIdentifiers>
<datacite:alternateIdentifier type="URL"/>
</datacite:alternateIdentifiers>
<datacite:creators>
<datacite:creator>
<datacite:creatorName>Pappalardo, Luca</datacite:creatorName>
<datacite:affiliation/>
<datacite:nameIdentifier nameIdentifierScheme="ORCID" schemeURI="http://orcid.org">0000-0002-1547-6007</datacite:nameIdentifier>
</datacite:creator>
</datacite:creators>
<datacite:titles>
<datacite:title>City-to-city migration</datacite:title>
</datacite:titles>
<datacite:publisher>SoBigData++</datacite:publisher>
<datacite:publicationYear/>
<datacite:dates>
<datacite:date dateType="Issued">2018-02-15</datacite:date>
</datacite:dates>
<datacite:resourceType resourceTypeGeneral="Dataset">Dataset</datacite:resourceType>
<datacite:descriptions>
<datacite:description descriptionType="Abstract">Census data recording the migration of people between metropolitan areas in
the US</datacite:description>
</datacite:descriptions>
<datacite:subjects>
<datacite:subject>Human Mobility data</datacite:subject>
</datacite:subjects>
<datacite:formats/>
</datacite:resource>
<oaf:accessrights>OPEN</oaf:accessrights>
<dr:CobjCategory type="dataset">0021</dr:CobjCategory>
<oaf:dateAccepted>2018-02-15</oaf:dateAccepted>
<oaf:hostedBy id="alessia_____::alessia" name="Alessia"/>
<oaf:collectedFrom id="alessia_____::alessia" name="Alessia"/>
<oaf:license>AFL-3.0</oaf:license>
<oaf:projectid>corda__h2020::871042</oaf:projectid>
</oai:metadata>
<about xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
<originDescription altered="true" harvestDate="2023-09-08T12:14:27.615+02:00">
<baseURL>https%3A%2F%2Fapi.d4science.org%2Fcatalogue%2Fitems</baseURL>
<identifier/>
<datestamp/>
<metadataNamespace/>
</originDescription>
</provenance>
<oaf:datainfo>
<oaf:inferred>false</oaf:inferred>
<oaf:deletedbyinference>false</oaf:deletedbyinference>
<oaf:trust>0.9</oaf:trust>
<oaf:inferenceprovenance/>
<oaf:provenanceaction classid="sysimport:crosswalk"
classname="Harvested" schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
</oaf:datainfo>
</about>
</oai:record>

View File

@ -24,10 +24,7 @@ import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class XmlRecordFactoryTest { public class XmlRecordFactoryTest {
@ -196,4 +193,51 @@ public class XmlRecordFactoryTest {
assertEquals("dnet:pid_types", ((Element) pids.get(0)).attribute("schemeid").getValue()); assertEquals("dnet:pid_types", ((Element) pids.get(0)).attribute("schemeid").getValue());
assertEquals("dnet:pid_types", ((Element) pids.get(0)).attribute("schemename").getValue()); assertEquals("dnet:pid_types", ((Element) pids.get(0)).attribute("schemename").getValue());
} }
@Test
public void testD4ScienceTraining() throws DocumentException, IOException {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER
.readValue(
IOUtils.toString(getClass().getResourceAsStream("d4science-1-training.json")),
OtherResearchProduct.class);
final String xml = xmlRecordFactory.build(new JoinedEntity<>(p));
assertNotNull(xml);
final Document doc = new SAXReader().read(new StringReader(xml));
assertNotNull(doc);
System.out.println(doc.asXML());
}
@Test
public void testD4ScienceDataset() throws DocumentException, IOException {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER
.readValue(
IOUtils.toString(getClass().getResourceAsStream("d4science-2-dataset.json")),
OtherResearchProduct.class);
final String xml = xmlRecordFactory.build(new JoinedEntity<>(p));
assertNotNull(xml);
final Document doc = new SAXReader().read(new StringReader(xml));
assertNotNull(doc);
System.out.println(doc.asXML());
}
} }

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -39,7 +39,8 @@
<switch> <switch>
<!-- The default will be set as the normal start, a.k.a. get-doi-synonyms --> <!-- The default will be set as the normal start, a.k.a. get-doi-synonyms -->
<!-- If any different condition is set, go to the corresponding start --> <!-- If any different condition is set, go to the corresponding start -->
<case to="non-iterative-rankings">${wf:conf('resume') eq "rankings-start"}</case> <case to="spark-cc">${wf:conf('resume') eq "cc"}</case>
<case to="spark-ram">${wf:conf('resume') eq "ram"}</case>
<case to="spark-impulse">${wf:conf('resume') eq "impulse"}</case> <case to="spark-impulse">${wf:conf('resume') eq "impulse"}</case>
<case to="spark-pagerank">${wf:conf('resume') eq "pagerank"}</case> <case to="spark-pagerank">${wf:conf('resume') eq "pagerank"}</case>
<case to="spark-attrank">${wf:conf('resume') eq "attrank"}</case> <case to="spark-attrank">${wf:conf('resume') eq "attrank"}</case>
@ -89,18 +90,11 @@
<file>${nameNode}${wfAppPath}/create_openaire_ranking_graph.py#create_openaire_ranking_graph.py</file> <file>${nameNode}${wfAppPath}/create_openaire_ranking_graph.py#create_openaire_ranking_graph.py</file>
</spark> </spark>
<ok to="non-iterative-rankings" /> <ok to="spark-cc"/>
<error to="openaire-graph-error" /> <error to="openaire-graph-error" />
</action> </action>
<!-- Citation Count and RAM are calculated in parallel-->
<fork name="non-iterative-rankings">
<path start="spark-cc"/>
<!-- <path start="spark-impulse"/> -->
<path start="spark-ram"/>
</fork>
<!-- Run Citation Count calculation --> <!-- Run Citation Count calculation -->
<action name="spark-cc"> <action name="spark-cc">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
@ -129,7 +123,7 @@
<file>${wfAppPath}/bip-ranker/CC.py#CC.py</file> <file>${wfAppPath}/bip-ranker/CC.py#CC.py</file>
</spark> </spark>
<ok to="join-non-iterative-rankings" /> <ok to="spark-ram" />
<error to="cc-fail" /> <error to="cc-fail" />
</action> </action>
@ -165,14 +159,11 @@
<file>${wfAppPath}/bip-ranker/TAR.py#TAR.py</file> <file>${wfAppPath}/bip-ranker/TAR.py#TAR.py</file>
</spark> </spark>
<ok to="join-non-iterative-rankings" /> <ok to="spark-impulse" />
<error to="ram-fail" /> <error to="ram-fail" />
</action> </action>
<!-- Join non-iterative methods -->
<join name="join-non-iterative-rankings" to="spark-impulse"/>
<action name="spark-impulse"> <action name="spark-impulse">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">

10
pom.xml
View File

@ -112,6 +112,16 @@
<url>https://maven.d4science.org/nexus/content/repositories/dnet-deps</url> <url>https://maven.d4science.org/nexus/content/repositories/dnet-deps</url>
<layout>default</layout> <layout>default</layout>
</repository> </repository>
<repository>
<id>maven-restlet</id>
<name>Restlet repository</name>
<url>https://maven.restlet.talend.com</url>
</repository>
<repository>
<id>conjars</id>
<name>conjars</name>
<url>https://conjars.wensel.net/repo/</url>
</repository>
</repositories> </repositories>
<dependencies> <dependencies>