diff --git a/.gitignore b/.gitignore index 73d9179fa..14cd4d345 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,4 @@ spark-warehouse /**/*.log /**/.factorypath /**/.scalafmt.conf +/.java-version diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java deleted file mode 100644 index cf0a183d7..000000000 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java +++ /dev/null @@ -1,97 +0,0 @@ - -package eu.dnetlib.dhp.oa.merge; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.Objects; -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.common.ModelSupport; - -public class DispatchEntitiesSparkJob { - - private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class); - - public static void main(String[] args) throws Exception { - - String jsonConfiguration = IOUtils - .toString( - Objects - .requireNonNull( - DispatchEntitiesSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json"))); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); - - String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible")); - log.info("filterInvisible: {}", filterInvisible); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> dispatchEntities(spark, inputPath, outputPath, filterInvisible)); - } - - private static void dispatchEntities( - SparkSession spark, - String inputPath, - String outputPath, - boolean filterInvisible) { - - Dataset df = spark.read().textFile(inputPath); - - ModelSupport.oafTypes.entrySet().parallelStream().forEach(entry -> { - String entityType = entry.getKey(); - Class clazz = entry.getValue(); - - final String entityPath = outputPath + "/" + entityType; - if (!entityType.equalsIgnoreCase("relation")) { - HdfsSupport.remove(entityPath, spark.sparkContext().hadoopConfiguration()); - Dataset entityDF = spark - .read() - .schema(Encoders.bean(clazz).schema()) - .json( - df - .filter((FilterFunction) s -> s.startsWith(clazz.getName())) - .map( - (MapFunction) s -> StringUtils.substringAfter(s, "|"), - Encoders.STRING())); - - if (filterInvisible) { - entityDF = entityDF.filter("dataInfo.invisible != true"); - } - - entityDF - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(entityPath); - } - }); - } -} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java index e652bd5b6..87510c108 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java @@ -2,36 +2,28 @@ package eu.dnetlib.dhp.oa.merge; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.when; -import java.io.IOException; -import java.util.List; -import java.util.Objects; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.*; -import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.jayway.jsonpath.Configuration; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import com.jayway.jsonpath.Option; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; @@ -39,13 +31,9 @@ import scala.Tuple2; * Groups the graph content by entity identifier to ensure ID uniqueness */ public class GroupEntitiesSparkJob { - private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); - private static final String ID_JPATH = "$.id"; - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private static final Encoder OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class); public static void main(String[] args) throws Exception { @@ -66,9 +54,15 @@ public class GroupEntitiesSparkJob { String graphInputPath = parser.get("graphInputPath"); log.info("graphInputPath: {}", graphInputPath); + String checkpointPath = parser.get("checkpointPath"); + log.info("checkpointPath: {}", checkpointPath); + String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); + boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible")); + log.info("filterInvisible: {}", filterInvisible); + SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(ModelSupport.getOafModelClasses()); @@ -78,126 +72,95 @@ public class GroupEntitiesSparkJob { isSparkSessionManaged, spark -> { HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - groupEntities(spark, graphInputPath, outputPath); + groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible); }); } private static void groupEntities( SparkSession spark, String inputPath, - String outputPath) { + String checkpointPath, + String outputPath, + boolean filterInvisible) { - final TypedColumn aggregator = new GroupingAggregator().toColumn(); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - spark - .read() - .textFile(toSeq(listEntityPaths(inputPath, sc))) - .map((MapFunction) GroupEntitiesSparkJob::parseOaf, Encoders.kryo(OafEntity.class)) - .filter((FilterFunction) e -> StringUtils.isNotBlank(ModelSupport.idFn().apply(e))) - .groupByKey((MapFunction) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) - .agg(aggregator) + Dataset allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC); + + for (Map.Entry e : ModelSupport.entityTypes.entrySet()) { + String entity = e.getKey().name(); + Class entityClass = e.getValue(); + String entityInputPath = inputPath + "/" + entity; + + if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) { + continue; + } + + allEntities = allEntities + .union( + ((Dataset) spark + .read() + .schema(Encoders.bean(entityClass).schema()) + .json(entityInputPath) + .filter("length(id) > 0") + .as(Encoders.bean(entityClass))) + .map((MapFunction) r -> r, OAFENTITY_KRYO_ENC)); + } + + Dataset groupedEntities = allEntities + .groupByKey((MapFunction) OafEntity::getId, Encoders.STRING()) + .reduceGroups((ReduceFunction) (b, a) -> OafMapperUtils.mergeEntities(b, a)) .map( - (MapFunction, String>) t -> t._2().getClass().getName() + - "|" + OBJECT_MAPPER.writeValueAsString(t._2()), - Encoders.STRING()) + (MapFunction, Tuple2>) t -> new Tuple2( + t._2().getClass().getName(), t._2()), + Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC)); + + // pivot on "_1" (classname of the entity) + // created columns containing only entities of the same class + for (Map.Entry e : ModelSupport.entityTypes.entrySet()) { + String entity = e.getKey().name(); + Class entityClass = e.getValue(); + + groupedEntities = groupedEntities + .withColumn( + entity, + when(col("_1").equalTo(entityClass.getName()), col("_2"))); + } + + groupedEntities + .drop("_1", "_2") .write() - .option("compression", "gzip") .mode(SaveMode.Overwrite) - .text(outputPath); - } + .option("compression", "gzip") + .save(checkpointPath); - public static class GroupingAggregator extends Aggregator { + ForkJoinPool parPool = new ForkJoinPool(ModelSupport.entityTypes.size()); - @Override - public OafEntity zero() { - return null; - } - - @Override - public OafEntity reduce(OafEntity b, OafEntity a) { - return mergeAndGet(b, a); - } - - private OafEntity mergeAndGet(OafEntity b, OafEntity a) { - if (Objects.nonNull(a) && Objects.nonNull(b)) { - return OafMapperUtils.mergeEntities(b, a); - } - return Objects.isNull(a) ? b : a; - } - - @Override - public OafEntity merge(OafEntity b, OafEntity a) { - return mergeAndGet(b, a); - } - - @Override - public OafEntity finish(OafEntity j) { - return j; - } - - @Override - public Encoder bufferEncoder() { - return Encoders.kryo(OafEntity.class); - } - - @Override - public Encoder outputEncoder() { - return Encoders.kryo(OafEntity.class); - } - - } - - private static OafEntity parseOaf(String s) { - - DocumentContext dc = JsonPath - .parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS)); - final String id = dc.read(ID_JPATH); - if (StringUtils.isNotBlank(id)) { - - String prefix = StringUtils.substringBefore(id, "|"); - switch (prefix) { - case "10": - return parse(s, Datasource.class); - case "20": - return parse(s, Organization.class); - case "40": - return parse(s, Project.class); - case "50": - String resultType = dc.read("$.resulttype.classid"); - switch (resultType) { - case "publication": - return parse(s, Publication.class); - case "dataset": - return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class); - case "software": - return parse(s, Software.class); - case "other": - return parse(s, OtherResearchProduct.class); - default: - throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType)); - } - default: - throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix)); - } - } else { - throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s)); - } - } - - private static OafEntity parse(String s, Class clazz) { - try { - return OBJECT_MAPPER.readValue(s, clazz); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - } - - private static List listEntityPaths(String inputPath, JavaSparkContext sc) { - return HdfsSupport - .listFiles(inputPath, sc.hadoopConfiguration()) + ModelSupport.entityTypes + .entrySet() .stream() - .filter(f -> !f.toLowerCase().contains("relation")) - .collect(Collectors.toList()); - } + .map(e -> parPool.submit(() -> { + String entity = e.getKey().name(); + Class entityClass = e.getValue(); + spark + .read() + .load(checkpointPath) + .select(col(entity).as("value")) + .filter("value IS NOT NULL") + .as(OAFENTITY_KRYO_ENC) + .map((MapFunction) r -> r, (Encoder) Encoders.bean(entityClass)) + .filter(filterInvisible ? "dataInfo.invisible != TRUE" : "TRUE") + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/" + entity); + })) + .collect(Collectors.toList()) + .forEach(t -> { + try { + t.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } } diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json deleted file mode 100644 index 60f11ac84..000000000 --- a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json +++ /dev/null @@ -1,26 +0,0 @@ -[ - { - "paramName": "issm", - "paramLongName": "isSparkSessionManaged", - "paramDescription": "when true will stop SparkSession after job execution", - "paramRequired": false - }, - { - "paramName": "i", - "paramLongName": "inputPath", - "paramDescription": "the source path", - "paramRequired": true - }, - { - "paramName": "o", - "paramLongName": "outputPath", - "paramDescription": "path of the output graph", - "paramRequired": true - }, - { - "paramName": "fi", - "paramLongName": "filterInvisible", - "paramDescription": "if true filters out invisible entities", - "paramRequired": true - } -] \ No newline at end of file diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json index e65acb3c4..58e3ca711 100644 --- a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json +++ b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json @@ -8,13 +8,25 @@ { "paramName": "gin", "paramLongName": "graphInputPath", - "paramDescription": "the graph root path", + "paramDescription": "the input graph root path", + "paramRequired": true + }, + { + "paramName": "cp", + "paramLongName": "checkpointPath", + "paramDescription": "checkpoint directory", "paramRequired": true }, { "paramName": "out", "paramLongName": "outputPath", - "paramDescription": "the output merged graph root path", + "paramDescription": "the output graph root path", + "paramRequired": true + }, + { + "paramName": "fi", + "paramLongName": "filterInvisible", + "paramDescription": "if true filters out invisible entities", "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala index 95325ace0..aa997c6e9 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala @@ -81,7 +81,7 @@ case class SparkModel(conf: DedupConfig) { MapDocumentUtil.truncateList( MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType), fdef.getSize - ).toArray + ).asScala case Type.StringConcat => val jpaths = CONCAT_REGEX.split(fdef.getPath) diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java index 84d49bd5c..cfd9acd70 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java @@ -1,6 +1,23 @@ package eu.dnetlib.pace.util; +/* + * Diff Match and Patch + * Copyright 2018 The diff-match-patch Authors. + * https://github.com/google/diff-match-patch + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ /* * Diff Match and Patch * Copyright 2018 The diff-match-patch Authors. diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/MapDocumentUtil.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/MapDocumentUtil.java index a59b6248b..28244cb3b 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/MapDocumentUtil.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/MapDocumentUtil.java @@ -117,6 +117,11 @@ public class MapDocumentUtil { return result; } + if (type == Type.List && jresult instanceof List) { + ((List) jresult).forEach(x -> result.add(x.toString())); + return result; + } + if (jresult instanceof JSONArray) { ((JSONArray) jresult).forEach(it -> { try { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java index 4c658e52f..e3a9833b3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -12,6 +12,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; @@ -30,12 +31,16 @@ import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; public class CreateActionSetSparkJob implements Serializable { public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations"; public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations"; - private static final String ID_PREFIX = "50|doi_________::"; + private static final String DOI_PREFIX = "50|doi_________::"; + + private static final String PMID_PREFIX = "50|pmid________::"; private static final String TRUST = "0.91"; private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class); @@ -84,33 +89,55 @@ public class CreateActionSetSparkJob implements Serializable { private static void extractContent(SparkSession spark, String inputPath, String outputPath, boolean shouldDuplicateRels) { - spark + + getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, "COCI") + .union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, "POCI")) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + + } + + private static JavaPairRDD getTextTextJavaPairRDD(SparkSession spark, String inputPath, + boolean shouldDuplicateRels, String prefix) { + return spark .read() - .textFile(inputPath + "/*") + .textFile(inputPath + "/" + prefix + "/" + prefix + "_JSON/*") .map( (MapFunction) value -> OBJECT_MAPPER.readValue(value, COCI.class), Encoders.bean(COCI.class)) .flatMap( - (FlatMapFunction) value -> createRelation(value, shouldDuplicateRels).iterator(), + (FlatMapFunction) value -> createRelation( + value, shouldDuplicateRels, prefix) + .iterator(), Encoders.bean(Relation.class)) .filter((FilterFunction) value -> value != null) .toJavaRDD() .map(p -> new AtomicAction(p.getClass(), p)) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); - + new Text(OBJECT_MAPPER.writeValueAsString(aa)))); } - private static List createRelation(COCI value, boolean duplicate) { + private static List createRelation(COCI value, boolean duplicate, String p) { List relationList = new ArrayList<>(); + String prefix; + String citing; + String cited; + if (p.equals("COCI")) { + prefix = DOI_PREFIX; + citing = prefix + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting())); + cited = prefix + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited())); - String citing = ID_PREFIX - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting())); - final String cited = ID_PREFIX - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited())); + } else { + prefix = PMID_PREFIX; + citing = prefix + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("pmid", value.getCiting())); + cited = prefix + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("pmid", value.getCited())); + + } if (!citing.equals(cited)) { relationList @@ -120,7 +147,7 @@ public class CreateActionSetSparkJob implements Serializable { cited, ModelConstants.CITES)); if (duplicate && value.getCiting().endsWith(".refs")) { - citing = ID_PREFIX + IdentifierFactory + citing = prefix + IdentifierFactory .md5( CleaningFunctions .normalizePidValue( @@ -132,59 +159,30 @@ public class CreateActionSetSparkJob implements Serializable { return relationList; } - private static Collection getRelations(String citing, String cited) { - - return Arrays - .asList( - getRelation(citing, cited, ModelConstants.CITES), - getRelation(cited, citing, ModelConstants.IS_CITED_BY)); - } - public static Relation getRelation( String source, String target, String relclass) { - Relation r = new Relation(); - r.setCollectedfrom(getCollectedFrom()); - r.setSource(source); - r.setTarget(target); - r.setRelClass(relclass); - r.setRelType(ModelConstants.RESULT_RESULT); - r.setSubRelType(ModelConstants.CITATION); - r - .setDataInfo( - getDataInfo()); - return r; + + return OafMapperUtils + .getRelation( + source, + target, + ModelConstants.RESULT_RESULT, + ModelConstants.CITATION, + relclass, + Arrays + .asList( + OafMapperUtils.keyValue(ModelConstants.OPENOCITATIONS_ID, ModelConstants.OPENOCITATIONS_NAME)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), + TRUST), + null); + } - - public static List getCollectedFrom() { - KeyValue kv = new KeyValue(); - kv.setKey(ModelConstants.OPENOCITATIONS_ID); - kv.setValue(ModelConstants.OPENOCITATIONS_NAME); - - return Arrays.asList(kv); - } - - public static DataInfo getDataInfo() { - DataInfo di = new DataInfo(); - di.setInferred(false); - di.setDeletedbyinference(false); - di.setTrust(TRUST); - - di - .setProvenanceaction( - getQualifier(OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS)); - return di; - } - - public static Qualifier getQualifier(String class_id, String class_name, - String qualifierSchema) { - Qualifier pa = new Qualifier(); - pa.setClassid(class_id); - pa.setClassname(class_name); - pa.setSchemeid(qualifierSchema); - pa.setSchemename(qualifierSchema); - return pa; - } - } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java index 3530c9980..60dc998ef 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java @@ -45,6 +45,9 @@ public class GetOpenCitationsRefs implements Serializable { final String hdfsNameNode = parser.get("hdfsNameNode"); log.info("hdfsNameNode {}", hdfsNameNode); + final String prefix = parser.get("prefix"); + log.info("prefix {}", prefix); + Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsNameNode); @@ -53,30 +56,31 @@ public class GetOpenCitationsRefs implements Serializable { GetOpenCitationsRefs ocr = new GetOpenCitationsRefs(); for (String file : inputFile) { - ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem); + ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem, prefix); } } - private void doExtract(String inputFile, String workingPath, FileSystem fileSystem) + private void doExtract(String inputFile, String workingPath, FileSystem fileSystem, String prefix) throws IOException { final Path path = new Path(inputFile); FSDataInputStream oc_zip = fileSystem.open(path); - int count = 1; + // int count = 1; try (ZipInputStream zis = new ZipInputStream(oc_zip)) { ZipEntry entry = null; while ((entry = zis.getNextEntry()) != null) { if (!entry.isDirectory()) { String fileName = entry.getName(); - fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count; - count++; + // fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count; + fileName = fileName.substring(0, fileName.lastIndexOf(".")); + // count++; try ( FSDataOutputStream out = fileSystem - .create(new Path(workingPath + "/COCI/" + fileName + ".gz")); + .create(new Path(workingPath + "/" + prefix + "/" + fileName + ".gz")); GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) { IOUtils.copy(zis, gzipOs); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java index 4293ca187..3d384de9d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java @@ -49,6 +49,9 @@ public class ReadCOCI implements Serializable { final String workingPath = parser.get("workingPath"); log.info("workingPath {}", workingPath); + final String format = parser.get("format"); + log.info("format {}", format); + SparkConf sconf = new SparkConf(); final String delimiter = Optional @@ -64,13 +67,14 @@ public class ReadCOCI implements Serializable { workingPath, inputFile, outputPath, - delimiter); + delimiter, + format); }); } private static void doRead(SparkSession spark, String workingPath, String[] inputFiles, String outputPath, - String delimiter) throws IOException { + String delimiter, String format) throws IOException { for (String inputFile : inputFiles) { String p_string = workingPath + "/" + inputFile + ".gz"; @@ -87,9 +91,15 @@ public class ReadCOCI implements Serializable { cociData.map((MapFunction) row -> { COCI coci = new COCI(); + if (format.equals("COCI")) { + coci.setCiting(row.getString(1)); + coci.setCited(row.getString(2)); + } else { + coci.setCiting(String.valueOf(row.getInt(1))); + coci.setCited(String.valueOf(row.getInt(2))); + } coci.setOci(row.getString(0)); - coci.setCiting(row.getString(1)); - coci.setCited(row.getString(2)); + return coci; }, Encoders.bean(COCI.class)) .write() diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json index 308e02026..e25d1f4b8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json @@ -21,5 +21,10 @@ "paramLongName": "shouldDuplicateRels", "paramDescription": "the hdfs name node", "paramRequired": false +},{ + "paramName": "p", + "paramLongName": "prefix", + "paramDescription": "the hdfs name node", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json index 4910ad11d..96db7eeb7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json @@ -16,5 +16,11 @@ "paramLongName": "hdfsNameNode", "paramDescription": "the hdfs name node", "paramRequired": true + }, + { + "paramName": "p", + "paramLongName": "prefix", + "paramDescription": "COCI or POCI", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json index b57cb5d9a..fa840089d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json @@ -30,7 +30,12 @@ "paramLongName": "inputFile", "paramDescription": "the hdfs name node", "paramRequired": true - } + }, { + "paramName": "f", + "paramLongName": "format", + "paramDescription": "the hdfs name node", + "paramRequired": true +} ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml index 0f01039f7..d87dfa2ba 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -46,7 +46,7 @@ download.sh ${filelist} - ${workingPath}/Original + ${workingPath}/${prefix}/Original HADOOP_USER_NAME=${wf:user()} download.sh @@ -59,7 +59,8 @@ eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs --hdfsNameNode${nameNode} --inputFile${inputFile} - --workingPath${workingPath} + --workingPath${workingPath}/${prefix} + --prefix${prefix} @@ -82,10 +83,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --workingPath${workingPath}/COCI - --outputPath${workingPath}/COCI_JSON/ + --workingPath${workingPath}/${prefix}/${prefix} + --outputPath${workingPath}/${prefix}/${prefix}_JSON/ --delimiter${delimiter} --inputFile${inputFileCoci} + --format${prefix} @@ -108,8 +110,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --inputPath${workingPath}/COCI_JSON + --inputPath${workingPath} --outputPath${outputPath} + --prefix${prefix} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java index a6fa2b1a1..67468c6f9 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java @@ -2,7 +2,9 @@ package eu.dnetlib.dhp.broker.oa.util; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import org.apache.commons.io.IOUtils; import org.apache.spark.sql.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,10 +29,14 @@ public class TrustUtils { static { mapper = new ObjectMapper(); try { - dedupConfig = mapper - .readValue( - DedupConfig.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"), - DedupConfig.class); + dedupConfig = DedupConfig + .load( + IOUtils + .toString( + DedupConfig.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"), + StandardCharsets.UTF_8)); + deduper = new SparkDeduper(dedupConfig); } catch (final IOException e) { log.error("Error loading dedupConfig, e"); @@ -57,7 +63,7 @@ public class TrustUtils { return TrustUtils.rescale(score, threshold); } catch (final Exception e) { log.error("Error computing score between results", e); - return BrokerConstants.MIN_TRUST; + throw new RuntimeException(e); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java deleted file mode 100644 index 96d783dbf..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java +++ /dev/null @@ -1,57 +0,0 @@ - -package eu.dnetlib.dhp.oa.dedup; - -import java.util.Objects; - -import org.apache.spark.sql.Encoder; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.expressions.Aggregator; - -import eu.dnetlib.dhp.schema.oaf.Relation; - -public class RelationAggregator extends Aggregator { - - private static final Relation ZERO = new Relation(); - - @Override - public Relation zero() { - return ZERO; - } - - @Override - public Relation reduce(Relation b, Relation a) { - return mergeRel(b, a); - } - - @Override - public Relation merge(Relation b, Relation a) { - return mergeRel(b, a); - } - - @Override - public Relation finish(Relation r) { - return r; - } - - private Relation mergeRel(Relation b, Relation a) { - if (Objects.equals(b, ZERO)) { - return a; - } - if (Objects.equals(a, ZERO)) { - return b; - } - - b.mergeFrom(a); - return b; - } - - @Override - public Encoder bufferEncoder() { - return Encoders.kryo(Relation.class); - } - - @Override - public Encoder outputEncoder() { - return Encoders.kryo(Relation.class); - } -} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala deleted file mode 100644 index 5d8da42c2..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala +++ /dev/null @@ -1,78 +0,0 @@ -package eu.dnetlib.dhp.oa.dedup - -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.common.HdfsSupport -import eu.dnetlib.dhp.schema.oaf.Relation -import eu.dnetlib.dhp.utils.ISLookupClientFactory -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService -import org.apache.commons.io.IOUtils -import org.apache.spark.SparkConf -import org.apache.spark.sql._ -import org.apache.spark.sql.functions.col -import org.apache.spark.sql.types.{DataTypes, StructField, StructType} -import org.slf4j.LoggerFactory - -object SparkCleanRelation { - private val log = LoggerFactory.getLogger(classOf[SparkCleanRelation]) - - @throws[Exception] - def main(args: Array[String]): Unit = { - val parser = new ArgumentApplicationParser( - IOUtils.toString( - classOf[SparkCleanRelation].getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json") - ) - ) - parser.parseArgument(args) - val conf = new SparkConf - - new SparkCleanRelation(parser, AbstractSparkAction.getSparkSession(conf)) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))) - } -} - -class SparkCleanRelation(parser: ArgumentApplicationParser, spark: SparkSession) - extends AbstractSparkAction(parser, spark) { - override def run(isLookUpService: ISLookUpService): Unit = { - val graphBasePath = parser.get("graphBasePath") - val inputPath = parser.get("inputPath") - val outputPath = parser.get("outputPath") - - SparkCleanRelation.log.info("graphBasePath: '{}'", graphBasePath) - SparkCleanRelation.log.info("inputPath: '{}'", inputPath) - SparkCleanRelation.log.info("outputPath: '{}'", outputPath) - - AbstractSparkAction.removeOutputDir(spark, outputPath) - - val entities = - Seq("datasource", "project", "organization", "publication", "dataset", "software", "otherresearchproduct") - - val idsSchema = StructType.fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>") - - val emptyIds = spark.createDataFrame(spark.sparkContext.emptyRDD[Row].setName("empty"), - idsSchema) - - val ids = entities - .foldLeft(emptyIds)((ds, entity) => { - val entityPath = graphBasePath + '/' + entity - if (HdfsSupport.exists(entityPath, spark.sparkContext.hadoopConfiguration)) { - ds.union(spark.read.schema(idsSchema).json(entityPath)) - } else { - ds - } - }) - .filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true") - .select("id") - .distinct() - - val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(inputPath) - .filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true") - - AbstractSparkAction.save( - relations - .join(ids, col("source") === ids("id"), "leftsemi") - .join(ids, col("target") === ids("id"), "leftsemi"), - outputPath, - SaveMode.Overwrite - ) - } -} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 175ebf8a6..739295c91 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -3,23 +3,19 @@ package eu.dnetlib.dhp.oa.dedup; import static org.apache.spark.sql.functions.col; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.Objects; - -import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.*; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.DataInfo; @@ -70,73 +66,63 @@ public class SparkPropagateRelation extends AbstractSparkAction { log.info("workingPath: '{}'", workingPath); log.info("graphOutputPath: '{}'", graphOutputPath); - final String outputRelationPath = DedupUtility.createEntityPath(graphOutputPath, "relation"); - removeOutputDir(spark, outputRelationPath); - Dataset mergeRels = spark - .read() - .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) - .as(REL_BEAN_ENC); + .read() + .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) + .as(REL_BEAN_ENC); // - Dataset mergedIds = mergeRels - .where(col("relClass").equalTo(ModelConstants.MERGES)) - .select(col("source").as("dedupID"), col("target").as("mergedObjectID")) - .distinct() - .cache(); + Dataset idsToMerge = mergeRels + .where(col("relClass").equalTo(ModelConstants.MERGES)) + .select(col("source").as("dedupID"), col("target").as("mergedObjectID")) + .distinct(); Dataset allRels = spark - .read() - .schema(REL_BEAN_ENC.schema()) - .json(DedupUtility.createEntityPath(graphBasePath, "relation")); + .read() + .schema(REL_BEAN_ENC.schema()) + .json(graphBasePath + "/relation"); Dataset dedupedRels = allRels - .joinWith(mergedIds, allRels.col("source").equalTo(mergedIds.col("mergedObjectID")), "left_outer") - .joinWith(mergedIds, col("_1.target").equalTo(mergedIds.col("mergedObjectID")), "left_outer") - .select("_1._1", "_1._2.dedupID", "_2.dedupID") - .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) - .flatMap(SparkPropagateRelation::addInferredRelations, REL_KRYO_ENC); + .joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") + .joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") + .select("_1._1", "_1._2.dedupID", "_2.dedupID") + .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) + .map((MapFunction, Relation>) t -> { + Relation rel = t._1(); + String newSource = t._2(); + String newTarget = t._3(); - Dataset processedRelations = distinctRelations( - dedupedRels.union(mergeRels.map((MapFunction) r -> r, REL_KRYO_ENC))) - .filter((FilterFunction) r -> !Objects.equals(r.getSource(), r.getTarget())); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } - save(processedRelations, outputRelationPath, SaveMode.Overwrite); - } + if (newSource != null || newTarget != null) { + rel.getDataInfo().setDeletedbyinference(false); - private static Iterator addInferredRelations(Tuple3 t) throws Exception { - Relation existingRel = t._1(); - String newSource = t._2(); - String newTarget = t._3(); + if (newSource != null) + rel.setSource(newSource); - if (newSource == null && newTarget == null) { - return Collections.singleton(t._1()).iterator(); - } + if (newTarget != null) + rel.setTarget(newTarget); + } - // update existing relation - if (existingRel.getDataInfo() == null) { - existingRel.setDataInfo(new DataInfo()); - } - existingRel.getDataInfo().setDeletedbyinference(true); + return rel; + }, REL_BEAN_ENC); - // Create new relation inferred by dedupIDs - Relation inferredRel = (Relation) BeanUtils.cloneBean(existingRel); + // ids of records that are both not deletedbyinference and not invisible + Dataset ids = validIds(spark, graphBasePath); - inferredRel.setDataInfo((DataInfo) BeanUtils.cloneBean(existingRel.getDataInfo())); - inferredRel.getDataInfo().setDeletedbyinference(false); + // filter relations that point to valid records, can force them to be visible + Dataset cleanedRels = dedupedRels + .join(ids, col("source").equalTo(ids.col("id")), "leftsemi") + .join(ids, col("target").equalTo(ids.col("id")), "leftsemi") + .as(REL_BEAN_ENC) + .map((MapFunction) r -> { + r.getDataInfo().setInvisible(false); + return r; + }, REL_KRYO_ENC); - if (newSource != null) - inferredRel.setSource(newSource); - - if (newTarget != null) - inferredRel.setTarget(newTarget); - - return Arrays.asList(existingRel, inferredRel).iterator(); - } - - private Dataset distinctRelations(Dataset rels) { - return rels - .filter(getRelationFilterFunction()) + Dataset distinctRels = cleanedRels .groupByKey( (MapFunction) r -> String .join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), @@ -146,13 +132,33 @@ public class SparkPropagateRelation extends AbstractSparkAction { return b; }) .map((MapFunction, Relation>) Tuple2::_2, REL_BEAN_ENC); + + final String outputRelationPath = graphOutputPath + "/relation"; + removeOutputDir(spark, outputRelationPath); + save( + distinctRels + .union(mergeRels) + .filter("source != target AND dataInfo.deletedbyinference != true AND dataInfo.invisible != true"), + outputRelationPath, + SaveMode.Overwrite); } - private FilterFunction getRelationFilterFunction() { - return r -> StringUtils.isNotBlank(r.getSource()) || - StringUtils.isNotBlank(r.getTarget()) || - StringUtils.isNotBlank(r.getRelType()) || - StringUtils.isNotBlank(r.getSubRelType()) || - StringUtils.isNotBlank(r.getRelClass()); + static Dataset validIds(SparkSession spark, String graphBasePath) { + StructType idsSchema = StructType + .fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>"); + + Dataset allIds = spark.emptyDataset(RowEncoder.apply(idsSchema)); + + for (EntityType entityType : ModelSupport.entityTypes.keySet()) { + String entityPath = graphBasePath + '/' + entityType.name(); + if (HdfsSupport.exists(entityPath, spark.sparkContext().hadoopConfiguration())) { + allIds = allIds.union(spark.read().schema(idsSchema).json(entityPath)); + } + } + + return allIds + .filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true") + .select("id") + .distinct(); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json deleted file mode 100644 index 860539ad9..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json +++ /dev/null @@ -1,20 +0,0 @@ -[ - { - "paramName": "i", - "paramLongName": "graphBasePath", - "paramDescription": "the base path of raw graph", - "paramRequired": true - }, - { - "paramName": "w", - "paramLongName": "inputPath", - "paramDescription": "the path to the input relation to cleanup", - "paramRequired": true - }, - { - "paramName": "o", - "paramLongName": "outputPath", - "paramDescription": "the path of the output relation cleaned", - "paramRequired": true - } -] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index b724e5d0b..5e2fc0a01 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -100,35 +100,9 @@ --conf spark.sql.shuffle.partitions=15000 --graphBasePath${graphBasePath} - --graphOutputPath${workingPath}/propagaterelation/ + --graphOutputPath${graphOutputPath} --workingPath${workingPath} - - - - - - - yarn - cluster - Clean Relations - eu.dnetlib.dhp.oa.dedup.SparkCleanRelation - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --conf spark.executor.memoryOverhead=${sparkExecutorMemoryOverhead} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15000 - - --graphBasePath${graphBasePath} - --inputPath${workingPath}/propagaterelation/relation - --outputPath${graphOutputPath}/relation - @@ -152,31 +126,7 @@ --conf spark.sql.shuffle.partitions=15000 --graphInputPath${graphBasePath} - --outputPath${workingPath}/grouped_entities - - - - - - - - yarn - cluster - Dispatch grouped entitities - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --conf spark.executor.memoryOverhead=${sparkExecutorMemoryOverhead} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities + --checkpointPath${workingPath}/grouped_entities --outputPath${graphOutputPath} --filterInvisible${filterInvisible} diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 38bd72a5e..6c4935637 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.dedup; import static java.nio.file.Files.createTempDirectory; -import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.count; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.lenient; @@ -23,14 +22,13 @@ import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -46,8 +44,6 @@ import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.util.MapDocumentUtil; -import scala.Tuple2; @ExtendWith(MockitoExtension.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @@ -62,6 +58,8 @@ public class SparkDedupTest implements Serializable { private static String testGraphBasePath; private static String testOutputBasePath; private static String testDedupGraphBasePath; + private static String testConsistencyGraphBasePath; + private static final String testActionSetId = "test-orchestrator"; private static String whitelistPath; private static List whiteList; @@ -75,6 +73,7 @@ public class SparkDedupTest implements Serializable { .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()) .toFile() .getAbsolutePath(); + testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") .toAbsolutePath() .toString(); @@ -83,6 +82,10 @@ public class SparkDedupTest implements Serializable { .toAbsolutePath() .toString(); + testConsistencyGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + whitelistPath = Paths .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/whitelist.simrels.txt").toURI()) .toFile() @@ -674,22 +677,45 @@ public class SparkDedupTest implements Serializable { assertEquals(mergedOrp, deletedOrp); } + @Test + @Order(6) + void copyRelationsNoOpenorgsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyRelationsNoOpenorgs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath + }); + + new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService); + + final Dataset outputRels = spark.read().text(testDedupGraphBasePath + "/relation"); + + System.out.println(outputRels.count()); + // assertEquals(2382, outputRels.count()); + } + @Test @Order(7) void propagateRelationTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")); - String outputRelPath = testDedupGraphBasePath + "/propagaterelation"; parser .parseArgument( new String[] { - "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", outputRelPath + "-i", testDedupGraphBasePath, "-w", testOutputBasePath, "-o", testConsistencyGraphBasePath }); new SparkPropagateRelation(parser, spark).run(isLookUpService); - long relations = jsc.textFile(outputRelPath + "/relation").count(); + long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); // assertEquals(4860, relations); System.out.println("relations = " + relations); @@ -699,95 +725,52 @@ public class SparkDedupTest implements Serializable { .read() .load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")) .as(Encoders.bean(Relation.class)); - final JavaPairRDD mergedIds = mergeRels - .where("relClass == 'merges'") - .select(mergeRels.col("target")) - .distinct() - .toJavaRDD() - .mapToPair( - (PairFunction) r -> new Tuple2(r.getString(0), "d")); - JavaRDD toCheck = jsc - .textFile(outputRelPath + "/relation") - .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) - .join(mergedIds) - .map(t -> t._2()._1()) - .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json)) - .join(mergedIds) - .map(t -> t._2()._1()); + Dataset inputRels = spark + .read() + .json(testDedupGraphBasePath + "/relation"); - long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); - long updated = toCheck.count(); + Dataset outputRels = spark + .read() + .json(testConsistencyGraphBasePath + "/relation"); - assertEquals(updated, deletedbyinference); + assertEquals( + 0, outputRels + .filter("dataInfo.deletedbyinference == true OR dataInfo.invisible == true") + .count()); + + assertEquals( + 5, outputRels + .filter("relClass NOT IN ('merges', 'isMergedIn')") + .count()); + + assertEquals(5 + mergeRels.count(), outputRels.count()); } @Test @Order(8) - void testCleanBaseRelations() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")); - - // append dangling relations to be cleaned up + void testCleanedPropagatedRelations() throws Exception { Dataset df_before = spark .read() .schema(Encoders.bean(Relation.class).schema()) - .json(testGraphBasePath + "/relation"); - Dataset df_input = df_before - .unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a"))) - .unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a"))); - df_input.write().mode(SaveMode.Overwrite).json(testOutputBasePath + "_tmp"); - - parser - .parseArgument( - new String[] { - "--graphBasePath", testGraphBasePath, - "--inputPath", testGraphBasePath + "/relation", - "--outputPath", testDedupGraphBasePath + "/relation" - }); - - new SparkCleanRelation(parser, spark).run(isLookUpService); + .json(testDedupGraphBasePath + "/relation"); Dataset df_after = spark .read() .schema(Encoders.bean(Relation.class).schema()) - .json(testDedupGraphBasePath + "/relation"); - - assertNotEquals(df_before.count(), df_input.count()); - assertNotEquals(df_input.count(), df_after.count()); - assertEquals(5, df_after.count()); - } - - @Test - @Order(9) - void testCleanDedupedRelations() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")); - - String inputRelPath = testDedupGraphBasePath + "/propagaterelation/relation"; - - // append dangling relations to be cleaned up - Dataset df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputRelPath); - - df_before.filter(col("dataInfo.deletedbyinference").notEqual(true)).show(50, false); - - parser - .parseArgument( - new String[] { - "--graphBasePath", testGraphBasePath, - "--inputPath", inputRelPath, - "--outputPath", testDedupGraphBasePath + "/relation" - }); - - new SparkCleanRelation(parser, spark).run(isLookUpService); - - Dataset df_after = spark - .read() - .schema(Encoders.bean(Relation.class).schema()) - .json(testDedupGraphBasePath + "/relation"); + .json(testConsistencyGraphBasePath + "/relation"); assertNotEquals(df_before.count(), df_after.count()); - assertEquals(0, df_after.count()); + + assertEquals( + 0, df_after + .filter("dataInfo.deletedbyinference == true OR dataInfo.invisible == true") + .count()); + + assertEquals( + 5, df_after + .filter("relClass NOT IN ('merges', 'isMergedIn')") + .count()); } @Test @@ -813,6 +796,7 @@ public class SparkDedupTest implements Serializable { public static void finalCleanUp() throws IOException { FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + FileUtils.deleteDirectory(new File(testConsistencyGraphBasePath)); } public boolean isDeletedByInference(String s) { diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java index a0bf6b37e..73e768cf1 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup; import static java.nio.file.Files.createTempDirectory; +import static org.apache.spark.sql.functions.col; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.lenient; @@ -15,10 +16,6 @@ import java.nio.file.Paths; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -33,8 +30,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.util.MapDocumentUtil; -import scala.Tuple2; @ExtendWith(MockitoExtension.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @@ -44,11 +39,11 @@ public class SparkOpenorgsProvisionTest implements Serializable { ISLookUpService isLookUpService; private static SparkSession spark; - private static JavaSparkContext jsc; private static String testGraphBasePath; private static String testOutputBasePath; private static String testDedupGraphBasePath; + private static String testConsistencyGraphBasePath; private static final String testActionSetId = "test-orchestrator"; @BeforeAll @@ -64,6 +59,9 @@ public class SparkOpenorgsProvisionTest implements Serializable { testDedupGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-") .toAbsolutePath() .toString(); + testConsistencyGraphBasePath = createTempDirectory(SparkOpenorgsProvisionTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); @@ -76,8 +74,13 @@ public class SparkOpenorgsProvisionTest implements Serializable { .master("local[*]") .config(conf) .getOrCreate(); + } - jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + @AfterAll + public static void finalCleanUp() throws IOException { + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + FileUtils.deleteDirectory(new File(testConsistencyGraphBasePath)); } @BeforeEach @@ -186,26 +189,21 @@ public class SparkOpenorgsProvisionTest implements Serializable { new SparkUpdateEntity(parser, spark).run(isLookUpService); - long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count(); + Dataset organizations = spark.read().json(testDedupGraphBasePath + "/organization"); - long mergedOrgs = spark + Dataset mergedOrgs = spark .read() .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") - .as(Encoders.bean(Relation.class)) .where("relClass=='merges'") - .javaRDD() - .map(Relation::getTarget) - .distinct() - .count(); + .select("target") + .distinct(); - assertEquals(80, organizations); + assertEquals(80, organizations.count()); - long deletedOrgs = jsc - .textFile(testDedupGraphBasePath + "/organization") - .filter(this::isDeletedByInference) - .count(); + Dataset deletedOrgs = organizations + .filter("dataInfo.deletedbyinference = TRUE"); - assertEquals(mergedOrgs, deletedOrgs); + assertEquals(mergedOrgs.count(), deletedOrgs.count()); } @Test @@ -226,10 +224,9 @@ public class SparkOpenorgsProvisionTest implements Serializable { new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService); - final JavaRDD rels = jsc.textFile(testDedupGraphBasePath + "/relation"); - - assertEquals(2382, rels.count()); + final Dataset outputRels = spark.read().text(testDedupGraphBasePath + "/relation"); + assertEquals(2382, outputRels.count()); } @Test @@ -244,51 +241,41 @@ public class SparkOpenorgsProvisionTest implements Serializable { parser .parseArgument( new String[] { - "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath + "-i", testDedupGraphBasePath, "-w", testOutputBasePath, "-o", testConsistencyGraphBasePath }); new SparkPropagateRelation(parser, spark).run(isLookUpService); - long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - - assertEquals(4896, relations); - - // check deletedbyinference final Dataset mergeRels = spark .read() .load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*")) .as(Encoders.bean(Relation.class)); - final JavaPairRDD mergedIds = mergeRels + + Dataset inputRels = spark + .read() + .json(testDedupGraphBasePath + "/relation"); + + Dataset outputRels = spark + .read() + .json(testConsistencyGraphBasePath + "/relation"); + + final Dataset mergedIds = mergeRels .where("relClass == 'merges'") - .select(mergeRels.col("target")) - .distinct() - .toJavaRDD() - .mapToPair( - (PairFunction) r -> new Tuple2(r.getString(0), "d")); + .select(col("target").as("id")) + .distinct(); - JavaRDD toCheck = jsc - .textFile(testDedupGraphBasePath + "/relation") - .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) - .join(mergedIds) - .map(t -> t._2()._1()) - .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.target", json), json)) - .join(mergedIds) - .map(t -> t._2()._1()); + Dataset toUpdateRels = inputRels + .as("rel") + .join(mergedIds.as("s"), col("rel.source").equalTo(col("s.id")), "left_outer") + .join(mergedIds.as("t"), col("rel.target").equalTo(col("t.id")), "left_outer") + .filter("s.id IS NOT NULL OR t.id IS NOT NULL") + .distinct(); - long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); - long updated = toCheck.count(); + Dataset updatedRels = inputRels + .select("source", "target", "relClass") + .except(outputRels.select("source", "target", "relClass")); - assertEquals(updated, deletedbyinference); + assertEquals(toUpdateRels.count(), updatedRels.count()); + assertEquals(140, outputRels.count()); } - - @AfterAll - public static void finalCleanUp() throws IOException { - FileUtils.deleteDirectory(new File(testOutputBasePath)); - FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); - } - - public boolean isDeletedByInference(String s) { - return s.contains("\"deletedbyinference\":true"); - } - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml index 219dc7331..190788c9d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml @@ -96,30 +96,7 @@ --conf spark.sql.shuffle.partitions=15000 --graphInputPath${graphBasePath} - --outputPath${workingPath}/grouped_entities - - - - - - - - yarn - cluster - Dispatch grouped entities - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities + --checkpointPath${workingPath}/grouped_entities --outputPath${graphOutputPath} --filterInvisible${filterInvisible} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java index 61baf80dc..b878e778e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java @@ -1,16 +1,15 @@ package eu.dnetlib.dhp.oa.graph.group; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; - +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.DHPUtils; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; @@ -19,118 +18,108 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob; -import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.utils.DHPUtils; +import static org.junit.jupiter.api.Assertions.assertEquals; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class GroupEntitiesSparkJobTest { - private static SparkSession spark; + private static SparkSession spark; - private static ObjectMapper mapper = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private static ObjectMapper mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - private static Path workingDir; - private Path dataInputPath; + private static Path workingDir; + private Path dataInputPath; - private Path groupEntityPath; - private Path dispatchEntityPath; + private Path checkpointPath; - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); + private Path outputPath; - SparkConf conf = new SparkConf(); - conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); - conf.setMaster("local"); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - spark = SparkSession.builder().config(conf).getOrCreate(); - } + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); - @BeforeEach - public void beforeEach() throws IOException, URISyntaxException { - dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); - groupEntityPath = workingDir.resolve("grouped_entity"); - dispatchEntityPath = workingDir.resolve("dispatched_entity"); - } + SparkConf conf = new SparkConf(); + conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); + conf.setMaster("local"); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + spark = SparkSession.builder().config(conf).getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - spark.stop(); - FileUtils.deleteDirectory(workingDir.toFile()); - } + @BeforeEach + public void beforeEach() throws IOException, URISyntaxException { + dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); + checkpointPath = workingDir.resolve("grouped_entity"); + outputPath = workingDir.resolve("dispatched_entity"); + } - @Test - @Order(1) - void testGroupEntities() throws Exception { - GroupEntitiesSparkJob.main(new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-graphInputPath", - dataInputPath.toString(), - "-outputPath", - groupEntityPath.toString() - }); + @AfterAll + public static void afterAll() throws IOException { + spark.stop(); + FileUtils.deleteDirectory(workingDir.toFile()); + } - Dataset output = spark - .read() - .textFile(groupEntityPath.toString()) - .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) - .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); + @Test + @Order(1) + void testGroupEntities() throws Exception { + GroupEntitiesSparkJob.main(new String[]{ + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-graphInputPath", + dataInputPath.toString(), + "-checkpointPath", + checkpointPath.toString(), + "-outputPath", + outputPath.toString(), + "-filterInvisible", + Boolean.FALSE.toString() + }); - assertEquals( - 1, - output - .filter( - (FilterFunction) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" - .equals(r.getId()) && - r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) - .count()); - } + Dataset checkpointTable = spark + .read() + .load(checkpointPath.toString()) + .selectExpr("COALESCE(*)") + .as(Encoders.kryo(OafEntity.class)); - @Test - @Order(2) - void testDispatchEntities() throws Exception { - DispatchEntitiesSparkJob.main(new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-inputPath", - groupEntityPath.toString(), - "-outputPath", - dispatchEntityPath.resolve(".").toString(), - "-filterInvisible", - Boolean.TRUE.toString() - }); - Dataset output = spark - .read() - .textFile( - DHPUtils - .toSeq( - HdfsSupport - .listFiles(dispatchEntityPath.toString(), spark.sparkContext().hadoopConfiguration()))) - .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); + assertEquals( + 1, + checkpointTable + .filter( + (FilterFunction) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" + .equals(r.getId()) && + r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) + .count()); - assertEquals(3, output.count()); - assertEquals( - 2, - output - .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) - .filter((FilterFunction) s -> s.equals("publication")) - .count()); - assertEquals( - 1, - output - .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) - .filter((FilterFunction) s -> s.equals("dataset")) - .count()); - } -} + + Dataset output = spark + .read() + .textFile( + DHPUtils + .toSeq( + HdfsSupport + .listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration()))) + .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); + + assertEquals(3, output.count()); + assertEquals( + 2, + output + .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) + .filter((FilterFunction) s -> s.equals("publication")) + .count()); + assertEquals( + 1, + output + .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) + .filter((FilterFunction) s -> s.equals("dataset")) + .count()); + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCsvTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCsvTest.java index 48f1e0c06..9bd32968a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCsvTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCsvTest.java @@ -49,7 +49,7 @@ public class DownloadCsvTest { @Test void getUnibiFileTest() throws CollectorException, IOException, ClassNotFoundException { - String fileURL = "https://pub.uni-bielefeld.de/download/2944717/2944718/issn_gold_oa_version_4.csv"; + String fileURL = "https://pub.uni-bielefeld.de/download/2944717/2944718/issn_gold_oa_version_5.csv"; final String outputFile = workingDir + "/unibi_gold.json"; new DownloadCSV() diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 894ed33f7..b506d3a62 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -1067,6 +1067,28 @@ class MappersTest { System.out.println("***************"); } + @Test + public void testD4ScienceTraining() throws IOException { + final String xml = IOUtils + .toString(Objects.requireNonNull(getClass().getResourceAsStream("d4science-1-training.xml"))); + final List list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml); + final OtherResearchProduct trainingMaterial = (OtherResearchProduct) list.get(0); + System.out.println("***************"); + System.out.println(new ObjectMapper().writeValueAsString(trainingMaterial)); + System.out.println("***************"); + } + + @Test + public void testD4ScienceDataset() throws IOException { + final String xml = IOUtils + .toString(Objects.requireNonNull(getClass().getResourceAsStream("d4science-2-dataset.xml"))); + final List list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml); + final Dataset trainingMaterial = (Dataset) list.get(0); + System.out.println("***************"); + System.out.println(new ObjectMapper().writeValueAsString(trainingMaterial)); + System.out.println("***************"); + } + @Test void testNotWellFormed() throws IOException { final String xml = IOUtils diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/d4science-1-training.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/d4science-1-training.xml new file mode 100644 index 000000000..91f9f9118 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/d4science-1-training.xml @@ -0,0 +1,93 @@ + + + + alessia_____::104c2d4ba8878c16fa824dce5b1bea57 + 12d8f77e-d66f-46f5-8d88-af7db23bc4c9 + 2023-09-08T10:12:35.864+02:00 + alessia_____ + 2023-09-08T11:31:45.692+02:00 + + + + http://data.d4science.org/ctlg/ResourceCatalogue/visual_analytics_for_data_scientists + + + + BRAGHIERI MARCO + + + + Visual Analytics for Data Scientists + + SoBigData++ + + + + + TrainingMaterial + + Participants to this module shall + - Learn the principles and rules underlying the design of visual data + representations and human-computer interactions + - Understand, adapt and apply representative visual analytics methods and systems for diverse types + of data and problems + - Analyse and evaluate the structure and properties + of data to select or devise appropriate methods for data exploration + - Combine visualization, interactive techniques, and computational + processing to develop practical data analysis for problem solving + + (This teaching material on Visual Analytics for Data Scientists is part of a MSc module at City University London). + + The author did not intend to violate any copyright on figures or content. In case you are the legal owner of any copyrighted content, please contact info@sobigdata.eu and we will immediately remove it + + + Visual analytics + + + Slides + Other + PDF + PDF + PDF + PDF + PDF + PDF + PDF + PDF + PDF + PDF + ZIP + + + OPEN + 0010 + + + + other-open + corda__h2020::871042 + + + + + https%3A%2F%2Fapi.d4science.org%2Fcatalogue%2Fitems + + + + + + + false + false + 0.9 + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/d4science-2-dataset.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/d4science-2-dataset.xml new file mode 100644 index 000000000..48ceb6c13 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/d4science-2-dataset.xml @@ -0,0 +1,72 @@ + + + + alessia_____::028879484548f4e1c630e1c503e35231 + 4fed018e-c2ff-4afa-b7b5-1ca1beebf850 + 2023-09-08T12:14:27.615+02:00 + alessia_____ + 2023-09-08T12:14:51.7+02:00 + + + + http://data.d4science.org/ctlg/ResourceCatalogue/city-to-city_migration + + + + + + Pappalardo, Luca + + 0000-0002-1547-6007 + + + + City-to-city migration + + SoBigData++ + + + 2018-02-15 + + Dataset + + Census data recording the migration of people between metropolitan areas in + the US + + + Human Mobility data + + + + OPEN + 0021 + 2018-02-15 + + + AFL-3.0 + corda__h2020::871042 + + + + + https%3A%2F%2Fapi.d4science.org%2Fcatalogue%2Fitems + + + + + + + false + false + 0.9 + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java index 761539780..88bffd0e7 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java @@ -24,10 +24,7 @@ import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; -import eu.dnetlib.dhp.schema.oaf.Datasource; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.*; public class XmlRecordFactoryTest { @@ -196,4 +193,51 @@ public class XmlRecordFactoryTest { assertEquals("dnet:pid_types", ((Element) pids.get(0)).attribute("schemeid").getValue()); assertEquals("dnet:pid_types", ((Element) pids.get(0)).attribute("schemename").getValue()); } + + @Test + public void testD4ScienceTraining() throws DocumentException, IOException { + final ContextMapper contextMapper = new ContextMapper(); + + final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, + XmlConverterJob.schemaLocation); + + final OtherResearchProduct p = OBJECT_MAPPER + .readValue( + IOUtils.toString(getClass().getResourceAsStream("d4science-1-training.json")), + OtherResearchProduct.class); + + final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + + assertNotNull(xml); + + final Document doc = new SAXReader().read(new StringReader(xml)); + + assertNotNull(doc); + System.out.println(doc.asXML()); + + } + + @Test + public void testD4ScienceDataset() throws DocumentException, IOException { + final ContextMapper contextMapper = new ContextMapper(); + + final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, + XmlConverterJob.schemaLocation); + + final OtherResearchProduct p = OBJECT_MAPPER + .readValue( + IOUtils.toString(getClass().getResourceAsStream("d4science-2-dataset.json")), + OtherResearchProduct.class); + + final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + + assertNotNull(xml); + + final Document doc = new SAXReader().read(new StringReader(xml)); + + assertNotNull(doc); + System.out.println(doc.asXML()); + + } + } diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/d4science-1-training.json b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/d4science-1-training.json new file mode 100644 index 000000000..3ce397f10 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/d4science-1-training.json @@ -0,0 +1 @@ +{"collectedfrom":[{"key":"10|alessia_____::6332e88a4c7dba6f7743d3a7a0c6ea2c","value":"Alessia","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":1694165542374,"id":"50|alessia_____::104c2d4ba8878c16fa824dce5b1bea57","originalId":["12d8f77e-d66f-46f5-8d88-af7db23bc4c9","50|alessia_____::104c2d4ba8878c16fa824dce5b1bea57"],"pid":[],"dateofcollection":"2023-09-08T10:12:35.864+02:00","dateoftransformation":"2023-09-08T11:31:45.692+02:00","extraInfo":[],"oaiprovenance":{"originDescription":{"harvestDate":"2023-09-08T10:12:35.864+02:00","altered":true,"baseURL":"https%3A%2F%2Fapi.d4science.org%2Fcatalogue%2Fitems","identifier":"","datestamp":"","metadataNamespace":""}},"measures":null,"processingchargeamount":null,"processingchargecurrency":null,"author":[{"fullname":"BRAGHIERI MARCO","name":"","surname":"","rank":1,"pid":[],"affiliation":[]}],"resulttype":{"classid":"other","classname":"other","schemeid":"dnet:result_typologies","schemename":"dnet:result_typologies"},"language":{"classid":"","classname":"","schemeid":"dnet:languages","schemename":"dnet:languages"},"country":[],"subject":[{"value":"Visual analytics","qualifier":{"classid":"","classname":"","schemeid":"","schemename":""},"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"title":[{"value":"Visual Analytics for Data Scientists","qualifier":{"classid":"main title","classname":"main title","schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"relevantdate":[{"value":"","qualifier":{"classid":"Issued","classname":"Issued","schemeid":"dnet:dataCite_date","schemename":"dnet:dataCite_date"},"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"description":[{"value":"Participants to this module shall\n - Learn the principles and rules underlying the design of visual data\n representations and human-computer interactions\n - Understand, adapt and apply representative visual analytics methods and systems for diverse types\n of data and problems\n - Analyse and evaluate the structure and properties\n of data to select or devise appropriate methods for data exploration\n - Combine visualization, interactive techniques, and computational\n processing to develop practical data analysis for problem solving\n\n (This teaching material on Visual Analytics for Data Scientists is part of a MSc module at City University London).\n\n The author did not intend to violate any copyright on figures or content. In case you are the legal owner of any copyrighted content, please contact info@sobigdata.eu and we will immediately remove it","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"dateofacceptance":null,"publisher":{"value":"SoBigData++","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"embargoenddate":null,"source":[],"fulltext":[],"format":[{"value":"Slides","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"value":"Other","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"value":"PDF","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"value":"ZIP","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"contributor":[],"resourcetype":{"classid":"TrainingMaterial","classname":"TrainingMaterial","schemeid":"dnet:dataCite_resource","schemename":"dnet:dataCite_resource"},"coverage":[],"bestaccessright":{"classid":"OPEN","classname":"Open Access","schemeid":"dnet:access_modes","schemename":"dnet:access_modes"},"context":[],"externalReference":[],"instance":[{"license":{"value":"other-open","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"accessright":{"classid":"OPEN","classname":"Open Access","schemeid":"dnet:access_modes","schemename":"dnet:access_modes","openAccessRoute":null},"instancetype":{"classid":"0010","classname":"Lecture","schemeid":"dnet:publication_resource","schemename":"dnet:publication_resource"},"hostedby":{"key":"10|alessia_____::6332e88a4c7dba6f7743d3a7a0c6ea2c","value":"Alessia","dataInfo":null},"url":["http://data.d4science.org/ctlg/ResourceCatalogue/visual_analytics_for_data_scientists"],"distributionlocation":null,"collectedfrom":{"key":"10|alessia_____::6332e88a4c7dba6f7743d3a7a0c6ea2c","value":"Alessia","dataInfo":null},"pid":[],"alternateIdentifier":[],"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":{"classid":"","classname":"","schemeid":"dnet:review_levels","schemename":"dnet:review_levels"},"measures":null,"fulltext":null}],"eoscifguidelines":[],"contactperson":[],"contactgroup":[],"tool":[]} diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/d4science-2-dataset.json b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/d4science-2-dataset.json new file mode 100644 index 000000000..ea8465e36 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/d4science-2-dataset.json @@ -0,0 +1 @@ +{"collectedfrom":[{"key":"10|alessia_____::6332e88a4c7dba6f7743d3a7a0c6ea2c","value":"Alessia","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":1694507584675,"id":"50|alessia_____::028879484548f4e1c630e1c503e35231","originalId":["4fed018e-c2ff-4afa-b7b5-1ca1beebf850","50|alessia_____::028879484548f4e1c630e1c503e35231"],"pid":[],"dateofcollection":"2023-09-08T12:14:27.615+02:00","dateoftransformation":"2023-09-08T12:14:51.7+02:00","extraInfo":[],"oaiprovenance":{"originDescription":{"harvestDate":"2023-09-08T12:14:27.615+02:00","altered":true,"baseURL":"https%3A%2F%2Fapi.d4science.org%2Fcatalogue%2Fitems","identifier":"","datestamp":"","metadataNamespace":""}},"measures":null,"processingchargeamount":null,"processingchargecurrency":null,"author":[{"fullname":"Pappalardo, Luca","name":"Luca","surname":"Pappalardo","rank":1,"pid":[{"value":"0000-0002-1547-6007","qualifier":{"classid":"orcid_pending","classname":"Open Researcher and Contributor ID","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"affiliation":[]}],"resulttype":{"classid":"dataset","classname":"dataset","schemeid":"dnet:result_typologies","schemename":"dnet:result_typologies"},"language":{"classid":"","classname":"","schemeid":"dnet:languages","schemename":"dnet:languages"},"country":[],"subject":[{"value":"Human Mobility data","qualifier":{"classid":"","classname":"","schemeid":"","schemename":""},"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"title":[{"value":"City-to-city migration","qualifier":{"classid":"main title","classname":"main title","schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"relevantdate":[{"value":"2018-02-15","qualifier":{"classid":"Issued","classname":"Issued","schemeid":"dnet:dataCite_date","schemename":"dnet:dataCite_date"},"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"description":[{"value":"Census data recording the migration of people between metropolitan areas in\n the US","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"dateofacceptance":{"value":"2018-02-15","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"publisher":{"value":"SoBigData++","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"embargoenddate":null,"source":[],"fulltext":[],"format":[],"contributor":[],"resourcetype":{"classid":"dataset","classname":"dataset","schemeid":"dnet:dataCite_resource","schemename":"dnet:dataCite_resource"},"coverage":[],"bestaccessright":{"classid":"OPEN","classname":"Open Access","schemeid":"dnet:access_modes","schemename":"dnet:access_modes"},"context":[],"externalReference":[],"instance":[{"license":{"value":"AFL-3.0","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"accessright":{"classid":"OPEN","classname":"Open Access","schemeid":"dnet:access_modes","schemename":"dnet:access_modes","openAccessRoute":null},"instancetype":{"classid":"0021","classname":"Dataset","schemeid":"dnet:publication_resource","schemename":"dnet:publication_resource"},"hostedby":{"key":"10|alessia_____::6332e88a4c7dba6f7743d3a7a0c6ea2c","value":"Alessia","dataInfo":null},"url":["http://data.d4science.org/ctlg/ResourceCatalogue/city-to-city_migration"],"distributionlocation":null,"collectedfrom":{"key":"10|alessia_____::6332e88a4c7dba6f7743d3a7a0c6ea2c","value":"Alessia","dataInfo":null},"pid":[],"alternateIdentifier":[],"dateofacceptance":{"value":"2018-02-15","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"processingchargeamount":null,"processingchargecurrency":null,"refereed":{"classid":"","classname":"","schemeid":"dnet:review_levels","schemename":"dnet:review_levels"},"measures":null,"fulltext":null}],"eoscifguidelines":[],"storagedate":{"value":"2018-02-15","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"device":null,"size":null,"version":null,"lastmetadataupdate":null,"metadataversionnumber":null,"geolocation":[]} diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index 0d7d29bfe..e43e7cf14 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -39,7 +39,8 @@ - ${wf:conf('resume') eq "rankings-start"} + ${wf:conf('resume') eq "cc"} + ${wf:conf('resume') eq "ram"} ${wf:conf('resume') eq "impulse"} ${wf:conf('resume') eq "pagerank"} ${wf:conf('resume') eq "attrank"} @@ -89,18 +90,11 @@ ${nameNode}${wfAppPath}/create_openaire_ranking_graph.py#create_openaire_ranking_graph.py - + - - - - - - - @@ -129,7 +123,7 @@ ${wfAppPath}/bip-ranker/CC.py#CC.py - + @@ -165,14 +159,11 @@ ${wfAppPath}/bip-ranker/TAR.py#TAR.py - + - - - diff --git a/pom.xml b/pom.xml index c6b65e27a..9cd82a343 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,16 @@ https://maven.d4science.org/nexus/content/repositories/dnet-deps default + + maven-restlet + Restlet repository + https://maven.restlet.talend.com + + + conjars + conjars + https://conjars.wensel.net/repo/ +