From 6cc7d8ca7b04f8aff859fda0ff04294b830608b5 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Thu, 24 Aug 2023 21:48:07 +0200 Subject: [PATCH 01/13] GroupEntities and DispatchEntites are now merged in GroupEntitiesSparkJob --- .../oa/merge/DispatchEntitiesSparkJob.java | 98 -------- .../dhp/oa/merge/GroupEntitiesSparkJob.java | 221 ++++++++---------- .../merge/dispatch_entities_parameters.json | 26 --- .../group_graph_entities_parameters.json | 16 +- .../dedup/consistency/oozie_app/workflow.xml | 26 +-- .../dhp/oa/graph/group/oozie_app/workflow.xml | 25 +- .../group/GroupEntitiesSparkJobTest.java | 201 ++++++++-------- 7 files changed, 203 insertions(+), 410 deletions(-) delete mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java delete mode 100644 dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java deleted file mode 100644 index 4d2ccb1788..0000000000 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java +++ /dev/null @@ -1,98 +0,0 @@ - -package eu.dnetlib.dhp.oa.merge; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.Objects; -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.common.ModelSupport; - -public class DispatchEntitiesSparkJob { - - private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class); - - public static void main(String[] args) throws Exception { - - String jsonConfiguration = IOUtils - .toString( - Objects - .requireNonNull( - DispatchEntitiesSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json"))); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); - - String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible")); - log.info("filterInvisible: {}", filterInvisible); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - dispatchEntities(spark, inputPath, outputPath, filterInvisible); - }); - } - - private static void dispatchEntities( - SparkSession spark, - String inputPath, - String outputPath, - boolean filterInvisible) { - - Dataset df = spark.read().textFile(inputPath); - - ModelSupport.oafTypes.entrySet().parallelStream().forEach(entry -> { - String entityType = entry.getKey(); - Class clazz = entry.getValue(); - - if (!entityType.equalsIgnoreCase("relation")) { - Dataset entityDF = spark - .read() - .schema(Encoders.bean(clazz).schema()) - .json( - df - .filter((FilterFunction) s -> s.startsWith(clazz.getName())) - .map( - (MapFunction) s -> StringUtils.substringAfter(s, "|"), - Encoders.STRING())); - - if (filterInvisible) { - entityDF = entityDF.filter("dataInfo.invisible != true"); - } - - entityDF - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/" + entityType); - } - }); - } -} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java index e652bd5b69..87510c1086 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java @@ -2,36 +2,28 @@ package eu.dnetlib.dhp.oa.merge; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.when; -import java.io.IOException; -import java.util.List; -import java.util.Objects; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.*; -import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.jayway.jsonpath.Configuration; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import com.jayway.jsonpath.Option; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; @@ -39,13 +31,9 @@ import scala.Tuple2; * Groups the graph content by entity identifier to ensure ID uniqueness */ public class GroupEntitiesSparkJob { - private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); - private static final String ID_JPATH = "$.id"; - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private static final Encoder OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class); public static void main(String[] args) throws Exception { @@ -66,9 +54,15 @@ public class GroupEntitiesSparkJob { String graphInputPath = parser.get("graphInputPath"); log.info("graphInputPath: {}", graphInputPath); + String checkpointPath = parser.get("checkpointPath"); + log.info("checkpointPath: {}", checkpointPath); + String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); + boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible")); + log.info("filterInvisible: {}", filterInvisible); + SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(ModelSupport.getOafModelClasses()); @@ -78,126 +72,95 @@ public class GroupEntitiesSparkJob { isSparkSessionManaged, spark -> { HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - groupEntities(spark, graphInputPath, outputPath); + groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible); }); } private static void groupEntities( SparkSession spark, String inputPath, - String outputPath) { + String checkpointPath, + String outputPath, + boolean filterInvisible) { - final TypedColumn aggregator = new GroupingAggregator().toColumn(); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - spark - .read() - .textFile(toSeq(listEntityPaths(inputPath, sc))) - .map((MapFunction) GroupEntitiesSparkJob::parseOaf, Encoders.kryo(OafEntity.class)) - .filter((FilterFunction) e -> StringUtils.isNotBlank(ModelSupport.idFn().apply(e))) - .groupByKey((MapFunction) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) - .agg(aggregator) + Dataset allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC); + + for (Map.Entry e : ModelSupport.entityTypes.entrySet()) { + String entity = e.getKey().name(); + Class entityClass = e.getValue(); + String entityInputPath = inputPath + "/" + entity; + + if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) { + continue; + } + + allEntities = allEntities + .union( + ((Dataset) spark + .read() + .schema(Encoders.bean(entityClass).schema()) + .json(entityInputPath) + .filter("length(id) > 0") + .as(Encoders.bean(entityClass))) + .map((MapFunction) r -> r, OAFENTITY_KRYO_ENC)); + } + + Dataset groupedEntities = allEntities + .groupByKey((MapFunction) OafEntity::getId, Encoders.STRING()) + .reduceGroups((ReduceFunction) (b, a) -> OafMapperUtils.mergeEntities(b, a)) .map( - (MapFunction, String>) t -> t._2().getClass().getName() + - "|" + OBJECT_MAPPER.writeValueAsString(t._2()), - Encoders.STRING()) + (MapFunction, Tuple2>) t -> new Tuple2( + t._2().getClass().getName(), t._2()), + Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC)); + + // pivot on "_1" (classname of the entity) + // created columns containing only entities of the same class + for (Map.Entry e : ModelSupport.entityTypes.entrySet()) { + String entity = e.getKey().name(); + Class entityClass = e.getValue(); + + groupedEntities = groupedEntities + .withColumn( + entity, + when(col("_1").equalTo(entityClass.getName()), col("_2"))); + } + + groupedEntities + .drop("_1", "_2") .write() - .option("compression", "gzip") .mode(SaveMode.Overwrite) - .text(outputPath); - } + .option("compression", "gzip") + .save(checkpointPath); - public static class GroupingAggregator extends Aggregator { + ForkJoinPool parPool = new ForkJoinPool(ModelSupport.entityTypes.size()); - @Override - public OafEntity zero() { - return null; - } - - @Override - public OafEntity reduce(OafEntity b, OafEntity a) { - return mergeAndGet(b, a); - } - - private OafEntity mergeAndGet(OafEntity b, OafEntity a) { - if (Objects.nonNull(a) && Objects.nonNull(b)) { - return OafMapperUtils.mergeEntities(b, a); - } - return Objects.isNull(a) ? b : a; - } - - @Override - public OafEntity merge(OafEntity b, OafEntity a) { - return mergeAndGet(b, a); - } - - @Override - public OafEntity finish(OafEntity j) { - return j; - } - - @Override - public Encoder bufferEncoder() { - return Encoders.kryo(OafEntity.class); - } - - @Override - public Encoder outputEncoder() { - return Encoders.kryo(OafEntity.class); - } - - } - - private static OafEntity parseOaf(String s) { - - DocumentContext dc = JsonPath - .parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS)); - final String id = dc.read(ID_JPATH); - if (StringUtils.isNotBlank(id)) { - - String prefix = StringUtils.substringBefore(id, "|"); - switch (prefix) { - case "10": - return parse(s, Datasource.class); - case "20": - return parse(s, Organization.class); - case "40": - return parse(s, Project.class); - case "50": - String resultType = dc.read("$.resulttype.classid"); - switch (resultType) { - case "publication": - return parse(s, Publication.class); - case "dataset": - return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class); - case "software": - return parse(s, Software.class); - case "other": - return parse(s, OtherResearchProduct.class); - default: - throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType)); - } - default: - throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix)); - } - } else { - throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s)); - } - } - - private static OafEntity parse(String s, Class clazz) { - try { - return OBJECT_MAPPER.readValue(s, clazz); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - } - - private static List listEntityPaths(String inputPath, JavaSparkContext sc) { - return HdfsSupport - .listFiles(inputPath, sc.hadoopConfiguration()) + ModelSupport.entityTypes + .entrySet() .stream() - .filter(f -> !f.toLowerCase().contains("relation")) - .collect(Collectors.toList()); - } + .map(e -> parPool.submit(() -> { + String entity = e.getKey().name(); + Class entityClass = e.getValue(); + spark + .read() + .load(checkpointPath) + .select(col(entity).as("value")) + .filter("value IS NOT NULL") + .as(OAFENTITY_KRYO_ENC) + .map((MapFunction) r -> r, (Encoder) Encoders.bean(entityClass)) + .filter(filterInvisible ? "dataInfo.invisible != TRUE" : "TRUE") + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/" + entity); + })) + .collect(Collectors.toList()) + .forEach(t -> { + try { + t.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } } diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json deleted file mode 100644 index 60f11ac844..0000000000 --- a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json +++ /dev/null @@ -1,26 +0,0 @@ -[ - { - "paramName": "issm", - "paramLongName": "isSparkSessionManaged", - "paramDescription": "when true will stop SparkSession after job execution", - "paramRequired": false - }, - { - "paramName": "i", - "paramLongName": "inputPath", - "paramDescription": "the source path", - "paramRequired": true - }, - { - "paramName": "o", - "paramLongName": "outputPath", - "paramDescription": "path of the output graph", - "paramRequired": true - }, - { - "paramName": "fi", - "paramLongName": "filterInvisible", - "paramDescription": "if true filters out invisible entities", - "paramRequired": true - } -] \ No newline at end of file diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json index e65acb3c43..58e3ca7113 100644 --- a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json +++ b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json @@ -8,13 +8,25 @@ { "paramName": "gin", "paramLongName": "graphInputPath", - "paramDescription": "the graph root path", + "paramDescription": "the input graph root path", + "paramRequired": true + }, + { + "paramName": "cp", + "paramLongName": "checkpointPath", + "paramDescription": "checkpoint directory", "paramRequired": true }, { "paramName": "out", "paramLongName": "outputPath", - "paramDescription": "the output merged graph root path", + "paramDescription": "the output graph root path", + "paramRequired": true + }, + { + "paramName": "fi", + "paramLongName": "filterInvisible", + "paramDescription": "if true filters out invisible entities", "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index b724e5d0bd..3640ee6ca5 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -152,31 +152,7 @@ --conf spark.sql.shuffle.partitions=15000 --graphInputPath${graphBasePath} - --outputPath${workingPath}/grouped_entities - - - - - - - - yarn - cluster - Dispatch grouped entitities - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --conf spark.executor.memoryOverhead=${sparkExecutorMemoryOverhead} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities + --checkpointPath${workingPath}/grouped_entities --outputPath${graphOutputPath} --filterInvisible${filterInvisible} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml index 219dc7331e..190788c9dd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml @@ -96,30 +96,7 @@ --conf spark.sql.shuffle.partitions=15000 --graphInputPath${graphBasePath} - --outputPath${workingPath}/grouped_entities - - - - - - - - yarn - cluster - Dispatch grouped entities - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities + --checkpointPath${workingPath}/grouped_entities --outputPath${graphOutputPath} --filterInvisible${filterInvisible} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java index 61baf80dca..b878e778eb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java @@ -1,16 +1,15 @@ package eu.dnetlib.dhp.oa.graph.group; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; - +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.DHPUtils; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; @@ -19,118 +18,108 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob; -import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.utils.DHPUtils; +import static org.junit.jupiter.api.Assertions.assertEquals; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class GroupEntitiesSparkJobTest { - private static SparkSession spark; + private static SparkSession spark; - private static ObjectMapper mapper = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private static ObjectMapper mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - private static Path workingDir; - private Path dataInputPath; + private static Path workingDir; + private Path dataInputPath; - private Path groupEntityPath; - private Path dispatchEntityPath; + private Path checkpointPath; - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); + private Path outputPath; - SparkConf conf = new SparkConf(); - conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); - conf.setMaster("local"); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - spark = SparkSession.builder().config(conf).getOrCreate(); - } + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); - @BeforeEach - public void beforeEach() throws IOException, URISyntaxException { - dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); - groupEntityPath = workingDir.resolve("grouped_entity"); - dispatchEntityPath = workingDir.resolve("dispatched_entity"); - } + SparkConf conf = new SparkConf(); + conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); + conf.setMaster("local"); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + spark = SparkSession.builder().config(conf).getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - spark.stop(); - FileUtils.deleteDirectory(workingDir.toFile()); - } + @BeforeEach + public void beforeEach() throws IOException, URISyntaxException { + dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); + checkpointPath = workingDir.resolve("grouped_entity"); + outputPath = workingDir.resolve("dispatched_entity"); + } - @Test - @Order(1) - void testGroupEntities() throws Exception { - GroupEntitiesSparkJob.main(new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-graphInputPath", - dataInputPath.toString(), - "-outputPath", - groupEntityPath.toString() - }); + @AfterAll + public static void afterAll() throws IOException { + spark.stop(); + FileUtils.deleteDirectory(workingDir.toFile()); + } - Dataset output = spark - .read() - .textFile(groupEntityPath.toString()) - .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) - .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); + @Test + @Order(1) + void testGroupEntities() throws Exception { + GroupEntitiesSparkJob.main(new String[]{ + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-graphInputPath", + dataInputPath.toString(), + "-checkpointPath", + checkpointPath.toString(), + "-outputPath", + outputPath.toString(), + "-filterInvisible", + Boolean.FALSE.toString() + }); - assertEquals( - 1, - output - .filter( - (FilterFunction) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" - .equals(r.getId()) && - r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) - .count()); - } + Dataset checkpointTable = spark + .read() + .load(checkpointPath.toString()) + .selectExpr("COALESCE(*)") + .as(Encoders.kryo(OafEntity.class)); - @Test - @Order(2) - void testDispatchEntities() throws Exception { - DispatchEntitiesSparkJob.main(new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-inputPath", - groupEntityPath.toString(), - "-outputPath", - dispatchEntityPath.resolve(".").toString(), - "-filterInvisible", - Boolean.TRUE.toString() - }); - Dataset output = spark - .read() - .textFile( - DHPUtils - .toSeq( - HdfsSupport - .listFiles(dispatchEntityPath.toString(), spark.sparkContext().hadoopConfiguration()))) - .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); + assertEquals( + 1, + checkpointTable + .filter( + (FilterFunction) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" + .equals(r.getId()) && + r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) + .count()); - assertEquals(3, output.count()); - assertEquals( - 2, - output - .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) - .filter((FilterFunction) s -> s.equals("publication")) - .count()); - assertEquals( - 1, - output - .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) - .filter((FilterFunction) s -> s.equals("dataset")) - .count()); - } -} + + Dataset output = spark + .read() + .textFile( + DHPUtils + .toSeq( + HdfsSupport + .listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration()))) + .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); + + assertEquals(3, output.count()); + assertEquals( + 2, + output + .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) + .filter((FilterFunction) s -> s.equals("publication")) + .count()); + assertEquals( + 1, + output + .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) + .filter((FilterFunction) s -> s.equals("dataset")) + .count()); + } +} \ No newline at end of file From 7de0164c269fcc1b4b15a37ca006e3383709aad6 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Mon, 4 Sep 2023 16:04:41 +0300 Subject: [PATCH 02/13] Fix import of affiliations relations from Crossref --- .../PrepareAffiliationRelations.java | 5 ++--- .../actionmanager/bipaffiliations/job.properties | 4 ++-- .../PrepareAffiliationRelationsTest.java | 8 ++++---- .../actionmanager/bipaffiliations/doi_to_ror.json | 13 +++++++------ 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index a9c610de7b..9ac610240c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -80,16 +80,15 @@ public class PrepareAffiliationRelations implements Serializable { // load and parse affiliation relations from HDFS Dataset df = spark .read() - .schema("`DOI` STRING, `Matchings` ARRAY,`Confidence`:DOUBLE>>") + .schema("`DOI` STRING, `Matchings` ARRAY>") .json(inputPath); // unroll nested arrays df = df .withColumn("matching", functions.explode(new Column("Matchings"))) - .withColumn("rorid", functions.explode(new Column("matching.RORid"))) .select( new Column("DOI").as("doi"), - new Column("rorid"), + new Column("matching.RORid").as("rorid"), new Column("matching.Confidence").as("confidence")); // prepare action sets for affiliation relations diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties index 43d86ee09f..d942e67723 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties @@ -31,5 +31,5 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen # The following is needed as a property of a workflow oozie.wf.application.path=${oozieTopWfApplicationPath} -inputPath=/user/schatz/affiliations/data-v3.1.json -outputPath=/tmp/crossref-affiliations-output-v3.1 +inputPath=/data/bip-affiliations/data.json +outputPath=/tmp/crossref-affiliations-output-v5 diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java index 72aabde7f8..ed8e5fe0df 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java @@ -101,7 +101,7 @@ public class PrepareAffiliationRelationsTest { // ); // } // count the number of relations - assertEquals(16, tmp.count()); + assertEquals(20, tmp.count()); Dataset dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); dataset.createOrReplaceTempView("result"); @@ -112,7 +112,7 @@ public class PrepareAffiliationRelationsTest { // verify that we have equal number of bi-directional relations Assertions .assertEquals( - 8, execVerification + 10, execVerification .filter( "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") .collectAsList() @@ -120,14 +120,14 @@ public class PrepareAffiliationRelationsTest { Assertions .assertEquals( - 8, execVerification + 10, execVerification .filter( "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'") .collectAsList() .size()); // check confidence value of a specific relation - String sourceDOI = "10.1105/tpc.8.3.343"; + String sourceDOI = "10.1061/(asce)0733-9399(2002)128:7(759)"; final String sourceOpenaireId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", sourceDOI)); diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json index 3b067dcc83..985a8d14b4 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json @@ -1,6 +1,7 @@ -{"DOI":"10.1061\/(asce)0733-9399(2002)128:7(759)","Matchings":[{"RORid":["https:\/\/ror.org\/01teme464"],"Confidence":0.73},{"RORid":["https:\/\/ror.org\/03yxnpp24"],"Confidence":0.7071067812}]} -{"DOI":"10.1105\/tpc.8.3.343","Matchings":[{"RORid":["https:\/\/ror.org\/02k40bc56"],"Confidence":0.7071067812}]} -{"DOI":"10.1161\/01.cir.0000013305.01850.37","Matchings":[{"RORid":["https:\/\/ror.org\/00qjgza05"],"Confidence":1}]} -{"DOI":"10.1142\/s021821650200186x","Matchings":[{"RORid":["https:\/\/ror.org\/05apxxy63"],"Confidence":1},{"RORid":["https:\/\/ror.org\/035xkbk20"],"Confidence":1}]} -{"DOI":"10.1061\/(asce)0733-9372(2002)128:7(575)","Matchings":[{"RORid":["https:\/\/ror.org\/04j198w64"],"Confidence":0.58}]} -{"DOI":"10.1161\/hy0202.103001","Matchings":[{"RORid":["https:\/\/ror.org\/057xtrt18"],"Confidence":0.7071067812}]} \ No newline at end of file +{"DOI":"10.1061\/(asce)0733-9399(2002)128:7(759)","Matchings":[{"RORid":"https:\/\/ror.org\/03yxnpp24","Confidence":0.7071067812},{"RORid":"https:\/\/ror.org\/01teme464","Confidence":0.89}]} +{"DOI":"10.1105\/tpc.8.3.343","Matchings":[{"RORid":"https:\/\/ror.org\/02k40bc56","Confidence":0.7071067812}]} +{"DOI":"10.1161\/01.cir.0000013305.01850.37","Matchings":[{"RORid":"https:\/\/ror.org\/00qjgza05","Confidence":1}]} +{"DOI":"10.1142\/s021821650200186x","Matchings":[{"RORid":"https:\/\/ror.org\/035xkbk20","Confidence":1},{"RORid":"https:\/\/ror.org\/05apxxy63","Confidence":1}]} +{"DOI":"10.1061\/(asce)0733-9372(2002)128:7(575)","Matchings":[{"RORid":"https:\/\/ror.org\/04j198w64","Confidence":0.82}]} +{"DOI":"10.1061\/(asce)0733-9372(2002)128:7(588)","Matchings":[{"RORid":"https:\/\/ror.org\/03m8km719","Confidence":0.8660254038},{"RORid":"https:\/\/ror.org\/02aze4h65","Confidence":0.87}]} +{"DOI":"10.1161\/hy0202.103001","Matchings":[{"RORid":"https:\/\/ror.org\/057xtrt18","Confidence":0.7071067812}]} \ No newline at end of file From 5b06c9d06fd592440e0a97bb4aaeabee33b0b08b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 4 Sep 2023 15:15:24 +0200 Subject: [PATCH 03/13] [graph raw] datainfo.invisible set as true only for entities --- .../raw/AbstractMdRecordToOafMapper.java | 45 +++++++++++-------- .../dhp/oa/graph/raw/OafToOafMapper.java | 9 ++-- .../dhp/oa/graph/raw/OdfToOafMapper.java | 17 +++---- 3 files changed, 38 insertions(+), 33 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index efb860d44d..b37e6a7553 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -117,14 +117,14 @@ public abstract class AbstractMdRecordToOafMapper { return Lists.newArrayList(); } - final DataInfo info = prepareDataInfo(doc, invisible); + final DataInfo entityInfo = prepareDataInfo(doc, invisible); final long lastUpdateTimestamp = new Date().getTime(); - final List instances = prepareInstances(doc, info, collectedFrom, hostedBy); + final List instances = prepareInstances(doc, entityInfo, collectedFrom, hostedBy); final String type = getResultType(doc, instances); - return createOafs(doc, type, instances, collectedFrom, info, lastUpdateTimestamp); + return createOafs(doc, type, instances, collectedFrom, entityInfo, lastUpdateTimestamp); } catch (DocumentException e) { log.error("Error with record:\n" + xml); return Lists.newArrayList(); @@ -184,13 +184,15 @@ public abstract class AbstractMdRecordToOafMapper { final List oafs = Lists.newArrayList(entity); + final DataInfo relationInfo = prepareDataInfo(doc, false); + if (!oafs.isEmpty()) { Set rels = Sets.newHashSet(); - rels.addAll(addProjectRels(doc, entity)); - rels.addAll(addOtherResultRels(doc, entity)); - rels.addAll(addRelations(doc, entity)); - rels.addAll(addAffiliations(doc, entity)); + rels.addAll(addProjectRels(doc, entity, relationInfo)); + rels.addAll(addOtherResultRels(doc, entity, relationInfo)); + rels.addAll(addRelations(doc, entity, relationInfo)); + rels.addAll(addAffiliations(doc, entity, relationInfo)); oafs.addAll(rels); } @@ -243,7 +245,7 @@ public abstract class AbstractMdRecordToOafMapper { private List addProjectRels( final Document doc, - final OafEntity entity) { + final OafEntity entity, DataInfo info) { final List res = new ArrayList<>(); @@ -262,18 +264,21 @@ public abstract class AbstractMdRecordToOafMapper { .add( OafMapperUtils .getRelation( - docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, entity, validationdDate)); + docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, entity.getCollectedfrom(), + info, entity.getLastupdatetimestamp(), validationdDate, null)); res .add( OafMapperUtils - .getRelation(projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity, validationdDate)); + .getRelation( + projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity.getCollectedfrom(), info, + entity.getLastupdatetimestamp(), validationdDate, null)); } } return res; } - private List addRelations(Document doc, OafEntity entity) { + private List addRelations(Document doc, OafEntity entity, DataInfo info) { final List rels = Lists.newArrayList(); @@ -301,14 +306,16 @@ public abstract class AbstractMdRecordToOafMapper { .add( OafMapperUtils .getRelation( - entity.getId(), targetId, relType, subRelType, relClass, entity, - validationDate)); + entity.getId(), targetId, relType, subRelType, relClass, + entity.getCollectedfrom(), info, + entity.getLastupdatetimestamp(), validationDate, null)); rels .add( OafMapperUtils .getRelation( - targetId, entity.getId(), relType, subRelType, relClassInverse, entity, - validationDate)); + targetId, entity.getId(), relType, subRelType, relClassInverse, + entity.getCollectedfrom(), info, + entity.getLastupdatetimestamp(), validationDate, null)); } } } @@ -316,7 +323,7 @@ public abstract class AbstractMdRecordToOafMapper { return rels; } - private List addAffiliations(Document doc, OafEntity entity) { + private List addAffiliations(Document doc, OafEntity entity, DataInfo info) { final List rels = Lists.newArrayList(); for (Object o : doc.selectNodes("//datacite:affiliation[@affiliationIdentifierScheme='ROR']")) { @@ -345,14 +352,14 @@ public abstract class AbstractMdRecordToOafMapper { OafMapperUtils .getRelation( resultId, orgId, RESULT_ORGANIZATION, AFFILIATION, HAS_AUTHOR_INSTITUTION, - entity.getCollectedfrom(), entity.getDataInfo(), entity.getLastupdatetimestamp(), null, + entity.getCollectedfrom(), info, entity.getLastupdatetimestamp(), null, properties)); rels .add( OafMapperUtils .getRelation( orgId, resultId, RESULT_ORGANIZATION, AFFILIATION, IS_AUTHOR_INSTITUTION_OF, - entity.getCollectedfrom(), entity.getDataInfo(), entity.getLastupdatetimestamp(), null, + entity.getCollectedfrom(), info, entity.getLastupdatetimestamp(), null, properties)); } } @@ -361,7 +368,7 @@ public abstract class AbstractMdRecordToOafMapper { protected abstract List addOtherResultRels( final Document doc, - final OafEntity entity); + final OafEntity entity, DataInfo info); private void populateResultFields( final Result r, diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java index 2271a0fff8..a9f9367af5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java @@ -4,7 +4,6 @@ package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.schema.common.ModelConstants.*; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.*; -import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.util.ArrayList; import java.util.HashSet; @@ -292,7 +291,7 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper { @Override protected List addOtherResultRels( final Document doc, - final OafEntity entity) { + final OafEntity entity, DataInfo info) { final String docId = entity.getId(); final List res = new ArrayList<>(); @@ -308,11 +307,13 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper { res .add( getRelation( - docId, otherId, RESULT_RESULT, RELATIONSHIP, IS_RELATED_TO, entity)); + docId, otherId, RESULT_RESULT, RELATIONSHIP, IS_RELATED_TO, entity.getCollectedfrom(), info, + entity.getLastupdatetimestamp(), null, null)); res .add( getRelation( - otherId, docId, RESULT_RESULT, RELATIONSHIP, IS_RELATED_TO, entity)); + otherId, docId, RESULT_RESULT, RELATIONSHIP, IS_RELATED_TO, entity.getCollectedfrom(), info, + entity.getLastupdatetimestamp(), null, null)); } } return res; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java index 1faa2fe9b8..bbd1e7ab19 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java @@ -5,15 +5,11 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.*; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.*; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.structuredProperty; -import java.io.UnsupportedEncodingException; -import java.net.MalformedURLException; -import java.net.URL; import java.net.URLDecoder; import java.util.*; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.validator.routines.UrlValidator; import org.dom4j.Document; import org.dom4j.Element; import org.dom4j.Node; @@ -27,7 +23,6 @@ import eu.dnetlib.dhp.schema.common.RelationInverse; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; -import eu.dnetlib.dhp.schema.oaf.utils.PidType; public class OdfToOafMapper extends AbstractMdRecordToOafMapper { @@ -397,7 +392,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper { @Override protected List addOtherResultRels( final Document doc, - final OafEntity entity) { + final OafEntity entity, DataInfo info) { final String docId = entity.getId(); @@ -413,7 +408,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper { final String relType = ((Node) o).valueOf("@relationType"); String otherId = guessRelatedIdentifier(idType, originalId); if (StringUtils.isNotBlank(otherId)) { - res.addAll(getRelations(relType, docId, otherId, entity)); + res.addAll(getRelations(relType, docId, otherId, entity, info)); } } @@ -434,18 +429,20 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper { } protected List getRelations(final String reltype, final String entityId, final String otherId, - final OafEntity entity) { + final OafEntity entity, DataInfo info) { final List res = new ArrayList<>(); RelationInverse rel = ModelSupport.findRelation(reltype); if (rel != null) { res .add( getRelation( - entityId, otherId, rel.getRelType(), rel.getSubReltype(), rel.getRelClass(), entity)); + entityId, otherId, rel.getRelType(), rel.getSubReltype(), rel.getRelClass(), + entity.getCollectedfrom(), info, entity.getLastupdatetimestamp(), null, null)); res .add( getRelation( - otherId, entityId, rel.getRelType(), rel.getSubReltype(), rel.getInverseRelClass(), entity)); + otherId, entityId, rel.getRelType(), rel.getSubReltype(), rel.getInverseRelClass(), + entity.getCollectedfrom(), info, entity.getLastupdatetimestamp(), null, null)); } return res; From 15666e86a8448f724e3c7876f480afac36b4a766 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 4 Sep 2023 15:56:06 +0200 Subject: [PATCH 04/13] added collectedfrom to the affiliation relations imported from Crossref --- .../bipaffiliations/PrepareAffiliationRelations.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 9ac610240c..603ad6339f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -120,8 +120,10 @@ public class PrepareAffiliationRelations implements Serializable { qualifier, Double.toString(row.getAs("confidence"))); + List collectedfrom = OafMapperUtils.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref"); + // return bi-directional relations - return getAffiliationRelationPair(paperId, affId, dataInfo).iterator(); + return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator(); }) .map(p -> new AtomicAction(Relation.class, p)) @@ -132,7 +134,8 @@ public class PrepareAffiliationRelations implements Serializable { } - private static List getAffiliationRelationPair(String paperId, String affId, DataInfo dataInfo) { + private static List getAffiliationRelationPair(String paperId, String affId, List collectedfrom, + DataInfo dataInfo) { return Arrays .asList( OafMapperUtils @@ -142,7 +145,7 @@ public class PrepareAffiliationRelations implements Serializable { ModelConstants.RESULT_ORGANIZATION, ModelConstants.AFFILIATION, ModelConstants.HAS_AUTHOR_INSTITUTION, - null, + collectedfrom, dataInfo, null), OafMapperUtils @@ -152,7 +155,7 @@ public class PrepareAffiliationRelations implements Serializable { ModelConstants.RESULT_ORGANIZATION, ModelConstants.AFFILIATION, ModelConstants.IS_AUTHOR_INSTITUTION_OF, - null, + collectedfrom, dataInfo, null)); } From 4786aa0e094fe848a5e7024b68c0d4e7c80ec65f Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 7 Sep 2023 11:20:35 +0200 Subject: [PATCH 05/13] added Archive ouverte UNIGE (ETHZ.UNIGENF, opendoar____::1400) to the Datacite hostedBy_map --- .../main/resources/eu/dnetlib/dhp/datacite/hostedBy_map.json | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/hostedBy_map.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/hostedBy_map.json index 9088d29600..d07cc33cb5 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/hostedBy_map.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/datacite/hostedBy_map.json @@ -1,4 +1,9 @@ { + "ETHZ.UNIGENF": { + "openaire_id": "opendoar____::1400", + "datacite_name": "Uni Genf", + "official_name": "Archive ouverte UNIGE" + }, "GESIS.RKI": { "openaire_id": "re3data_____::r3d100010436", "datacite_name": "Forschungsdatenzentrum am Robert Koch Institut", From 8a6892cc638c84fb6b05db27fa9e2ed538899896 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 12 Sep 2023 14:34:28 +0200 Subject: [PATCH 06/13] [graph dedup] consistency wf should not remove the relations while dispatching the entities --- .../dhp/oa/merge/DispatchEntitiesSparkJob.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java index 4d2ccb1788..cf0a183d72 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java @@ -47,17 +47,14 @@ public class DispatchEntitiesSparkJob { String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible")); + boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible")); log.info("filterInvisible: {}", filterInvisible); SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, - spark -> { - HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - dispatchEntities(spark, inputPath, outputPath, filterInvisible); - }); + spark -> dispatchEntities(spark, inputPath, outputPath, filterInvisible)); } private static void dispatchEntities( @@ -72,7 +69,9 @@ public class DispatchEntitiesSparkJob { String entityType = entry.getKey(); Class clazz = entry.getValue(); + final String entityPath = outputPath + "/" + entityType; if (!entityType.equalsIgnoreCase("relation")) { + HdfsSupport.remove(entityPath, spark.sparkContext().hadoopConfiguration()); Dataset entityDF = spark .read() .schema(Encoders.bean(clazz).schema()) @@ -91,7 +90,7 @@ public class DispatchEntitiesSparkJob { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath + "/" + entityType); + .json(entityPath); } }); } From 395a4af020621633b9e33e05a6ee5ab5f089413b Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Tue, 12 Sep 2023 22:31:50 +0300 Subject: [PATCH 07/13] Run CC and RAM sequentieally in dhp-impact-indicators WF --- .../impact_indicators/oozie_app/workflow.xml | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index 0d7d29bfe2..e43e7cf14a 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -39,7 +39,8 @@ - ${wf:conf('resume') eq "rankings-start"} + ${wf:conf('resume') eq "cc"} + ${wf:conf('resume') eq "ram"} ${wf:conf('resume') eq "impulse"} ${wf:conf('resume') eq "pagerank"} ${wf:conf('resume') eq "attrank"} @@ -89,18 +90,11 @@ ${nameNode}${wfAppPath}/create_openaire_ranking_graph.py#create_openaire_ranking_graph.py - + - - - - - - - @@ -129,7 +123,7 @@ ${wfAppPath}/bip-ranker/CC.py#CC.py - + @@ -165,14 +159,11 @@ ${wfAppPath}/bip-ranker/TAR.py#TAR.py - + - - - From 76476cdfb63c2c0570f0669e50970d43e9aecb16 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 20 Sep 2023 10:33:14 +0200 Subject: [PATCH 08/13] Added maven repo for dependencies that are not in maven central --- .../eu/dnetlib/pace/util/DiffPatchMatch.java | 17 +++++++++++++++++ pom.xml | 10 ++++++++++ 2 files changed, 27 insertions(+) diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java index 84d49bd5ca..cfd9acd702 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java @@ -1,6 +1,23 @@ package eu.dnetlib.pace.util; +/* + * Diff Match and Patch + * Copyright 2018 The diff-match-patch Authors. + * https://github.com/google/diff-match-patch + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ /* * Diff Match and Patch * Copyright 2018 The diff-match-patch Authors. diff --git a/pom.xml b/pom.xml index c6b65e27ae..9cd82a3431 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,16 @@ https://maven.d4science.org/nexus/content/repositories/dnet-deps default + + maven-restlet + Restlet repository + https://maven.restlet.talend.com + + + conjars + conjars + https://conjars.wensel.net/repo/ + From cc7204a08904a8b23ac8bd30be5f829ff93e7cc0 Mon Sep 17 00:00:00 2001 From: Alessia Bardi Date: Tue, 19 Sep 2023 13:38:25 +0200 Subject: [PATCH 09/13] tests for d4science catalog --- .../dnetlib/dhp/oa/graph/raw/MappersTest.java | 22 +++++ .../dhp/oa/graph/raw/d4science-1-training.xml | 93 +++++++++++++++++++ .../dhp/oa/graph/raw/d4science-2-dataset.xml | 72 ++++++++++++++ .../oa/provision/XmlRecordFactoryTest.java | 52 ++++++++++- .../oa/provision/d4science-1-training.json | 1 + .../dhp/oa/provision/d4science-2-dataset.json | 1 + 6 files changed, 237 insertions(+), 4 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/d4science-1-training.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/d4science-2-dataset.xml create mode 100644 dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/d4science-1-training.json create mode 100644 dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/d4science-2-dataset.json diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 894ed33f77..b506d3a62a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -1067,6 +1067,28 @@ class MappersTest { System.out.println("***************"); } + @Test + public void testD4ScienceTraining() throws IOException { + final String xml = IOUtils + .toString(Objects.requireNonNull(getClass().getResourceAsStream("d4science-1-training.xml"))); + final List list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml); + final OtherResearchProduct trainingMaterial = (OtherResearchProduct) list.get(0); + System.out.println("***************"); + System.out.println(new ObjectMapper().writeValueAsString(trainingMaterial)); + System.out.println("***************"); + } + + @Test + public void testD4ScienceDataset() throws IOException { + final String xml = IOUtils + .toString(Objects.requireNonNull(getClass().getResourceAsStream("d4science-2-dataset.xml"))); + final List list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml); + final Dataset trainingMaterial = (Dataset) list.get(0); + System.out.println("***************"); + System.out.println(new ObjectMapper().writeValueAsString(trainingMaterial)); + System.out.println("***************"); + } + @Test void testNotWellFormed() throws IOException { final String xml = IOUtils diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/d4science-1-training.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/d4science-1-training.xml new file mode 100644 index 0000000000..91f9f91184 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/d4science-1-training.xml @@ -0,0 +1,93 @@ + + + + alessia_____::104c2d4ba8878c16fa824dce5b1bea57 + 12d8f77e-d66f-46f5-8d88-af7db23bc4c9 + 2023-09-08T10:12:35.864+02:00 + alessia_____ + 2023-09-08T11:31:45.692+02:00 + + + + http://data.d4science.org/ctlg/ResourceCatalogue/visual_analytics_for_data_scientists + + + + BRAGHIERI MARCO + + + + Visual Analytics for Data Scientists + + SoBigData++ + + + + + TrainingMaterial + + Participants to this module shall + - Learn the principles and rules underlying the design of visual data + representations and human-computer interactions + - Understand, adapt and apply representative visual analytics methods and systems for diverse types + of data and problems + - Analyse and evaluate the structure and properties + of data to select or devise appropriate methods for data exploration + - Combine visualization, interactive techniques, and computational + processing to develop practical data analysis for problem solving + + (This teaching material on Visual Analytics for Data Scientists is part of a MSc module at City University London). + + The author did not intend to violate any copyright on figures or content. In case you are the legal owner of any copyrighted content, please contact info@sobigdata.eu and we will immediately remove it + + + Visual analytics + + + Slides + Other + PDF + PDF + PDF + PDF + PDF + PDF + PDF + PDF + PDF + PDF + ZIP + + + OPEN + 0010 + + + + other-open + corda__h2020::871042 + + + + + https%3A%2F%2Fapi.d4science.org%2Fcatalogue%2Fitems + + + + + + + false + false + 0.9 + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/d4science-2-dataset.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/d4science-2-dataset.xml new file mode 100644 index 0000000000..48ceb6c135 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/d4science-2-dataset.xml @@ -0,0 +1,72 @@ + + + + alessia_____::028879484548f4e1c630e1c503e35231 + 4fed018e-c2ff-4afa-b7b5-1ca1beebf850 + 2023-09-08T12:14:27.615+02:00 + alessia_____ + 2023-09-08T12:14:51.7+02:00 + + + + http://data.d4science.org/ctlg/ResourceCatalogue/city-to-city_migration + + + + + + Pappalardo, Luca + + 0000-0002-1547-6007 + + + + City-to-city migration + + SoBigData++ + + + 2018-02-15 + + Dataset + + Census data recording the migration of people between metropolitan areas in + the US + + + Human Mobility data + + + + OPEN + 0021 + 2018-02-15 + + + AFL-3.0 + corda__h2020::871042 + + + + + https%3A%2F%2Fapi.d4science.org%2Fcatalogue%2Fitems + + + + + + + false + false + 0.9 + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java index 761539780e..88bffd0e79 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java @@ -24,10 +24,7 @@ import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; -import eu.dnetlib.dhp.schema.oaf.Datasource; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.*; public class XmlRecordFactoryTest { @@ -196,4 +193,51 @@ public class XmlRecordFactoryTest { assertEquals("dnet:pid_types", ((Element) pids.get(0)).attribute("schemeid").getValue()); assertEquals("dnet:pid_types", ((Element) pids.get(0)).attribute("schemename").getValue()); } + + @Test + public void testD4ScienceTraining() throws DocumentException, IOException { + final ContextMapper contextMapper = new ContextMapper(); + + final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, + XmlConverterJob.schemaLocation); + + final OtherResearchProduct p = OBJECT_MAPPER + .readValue( + IOUtils.toString(getClass().getResourceAsStream("d4science-1-training.json")), + OtherResearchProduct.class); + + final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + + assertNotNull(xml); + + final Document doc = new SAXReader().read(new StringReader(xml)); + + assertNotNull(doc); + System.out.println(doc.asXML()); + + } + + @Test + public void testD4ScienceDataset() throws DocumentException, IOException { + final ContextMapper contextMapper = new ContextMapper(); + + final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, + XmlConverterJob.schemaLocation); + + final OtherResearchProduct p = OBJECT_MAPPER + .readValue( + IOUtils.toString(getClass().getResourceAsStream("d4science-2-dataset.json")), + OtherResearchProduct.class); + + final String xml = xmlRecordFactory.build(new JoinedEntity<>(p)); + + assertNotNull(xml); + + final Document doc = new SAXReader().read(new StringReader(xml)); + + assertNotNull(doc); + System.out.println(doc.asXML()); + + } + } diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/d4science-1-training.json b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/d4science-1-training.json new file mode 100644 index 0000000000..3ce397f10c --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/d4science-1-training.json @@ -0,0 +1 @@ +{"collectedfrom":[{"key":"10|alessia_____::6332e88a4c7dba6f7743d3a7a0c6ea2c","value":"Alessia","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":1694165542374,"id":"50|alessia_____::104c2d4ba8878c16fa824dce5b1bea57","originalId":["12d8f77e-d66f-46f5-8d88-af7db23bc4c9","50|alessia_____::104c2d4ba8878c16fa824dce5b1bea57"],"pid":[],"dateofcollection":"2023-09-08T10:12:35.864+02:00","dateoftransformation":"2023-09-08T11:31:45.692+02:00","extraInfo":[],"oaiprovenance":{"originDescription":{"harvestDate":"2023-09-08T10:12:35.864+02:00","altered":true,"baseURL":"https%3A%2F%2Fapi.d4science.org%2Fcatalogue%2Fitems","identifier":"","datestamp":"","metadataNamespace":""}},"measures":null,"processingchargeamount":null,"processingchargecurrency":null,"author":[{"fullname":"BRAGHIERI MARCO","name":"","surname":"","rank":1,"pid":[],"affiliation":[]}],"resulttype":{"classid":"other","classname":"other","schemeid":"dnet:result_typologies","schemename":"dnet:result_typologies"},"language":{"classid":"","classname":"","schemeid":"dnet:languages","schemename":"dnet:languages"},"country":[],"subject":[{"value":"Visual analytics","qualifier":{"classid":"","classname":"","schemeid":"","schemename":""},"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"title":[{"value":"Visual Analytics for Data Scientists","qualifier":{"classid":"main title","classname":"main title","schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"relevantdate":[{"value":"","qualifier":{"classid":"Issued","classname":"Issued","schemeid":"dnet:dataCite_date","schemename":"dnet:dataCite_date"},"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"description":[{"value":"Participants to this module shall\n - Learn the principles and rules underlying the design of visual data\n representations and human-computer interactions\n - Understand, adapt and apply representative visual analytics methods and systems for diverse types\n of data and problems\n - Analyse and evaluate the structure and properties\n of data to select or devise appropriate methods for data exploration\n - Combine visualization, interactive techniques, and computational\n processing to develop practical data analysis for problem solving\n\n (This teaching material on Visual Analytics for Data Scientists is part of a MSc module at City University London).\n\n The author did not intend to violate any copyright on figures or content. In case you are the legal owner of any copyrighted content, please contact info@sobigdata.eu and we will immediately remove it","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"dateofacceptance":null,"publisher":{"value":"SoBigData++","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"embargoenddate":null,"source":[],"fulltext":[],"format":[{"value":"Slides","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"value":"Other","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"value":"PDF","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"value":"ZIP","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"contributor":[],"resourcetype":{"classid":"TrainingMaterial","classname":"TrainingMaterial","schemeid":"dnet:dataCite_resource","schemename":"dnet:dataCite_resource"},"coverage":[],"bestaccessright":{"classid":"OPEN","classname":"Open Access","schemeid":"dnet:access_modes","schemename":"dnet:access_modes"},"context":[],"externalReference":[],"instance":[{"license":{"value":"other-open","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"accessright":{"classid":"OPEN","classname":"Open Access","schemeid":"dnet:access_modes","schemename":"dnet:access_modes","openAccessRoute":null},"instancetype":{"classid":"0010","classname":"Lecture","schemeid":"dnet:publication_resource","schemename":"dnet:publication_resource"},"hostedby":{"key":"10|alessia_____::6332e88a4c7dba6f7743d3a7a0c6ea2c","value":"Alessia","dataInfo":null},"url":["http://data.d4science.org/ctlg/ResourceCatalogue/visual_analytics_for_data_scientists"],"distributionlocation":null,"collectedfrom":{"key":"10|alessia_____::6332e88a4c7dba6f7743d3a7a0c6ea2c","value":"Alessia","dataInfo":null},"pid":[],"alternateIdentifier":[],"dateofacceptance":null,"processingchargeamount":null,"processingchargecurrency":null,"refereed":{"classid":"","classname":"","schemeid":"dnet:review_levels","schemename":"dnet:review_levels"},"measures":null,"fulltext":null}],"eoscifguidelines":[],"contactperson":[],"contactgroup":[],"tool":[]} diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/d4science-2-dataset.json b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/d4science-2-dataset.json new file mode 100644 index 0000000000..ea8465e361 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/d4science-2-dataset.json @@ -0,0 +1 @@ +{"collectedfrom":[{"key":"10|alessia_____::6332e88a4c7dba6f7743d3a7a0c6ea2c","value":"Alessia","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":1694507584675,"id":"50|alessia_____::028879484548f4e1c630e1c503e35231","originalId":["4fed018e-c2ff-4afa-b7b5-1ca1beebf850","50|alessia_____::028879484548f4e1c630e1c503e35231"],"pid":[],"dateofcollection":"2023-09-08T12:14:27.615+02:00","dateoftransformation":"2023-09-08T12:14:51.7+02:00","extraInfo":[],"oaiprovenance":{"originDescription":{"harvestDate":"2023-09-08T12:14:27.615+02:00","altered":true,"baseURL":"https%3A%2F%2Fapi.d4science.org%2Fcatalogue%2Fitems","identifier":"","datestamp":"","metadataNamespace":""}},"measures":null,"processingchargeamount":null,"processingchargecurrency":null,"author":[{"fullname":"Pappalardo, Luca","name":"Luca","surname":"Pappalardo","rank":1,"pid":[{"value":"0000-0002-1547-6007","qualifier":{"classid":"orcid_pending","classname":"Open Researcher and Contributor ID","schemeid":"dnet:pid_types","schemename":"dnet:pid_types"},"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"affiliation":[]}],"resulttype":{"classid":"dataset","classname":"dataset","schemeid":"dnet:result_typologies","schemename":"dnet:result_typologies"},"language":{"classid":"","classname":"","schemeid":"dnet:languages","schemename":"dnet:languages"},"country":[],"subject":[{"value":"Human Mobility data","qualifier":{"classid":"","classname":"","schemeid":"","schemename":""},"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"title":[{"value":"City-to-city migration","qualifier":{"classid":"main title","classname":"main title","schemeid":"dnet:dataCite_title","schemename":"dnet:dataCite_title"},"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"relevantdate":[{"value":"2018-02-15","qualifier":{"classid":"Issued","classname":"Issued","schemeid":"dnet:dataCite_date","schemename":"dnet:dataCite_date"},"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"description":[{"value":"Census data recording the migration of people between metropolitan areas in\n the US","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"dateofacceptance":{"value":"2018-02-15","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"publisher":{"value":"SoBigData++","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"embargoenddate":null,"source":[],"fulltext":[],"format":[],"contributor":[],"resourcetype":{"classid":"dataset","classname":"dataset","schemeid":"dnet:dataCite_resource","schemename":"dnet:dataCite_resource"},"coverage":[],"bestaccessright":{"classid":"OPEN","classname":"Open Access","schemeid":"dnet:access_modes","schemename":"dnet:access_modes"},"context":[],"externalReference":[],"instance":[{"license":{"value":"AFL-3.0","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"accessright":{"classid":"OPEN","classname":"Open Access","schemeid":"dnet:access_modes","schemename":"dnet:access_modes","openAccessRoute":null},"instancetype":{"classid":"0021","classname":"Dataset","schemeid":"dnet:publication_resource","schemename":"dnet:publication_resource"},"hostedby":{"key":"10|alessia_____::6332e88a4c7dba6f7743d3a7a0c6ea2c","value":"Alessia","dataInfo":null},"url":["http://data.d4science.org/ctlg/ResourceCatalogue/city-to-city_migration"],"distributionlocation":null,"collectedfrom":{"key":"10|alessia_____::6332e88a4c7dba6f7743d3a7a0c6ea2c","value":"Alessia","dataInfo":null},"pid":[],"alternateIdentifier":[],"dateofacceptance":{"value":"2018-02-15","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"processingchargeamount":null,"processingchargecurrency":null,"refereed":{"classid":"","classname":"","schemeid":"dnet:review_levels","schemename":"dnet:review_levels"},"measures":null,"fulltext":null}],"eoscifguidelines":[],"storagedate":{"value":"2018-02-15","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":"","provenanceaction":{"classid":"sysimport:crosswalk","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"device":null,"size":null,"version":null,"lastmetadataupdate":null,"metadataversionnumber":null,"geolocation":[]} From 0935d7757cfc9c4efce6500e3e8f02792d56c2ad Mon Sep 17 00:00:00 2001 From: Alessia Bardi Date: Tue, 19 Sep 2023 14:47:01 +0200 Subject: [PATCH 10/13] Use v5 of the UNIBI Gold ISSN list in test --- .../eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCsvTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCsvTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCsvTest.java index 48f1e0c064..9bd32968ae 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCsvTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/DownloadCsvTest.java @@ -49,7 +49,7 @@ public class DownloadCsvTest { @Test void getUnibiFileTest() throws CollectorException, IOException, ClassNotFoundException { - String fileURL = "https://pub.uni-bielefeld.de/download/2944717/2944718/issn_gold_oa_version_4.csv"; + String fileURL = "https://pub.uni-bielefeld.de/download/2944717/2944718/issn_gold_oa_version_5.csv"; final String outputFile = workingDir + "/unibi_gold.json"; new DownloadCSV() From e239b81740bccfb90211464b5d422113eac2b783 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Wed, 20 Sep 2023 15:42:00 +0200 Subject: [PATCH 11/13] Fix defect #8997: GenerateEventsJob is generating huge amounts of logs because broker entity similarity calculation consistently failed --- .../eu/dnetlib/pace/model/SparkModel.scala | 4 ++-- .../dhp/broker/oa/util/TrustUtils.java | 22 +++++++++---------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala index 95325ace0a..3ba36aa227 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala @@ -78,10 +78,10 @@ case class SparkModel(conf: DedupConfig) { uv case Type.List | Type.JSON => - MapDocumentUtil.truncateList( + Seq(MapDocumentUtil.truncateList( MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType), fdef.getSize - ).toArray + )) case Type.StringConcat => val jpaths = CONCAT_REGEX.split(fdef.getPath) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java index a6fa2b1a1c..6f197a8ce3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java @@ -1,18 +1,18 @@ package eu.dnetlib.dhp.broker.oa.util; -import java.io.IOException; - -import org.apache.spark.sql.Row; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.SparkDeduper; import eu.dnetlib.pace.tree.support.TreeProcessor; +import org.apache.commons.io.IOUtils; +import org.apache.spark.sql.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; public class TrustUtils { @@ -27,10 +27,8 @@ public class TrustUtils { static { mapper = new ObjectMapper(); try { - dedupConfig = mapper - .readValue( - DedupConfig.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"), - DedupConfig.class); + dedupConfig = DedupConfig.load(IOUtils.toString(DedupConfig.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"), StandardCharsets.UTF_8)); + deduper = new SparkDeduper(dedupConfig); } catch (final IOException e) { log.error("Error loading dedupConfig, e"); @@ -57,7 +55,7 @@ public class TrustUtils { return TrustUtils.rescale(score, threshold); } catch (final Exception e) { log.error("Error computing score between results", e); - return BrokerConstants.MIN_TRUST; + throw new RuntimeException(e); } } From 7f244d9a7ad7e723d9c80503476e590835031c4d Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 20 Sep 2023 15:53:21 +0200 Subject: [PATCH 12/13] code formatting --- .../dhp/broker/oa/util/TrustUtils.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java index 6f197a8ce3..67468c6f9c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java @@ -1,18 +1,20 @@ package eu.dnetlib.dhp.broker.oa.util; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.broker.objects.OaBrokerMainEntity; -import eu.dnetlib.pace.config.DedupConfig; -import eu.dnetlib.pace.model.SparkDeduper; -import eu.dnetlib.pace.tree.support.TreeProcessor; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + import org.apache.commons.io.IOUtils; import org.apache.spark.sql.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.charset.StandardCharsets; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.SparkDeduper; +import eu.dnetlib.pace.tree.support.TreeProcessor; public class TrustUtils { @@ -27,7 +29,13 @@ public class TrustUtils { static { mapper = new ObjectMapper(); try { - dedupConfig = DedupConfig.load(IOUtils.toString(DedupConfig.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"), StandardCharsets.UTF_8)); + dedupConfig = DedupConfig + .load( + IOUtils + .toString( + DedupConfig.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"), + StandardCharsets.UTF_8)); deduper = new SparkDeduper(dedupConfig); } catch (final IOException e) { From 3c47920c7839b2004a2e8f9fbb4dbff8873d1e6d Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Wed, 20 Sep 2023 16:14:01 +0200 Subject: [PATCH 13/13] Use asScala to convert java List to Scala Sequence --- .../src/main/java/eu/dnetlib/pace/model/SparkModel.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala index 3ba36aa227..aa997c6e9f 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkModel.scala @@ -78,10 +78,10 @@ case class SparkModel(conf: DedupConfig) { uv case Type.List | Type.JSON => - Seq(MapDocumentUtil.truncateList( + MapDocumentUtil.truncateList( MapDocumentUtil.getJPathList(fdef.getPath, documentContext, fdef.getType), fdef.getSize - )) + ).asScala case Type.StringConcat => val jpaths = CONCAT_REGEX.split(fdef.getPath)