Merge branch 'beta' into consistency_keep_mergerels

This commit is contained in:
Claudio Atzori 2023-10-02 11:26:00 +02:00
commit 7b403a920f
27 changed files with 550 additions and 487 deletions

View File

@ -1,98 +0,0 @@
package eu.dnetlib.dhp.oa.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
public class DispatchEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
Objects
.requireNonNull(
DispatchEntitiesSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json")));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible"));
log.info("filterInvisible: {}", filterInvisible);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
dispatchEntities(spark, inputPath, outputPath, filterInvisible);
});
}
private static void dispatchEntities(
SparkSession spark,
String inputPath,
String outputPath,
boolean filterInvisible) {
Dataset<String> df = spark.read().textFile(inputPath);
ModelSupport.oafTypes.entrySet().parallelStream().forEach(entry -> {
String entityType = entry.getKey();
Class<?> clazz = entry.getValue();
if (!entityType.equalsIgnoreCase("relation")) {
Dataset<Row> entityDF = spark
.read()
.schema(Encoders.bean(clazz).schema())
.json(
df
.filter((FilterFunction<String>) s -> s.startsWith(clazz.getName()))
.map(
(MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"),
Encoders.STRING()));
if (filterInvisible) {
entityDF = entityDF.filter("dataInfo.invisible != true");
}
entityDF
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + entityType);
}
});
}
}

View File

@ -2,36 +2,28 @@
package eu.dnetlib.dhp.oa.merge; package eu.dnetlib.dhp.oa.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.when;
import java.io.IOException; import java.util.Map;
import java.util.List;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2; import scala.Tuple2;
@ -39,13 +31,9 @@ import scala.Tuple2;
* Groups the graph content by entity identifier to ensure ID uniqueness * Groups the graph content by entity identifier to ensure ID uniqueness
*/ */
public class GroupEntitiesSparkJob { public class GroupEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
private static final String ID_JPATH = "$.id"; private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -66,9 +54,15 @@ public class GroupEntitiesSparkJob {
String graphInputPath = parser.get("graphInputPath"); String graphInputPath = parser.get("graphInputPath");
log.info("graphInputPath: {}", graphInputPath); log.info("graphInputPath: {}", graphInputPath);
String checkpointPath = parser.get("checkpointPath");
log.info("checkpointPath: {}", checkpointPath);
String outputPath = parser.get("outputPath"); String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible"));
log.info("filterInvisible: {}", filterInvisible);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses()); conf.registerKryoClasses(ModelSupport.getOafModelClasses());
@ -78,126 +72,95 @@ public class GroupEntitiesSparkJob {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
groupEntities(spark, graphInputPath, outputPath); groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible);
}); });
} }
private static void groupEntities( private static void groupEntities(
SparkSession spark, SparkSession spark,
String inputPath, String inputPath,
String outputPath) { String checkpointPath,
String outputPath,
boolean filterInvisible) {
final TypedColumn<OafEntity, OafEntity> aggregator = new GroupingAggregator().toColumn(); Dataset<OafEntity> allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC);
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
spark for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
.read() String entity = e.getKey().name();
.textFile(toSeq(listEntityPaths(inputPath, sc))) Class<? extends OafEntity> entityClass = e.getValue();
.map((MapFunction<String, OafEntity>) GroupEntitiesSparkJob::parseOaf, Encoders.kryo(OafEntity.class)) String entityInputPath = inputPath + "/" + entity;
.filter((FilterFunction<OafEntity>) e -> StringUtils.isNotBlank(ModelSupport.idFn().apply(e)))
.groupByKey((MapFunction<OafEntity, String>) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) {
.agg(aggregator) continue;
}
allEntities = allEntities
.union(
((Dataset<OafEntity>) spark
.read()
.schema(Encoders.bean(entityClass).schema())
.json(entityInputPath)
.filter("length(id) > 0")
.as(Encoders.bean(entityClass)))
.map((MapFunction<OafEntity, OafEntity>) r -> r, OAFENTITY_KRYO_ENC));
}
Dataset<?> groupedEntities = allEntities
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
.reduceGroups((ReduceFunction<OafEntity>) (b, a) -> OafMapperUtils.mergeEntities(b, a))
.map( .map(
(MapFunction<Tuple2<String, OafEntity>, String>) t -> t._2().getClass().getName() + (MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2(
"|" + OBJECT_MAPPER.writeValueAsString(t._2()), t._2().getClass().getName(), t._2()),
Encoders.STRING()) Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
// pivot on "_1" (classname of the entity)
// created columns containing only entities of the same class
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();
groupedEntities = groupedEntities
.withColumn(
entity,
when(col("_1").equalTo(entityClass.getName()), col("_2")));
}
groupedEntities
.drop("_1", "_2")
.write() .write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.text(outputPath); .option("compression", "gzip")
} .save(checkpointPath);
public static class GroupingAggregator extends Aggregator<OafEntity, OafEntity, OafEntity> { ForkJoinPool parPool = new ForkJoinPool(ModelSupport.entityTypes.size());
@Override ModelSupport.entityTypes
public OafEntity zero() { .entrySet()
return null;
}
@Override
public OafEntity reduce(OafEntity b, OafEntity a) {
return mergeAndGet(b, a);
}
private OafEntity mergeAndGet(OafEntity b, OafEntity a) {
if (Objects.nonNull(a) && Objects.nonNull(b)) {
return OafMapperUtils.mergeEntities(b, a);
}
return Objects.isNull(a) ? b : a;
}
@Override
public OafEntity merge(OafEntity b, OafEntity a) {
return mergeAndGet(b, a);
}
@Override
public OafEntity finish(OafEntity j) {
return j;
}
@Override
public Encoder<OafEntity> bufferEncoder() {
return Encoders.kryo(OafEntity.class);
}
@Override
public Encoder<OafEntity> outputEncoder() {
return Encoders.kryo(OafEntity.class);
}
}
private static OafEntity parseOaf(String s) {
DocumentContext dc = JsonPath
.parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
final String id = dc.read(ID_JPATH);
if (StringUtils.isNotBlank(id)) {
String prefix = StringUtils.substringBefore(id, "|");
switch (prefix) {
case "10":
return parse(s, Datasource.class);
case "20":
return parse(s, Organization.class);
case "40":
return parse(s, Project.class);
case "50":
String resultType = dc.read("$.resulttype.classid");
switch (resultType) {
case "publication":
return parse(s, Publication.class);
case "dataset":
return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class);
case "software":
return parse(s, Software.class);
case "other":
return parse(s, OtherResearchProduct.class);
default:
throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType));
}
default:
throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix));
}
} else {
throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s));
}
}
private static <T extends OafEntity> OafEntity parse(String s, Class<T> clazz) {
try {
return OBJECT_MAPPER.readValue(s, clazz);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
private static List<String> listEntityPaths(String inputPath, JavaSparkContext sc) {
return HdfsSupport
.listFiles(inputPath, sc.hadoopConfiguration())
.stream() .stream()
.filter(f -> !f.toLowerCase().contains("relation")) .map(e -> parPool.submit(() -> {
.collect(Collectors.toList()); String entity = e.getKey().name();
} Class<? extends OafEntity> entityClass = e.getValue();
spark
.read()
.load(checkpointPath)
.select(col(entity).as("value"))
.filter("value IS NOT NULL")
.as(OAFENTITY_KRYO_ENC)
.map((MapFunction<OafEntity, OafEntity>) r -> r, (Encoder<OafEntity>) Encoders.bean(entityClass))
.filter(filterInvisible ? "dataInfo.invisible != TRUE" : "TRUE")
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + entity);
}))
.collect(Collectors.toList())
.forEach(t -> {
try {
t.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
} }

View File

@ -1,26 +0,0 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "i",
"paramLongName": "inputPath",
"paramDescription": "the source path",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "path of the output graph",
"paramRequired": true
},
{
"paramName": "fi",
"paramLongName": "filterInvisible",
"paramDescription": "if true filters out invisible entities",
"paramRequired": true
}
]

View File

@ -8,13 +8,25 @@
{ {
"paramName": "gin", "paramName": "gin",
"paramLongName": "graphInputPath", "paramLongName": "graphInputPath",
"paramDescription": "the graph root path", "paramDescription": "the input graph root path",
"paramRequired": true
},
{
"paramName": "cp",
"paramLongName": "checkpointPath",
"paramDescription": "checkpoint directory",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "out", "paramName": "out",
"paramLongName": "outputPath", "paramLongName": "outputPath",
"paramDescription": "the output merged graph root path", "paramDescription": "the output graph root path",
"paramRequired": true
},
{
"paramName": "fi",
"paramLongName": "filterInvisible",
"paramDescription": "if true filters out invisible entities",
"paramRequired": true "paramRequired": true
} }
] ]

View File

@ -81,7 +81,7 @@ case class SparkModel(conf: DedupConfig) {
MapDocumentUtil.truncateList( MapDocumentUtil.truncateList(
MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType), MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType),
fdef.getSize fdef.getSize
).toArray ).asScala
case Type.StringConcat => case Type.StringConcat =>
val jpaths = CONCAT_REGEX.split(fdef.getPath) val jpaths = CONCAT_REGEX.split(fdef.getPath)

View File

@ -1,6 +1,23 @@
package eu.dnetlib.pace.util; package eu.dnetlib.pace.util;
/*
* Diff Match and Patch
* Copyright 2018 The diff-match-patch Authors.
* https://github.com/google/diff-match-patch
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* /*
* Diff Match and Patch * Diff Match and Patch
* Copyright 2018 The diff-match-patch Authors. * Copyright 2018 The diff-match-patch Authors.

View File

@ -80,16 +80,15 @@ public class PrepareAffiliationRelations implements Serializable {
// load and parse affiliation relations from HDFS // load and parse affiliation relations from HDFS
Dataset<Row> df = spark Dataset<Row> df = spark
.read() .read()
.schema("`DOI` STRING, `Matchings` ARRAY<STRUCT<`RORid`:ARRAY<STRING>,`Confidence`:DOUBLE>>") .schema("`DOI` STRING, `Matchings` ARRAY<STRUCT<`RORid`:STRING,`Confidence`:DOUBLE>>")
.json(inputPath); .json(inputPath);
// unroll nested arrays // unroll nested arrays
df = df df = df
.withColumn("matching", functions.explode(new Column("Matchings"))) .withColumn("matching", functions.explode(new Column("Matchings")))
.withColumn("rorid", functions.explode(new Column("matching.RORid")))
.select( .select(
new Column("DOI").as("doi"), new Column("DOI").as("doi"),
new Column("rorid"), new Column("matching.RORid").as("rorid"),
new Column("matching.Confidence").as("confidence")); new Column("matching.Confidence").as("confidence"));
// prepare action sets for affiliation relations // prepare action sets for affiliation relations
@ -121,8 +120,10 @@ public class PrepareAffiliationRelations implements Serializable {
qualifier, qualifier,
Double.toString(row.getAs("confidence"))); Double.toString(row.getAs("confidence")));
List<KeyValue> collectedfrom = OafMapperUtils.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
// return bi-directional relations // return bi-directional relations
return getAffiliationRelationPair(paperId, affId, dataInfo).iterator(); return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator();
}) })
.map(p -> new AtomicAction(Relation.class, p)) .map(p -> new AtomicAction(Relation.class, p))
@ -133,7 +134,8 @@ public class PrepareAffiliationRelations implements Serializable {
} }
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, DataInfo dataInfo) { private static List<Relation> getAffiliationRelationPair(String paperId, String affId, List<KeyValue> collectedfrom,
DataInfo dataInfo) {
return Arrays return Arrays
.asList( .asList(
OafMapperUtils OafMapperUtils
@ -143,7 +145,7 @@ public class PrepareAffiliationRelations implements Serializable {
ModelConstants.RESULT_ORGANIZATION, ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION, ModelConstants.AFFILIATION,
ModelConstants.HAS_AUTHOR_INSTITUTION, ModelConstants.HAS_AUTHOR_INSTITUTION,
null, collectedfrom,
dataInfo, dataInfo,
null), null),
OafMapperUtils OafMapperUtils
@ -153,7 +155,7 @@ public class PrepareAffiliationRelations implements Serializable {
ModelConstants.RESULT_ORGANIZATION, ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION, ModelConstants.AFFILIATION,
ModelConstants.IS_AUTHOR_INSTITUTION_OF, ModelConstants.IS_AUTHOR_INSTITUTION_OF,
null, collectedfrom,
dataInfo, dataInfo,
null)); null));
} }

View File

@ -31,5 +31,5 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen
# The following is needed as a property of a workflow # The following is needed as a property of a workflow
oozie.wf.application.path=${oozieTopWfApplicationPath} oozie.wf.application.path=${oozieTopWfApplicationPath}
inputPath=/user/schatz/affiliations/data-v3.1.json inputPath=/data/bip-affiliations/data.json
outputPath=/tmp/crossref-affiliations-output-v3.1 outputPath=/tmp/crossref-affiliations-output-v5

View File

@ -1,4 +1,9 @@
{ {
"ETHZ.UNIGENF": {
"openaire_id": "opendoar____::1400",
"datacite_name": "Uni Genf",
"official_name": "Archive ouverte UNIGE"
},
"GESIS.RKI": { "GESIS.RKI": {
"openaire_id": "re3data_____::r3d100010436", "openaire_id": "re3data_____::r3d100010436",
"datacite_name": "Forschungsdatenzentrum am Robert Koch Institut", "datacite_name": "Forschungsdatenzentrum am Robert Koch Institut",

View File

@ -101,7 +101,7 @@ public class PrepareAffiliationRelationsTest {
// ); // );
// } // }
// count the number of relations // count the number of relations
assertEquals(16, tmp.count()); assertEquals(20, tmp.count());
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
dataset.createOrReplaceTempView("result"); dataset.createOrReplaceTempView("result");
@ -112,7 +112,7 @@ public class PrepareAffiliationRelationsTest {
// verify that we have equal number of bi-directional relations // verify that we have equal number of bi-directional relations
Assertions Assertions
.assertEquals( .assertEquals(
8, execVerification 10, execVerification
.filter( .filter(
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
.collectAsList() .collectAsList()
@ -120,14 +120,14 @@ public class PrepareAffiliationRelationsTest {
Assertions Assertions
.assertEquals( .assertEquals(
8, execVerification 10, execVerification
.filter( .filter(
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'") "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
.collectAsList() .collectAsList()
.size()); .size());
// check confidence value of a specific relation // check confidence value of a specific relation
String sourceDOI = "10.1105/tpc.8.3.343"; String sourceDOI = "10.1061/(asce)0733-9399(2002)128:7(759)";
final String sourceOpenaireId = ID_PREFIX final String sourceOpenaireId = ID_PREFIX
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", sourceDOI)); + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", sourceDOI));

View File

@ -1,6 +1,7 @@
{"DOI":"10.1061\/(asce)0733-9399(2002)128:7(759)","Matchings":[{"RORid":["https:\/\/ror.org\/01teme464"],"Confidence":0.73},{"RORid":["https:\/\/ror.org\/03yxnpp24"],"Confidence":0.7071067812}]} {"DOI":"10.1061\/(asce)0733-9399(2002)128:7(759)","Matchings":[{"RORid":"https:\/\/ror.org\/03yxnpp24","Confidence":0.7071067812},{"RORid":"https:\/\/ror.org\/01teme464","Confidence":0.89}]}
{"DOI":"10.1105\/tpc.8.3.343","Matchings":[{"RORid":["https:\/\/ror.org\/02k40bc56"],"Confidence":0.7071067812}]} {"DOI":"10.1105\/tpc.8.3.343","Matchings":[{"RORid":"https:\/\/ror.org\/02k40bc56","Confidence":0.7071067812}]}
{"DOI":"10.1161\/01.cir.0000013305.01850.37","Matchings":[{"RORid":["https:\/\/ror.org\/00qjgza05"],"Confidence":1}]} {"DOI":"10.1161\/01.cir.0000013305.01850.37","Matchings":[{"RORid":"https:\/\/ror.org\/00qjgza05","Confidence":1}]}
{"DOI":"10.1142\/s021821650200186x","Matchings":[{"RORid":["https:\/\/ror.org\/05apxxy63"],"Confidence":1},{"RORid":["https:\/\/ror.org\/035xkbk20"],"Confidence":1}]} {"DOI":"10.1142\/s021821650200186x","Matchings":[{"RORid":"https:\/\/ror.org\/035xkbk20","Confidence":1},{"RORid":"https:\/\/ror.org\/05apxxy63","Confidence":1}]}
{"DOI":"10.1061\/(asce)0733-9372(2002)128:7(575)","Matchings":[{"RORid":["https:\/\/ror.org\/04j198w64"],"Confidence":0.58}]} {"DOI":"10.1061\/(asce)0733-9372(2002)128:7(575)","Matchings":[{"RORid":"https:\/\/ror.org\/04j198w64","Confidence":0.82}]}
{"DOI":"10.1161\/hy0202.103001","Matchings":[{"RORid":["https:\/\/ror.org\/057xtrt18"],"Confidence":0.7071067812}]} {"DOI":"10.1061\/(asce)0733-9372(2002)128:7(588)","Matchings":[{"RORid":"https:\/\/ror.org\/03m8km719","Confidence":0.8660254038},{"RORid":"https:\/\/ror.org\/02aze4h65","Confidence":0.87}]}
{"DOI":"10.1161\/hy0202.103001","Matchings":[{"RORid":"https:\/\/ror.org\/057xtrt18","Confidence":0.7071067812}]}

View File

@ -2,7 +2,9 @@
package eu.dnetlib.dhp.broker.oa.util; package eu.dnetlib.dhp.broker.oa.util;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.commons.io.IOUtils;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -27,10 +29,14 @@ public class TrustUtils {
static { static {
mapper = new ObjectMapper(); mapper = new ObjectMapper();
try { try {
dedupConfig = mapper dedupConfig = DedupConfig
.readValue( .load(
DedupConfig.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"), IOUtils
DedupConfig.class); .toString(
DedupConfig.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"),
StandardCharsets.UTF_8));
deduper = new SparkDeduper(dedupConfig); deduper = new SparkDeduper(dedupConfig);
} catch (final IOException e) { } catch (final IOException e) {
log.error("Error loading dedupConfig, e"); log.error("Error loading dedupConfig, e");
@ -57,7 +63,7 @@ public class TrustUtils {
return TrustUtils.rescale(score, threshold); return TrustUtils.rescale(score, threshold);
} catch (final Exception e) { } catch (final Exception e) {
log.error("Error computing score between results", e); log.error("Error computing score between results", e);
return BrokerConstants.MIN_TRUST; throw new RuntimeException(e);
} }
} }

View File

@ -126,31 +126,7 @@
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg> <arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg> <arg>--checkpointPath</arg><arg>${workingPath}/grouped_entities</arg>
</spark>
<ok to="dispatch_entities"/>
<error to="Kill"/>
</action>
<action name="dispatch_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch grouped entitities</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemoryOverhead}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}</arg> <arg>--outputPath</arg><arg>${graphOutputPath}</arg>
<arg>--filterInvisible</arg><arg>${filterInvisible}</arg> <arg>--filterInvisible</arg><arg>${filterInvisible}</arg>
</spark> </spark>

View File

@ -117,14 +117,14 @@ public abstract class AbstractMdRecordToOafMapper {
return Lists.newArrayList(); return Lists.newArrayList();
} }
final DataInfo info = prepareDataInfo(doc, invisible); final DataInfo entityInfo = prepareDataInfo(doc, invisible);
final long lastUpdateTimestamp = new Date().getTime(); final long lastUpdateTimestamp = new Date().getTime();
final List<Instance> instances = prepareInstances(doc, info, collectedFrom, hostedBy); final List<Instance> instances = prepareInstances(doc, entityInfo, collectedFrom, hostedBy);
final String type = getResultType(doc, instances); final String type = getResultType(doc, instances);
return createOafs(doc, type, instances, collectedFrom, info, lastUpdateTimestamp); return createOafs(doc, type, instances, collectedFrom, entityInfo, lastUpdateTimestamp);
} catch (DocumentException e) { } catch (DocumentException e) {
log.error("Error with record:\n" + xml); log.error("Error with record:\n" + xml);
return Lists.newArrayList(); return Lists.newArrayList();
@ -184,13 +184,15 @@ public abstract class AbstractMdRecordToOafMapper {
final List<Oaf> oafs = Lists.newArrayList(entity); final List<Oaf> oafs = Lists.newArrayList(entity);
final DataInfo relationInfo = prepareDataInfo(doc, false);
if (!oafs.isEmpty()) { if (!oafs.isEmpty()) {
Set<Oaf> rels = Sets.newHashSet(); Set<Oaf> rels = Sets.newHashSet();
rels.addAll(addProjectRels(doc, entity)); rels.addAll(addProjectRels(doc, entity, relationInfo));
rels.addAll(addOtherResultRels(doc, entity)); rels.addAll(addOtherResultRels(doc, entity, relationInfo));
rels.addAll(addRelations(doc, entity)); rels.addAll(addRelations(doc, entity, relationInfo));
rels.addAll(addAffiliations(doc, entity)); rels.addAll(addAffiliations(doc, entity, relationInfo));
oafs.addAll(rels); oafs.addAll(rels);
} }
@ -243,7 +245,7 @@ public abstract class AbstractMdRecordToOafMapper {
private List<Oaf> addProjectRels( private List<Oaf> addProjectRels(
final Document doc, final Document doc,
final OafEntity entity) { final OafEntity entity, DataInfo info) {
final List<Oaf> res = new ArrayList<>(); final List<Oaf> res = new ArrayList<>();
@ -262,18 +264,21 @@ public abstract class AbstractMdRecordToOafMapper {
.add( .add(
OafMapperUtils OafMapperUtils
.getRelation( .getRelation(
docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, entity, validationdDate)); docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, entity.getCollectedfrom(),
info, entity.getLastupdatetimestamp(), validationdDate, null));
res res
.add( .add(
OafMapperUtils OafMapperUtils
.getRelation(projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity, validationdDate)); .getRelation(
projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity.getCollectedfrom(), info,
entity.getLastupdatetimestamp(), validationdDate, null));
} }
} }
return res; return res;
} }
private List<Oaf> addRelations(Document doc, OafEntity entity) { private List<Oaf> addRelations(Document doc, OafEntity entity, DataInfo info) {
final List<Oaf> rels = Lists.newArrayList(); final List<Oaf> rels = Lists.newArrayList();
@ -301,14 +306,16 @@ public abstract class AbstractMdRecordToOafMapper {
.add( .add(
OafMapperUtils OafMapperUtils
.getRelation( .getRelation(
entity.getId(), targetId, relType, subRelType, relClass, entity, entity.getId(), targetId, relType, subRelType, relClass,
validationDate)); entity.getCollectedfrom(), info,
entity.getLastupdatetimestamp(), validationDate, null));
rels rels
.add( .add(
OafMapperUtils OafMapperUtils
.getRelation( .getRelation(
targetId, entity.getId(), relType, subRelType, relClassInverse, entity, targetId, entity.getId(), relType, subRelType, relClassInverse,
validationDate)); entity.getCollectedfrom(), info,
entity.getLastupdatetimestamp(), validationDate, null));
} }
} }
} }
@ -316,7 +323,7 @@ public abstract class AbstractMdRecordToOafMapper {
return rels; return rels;
} }
private List<Oaf> addAffiliations(Document doc, OafEntity entity) { private List<Oaf> addAffiliations(Document doc, OafEntity entity, DataInfo info) {
final List<Oaf> rels = Lists.newArrayList(); final List<Oaf> rels = Lists.newArrayList();
for (Object o : doc.selectNodes("//datacite:affiliation[@affiliationIdentifierScheme='ROR']")) { for (Object o : doc.selectNodes("//datacite:affiliation[@affiliationIdentifierScheme='ROR']")) {
@ -345,14 +352,14 @@ public abstract class AbstractMdRecordToOafMapper {
OafMapperUtils OafMapperUtils
.getRelation( .getRelation(
resultId, orgId, RESULT_ORGANIZATION, AFFILIATION, HAS_AUTHOR_INSTITUTION, resultId, orgId, RESULT_ORGANIZATION, AFFILIATION, HAS_AUTHOR_INSTITUTION,
entity.getCollectedfrom(), entity.getDataInfo(), entity.getLastupdatetimestamp(), null, entity.getCollectedfrom(), info, entity.getLastupdatetimestamp(), null,
properties)); properties));
rels rels
.add( .add(
OafMapperUtils OafMapperUtils
.getRelation( .getRelation(
orgId, resultId, RESULT_ORGANIZATION, AFFILIATION, IS_AUTHOR_INSTITUTION_OF, orgId, resultId, RESULT_ORGANIZATION, AFFILIATION, IS_AUTHOR_INSTITUTION_OF,
entity.getCollectedfrom(), entity.getDataInfo(), entity.getLastupdatetimestamp(), null, entity.getCollectedfrom(), info, entity.getLastupdatetimestamp(), null,
properties)); properties));
} }
} }
@ -361,7 +368,7 @@ public abstract class AbstractMdRecordToOafMapper {
protected abstract List<Oaf> addOtherResultRels( protected abstract List<Oaf> addOtherResultRels(
final Document doc, final Document doc,
final OafEntity entity); final OafEntity entity, DataInfo info);
private void populateResultFields( private void populateResultFields(
final Result r, final Result r,

View File

@ -4,7 +4,6 @@ package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*; import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.*; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.*;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
@ -292,7 +291,7 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
@Override @Override
protected List<Oaf> addOtherResultRels( protected List<Oaf> addOtherResultRels(
final Document doc, final Document doc,
final OafEntity entity) { final OafEntity entity, DataInfo info) {
final String docId = entity.getId(); final String docId = entity.getId();
final List<Oaf> res = new ArrayList<>(); final List<Oaf> res = new ArrayList<>();
@ -308,11 +307,13 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
res res
.add( .add(
getRelation( getRelation(
docId, otherId, RESULT_RESULT, RELATIONSHIP, IS_RELATED_TO, entity)); docId, otherId, RESULT_RESULT, RELATIONSHIP, IS_RELATED_TO, entity.getCollectedfrom(), info,
entity.getLastupdatetimestamp(), null, null));
res res
.add( .add(
getRelation( getRelation(
otherId, docId, RESULT_RESULT, RELATIONSHIP, IS_RELATED_TO, entity)); otherId, docId, RESULT_RESULT, RELATIONSHIP, IS_RELATED_TO, entity.getCollectedfrom(), info,
entity.getLastupdatetimestamp(), null, null));
} }
} }
return res; return res;

View File

@ -5,15 +5,11 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.*; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.structuredProperty; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.structuredProperty;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.validator.routines.UrlValidator;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.Node; import org.dom4j.Node;
@ -27,7 +23,6 @@ import eu.dnetlib.dhp.schema.common.RelationInverse;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
public class OdfToOafMapper extends AbstractMdRecordToOafMapper { public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
@ -397,7 +392,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
@Override @Override
protected List<Oaf> addOtherResultRels( protected List<Oaf> addOtherResultRels(
final Document doc, final Document doc,
final OafEntity entity) { final OafEntity entity, DataInfo info) {
final String docId = entity.getId(); final String docId = entity.getId();
@ -413,7 +408,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
final String relType = ((Node) o).valueOf("@relationType"); final String relType = ((Node) o).valueOf("@relationType");
String otherId = guessRelatedIdentifier(idType, originalId); String otherId = guessRelatedIdentifier(idType, originalId);
if (StringUtils.isNotBlank(otherId)) { if (StringUtils.isNotBlank(otherId)) {
res.addAll(getRelations(relType, docId, otherId, entity)); res.addAll(getRelations(relType, docId, otherId, entity, info));
} }
} }
@ -434,18 +429,20 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
} }
protected List<Oaf> getRelations(final String reltype, final String entityId, final String otherId, protected List<Oaf> getRelations(final String reltype, final String entityId, final String otherId,
final OafEntity entity) { final OafEntity entity, DataInfo info) {
final List<Oaf> res = new ArrayList<>(); final List<Oaf> res = new ArrayList<>();
RelationInverse rel = ModelSupport.findRelation(reltype); RelationInverse rel = ModelSupport.findRelation(reltype);
if (rel != null) { if (rel != null) {
res res
.add( .add(
getRelation( getRelation(
entityId, otherId, rel.getRelType(), rel.getSubReltype(), rel.getRelClass(), entity)); entityId, otherId, rel.getRelType(), rel.getSubReltype(), rel.getRelClass(),
entity.getCollectedfrom(), info, entity.getLastupdatetimestamp(), null, null));
res res
.add( .add(
getRelation( getRelation(
otherId, entityId, rel.getRelType(), rel.getSubReltype(), rel.getInverseRelClass(), entity)); otherId, entityId, rel.getRelType(), rel.getSubReltype(), rel.getInverseRelClass(),
entity.getCollectedfrom(), info, entity.getLastupdatetimestamp(), null, null));
} }
return res; return res;

View File

@ -96,30 +96,7 @@
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg> <arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg> <arg>--checkpointPath</arg><arg>${workingPath}/grouped_entities</arg>
</spark>
<ok to="dispatch_entities"/>
<error to="Kill"/>
</action>
<action name="dispatch_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch grouped entities</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}</arg> <arg>--outputPath</arg><arg>${graphOutputPath}</arg>
<arg>--filterInvisible</arg><arg>${filterInvisible}</arg> <arg>--filterInvisible</arg><arg>${filterInvisible}</arg>
</spark> </spark>

View File

@ -1,16 +1,15 @@
package eu.dnetlib.dhp.oa.graph.group; package eu.dnetlib.dhp.oa.graph.group;
import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException; import eu.dnetlib.dhp.common.HdfsSupport;
import java.net.URISyntaxException; import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import java.nio.file.Files; import eu.dnetlib.dhp.schema.common.ModelSupport;
import java.nio.file.Path; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import java.nio.file.Paths; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -19,118 +18,108 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import com.fasterxml.jackson.databind.DeserializationFeature; import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper; import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import eu.dnetlib.dhp.common.HdfsSupport; import static org.junit.jupiter.api.Assertions.assertEquals;
import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob;
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class GroupEntitiesSparkJobTest { public class GroupEntitiesSparkJobTest {
private static SparkSession spark; private static SparkSession spark;
private static ObjectMapper mapper = new ObjectMapper() private static ObjectMapper mapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static Path workingDir; private static Path workingDir;
private Path dataInputPath; private Path dataInputPath;
private Path groupEntityPath; private Path checkpointPath;
private Path dispatchEntityPath;
@BeforeAll private Path outputPath;
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName());
SparkConf conf = new SparkConf(); @BeforeAll
conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); public static void beforeAll() throws IOException {
conf.setMaster("local"); workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName());
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
spark = SparkSession.builder().config(conf).getOrCreate();
}
@BeforeEach SparkConf conf = new SparkConf();
public void beforeEach() throws IOException, URISyntaxException { conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName());
dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); conf.setMaster("local");
groupEntityPath = workingDir.resolve("grouped_entity"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
dispatchEntityPath = workingDir.resolve("dispatched_entity"); conf.registerKryoClasses(ModelSupport.getOafModelClasses());
} spark = SparkSession.builder().config(conf).getOrCreate();
}
@AfterAll @BeforeEach
public static void afterAll() throws IOException { public void beforeEach() throws IOException, URISyntaxException {
spark.stop(); dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI());
FileUtils.deleteDirectory(workingDir.toFile()); checkpointPath = workingDir.resolve("grouped_entity");
} outputPath = workingDir.resolve("dispatched_entity");
}
@Test @AfterAll
@Order(1) public static void afterAll() throws IOException {
void testGroupEntities() throws Exception { spark.stop();
GroupEntitiesSparkJob.main(new String[] { FileUtils.deleteDirectory(workingDir.toFile());
"-isSparkSessionManaged", }
Boolean.FALSE.toString(),
"-graphInputPath",
dataInputPath.toString(),
"-outputPath",
groupEntityPath.toString()
});
Dataset<Result> output = spark @Test
.read() @Order(1)
.textFile(groupEntityPath.toString()) void testGroupEntities() throws Exception {
.map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) GroupEntitiesSparkJob.main(new String[]{
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); "-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-graphInputPath",
dataInputPath.toString(),
"-checkpointPath",
checkpointPath.toString(),
"-outputPath",
outputPath.toString(),
"-filterInvisible",
Boolean.FALSE.toString()
});
assertEquals( Dataset<OafEntity> checkpointTable = spark
1, .read()
output .load(checkpointPath.toString())
.filter( .selectExpr("COALESCE(*)")
(FilterFunction<Result>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" .as(Encoders.kryo(OafEntity.class));
.equals(r.getId()) &&
r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
.count());
}
@Test
@Order(2)
void testDispatchEntities() throws Exception {
DispatchEntitiesSparkJob.main(new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
groupEntityPath.toString(),
"-outputPath",
dispatchEntityPath.resolve(".").toString(),
"-filterInvisible",
Boolean.TRUE.toString()
});
Dataset<Result> output = spark assertEquals(
.read() 1,
.textFile( checkpointTable
DHPUtils .filter(
.toSeq( (FilterFunction<OafEntity>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9"
HdfsSupport .equals(r.getId()) &&
.listFiles(dispatchEntityPath.toString(), spark.sparkContext().hadoopConfiguration()))) r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); .count());
assertEquals(3, output.count());
assertEquals( Dataset<Result> output = spark
2, .read()
output .textFile(
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING()) DHPUtils
.filter((FilterFunction<String>) s -> s.equals("publication")) .toSeq(
.count()); HdfsSupport
assertEquals( .listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration())))
1, .map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
output
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING()) assertEquals(3, output.count());
.filter((FilterFunction<String>) s -> s.equals("dataset")) assertEquals(
.count()); 2,
} output
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
.filter((FilterFunction<String>) s -> s.equals("publication"))
.count());
assertEquals(
1,
output
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
.filter((FilterFunction<String>) s -> s.equals("dataset"))
.count());
}
} }

View File

@ -49,7 +49,7 @@ public class DownloadCsvTest {
@Test @Test
void getUnibiFileTest() throws CollectorException, IOException, ClassNotFoundException { void getUnibiFileTest() throws CollectorException, IOException, ClassNotFoundException {
String fileURL = "https://pub.uni-bielefeld.de/download/2944717/2944718/issn_gold_oa_version_4.csv"; String fileURL = "https://pub.uni-bielefeld.de/download/2944717/2944718/issn_gold_oa_version_5.csv";
final String outputFile = workingDir + "/unibi_gold.json"; final String outputFile = workingDir + "/unibi_gold.json";
new DownloadCSV() new DownloadCSV()

View File

@ -1067,6 +1067,28 @@ class MappersTest {
System.out.println("***************"); System.out.println("***************");
} }
@Test
public void testD4ScienceTraining() throws IOException {
final String xml = IOUtils
.toString(Objects.requireNonNull(getClass().getResourceAsStream("d4science-1-training.xml")));
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
final OtherResearchProduct trainingMaterial = (OtherResearchProduct) list.get(0);
System.out.println("***************");
System.out.println(new ObjectMapper().writeValueAsString(trainingMaterial));
System.out.println("***************");
}
@Test
public void testD4ScienceDataset() throws IOException {
final String xml = IOUtils
.toString(Objects.requireNonNull(getClass().getResourceAsStream("d4science-2-dataset.xml")));
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
final Dataset trainingMaterial = (Dataset) list.get(0);
System.out.println("***************");
System.out.println(new ObjectMapper().writeValueAsString(trainingMaterial));
System.out.println("***************");
}
@Test @Test
void testNotWellFormed() throws IOException { void testNotWellFormed() throws IOException {
final String xml = IOUtils final String xml = IOUtils

View File

@ -0,0 +1,93 @@
<?xml version="1.0" encoding="UTF-8"?>
<oai:record xmlns:dr="http://www.driver-repository.eu/namespace/dr"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:oaf="http://namespace.openaire.eu/oaf" xmlns:oai="http://www.openarchives.org/OAI/2.0/">
<oai:header>
<dri:objIdentifier>alessia_____::104c2d4ba8878c16fa824dce5b1bea57</dri:objIdentifier>
<dri:recordIdentifier>12d8f77e-d66f-46f5-8d88-af7db23bc4c9</dri:recordIdentifier>
<dri:dateOfCollection>2023-09-08T10:12:35.864+02:00</dri:dateOfCollection>
<oaf:datasourceprefix>alessia_____</oaf:datasourceprefix>
<dr:dateOfTransformation>2023-09-08T11:31:45.692+02:00</dr:dateOfTransformation>
</oai:header>
<oai:metadata>
<datacite:resource
xmlns:datacite="http://datacite.org/schema/kernel-4"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://datacite.org/schema/kernel-4 http://schema.datacite.org/meta/kernel-4/metadata.xsd">
<datacite:identifier identifierType="URL">http://data.d4science.org/ctlg/ResourceCatalogue/visual_analytics_for_data_scientists</datacite:identifier>
<datacite:alternateIdentifiers/>
<datacite:creators>
<datacite:creator>
<datacite:creatorName>BRAGHIERI MARCO</datacite:creatorName>
</datacite:creator>
</datacite:creators>
<datacite:titles>
<datacite:title>Visual Analytics for Data Scientists</datacite:title>
</datacite:titles>
<datacite:publisher>SoBigData++</datacite:publisher>
<datacite:publicationYear/>
<datacite:dates>
<datacite:date dateType="Issued"/>
</datacite:dates>
<datacite:resourceType resourceTypeGeneral="TrainingMaterial">TrainingMaterial</datacite:resourceType>
<datacite:descriptions>
<datacite:description descriptionType="Abstract">Participants to this module shall
- Learn the principles and rules underlying the design of visual data
representations and human-computer interactions
- Understand, adapt and apply representative visual analytics methods and systems for diverse types
of data and problems
- Analyse and evaluate the structure and properties
of data to select or devise appropriate methods for data exploration
- Combine visualization, interactive techniques, and computational
processing to develop practical data analysis for problem solving
(This teaching material on Visual Analytics for Data Scientists is part of a MSc module at City University London).
The author did not intend to violate any copyright on figures or content. In case you are the legal owner of any copyrighted content, please contact info@sobigdata.eu and we will immediately remove it</datacite:description>
</datacite:descriptions>
<datacite:subjects>
<datacite:subject>Visual analytics</datacite:subject>
</datacite:subjects>
<datacite:formats>
<datacite:format>Slides</datacite:format>
<datacite:format>Other</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>ZIP</datacite:format>
</datacite:formats>
</datacite:resource>
<oaf:accessrights>OPEN</oaf:accessrights>
<dr:CobjCategory type="other">0010</dr:CobjCategory>
<oaf:dateAccepted/>
<oaf:hostedBy id="alessia_____::alessia" name="Alessia"/>
<oaf:collectedFrom id="alessia_____::alessia" name="Alessia"/>
<oaf:license>other-open</oaf:license>
<oaf:projectid>corda__h2020::871042</oaf:projectid>
</oai:metadata>
<about xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
<originDescription altered="true" harvestDate="2023-09-08T10:12:35.864+02:00">
<baseURL>https%3A%2F%2Fapi.d4science.org%2Fcatalogue%2Fitems</baseURL>
<identifier/>
<datestamp/>
<metadataNamespace/>
</originDescription>
</provenance>
<oaf:datainfo>
<oaf:inferred>false</oaf:inferred>
<oaf:deletedbyinference>false</oaf:deletedbyinference>
<oaf:trust>0.9</oaf:trust>
<oaf:inferenceprovenance/>
<oaf:provenanceaction classid="sysimport:crosswalk"
classname="Harvested" schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
</oaf:datainfo>
</about>
</oai:record>

View File

@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<oai:record xmlns:dr="http://www.driver-repository.eu/namespace/dr"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:oaf="http://namespace.openaire.eu/oaf" xmlns:oai="http://www.openarchives.org/OAI/2.0/">
<oai:header>
<dri:objIdentifier>alessia_____::028879484548f4e1c630e1c503e35231</dri:objIdentifier>
<dri:recordIdentifier>4fed018e-c2ff-4afa-b7b5-1ca1beebf850</dri:recordIdentifier>
<dri:dateOfCollection>2023-09-08T12:14:27.615+02:00</dri:dateOfCollection>
<oaf:datasourceprefix>alessia_____</oaf:datasourceprefix>
<dr:dateOfTransformation>2023-09-08T12:14:51.7+02:00</dr:dateOfTransformation>
</oai:header>
<oai:metadata>
<datacite:resource
xmlns:datacite="http://datacite.org/schema/kernel-4"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://datacite.org/schema/kernel-4 http://schema.datacite.org/meta/kernel-4/metadata.xsd">
<datacite:identifier identifierType="URL">http://data.d4science.org/ctlg/ResourceCatalogue/city-to-city_migration</datacite:identifier>
<datacite:alternateIdentifiers>
<datacite:alternateIdentifier type="URL"/>
</datacite:alternateIdentifiers>
<datacite:creators>
<datacite:creator>
<datacite:creatorName>Pappalardo, Luca</datacite:creatorName>
<datacite:affiliation/>
<datacite:nameIdentifier nameIdentifierScheme="ORCID" schemeURI="http://orcid.org">0000-0002-1547-6007</datacite:nameIdentifier>
</datacite:creator>
</datacite:creators>
<datacite:titles>
<datacite:title>City-to-city migration</datacite:title>
</datacite:titles>
<datacite:publisher>SoBigData++</datacite:publisher>
<datacite:publicationYear/>
<datacite:dates>
<datacite:date dateType="Issued">2018-02-15</datacite:date>
</datacite:dates>
<datacite:resourceType resourceTypeGeneral="Dataset">Dataset</datacite:resourceType>
<datacite:descriptions>
<datacite:description descriptionType="Abstract">Census data recording the migration of people between metropolitan areas in
the US</datacite:description>
</datacite:descriptions>
<datacite:subjects>
<datacite:subject>Human Mobility data</datacite:subject>
</datacite:subjects>
<datacite:formats/>
</datacite:resource>
<oaf:accessrights>OPEN</oaf:accessrights>
<dr:CobjCategory type="dataset">0021</dr:CobjCategory>
<oaf:dateAccepted>2018-02-15</oaf:dateAccepted>
<oaf:hostedBy id="alessia_____::alessia" name="Alessia"/>
<oaf:collectedFrom id="alessia_____::alessia" name="Alessia"/>
<oaf:license>AFL-3.0</oaf:license>
<oaf:projectid>corda__h2020::871042</oaf:projectid>
</oai:metadata>
<about xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
<originDescription altered="true" harvestDate="2023-09-08T12:14:27.615+02:00">
<baseURL>https%3A%2F%2Fapi.d4science.org%2Fcatalogue%2Fitems</baseURL>
<identifier/>
<datestamp/>
<metadataNamespace/>
</originDescription>
</provenance>
<oaf:datainfo>
<oaf:inferred>false</oaf:inferred>
<oaf:deletedbyinference>false</oaf:deletedbyinference>
<oaf:trust>0.9</oaf:trust>
<oaf:inferenceprovenance/>
<oaf:provenanceaction classid="sysimport:crosswalk"
classname="Harvested" schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
</oaf:datainfo>
</about>
</oai:record>

View File

@ -24,10 +24,7 @@ import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class XmlRecordFactoryTest { public class XmlRecordFactoryTest {
@ -196,4 +193,51 @@ public class XmlRecordFactoryTest {
assertEquals("dnet:pid_types", ((Element) pids.get(0)).attribute("schemeid").getValue()); assertEquals("dnet:pid_types", ((Element) pids.get(0)).attribute("schemeid").getValue());
assertEquals("dnet:pid_types", ((Element) pids.get(0)).attribute("schemename").getValue()); assertEquals("dnet:pid_types", ((Element) pids.get(0)).attribute("schemename").getValue());
} }
@Test
public void testD4ScienceTraining() throws DocumentException, IOException {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER
.readValue(
IOUtils.toString(getClass().getResourceAsStream("d4science-1-training.json")),
OtherResearchProduct.class);
final String xml = xmlRecordFactory.build(new JoinedEntity<>(p));
assertNotNull(xml);
final Document doc = new SAXReader().read(new StringReader(xml));
assertNotNull(doc);
System.out.println(doc.asXML());
}
@Test
public void testD4ScienceDataset() throws DocumentException, IOException {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER
.readValue(
IOUtils.toString(getClass().getResourceAsStream("d4science-2-dataset.json")),
OtherResearchProduct.class);
final String xml = xmlRecordFactory.build(new JoinedEntity<>(p));
assertNotNull(xml);
final Document doc = new SAXReader().read(new StringReader(xml));
assertNotNull(doc);
System.out.println(doc.asXML());
}
} }

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -39,7 +39,8 @@
<switch> <switch>
<!-- The default will be set as the normal start, a.k.a. get-doi-synonyms --> <!-- The default will be set as the normal start, a.k.a. get-doi-synonyms -->
<!-- If any different condition is set, go to the corresponding start --> <!-- If any different condition is set, go to the corresponding start -->
<case to="non-iterative-rankings">${wf:conf('resume') eq "rankings-start"}</case> <case to="spark-cc">${wf:conf('resume') eq "cc"}</case>
<case to="spark-ram">${wf:conf('resume') eq "ram"}</case>
<case to="spark-impulse">${wf:conf('resume') eq "impulse"}</case> <case to="spark-impulse">${wf:conf('resume') eq "impulse"}</case>
<case to="spark-pagerank">${wf:conf('resume') eq "pagerank"}</case> <case to="spark-pagerank">${wf:conf('resume') eq "pagerank"}</case>
<case to="spark-attrank">${wf:conf('resume') eq "attrank"}</case> <case to="spark-attrank">${wf:conf('resume') eq "attrank"}</case>
@ -89,18 +90,11 @@
<file>${nameNode}${wfAppPath}/create_openaire_ranking_graph.py#create_openaire_ranking_graph.py</file> <file>${nameNode}${wfAppPath}/create_openaire_ranking_graph.py#create_openaire_ranking_graph.py</file>
</spark> </spark>
<ok to="non-iterative-rankings" /> <ok to="spark-cc"/>
<error to="openaire-graph-error" /> <error to="openaire-graph-error" />
</action> </action>
<!-- Citation Count and RAM are calculated in parallel-->
<fork name="non-iterative-rankings">
<path start="spark-cc"/>
<!-- <path start="spark-impulse"/> -->
<path start="spark-ram"/>
</fork>
<!-- Run Citation Count calculation --> <!-- Run Citation Count calculation -->
<action name="spark-cc"> <action name="spark-cc">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
@ -129,7 +123,7 @@
<file>${wfAppPath}/bip-ranker/CC.py#CC.py</file> <file>${wfAppPath}/bip-ranker/CC.py#CC.py</file>
</spark> </spark>
<ok to="join-non-iterative-rankings" /> <ok to="spark-ram" />
<error to="cc-fail" /> <error to="cc-fail" />
</action> </action>
@ -165,14 +159,11 @@
<file>${wfAppPath}/bip-ranker/TAR.py#TAR.py</file> <file>${wfAppPath}/bip-ranker/TAR.py#TAR.py</file>
</spark> </spark>
<ok to="join-non-iterative-rankings" /> <ok to="spark-impulse" />
<error to="ram-fail" /> <error to="ram-fail" />
</action> </action>
<!-- Join non-iterative methods -->
<join name="join-non-iterative-rankings" to="spark-impulse"/>
<action name="spark-impulse"> <action name="spark-impulse">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">

10
pom.xml
View File

@ -112,6 +112,16 @@
<url>https://maven.d4science.org/nexus/content/repositories/dnet-deps</url> <url>https://maven.d4science.org/nexus/content/repositories/dnet-deps</url>
<layout>default</layout> <layout>default</layout>
</repository> </repository>
<repository>
<id>maven-restlet</id>
<name>Restlet repository</name>
<url>https://maven.restlet.talend.com</url>
</repository>
<repository>
<id>conjars</id>
<name>conjars</name>
<url>https://conjars.wensel.net/repo/</url>
</repository>
</repositories> </repositories>
<dependencies> <dependencies>