Compare commits

...

20 Commits

Author SHA1 Message Date
Claudio Atzori ee8a39e7d2 cleanup and refinements 2023-10-04 12:32:05 +02:00
Claudio Atzori 5919e488dd Merge branch 'beta' into importpoci 2023-10-03 10:43:53 +02:00
Miriam Baglioni d7fccdc64b fixed paths in wf to match the req of the pathname 2023-10-02 14:10:57 +02:00
Miriam Baglioni 9898470b0e Addressing comments in D-Net/dnet-hadoop#340\#issuecomment-10592 2023-10-02 12:54:16 +02:00
Giambattista Bloisi c412dc162b Fix bug in conversion from dedup json model to Spark Dataset of Rows: list of strings contained the json escaped representation of the value instead of the plain value, this caused instanceTypeMatch failures because of the leading and trailing double quotes 2023-10-02 11:34:51 +02:00
Claudio Atzori 5d09b7db8b Merge pull request 'SparkPropagateRelation relations do not propagate deletedByInference and invisible' (#333) from consistency_keep_mergerels into beta
Reviewed-on: D-Net/dnet-hadoop#333
2023-10-02 11:27:57 +02:00
Claudio Atzori 7b403a920f Merge branch 'beta' into consistency_keep_mergerels 2023-10-02 11:26:00 +02:00
Claudio Atzori dc86018a5f Merge branch 'merge_entities_job' into beta 2023-10-02 11:24:48 +02:00
Giambattista Bloisi 3c47920c78 Use asScala to convert java List to Scala Sequence 2023-10-02 11:04:47 +02:00
Claudio Atzori 7f244d9a7a code formatting 2023-10-02 11:04:36 +02:00
Giambattista Bloisi e239b81740 Fix defect #8997: GenerateEventsJob is generating huge amounts of logs because broker entity similarity calculation consistently failed 2023-10-02 11:04:18 +02:00
Miriam Baglioni e84f5b5e64 extended existing codo to accomodate import of POCI from open citation 2023-10-02 09:25:16 +02:00
Alessia Bardi 0935d7757c Use v5 of the UNIBI Gold ISSN list in test 2023-09-20 15:41:35 +02:00
Alessia Bardi cc7204a089 tests for d4science catalog 2023-09-20 15:38:32 +02:00
Sandro La Bruzzo 76476cdfb6 Added maven repo for dependencies that are not in maven central 2023-09-20 10:33:14 +02:00
Serafeim Chatzopoulos 395a4af020 Run CC and RAM sequentieally in dhp-impact-indicators WF 2023-09-13 08:59:40 +02:00
Claudio Atzori 8a6892cc63 [graph dedup] consistency wf should not remove the relations while dispatching the entities 2023-09-12 21:27:05 +02:00
Claudio Atzori 4786aa0e09 added Archive ouverte UNIGE (ETHZ.UNIGENF, opendoar____::1400) to the Datacite hostedBy_map 2023-09-07 11:21:07 +02:00
Giambattista Bloisi 2caaaec42d Include SparkCleanRelation logic in SparkPropagateRelation
SparkPropagateRelation includes merge relations
Revised tests for SparkPropagateRelation
2023-09-04 11:33:20 +02:00
Giambattista Bloisi 6cc7d8ca7b GroupEntities and DispatchEntites are now merged in GroupEntitiesSparkJob 2023-08-30 10:43:31 +02:00
34 changed files with 818 additions and 917 deletions

View File

@ -1,98 +0,0 @@
package eu.dnetlib.dhp.oa.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
public class DispatchEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
Objects
.requireNonNull(
DispatchEntitiesSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json")));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible"));
log.info("filterInvisible: {}", filterInvisible);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
dispatchEntities(spark, inputPath, outputPath, filterInvisible);
});
}
private static void dispatchEntities(
SparkSession spark,
String inputPath,
String outputPath,
boolean filterInvisible) {
Dataset<String> df = spark.read().textFile(inputPath);
ModelSupport.oafTypes.entrySet().parallelStream().forEach(entry -> {
String entityType = entry.getKey();
Class<?> clazz = entry.getValue();
if (!entityType.equalsIgnoreCase("relation")) {
Dataset<Row> entityDF = spark
.read()
.schema(Encoders.bean(clazz).schema())
.json(
df
.filter((FilterFunction<String>) s -> s.startsWith(clazz.getName()))
.map(
(MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"),
Encoders.STRING()));
if (filterInvisible) {
entityDF = entityDF.filter("dataInfo.invisible != true");
}
entityDF
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + entityType);
}
});
}
}

View File

@ -2,36 +2,28 @@
package eu.dnetlib.dhp.oa.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.when;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2;
@ -39,13 +31,9 @@ import scala.Tuple2;
* Groups the graph content by entity identifier to ensure ID uniqueness
*/
public class GroupEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
private static final String ID_JPATH = "$.id";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
public static void main(String[] args) throws Exception {
@ -66,9 +54,15 @@ public class GroupEntitiesSparkJob {
String graphInputPath = parser.get("graphInputPath");
log.info("graphInputPath: {}", graphInputPath);
String checkpointPath = parser.get("checkpointPath");
log.info("checkpointPath: {}", checkpointPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible"));
log.info("filterInvisible: {}", filterInvisible);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
@ -78,126 +72,95 @@ public class GroupEntitiesSparkJob {
isSparkSessionManaged,
spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
groupEntities(spark, graphInputPath, outputPath);
groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible);
});
}
private static void groupEntities(
SparkSession spark,
String inputPath,
String outputPath) {
String checkpointPath,
String outputPath,
boolean filterInvisible) {
Dataset<OafEntity> allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC);
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();
String entityInputPath = inputPath + "/" + entity;
if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) {
continue;
}
allEntities = allEntities
.union(
((Dataset<OafEntity>) spark
.read()
.schema(Encoders.bean(entityClass).schema())
.json(entityInputPath)
.filter("length(id) > 0")
.as(Encoders.bean(entityClass)))
.map((MapFunction<OafEntity, OafEntity>) r -> r, OAFENTITY_KRYO_ENC));
}
Dataset<?> groupedEntities = allEntities
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
.reduceGroups((ReduceFunction<OafEntity>) (b, a) -> OafMapperUtils.mergeEntities(b, a))
.map(
(MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2(
t._2().getClass().getName(), t._2()),
Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
// pivot on "_1" (classname of the entity)
// created columns containing only entities of the same class
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();
groupedEntities = groupedEntities
.withColumn(
entity,
when(col("_1").equalTo(entityClass.getName()), col("_2")));
}
groupedEntities
.drop("_1", "_2")
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.save(checkpointPath);
ForkJoinPool parPool = new ForkJoinPool(ModelSupport.entityTypes.size());
ModelSupport.entityTypes
.entrySet()
.stream()
.map(e -> parPool.submit(() -> {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();
final TypedColumn<OafEntity, OafEntity> aggregator = new GroupingAggregator().toColumn();
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
spark
.read()
.textFile(toSeq(listEntityPaths(inputPath, sc)))
.map((MapFunction<String, OafEntity>) GroupEntitiesSparkJob::parseOaf, Encoders.kryo(OafEntity.class))
.filter((FilterFunction<OafEntity>) e -> StringUtils.isNotBlank(ModelSupport.idFn().apply(e)))
.groupByKey((MapFunction<OafEntity, String>) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING())
.agg(aggregator)
.map(
(MapFunction<Tuple2<String, OafEntity>, String>) t -> t._2().getClass().getName() +
"|" + OBJECT_MAPPER.writeValueAsString(t._2()),
Encoders.STRING())
.load(checkpointPath)
.select(col(entity).as("value"))
.filter("value IS NOT NULL")
.as(OAFENTITY_KRYO_ENC)
.map((MapFunction<OafEntity, OafEntity>) r -> r, (Encoder<OafEntity>) Encoders.bean(entityClass))
.filter(filterInvisible ? "dataInfo.invisible != TRUE" : "TRUE")
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.text(outputPath);
}
public static class GroupingAggregator extends Aggregator<OafEntity, OafEntity, OafEntity> {
@Override
public OafEntity zero() {
return null;
}
@Override
public OafEntity reduce(OafEntity b, OafEntity a) {
return mergeAndGet(b, a);
}
private OafEntity mergeAndGet(OafEntity b, OafEntity a) {
if (Objects.nonNull(a) && Objects.nonNull(b)) {
return OafMapperUtils.mergeEntities(b, a);
}
return Objects.isNull(a) ? b : a;
}
@Override
public OafEntity merge(OafEntity b, OafEntity a) {
return mergeAndGet(b, a);
}
@Override
public OafEntity finish(OafEntity j) {
return j;
}
@Override
public Encoder<OafEntity> bufferEncoder() {
return Encoders.kryo(OafEntity.class);
}
@Override
public Encoder<OafEntity> outputEncoder() {
return Encoders.kryo(OafEntity.class);
}
}
private static OafEntity parseOaf(String s) {
DocumentContext dc = JsonPath
.parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
final String id = dc.read(ID_JPATH);
if (StringUtils.isNotBlank(id)) {
String prefix = StringUtils.substringBefore(id, "|");
switch (prefix) {
case "10":
return parse(s, Datasource.class);
case "20":
return parse(s, Organization.class);
case "40":
return parse(s, Project.class);
case "50":
String resultType = dc.read("$.resulttype.classid");
switch (resultType) {
case "publication":
return parse(s, Publication.class);
case "dataset":
return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class);
case "software":
return parse(s, Software.class);
case "other":
return parse(s, OtherResearchProduct.class);
default:
throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType));
}
default:
throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix));
}
} else {
throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s));
}
}
private static <T extends OafEntity> OafEntity parse(String s, Class<T> clazz) {
.option("compression", "gzip")
.json(outputPath + "/" + entity);
}))
.collect(Collectors.toList())
.forEach(t -> {
try {
return OBJECT_MAPPER.readValue(s, clazz);
} catch (IOException e) {
throw new IllegalArgumentException(e);
t.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
private static List<String> listEntityPaths(String inputPath, JavaSparkContext sc) {
return HdfsSupport
.listFiles(inputPath, sc.hadoopConfiguration())
.stream()
.filter(f -> !f.toLowerCase().contains("relation"))
.collect(Collectors.toList());
}
}

View File

@ -1,26 +0,0 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "i",
"paramLongName": "inputPath",
"paramDescription": "the source path",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "path of the output graph",
"paramRequired": true
},
{
"paramName": "fi",
"paramLongName": "filterInvisible",
"paramDescription": "if true filters out invisible entities",
"paramRequired": true
}
]

View File

@ -8,13 +8,25 @@
{
"paramName": "gin",
"paramLongName": "graphInputPath",
"paramDescription": "the graph root path",
"paramDescription": "the input graph root path",
"paramRequired": true
},
{
"paramName": "cp",
"paramLongName": "checkpointPath",
"paramDescription": "checkpoint directory",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the output merged graph root path",
"paramDescription": "the output graph root path",
"paramRequired": true
},
{
"paramName": "fi",
"paramLongName": "filterInvisible",
"paramDescription": "if true filters out invisible entities",
"paramRequired": true
}
]

View File

@ -81,7 +81,7 @@ case class SparkModel(conf: DedupConfig) {
MapDocumentUtil.truncateList(
MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType),
fdef.getSize
).toArray
).asScala
case Type.StringConcat =>
val jpaths = CONCAT_REGEX.split(fdef.getPath)

View File

@ -1,6 +1,23 @@
package eu.dnetlib.pace.util;
/*
* Diff Match and Patch
* Copyright 2018 The diff-match-patch Authors.
* https://github.com/google/diff-match-patch
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Diff Match and Patch
* Copyright 2018 The diff-match-patch Authors.

View File

@ -117,6 +117,11 @@ public class MapDocumentUtil {
return result;
}
if (type == Type.List && jresult instanceof List) {
((List<?>) jresult).forEach(x -> result.add(x.toString()));
return result;
}
if (jresult instanceof JSONArray) {
((JSONArray) jresult).forEach(it -> {
try {

View File

@ -12,6 +12,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
@ -30,15 +31,29 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2;
public class CreateActionSetSparkJob implements Serializable {
public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations";
public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations";
private static final String ID_PREFIX = "50|doi_________::";
// DOI-to-DOI citations
public static final String COCI = "COCI";
// PMID-to-PMID citations
public static final String POCI = "POCI";
private static final String DOI_PREFIX = "50|doi_________::";
private static final String PMID_PREFIX = "50|pmid________::";
private static final String TRUST = "0.91";
private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws IOException, ParseException {
@ -62,7 +77,7 @@ public class CreateActionSetSparkJob implements Serializable {
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath");
log.info("inputPath {}", inputPath.toString());
log.info("inputPath {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath);
@ -76,41 +91,68 @@ public class CreateActionSetSparkJob implements Serializable {
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
extractContent(spark, inputPath, outputPath, shouldDuplicateRels);
});
spark -> extractContent(spark, inputPath, outputPath, shouldDuplicateRels));
}
private static void extractContent(SparkSession spark, String inputPath, String outputPath,
boolean shouldDuplicateRels) {
spark
getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, COCI)
.union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, POCI))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
}
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(SparkSession spark, String inputPath,
boolean shouldDuplicateRels, String prefix) {
return spark
.read()
.textFile(inputPath + "/*")
.textFile(inputPath + "/" + prefix + "/" + prefix + "_JSON/*")
.map(
(MapFunction<String, COCI>) value -> OBJECT_MAPPER.readValue(value, COCI.class),
Encoders.bean(COCI.class))
.flatMap(
(FlatMapFunction<COCI, Relation>) value -> createRelation(value, shouldDuplicateRels).iterator(),
(FlatMapFunction<COCI, Relation>) value -> createRelation(
value, shouldDuplicateRels, prefix)
.iterator(),
Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) value -> value != null)
.filter((FilterFunction<Relation>) Objects::nonNull)
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
}
private static List<Relation> createRelation(COCI value, boolean duplicate) {
private static List<Relation> createRelation(COCI value, boolean duplicate, String p) {
List<Relation> relationList = new ArrayList<>();
String prefix;
String citing;
String cited;
String citing = ID_PREFIX
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting()));
final String cited = ID_PREFIX
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited()));
switch (p) {
case COCI:
prefix = DOI_PREFIX;
citing = prefix
+ IdentifierFactory
.md5(CleaningFunctions.normalizePidValue(PidType.doi.toString(), value.getCiting()));
cited = prefix
+ IdentifierFactory
.md5(CleaningFunctions.normalizePidValue(PidType.doi.toString(), value.getCited()));
break;
case POCI:
prefix = PMID_PREFIX;
citing = prefix
+ IdentifierFactory
.md5(CleaningFunctions.normalizePidValue(PidType.pmid.toString(), value.getCiting()));
cited = prefix
+ IdentifierFactory
.md5(CleaningFunctions.normalizePidValue(PidType.pmid.toString(), value.getCited()));
break;
default:
throw new IllegalStateException("Invalid prefix: " + p);
}
if (!citing.equals(cited)) {
relationList
@ -120,7 +162,7 @@ public class CreateActionSetSparkJob implements Serializable {
cited, ModelConstants.CITES));
if (duplicate && value.getCiting().endsWith(".refs")) {
citing = ID_PREFIX + IdentifierFactory
citing = prefix + IdentifierFactory
.md5(
CleaningFunctions
.normalizePidValue(
@ -132,59 +174,30 @@ public class CreateActionSetSparkJob implements Serializable {
return relationList;
}
private static Collection<Relation> getRelations(String citing, String cited) {
return Arrays
.asList(
getRelation(citing, cited, ModelConstants.CITES),
getRelation(cited, citing, ModelConstants.IS_CITED_BY));
}
public static Relation getRelation(
String source,
String target,
String relclass) {
Relation r = new Relation();
r.setCollectedfrom(getCollectedFrom());
r.setSource(source);
r.setTarget(target);
r.setRelClass(relclass);
r.setRelType(ModelConstants.RESULT_RESULT);
r.setSubRelType(ModelConstants.CITATION);
r
.setDataInfo(
getDataInfo());
return r;
}
String relClass) {
public static List<KeyValue> getCollectedFrom() {
KeyValue kv = new KeyValue();
kv.setKey(ModelConstants.OPENOCITATIONS_ID);
kv.setValue(ModelConstants.OPENOCITATIONS_NAME);
return Arrays.asList(kv);
}
public static DataInfo getDataInfo() {
DataInfo di = new DataInfo();
di.setInferred(false);
di.setDeletedbyinference(false);
di.setTrust(TRUST);
di
.setProvenanceaction(
getQualifier(OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS));
return di;
}
public static Qualifier getQualifier(String class_id, String class_name,
String qualifierSchema) {
Qualifier pa = new Qualifier();
pa.setClassid(class_id);
pa.setClassname(class_name);
pa.setSchemeid(qualifierSchema);
pa.setSchemename(qualifierSchema);
return pa;
return OafMapperUtils
.getRelation(
source,
target,
ModelConstants.RESULT_RESULT,
ModelConstants.CITATION,
relClass,
Arrays
.asList(
OafMapperUtils.keyValue(ModelConstants.OPENOCITATIONS_ID, ModelConstants.OPENOCITATIONS_NAME)),
OafMapperUtils
.dataInfo(
false, null, false, false,
OafMapperUtils
.qualifier(
OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME,
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
TRUST),
null);
}
}

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.actionmanager.opencitations;
import java.io.*;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
@ -37,7 +38,7 @@ public class GetOpenCitationsRefs implements Serializable {
parser.parseArgument(args);
final String[] inputFile = parser.get("inputFile").split(";");
log.info("inputFile {}", inputFile.toString());
log.info("inputFile {}", Arrays.asList(inputFile));
final String workingPath = parser.get("workingPath");
log.info("workingPath {}", workingPath);
@ -45,6 +46,9 @@ public class GetOpenCitationsRefs implements Serializable {
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode);
final String prefix = parser.get("prefix");
log.info("prefix {}", prefix);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
@ -53,30 +57,31 @@ public class GetOpenCitationsRefs implements Serializable {
GetOpenCitationsRefs ocr = new GetOpenCitationsRefs();
for (String file : inputFile) {
ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem);
ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem, prefix);
}
}
private void doExtract(String inputFile, String workingPath, FileSystem fileSystem)
private void doExtract(String inputFile, String workingPath, FileSystem fileSystem, String prefix)
throws IOException {
final Path path = new Path(inputFile);
FSDataInputStream oc_zip = fileSystem.open(path);
int count = 1;
// int count = 1;
try (ZipInputStream zis = new ZipInputStream(oc_zip)) {
ZipEntry entry = null;
while ((entry = zis.getNextEntry()) != null) {
if (!entry.isDirectory()) {
String fileName = entry.getName();
fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count;
count++;
// fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count;
fileName = fileName.substring(0, fileName.lastIndexOf("."));
// count++;
try (
FSDataOutputStream out = fileSystem
.create(new Path(workingPath + "/COCI/" + fileName + ".gz"));
.create(new Path(workingPath + "/" + prefix + "/" + fileName + ".gz"));
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
IOUtils.copy(zis, gzipOs);

View File

@ -7,6 +7,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
@ -42,13 +43,16 @@ public class ReadCOCI implements Serializable {
log.info("outputPath: {}", outputPath);
final String[] inputFile = parser.get("inputFile").split(";");
log.info("inputFile {}", inputFile.toString());
log.info("inputFile {}", Arrays.asList(inputFile));
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath");
log.info("workingPath {}", workingPath);
final String format = parser.get("format");
log.info("format {}", format);
SparkConf sconf = new SparkConf();
final String delimiter = Optional
@ -64,16 +68,17 @@ public class ReadCOCI implements Serializable {
workingPath,
inputFile,
outputPath,
delimiter);
delimiter,
format);
});
}
private static void doRead(SparkSession spark, String workingPath, String[] inputFiles,
String outputPath,
String delimiter) throws IOException {
String delimiter, String format) {
for (String inputFile : inputFiles) {
String p_string = workingPath + "/" + inputFile + ".gz";
String pString = workingPath + "/" + inputFile + ".gz";
Dataset<Row> cociData = spark
.read()
@ -82,14 +87,20 @@ public class ReadCOCI implements Serializable {
.option("inferSchema", "true")
.option("header", "true")
.option("quotes", "\"")
.load(p_string)
.load(pString)
.repartition(100);
cociData.map((MapFunction<Row, COCI>) row -> {
COCI coci = new COCI();
coci.setOci(row.getString(0));
if (format.equals("COCI")) {
coci.setCiting(row.getString(1));
coci.setCited(row.getString(2));
} else {
coci.setCiting(String.valueOf(row.getInt(1)));
coci.setCited(String.valueOf(row.getInt(2)));
}
coci.setOci(row.getString(0));
return coci;
}, Encoders.bean(COCI.class))
.write()

View File

@ -16,10 +16,11 @@
"paramLongName": "isSparkSessionManaged",
"paramDescription": "the hdfs name node",
"paramRequired": false
}, {
},
{
"paramName": "sdr",
"paramLongName": "shouldDuplicateRels",
"paramDescription": "the hdfs name node",
"paramDescription": "activates/deactivates the construction of bidirectional relations Cites/IsCitedBy",
"paramRequired": false
}
}
]

View File

@ -16,5 +16,11 @@
"paramLongName": "hdfsNameNode",
"paramDescription": "the hdfs name node",
"paramRequired": true
},
{
"paramName": "p",
"paramLongName": "prefix",
"paramDescription": "COCI or POCI",
"paramRequired": true
}
]

View File

@ -30,7 +30,12 @@
"paramLongName": "inputFile",
"paramDescription": "the hdfs name node",
"paramRequired": true
}
}, {
"paramName": "f",
"paramLongName": "format",
"paramDescription": "the hdfs name node",
"paramRequired": true
}
]

View File

@ -34,6 +34,7 @@
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="download">
<shell xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
@ -46,7 +47,7 @@
</configuration>
<exec>download.sh</exec>
<argument>${filelist}</argument>
<argument>${workingPath}/Original</argument>
<argument>${workingPath}/${prefix}/Original</argument>
<env-var>HADOOP_USER_NAME=${wf:user()}</env-var>
<file>download.sh</file>
<capture-output/>
@ -54,12 +55,14 @@
<ok to="extract"/>
<error to="Kill"/>
</action>
<action name="extract">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--inputFile</arg><arg>${inputFile}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}/${prefix}</arg>
<arg>--prefix</arg><arg>${prefix}</arg>
</java>
<ok to="read"/>
<error to="Kill"/>
@ -82,10 +85,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}/COCI</arg>
<arg>--outputPath</arg><arg>${workingPath}/COCI_JSON/</arg>
<arg>--workingPath</arg><arg>${workingPath}/${prefix}/${prefix}</arg>
<arg>--outputPath</arg><arg>${workingPath}/${prefix}/${prefix}_JSON/</arg>
<arg>--delimiter</arg><arg>${delimiter}</arg>
<arg>--inputFile</arg><arg>${inputFileCoci}</arg>
<arg>--format</arg><arg>${prefix}</arg>
</spark>
<ok to="create_actionset"/>
<error to="Kill"/>
@ -108,7 +112,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/COCI_JSON</arg>
<arg>--inputPath</arg><arg>${workingPath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="End"/>

View File

@ -1,4 +1,9 @@
{
"ETHZ.UNIGENF": {
"openaire_id": "opendoar____::1400",
"datacite_name": "Uni Genf",
"official_name": "Archive ouverte UNIGE"
},
"GESIS.RKI": {
"openaire_id": "re3data_____::r3d100010436",
"datacite_name": "Forschungsdatenzentrum am Robert Koch Institut",

View File

@ -2,7 +2,9 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.commons.io.IOUtils;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -27,10 +29,14 @@ public class TrustUtils {
static {
mapper = new ObjectMapper();
try {
dedupConfig = mapper
.readValue(
DedupConfig.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"),
DedupConfig.class);
dedupConfig = DedupConfig
.load(
IOUtils
.toString(
DedupConfig.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"),
StandardCharsets.UTF_8));
deduper = new SparkDeduper(dedupConfig);
} catch (final IOException e) {
log.error("Error loading dedupConfig, e");
@ -57,7 +63,7 @@ public class TrustUtils {
return TrustUtils.rescale(score, threshold);
} catch (final Exception e) {
log.error("Error computing score between results", e);
return BrokerConstants.MIN_TRUST;
throw new RuntimeException(e);
}
}

View File

@ -1,57 +0,0 @@
package eu.dnetlib.dhp.oa.dedup;
import java.util.Objects;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class RelationAggregator extends Aggregator<Relation, Relation, Relation> {
private static final Relation ZERO = new Relation();
@Override
public Relation zero() {
return ZERO;
}
@Override
public Relation reduce(Relation b, Relation a) {
return mergeRel(b, a);
}
@Override
public Relation merge(Relation b, Relation a) {
return mergeRel(b, a);
}
@Override
public Relation finish(Relation r) {
return r;
}
private Relation mergeRel(Relation b, Relation a) {
if (Objects.equals(b, ZERO)) {
return a;
}
if (Objects.equals(a, ZERO)) {
return b;
}
b.mergeFrom(a);
return b;
}
@Override
public Encoder<Relation> bufferEncoder() {
return Encoders.kryo(Relation.class);
}
@Override
public Encoder<Relation> outputEncoder() {
return Encoders.kryo(Relation.class);
}
}

View File

@ -1,78 +0,0 @@
package eu.dnetlib.dhp.oa.dedup
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.slf4j.LoggerFactory
object SparkCleanRelation {
private val log = LoggerFactory.getLogger(classOf[SparkCleanRelation])
@throws[Exception]
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(
IOUtils.toString(
classOf[SparkCleanRelation].getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")
)
)
parser.parseArgument(args)
val conf = new SparkConf
new SparkCleanRelation(parser, AbstractSparkAction.getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")))
}
}
class SparkCleanRelation(parser: ArgumentApplicationParser, spark: SparkSession)
extends AbstractSparkAction(parser, spark) {
override def run(isLookUpService: ISLookUpService): Unit = {
val graphBasePath = parser.get("graphBasePath")
val inputPath = parser.get("inputPath")
val outputPath = parser.get("outputPath")
SparkCleanRelation.log.info("graphBasePath: '{}'", graphBasePath)
SparkCleanRelation.log.info("inputPath: '{}'", inputPath)
SparkCleanRelation.log.info("outputPath: '{}'", outputPath)
AbstractSparkAction.removeOutputDir(spark, outputPath)
val entities =
Seq("datasource", "project", "organization", "publication", "dataset", "software", "otherresearchproduct")
val idsSchema = StructType.fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>")
val emptyIds = spark.createDataFrame(spark.sparkContext.emptyRDD[Row].setName("empty"),
idsSchema)
val ids = entities
.foldLeft(emptyIds)((ds, entity) => {
val entityPath = graphBasePath + '/' + entity
if (HdfsSupport.exists(entityPath, spark.sparkContext.hadoopConfiguration)) {
ds.union(spark.read.schema(idsSchema).json(entityPath))
} else {
ds
}
})
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
.select("id")
.distinct()
val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(inputPath)
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
AbstractSparkAction.save(
relations
.join(ids, col("source") === ids("id"), "leftsemi")
.join(ids, col("target") === ids("id"), "leftsemi"),
outputPath,
SaveMode.Overwrite
)
}
}

View File

@ -3,23 +3,19 @@ package eu.dnetlib.dhp.oa.dedup;
import static org.apache.spark.sql.functions.col;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
@ -70,73 +66,63 @@ public class SparkPropagateRelation extends AbstractSparkAction {
log.info("workingPath: '{}'", workingPath);
log.info("graphOutputPath: '{}'", graphOutputPath);
final String outputRelationPath = DedupUtility.createEntityPath(graphOutputPath, "relation");
removeOutputDir(spark, outputRelationPath);
Dataset<Relation> mergeRels = spark
.read()
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
.as(REL_BEAN_ENC);
// <mergedObjectID, dedupID>
Dataset<Row> mergedIds = mergeRels
Dataset<Row> idsToMerge = mergeRels
.where(col("relClass").equalTo(ModelConstants.MERGES))
.select(col("source").as("dedupID"), col("target").as("mergedObjectID"))
.distinct()
.cache();
.distinct();
Dataset<Row> allRels = spark
.read()
.schema(REL_BEAN_ENC.schema())
.json(DedupUtility.createEntityPath(graphBasePath, "relation"));
.json(graphBasePath + "/relation");
Dataset<Relation> dedupedRels = allRels
.joinWith(mergedIds, allRels.col("source").equalTo(mergedIds.col("mergedObjectID")), "left_outer")
.joinWith(mergedIds, col("_1.target").equalTo(mergedIds.col("mergedObjectID")), "left_outer")
.joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
.joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
.select("_1._1", "_1._2.dedupID", "_2.dedupID")
.as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING()))
.flatMap(SparkPropagateRelation::addInferredRelations, REL_KRYO_ENC);
Dataset<Relation> processedRelations = distinctRelations(
dedupedRels.union(mergeRels.map((MapFunction<Relation, Relation>) r -> r, REL_KRYO_ENC)))
.filter((FilterFunction<Relation>) r -> !Objects.equals(r.getSource(), r.getTarget()));
save(processedRelations, outputRelationPath, SaveMode.Overwrite);
}
private static Iterator<Relation> addInferredRelations(Tuple3<Relation, String, String> t) throws Exception {
Relation existingRel = t._1();
.map((MapFunction<Tuple3<Relation, String, String>, Relation>) t -> {
Relation rel = t._1();
String newSource = t._2();
String newTarget = t._3();
if (newSource == null && newTarget == null) {
return Collections.singleton(t._1()).iterator();
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
// update existing relation
if (existingRel.getDataInfo() == null) {
existingRel.setDataInfo(new DataInfo());
}
existingRel.getDataInfo().setDeletedbyinference(true);
// Create new relation inferred by dedupIDs
Relation inferredRel = (Relation) BeanUtils.cloneBean(existingRel);
inferredRel.setDataInfo((DataInfo) BeanUtils.cloneBean(existingRel.getDataInfo()));
inferredRel.getDataInfo().setDeletedbyinference(false);
if (newSource != null || newTarget != null) {
rel.getDataInfo().setDeletedbyinference(false);
if (newSource != null)
inferredRel.setSource(newSource);
rel.setSource(newSource);
if (newTarget != null)
inferredRel.setTarget(newTarget);
return Arrays.asList(existingRel, inferredRel).iterator();
rel.setTarget(newTarget);
}
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
return rels
.filter(getRelationFilterFunction())
return rel;
}, REL_BEAN_ENC);
// ids of records that are both not deletedbyinference and not invisible
Dataset<Row> ids = validIds(spark, graphBasePath);
// filter relations that point to valid records, can force them to be visible
Dataset<Relation> cleanedRels = dedupedRels
.join(ids, col("source").equalTo(ids.col("id")), "leftsemi")
.join(ids, col("target").equalTo(ids.col("id")), "leftsemi")
.as(REL_BEAN_ENC)
.map((MapFunction<Relation, Relation>) r -> {
r.getDataInfo().setInvisible(false);
return r;
}, REL_KRYO_ENC);
Dataset<Relation> distinctRels = cleanedRels
.groupByKey(
(MapFunction<Relation, String>) r -> String
.join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()),
@ -146,13 +132,33 @@ public class SparkPropagateRelation extends AbstractSparkAction {
return b;
})
.map((MapFunction<Tuple2<String, Relation>, Relation>) Tuple2::_2, REL_BEAN_ENC);
final String outputRelationPath = graphOutputPath + "/relation";
removeOutputDir(spark, outputRelationPath);
save(
distinctRels
.union(mergeRels)
.filter("source != target AND dataInfo.deletedbyinference != true AND dataInfo.invisible != true"),
outputRelationPath,
SaveMode.Overwrite);
}
private FilterFunction<Relation> getRelationFilterFunction() {
return r -> StringUtils.isNotBlank(r.getSource()) ||
StringUtils.isNotBlank(r.getTarget()) ||
StringUtils.isNotBlank(r.getRelType()) ||
StringUtils.isNotBlank(r.getSubRelType()) ||
StringUtils.isNotBlank(r.getRelClass());
static Dataset<Row> validIds(SparkSession spark, String graphBasePath) {
StructType idsSchema = StructType
.fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>");
Dataset<Row> allIds = spark.emptyDataset(RowEncoder.apply(idsSchema));
for (EntityType entityType : ModelSupport.entityTypes.keySet()) {
String entityPath = graphBasePath + '/' + entityType.name();
if (HdfsSupport.exists(entityPath, spark.sparkContext().hadoopConfiguration())) {
allIds = allIds.union(spark.read().schema(idsSchema).json(entityPath));
}
}
return allIds
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
.select("id")
.distinct();
}
}

View File

@ -1,20 +0,0 @@
[
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of raw graph",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "inputPath",
"paramDescription": "the path to the input relation to cleanup",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "the path of the output relation cleaned",
"paramRequired": true
}
]

View File

@ -100,35 +100,9 @@
--conf spark.sql.shuffle.partitions=15000
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--graphOutputPath</arg><arg>${workingPath}/propagaterelation/</arg>
<arg>--graphOutputPath</arg><arg>${graphOutputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="CleanRelation"/>
<error to="Kill"/>
</action>
<action name="CleanRelation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCleanRelation</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemoryOverhead}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--inputPath</arg><arg>${workingPath}/propagaterelation/relation</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
</spark>
<ok to="group_entities"/>
<error to="Kill"/>
</action>
@ -152,31 +126,7 @@
--conf spark.sql.shuffle.partitions=15000
</spark-opts>
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg>
</spark>
<ok to="dispatch_entities"/>
<error to="Kill"/>
</action>
<action name="dispatch_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch grouped entitities</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemoryOverhead}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--checkpointPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}</arg>
<arg>--filterInvisible</arg><arg>${filterInvisible}</arg>
</spark>

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.lenient;
@ -23,14 +22,13 @@ import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@ -46,8 +44,6 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
@ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ -62,6 +58,8 @@ public class SparkDedupTest implements Serializable {
private static String testGraphBasePath;
private static String testOutputBasePath;
private static String testDedupGraphBasePath;
private static String testConsistencyGraphBasePath;
private static final String testActionSetId = "test-orchestrator";
private static String whitelistPath;
private static List<String> whiteList;
@ -75,6 +73,7 @@ public class SparkDedupTest implements Serializable {
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI())
.toFile()
.getAbsolutePath();
testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
@ -83,6 +82,10 @@ public class SparkDedupTest implements Serializable {
.toAbsolutePath()
.toString();
testConsistencyGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
whitelistPath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/whitelist.simrels.txt").toURI())
.toFile()
@ -674,22 +677,45 @@ public class SparkDedupTest implements Serializable {
assertEquals(mergedOrp, deletedOrp);
}
@Test
@Order(6)
void copyRelationsNoOpenorgsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyRelationsNoOpenorgs.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath
});
new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService);
final Dataset<Row> outputRels = spark.read().text(testDedupGraphBasePath + "/relation");
System.out.println(outputRels.count());
// assertEquals(2382, outputRels.count());
}
@Test
@Order(7)
void propagateRelationTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"));
String outputRelPath = testDedupGraphBasePath + "/propagaterelation";
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", outputRelPath
"-i", testDedupGraphBasePath, "-w", testOutputBasePath, "-o", testConsistencyGraphBasePath
});
new SparkPropagateRelation(parser, spark).run(isLookUpService);
long relations = jsc.textFile(outputRelPath + "/relation").count();
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
// assertEquals(4860, relations);
System.out.println("relations = " + relations);
@ -699,95 +725,52 @@ public class SparkDedupTest implements Serializable {
.read()
.load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*"))
.as(Encoders.bean(Relation.class));
final JavaPairRDD<String, String> mergedIds = mergeRels
.where("relClass == 'merges'")
.select(mergeRels.col("target"))
.distinct()
.toJavaRDD()
.mapToPair(
(PairFunction<Row, String, String>) r -> new Tuple2<String, String>(r.getString(0), "d"));
JavaRDD<String> toCheck = jsc
.textFile(outputRelPath + "/relation")
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json))
.join(mergedIds)
.map(t -> t._2()._1())
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json))
.join(mergedIds)
.map(t -> t._2()._1());
Dataset<Row> inputRels = spark
.read()
.json(testDedupGraphBasePath + "/relation");
long deletedbyinference = toCheck.filter(this::isDeletedByInference).count();
long updated = toCheck.count();
Dataset<Row> outputRels = spark
.read()
.json(testConsistencyGraphBasePath + "/relation");
assertEquals(updated, deletedbyinference);
assertEquals(
0, outputRels
.filter("dataInfo.deletedbyinference == true OR dataInfo.invisible == true")
.count());
assertEquals(
5, outputRels
.filter("relClass NOT IN ('merges', 'isMergedIn')")
.count());
assertEquals(5 + mergeRels.count(), outputRels.count());
}
@Test
@Order(8)
void testCleanBaseRelations() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
// append dangling relations to be cleaned up
void testCleanedPropagatedRelations() throws Exception {
Dataset<Row> df_before = spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(testGraphBasePath + "/relation");
Dataset<Row> df_input = df_before
.unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a")))
.unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a")));
df_input.write().mode(SaveMode.Overwrite).json(testOutputBasePath + "_tmp");
parser
.parseArgument(
new String[] {
"--graphBasePath", testGraphBasePath,
"--inputPath", testGraphBasePath + "/relation",
"--outputPath", testDedupGraphBasePath + "/relation"
});
new SparkCleanRelation(parser, spark).run(isLookUpService);
.json(testDedupGraphBasePath + "/relation");
Dataset<Row> df_after = spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(testDedupGraphBasePath + "/relation");
assertNotEquals(df_before.count(), df_input.count());
assertNotEquals(df_input.count(), df_after.count());
assertEquals(5, df_after.count());
}
@Test
@Order(9)
void testCleanDedupedRelations() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
String inputRelPath = testDedupGraphBasePath + "/propagaterelation/relation";
// append dangling relations to be cleaned up
Dataset<Row> df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputRelPath);
df_before.filter(col("dataInfo.deletedbyinference").notEqual(true)).show(50, false);
parser
.parseArgument(
new String[] {
"--graphBasePath", testGraphBasePath,
"--inputPath", inputRelPath,
"--outputPath", testDedupGraphBasePath + "/relation"
});
new SparkCleanRelation(parser, spark).run(isLookUpService);
Dataset<Row> df_after = spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(testDedupGraphBasePath + "/relation");
.json(testConsistencyGraphBasePath + "/relation");
assertNotEquals(df_before.count(), df_after.count());
assertEquals(0, df_after.count());
assertEquals(
0, df_after
.filter("dataInfo.deletedbyinference == true OR dataInfo.invisible == true")
.count());
assertEquals(
5, df_after
.filter("relClass NOT IN ('merges', 'isMergedIn')")
.count());
}
@Test
@ -813,6 +796,7 @@ public class SparkDedupTest implements Serializable {
public static void finalCleanUp() throws IOException {
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
FileUtils.deleteDirectory(new File(testConsistencyGraphBasePath));
}
public boolean isDeletedByInference(String s) {

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.col;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
@ -15,10 +16,6 @@ import java.nio.file.Paths;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@ -33,8 +30,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
@ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ -44,11 +39,11 @@ public class SparkOpenorgsProvisionTest implements Serializable {
ISLookUpService isLookUpService;
private static SparkSession spark;
private static JavaSparkContext jsc;
private static String testGraphBasePath;
private static String testOutputBasePath;
private static String testDedupGraphBasePath;
private static String testConsistencyGraphBasePath;
private static final String testActionSetId = "test-orchestrator";
@BeforeAll
@ -64,6 +59,9 @@ public class SparkOpenorgsProvisionTest implements Serializable {
testDedupGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
testConsistencyGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
@ -76,8 +74,13 @@ public class SparkOpenorgsProvisionTest implements Serializable {
.master("local[*]")
.config(conf)
.getOrCreate();
}
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@AfterAll
public static void finalCleanUp() throws IOException {
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
FileUtils.deleteDirectory(new File(testConsistencyGraphBasePath));
}
@BeforeEach
@ -186,26 +189,21 @@ public class SparkOpenorgsProvisionTest implements Serializable {
new SparkUpdateEntity(parser, spark).run(isLookUpService);
long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count();
Dataset<Row> organizations = spark.read().json(testDedupGraphBasePath + "/organization");
long mergedOrgs = spark
Dataset<Row> mergedOrgs = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.as(Encoders.bean(Relation.class))
.where("relClass=='merges'")
.javaRDD()
.map(Relation::getTarget)
.distinct()
.count();
.select("target")
.distinct();
assertEquals(80, organizations);
assertEquals(80, organizations.count());
long deletedOrgs = jsc
.textFile(testDedupGraphBasePath + "/organization")
.filter(this::isDeletedByInference)
.count();
Dataset<Row> deletedOrgs = organizations
.filter("dataInfo.deletedbyinference = TRUE");
assertEquals(mergedOrgs, deletedOrgs);
assertEquals(mergedOrgs.count(), deletedOrgs.count());
}
@Test
@ -226,10 +224,9 @@ public class SparkOpenorgsProvisionTest implements Serializable {
new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService);
final JavaRDD<String> rels = jsc.textFile(testDedupGraphBasePath + "/relation");
assertEquals(2382, rels.count());
final Dataset<Row> outputRels = spark.read().text(testDedupGraphBasePath + "/relation");
assertEquals(2382, outputRels.count());
}
@Test
@ -244,51 +241,41 @@ public class SparkOpenorgsProvisionTest implements Serializable {
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath
"-i", testDedupGraphBasePath, "-w", testOutputBasePath, "-o", testConsistencyGraphBasePath
});
new SparkPropagateRelation(parser, spark).run(isLookUpService);
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4896, relations);
// check deletedbyinference
final Dataset<Relation> mergeRels = spark
.read()
.load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*"))
.as(Encoders.bean(Relation.class));
final JavaPairRDD<String, String> mergedIds = mergeRels
Dataset<Row> inputRels = spark
.read()
.json(testDedupGraphBasePath + "/relation");
Dataset<Row> outputRels = spark
.read()
.json(testConsistencyGraphBasePath + "/relation");
final Dataset<Row> mergedIds = mergeRels
.where("relClass == 'merges'")
.select(mergeRels.col("target"))
.distinct()
.toJavaRDD()
.mapToPair(
(PairFunction<Row, String, String>) r -> new Tuple2<String, String>(r.getString(0), "d"));
.select(col("target").as("id"))
.distinct();
JavaRDD<String> toCheck = jsc
.textFile(testDedupGraphBasePath + "/relation")
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json))
.join(mergedIds)
.map(t -> t._2()._1())
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json))
.join(mergedIds)
.map(t -> t._2()._1());
Dataset<Row> toUpdateRels = inputRels
.as("rel")
.join(mergedIds.as("s"), col("rel.source").equalTo(col("s.id")), "left_outer")
.join(mergedIds.as("t"), col("rel.target").equalTo(col("t.id")), "left_outer")
.filter("s.id IS NOT NULL OR t.id IS NOT NULL")
.distinct();
long deletedbyinference = toCheck.filter(this::isDeletedByInference).count();
long updated = toCheck.count();
Dataset<Row> updatedRels = inputRels
.select("source", "target", "relClass")
.except(outputRels.select("source", "target", "relClass"));
assertEquals(updated, deletedbyinference);
assertEquals(toUpdateRels.count(), updatedRels.count());
assertEquals(140, outputRels.count());
}
@AfterAll
public static void finalCleanUp() throws IOException {
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
}
public boolean isDeletedByInference(String s) {
return s.contains("\"deletedbyinference\":true");
}
}

View File

@ -96,30 +96,7 @@
--conf spark.sql.shuffle.partitions=15000
</spark-opts>
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg>
</spark>
<ok to="dispatch_entities"/>
<error to="Kill"/>
</action>
<action name="dispatch_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch grouped entities</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--checkpointPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}</arg>
<arg>--filterInvisible</arg><arg>${filterInvisible}</arg>
</spark>

View File

@ -1,16 +1,15 @@
package eu.dnetlib.dhp.oa.graph.group;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
@ -19,15 +18,13 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob;
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
import static org.junit.jupiter.api.Assertions.assertEquals;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class GroupEntitiesSparkJobTest {
@ -40,8 +37,9 @@ public class GroupEntitiesSparkJobTest {
private static Path workingDir;
private Path dataInputPath;
private Path groupEntityPath;
private Path dispatchEntityPath;
private Path checkpointPath;
private Path outputPath;
@BeforeAll
public static void beforeAll() throws IOException {
@ -58,8 +56,8 @@ public class GroupEntitiesSparkJobTest {
@BeforeEach
public void beforeEach() throws IOException, URISyntaxException {
dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI());
groupEntityPath = workingDir.resolve("grouped_entity");
dispatchEntityPath = workingDir.resolve("dispatched_entity");
checkpointPath = workingDir.resolve("grouped_entity");
outputPath = workingDir.resolve("dispatched_entity");
}
@AfterAll
@ -71,44 +69,35 @@ public class GroupEntitiesSparkJobTest {
@Test
@Order(1)
void testGroupEntities() throws Exception {
GroupEntitiesSparkJob.main(new String[] {
GroupEntitiesSparkJob.main(new String[]{
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-graphInputPath",
dataInputPath.toString(),
"-checkpointPath",
checkpointPath.toString(),
"-outputPath",
groupEntityPath.toString()
outputPath.toString(),
"-filterInvisible",
Boolean.FALSE.toString()
});
Dataset<Result> output = spark
Dataset<OafEntity> checkpointTable = spark
.read()
.textFile(groupEntityPath.toString())
.map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING())
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
.load(checkpointPath.toString())
.selectExpr("COALESCE(*)")
.as(Encoders.kryo(OafEntity.class));
assertEquals(
1,
output
checkpointTable
.filter(
(FilterFunction<Result>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9"
(FilterFunction<OafEntity>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9"
.equals(r.getId()) &&
r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
.count());
}
@Test
@Order(2)
void testDispatchEntities() throws Exception {
DispatchEntitiesSparkJob.main(new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
groupEntityPath.toString(),
"-outputPath",
dispatchEntityPath.resolve(".").toString(),
"-filterInvisible",
Boolean.TRUE.toString()
});
Dataset<Result> output = spark
.read()
@ -116,7 +105,7 @@ public class GroupEntitiesSparkJobTest {
DHPUtils
.toSeq(
HdfsSupport
.listFiles(dispatchEntityPath.toString(), spark.sparkContext().hadoopConfiguration())))
.listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration())))
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
assertEquals(3, output.count());

View File

@ -49,7 +49,7 @@ public class DownloadCsvTest {
@Test
void getUnibiFileTest() throws CollectorException, IOException, ClassNotFoundException {
String fileURL = "https://pub.uni-bielefeld.de/download/2944717/2944718/issn_gold_oa_version_4.csv";
String fileURL = "https://pub.uni-bielefeld.de/download/2944717/2944718/issn_gold_oa_version_5.csv";
final String outputFile = workingDir + "/unibi_gold.json";
new DownloadCSV()

View File

@ -1067,6 +1067,28 @@ class MappersTest {
System.out.println("***************");
}
@Test
public void testD4ScienceTraining() throws IOException {
final String xml = IOUtils
.toString(Objects.requireNonNull(getClass().getResourceAsStream("d4science-1-training.xml")));
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
final OtherResearchProduct trainingMaterial = (OtherResearchProduct) list.get(0);
System.out.println("***************");
System.out.println(new ObjectMapper().writeValueAsString(trainingMaterial));
System.out.println("***************");
}
@Test
public void testD4ScienceDataset() throws IOException {
final String xml = IOUtils
.toString(Objects.requireNonNull(getClass().getResourceAsStream("d4science-2-dataset.xml")));
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
final Dataset trainingMaterial = (Dataset) list.get(0);
System.out.println("***************");
System.out.println(new ObjectMapper().writeValueAsString(trainingMaterial));
System.out.println("***************");
}
@Test
void testNotWellFormed() throws IOException {
final String xml = IOUtils

View File

@ -0,0 +1,93 @@
<?xml version="1.0" encoding="UTF-8"?>
<oai:record xmlns:dr="http://www.driver-repository.eu/namespace/dr"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:oaf="http://namespace.openaire.eu/oaf" xmlns:oai="http://www.openarchives.org/OAI/2.0/">
<oai:header>
<dri:objIdentifier>alessia_____::104c2d4ba8878c16fa824dce5b1bea57</dri:objIdentifier>
<dri:recordIdentifier>12d8f77e-d66f-46f5-8d88-af7db23bc4c9</dri:recordIdentifier>
<dri:dateOfCollection>2023-09-08T10:12:35.864+02:00</dri:dateOfCollection>
<oaf:datasourceprefix>alessia_____</oaf:datasourceprefix>
<dr:dateOfTransformation>2023-09-08T11:31:45.692+02:00</dr:dateOfTransformation>
</oai:header>
<oai:metadata>
<datacite:resource
xmlns:datacite="http://datacite.org/schema/kernel-4"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://datacite.org/schema/kernel-4 http://schema.datacite.org/meta/kernel-4/metadata.xsd">
<datacite:identifier identifierType="URL">http://data.d4science.org/ctlg/ResourceCatalogue/visual_analytics_for_data_scientists</datacite:identifier>
<datacite:alternateIdentifiers/>
<datacite:creators>
<datacite:creator>
<datacite:creatorName>BRAGHIERI MARCO</datacite:creatorName>
</datacite:creator>
</datacite:creators>
<datacite:titles>
<datacite:title>Visual Analytics for Data Scientists</datacite:title>
</datacite:titles>
<datacite:publisher>SoBigData++</datacite:publisher>
<datacite:publicationYear/>
<datacite:dates>
<datacite:date dateType="Issued"/>
</datacite:dates>
<datacite:resourceType resourceTypeGeneral="TrainingMaterial">TrainingMaterial</datacite:resourceType>
<datacite:descriptions>
<datacite:description descriptionType="Abstract">Participants to this module shall
- Learn the principles and rules underlying the design of visual data
representations and human-computer interactions
- Understand, adapt and apply representative visual analytics methods and systems for diverse types
of data and problems
- Analyse and evaluate the structure and properties
of data to select or devise appropriate methods for data exploration
- Combine visualization, interactive techniques, and computational
processing to develop practical data analysis for problem solving
(This teaching material on Visual Analytics for Data Scientists is part of a MSc module at City University London).
The author did not intend to violate any copyright on figures or content. In case you are the legal owner of any copyrighted content, please contact info@sobigdata.eu and we will immediately remove it</datacite:description>
</datacite:descriptions>
<datacite:subjects>
<datacite:subject>Visual analytics</datacite:subject>
</datacite:subjects>
<datacite:formats>
<datacite:format>Slides</datacite:format>
<datacite:format>Other</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>PDF</datacite:format>
<datacite:format>ZIP</datacite:format>
</datacite:formats>
</datacite:resource>
<oaf:accessrights>OPEN</oaf:accessrights>
<dr:CobjCategory type="other">0010</dr:CobjCategory>
<oaf:dateAccepted/>
<oaf:hostedBy id="alessia_____::alessia" name="Alessia"/>
<oaf:collectedFrom id="alessia_____::alessia" name="Alessia"/>
<oaf:license>other-open</oaf:license>
<oaf:projectid>corda__h2020::871042</oaf:projectid>
</oai:metadata>
<about xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
<originDescription altered="true" harvestDate="2023-09-08T10:12:35.864+02:00">
<baseURL>https%3A%2F%2Fapi.d4science.org%2Fcatalogue%2Fitems</baseURL>
<identifier/>
<datestamp/>
<metadataNamespace/>
</originDescription>
</provenance>
<oaf:datainfo>
<oaf:inferred>false</oaf:inferred>
<oaf:deletedbyinference>false</oaf:deletedbyinference>
<oaf:trust>0.9</oaf:trust>
<oaf:inferenceprovenance/>
<oaf:provenanceaction classid="sysimport:crosswalk"
classname="Harvested" schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
</oaf:datainfo>
</about>
</oai:record>

View File

@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<oai:record xmlns:dr="http://www.driver-repository.eu/namespace/dr"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:oaf="http://namespace.openaire.eu/oaf" xmlns:oai="http://www.openarchives.org/OAI/2.0/">
<oai:header>
<dri:objIdentifier>alessia_____::028879484548f4e1c630e1c503e35231</dri:objIdentifier>
<dri:recordIdentifier>4fed018e-c2ff-4afa-b7b5-1ca1beebf850</dri:recordIdentifier>
<dri:dateOfCollection>2023-09-08T12:14:27.615+02:00</dri:dateOfCollection>
<oaf:datasourceprefix>alessia_____</oaf:datasourceprefix>
<dr:dateOfTransformation>2023-09-08T12:14:51.7+02:00</dr:dateOfTransformation>
</oai:header>
<oai:metadata>
<datacite:resource
xmlns:datacite="http://datacite.org/schema/kernel-4"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://datacite.org/schema/kernel-4 http://schema.datacite.org/meta/kernel-4/metadata.xsd">
<datacite:identifier identifierType="URL">http://data.d4science.org/ctlg/ResourceCatalogue/city-to-city_migration</datacite:identifier>
<datacite:alternateIdentifiers>
<datacite:alternateIdentifier type="URL"/>
</datacite:alternateIdentifiers>
<datacite:creators>
<datacite:creator>
<datacite:creatorName>Pappalardo, Luca</datacite:creatorName>
<datacite:affiliation/>
<datacite:nameIdentifier nameIdentifierScheme="ORCID" schemeURI="http://orcid.org">0000-0002-1547-6007</datacite:nameIdentifier>
</datacite:creator>
</datacite:creators>
<datacite:titles>
<datacite:title>City-to-city migration</datacite:title>
</datacite:titles>
<datacite:publisher>SoBigData++</datacite:publisher>
<datacite:publicationYear/>
<datacite:dates>
<datacite:date dateType="Issued">2018-02-15</datacite:date>
</datacite:dates>
<datacite:resourceType resourceTypeGeneral="Dataset">Dataset</datacite:resourceType>
<datacite:descriptions>
<datacite:description descriptionType="Abstract">Census data recording the migration of people between metropolitan areas in
the US</datacite:description>
</datacite:descriptions>
<datacite:subjects>
<datacite:subject>Human Mobility data</datacite:subject>
</datacite:subjects>
<datacite:formats/>
</datacite:resource>
<oaf:accessrights>OPEN</oaf:accessrights>
<dr:CobjCategory type="dataset">0021</dr:CobjCategory>
<oaf:dateAccepted>2018-02-15</oaf:dateAccepted>
<oaf:hostedBy id="alessia_____::alessia" name="Alessia"/>
<oaf:collectedFrom id="alessia_____::alessia" name="Alessia"/>
<oaf:license>AFL-3.0</oaf:license>
<oaf:projectid>corda__h2020::871042</oaf:projectid>
</oai:metadata>
<about xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
<originDescription altered="true" harvestDate="2023-09-08T12:14:27.615+02:00">
<baseURL>https%3A%2F%2Fapi.d4science.org%2Fcatalogue%2Fitems</baseURL>
<identifier/>
<datestamp/>
<metadataNamespace/>
</originDescription>
</provenance>
<oaf:datainfo>
<oaf:inferred>false</oaf:inferred>
<oaf:deletedbyinference>false</oaf:deletedbyinference>
<oaf:trust>0.9</oaf:trust>
<oaf:inferenceprovenance/>
<oaf:provenanceaction classid="sysimport:crosswalk"
classname="Harvested" schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
</oaf:datainfo>
</about>
</oai:record>

View File

@ -24,10 +24,7 @@ import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.*;
public class XmlRecordFactoryTest {
@ -196,4 +193,51 @@ public class XmlRecordFactoryTest {
assertEquals("dnet:pid_types", ((Element) pids.get(0)).attribute("schemeid").getValue());
assertEquals("dnet:pid_types", ((Element) pids.get(0)).attribute("schemename").getValue());
}
@Test
public void testD4ScienceTraining() throws DocumentException, IOException {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER
.readValue(
IOUtils.toString(getClass().getResourceAsStream("d4science-1-training.json")),
OtherResearchProduct.class);
final String xml = xmlRecordFactory.build(new JoinedEntity<>(p));
assertNotNull(xml);
final Document doc = new SAXReader().read(new StringReader(xml));
assertNotNull(doc);
System.out.println(doc.asXML());
}
@Test
public void testD4ScienceDataset() throws DocumentException, IOException {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
final OtherResearchProduct p = OBJECT_MAPPER
.readValue(
IOUtils.toString(getClass().getResourceAsStream("d4science-2-dataset.json")),
OtherResearchProduct.class);
final String xml = xmlRecordFactory.build(new JoinedEntity<>(p));
assertNotNull(xml);
final Document doc = new SAXReader().read(new StringReader(xml));
assertNotNull(doc);
System.out.println(doc.asXML());
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -39,7 +39,8 @@
<switch>
<!-- The default will be set as the normal start, a.k.a. get-doi-synonyms -->
<!-- If any different condition is set, go to the corresponding start -->
<case to="non-iterative-rankings">${wf:conf('resume') eq "rankings-start"}</case>
<case to="spark-cc">${wf:conf('resume') eq "cc"}</case>
<case to="spark-ram">${wf:conf('resume') eq "ram"}</case>
<case to="spark-impulse">${wf:conf('resume') eq "impulse"}</case>
<case to="spark-pagerank">${wf:conf('resume') eq "pagerank"}</case>
<case to="spark-attrank">${wf:conf('resume') eq "attrank"}</case>
@ -89,18 +90,11 @@
<file>${nameNode}${wfAppPath}/create_openaire_ranking_graph.py#create_openaire_ranking_graph.py</file>
</spark>
<ok to="non-iterative-rankings" />
<ok to="spark-cc"/>
<error to="openaire-graph-error" />
</action>
<!-- Citation Count and RAM are calculated in parallel-->
<fork name="non-iterative-rankings">
<path start="spark-cc"/>
<!-- <path start="spark-impulse"/> -->
<path start="spark-ram"/>
</fork>
<!-- Run Citation Count calculation -->
<action name="spark-cc">
<spark xmlns="uri:oozie:spark-action:0.2">
@ -129,7 +123,7 @@
<file>${wfAppPath}/bip-ranker/CC.py#CC.py</file>
</spark>
<ok to="join-non-iterative-rankings" />
<ok to="spark-ram" />
<error to="cc-fail" />
</action>
@ -165,14 +159,11 @@
<file>${wfAppPath}/bip-ranker/TAR.py#TAR.py</file>
</spark>
<ok to="join-non-iterative-rankings" />
<ok to="spark-impulse" />
<error to="ram-fail" />
</action>
<!-- Join non-iterative methods -->
<join name="join-non-iterative-rankings" to="spark-impulse"/>
<action name="spark-impulse">
<spark xmlns="uri:oozie:spark-action:0.2">

10
pom.xml
View File

@ -112,6 +112,16 @@
<url>https://maven.d4science.org/nexus/content/repositories/dnet-deps</url>
<layout>default</layout>
</repository>
<repository>
<id>maven-restlet</id>
<name>Restlet repository</name>
<url>https://maven.restlet.talend.com</url>
</repository>
<repository>
<id>conjars</id>
<name>conjars</name>
<url>https://conjars.wensel.net/repo/</url>
</repository>
</repositories>
<dependencies>