From 6cc7d8ca7b04f8aff859fda0ff04294b830608b5 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Thu, 24 Aug 2023 21:48:07 +0200 Subject: [PATCH] GroupEntities and DispatchEntites are now merged in GroupEntitiesSparkJob --- .../oa/merge/DispatchEntitiesSparkJob.java | 98 -------- .../dhp/oa/merge/GroupEntitiesSparkJob.java | 221 ++++++++---------- .../merge/dispatch_entities_parameters.json | 26 --- .../group_graph_entities_parameters.json | 16 +- .../dedup/consistency/oozie_app/workflow.xml | 26 +-- .../dhp/oa/graph/group/oozie_app/workflow.xml | 25 +- .../group/GroupEntitiesSparkJobTest.java | 201 ++++++++-------- 7 files changed, 203 insertions(+), 410 deletions(-) delete mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java delete mode 100644 dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java deleted file mode 100644 index 4d2ccb178..000000000 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java +++ /dev/null @@ -1,98 +0,0 @@ - -package eu.dnetlib.dhp.oa.merge; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.Objects; -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.common.ModelSupport; - -public class DispatchEntitiesSparkJob { - - private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class); - - public static void main(String[] args) throws Exception { - - String jsonConfiguration = IOUtils - .toString( - Objects - .requireNonNull( - DispatchEntitiesSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json"))); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); - - String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible")); - log.info("filterInvisible: {}", filterInvisible); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - dispatchEntities(spark, inputPath, outputPath, filterInvisible); - }); - } - - private static void dispatchEntities( - SparkSession spark, - String inputPath, - String outputPath, - boolean filterInvisible) { - - Dataset df = spark.read().textFile(inputPath); - - ModelSupport.oafTypes.entrySet().parallelStream().forEach(entry -> { - String entityType = entry.getKey(); - Class clazz = entry.getValue(); - - if (!entityType.equalsIgnoreCase("relation")) { - Dataset entityDF = spark - .read() - .schema(Encoders.bean(clazz).schema()) - .json( - df - .filter((FilterFunction) s -> s.startsWith(clazz.getName())) - .map( - (MapFunction) s -> StringUtils.substringAfter(s, "|"), - Encoders.STRING())); - - if (filterInvisible) { - entityDF = entityDF.filter("dataInfo.invisible != true"); - } - - entityDF - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/" + entityType); - } - }); - } -} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java index e652bd5b6..87510c108 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java @@ -2,36 +2,28 @@ package eu.dnetlib.dhp.oa.merge; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.when; -import java.io.IOException; -import java.util.List; -import java.util.Objects; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.*; -import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.jayway.jsonpath.Configuration; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import com.jayway.jsonpath.Option; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; @@ -39,13 +31,9 @@ import scala.Tuple2; * Groups the graph content by entity identifier to ensure ID uniqueness */ public class GroupEntitiesSparkJob { - private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); - private static final String ID_JPATH = "$.id"; - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private static final Encoder OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class); public static void main(String[] args) throws Exception { @@ -66,9 +54,15 @@ public class GroupEntitiesSparkJob { String graphInputPath = parser.get("graphInputPath"); log.info("graphInputPath: {}", graphInputPath); + String checkpointPath = parser.get("checkpointPath"); + log.info("checkpointPath: {}", checkpointPath); + String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); + boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible")); + log.info("filterInvisible: {}", filterInvisible); + SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(ModelSupport.getOafModelClasses()); @@ -78,126 +72,95 @@ public class GroupEntitiesSparkJob { isSparkSessionManaged, spark -> { HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - groupEntities(spark, graphInputPath, outputPath); + groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible); }); } private static void groupEntities( SparkSession spark, String inputPath, - String outputPath) { + String checkpointPath, + String outputPath, + boolean filterInvisible) { - final TypedColumn aggregator = new GroupingAggregator().toColumn(); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - spark - .read() - .textFile(toSeq(listEntityPaths(inputPath, sc))) - .map((MapFunction) GroupEntitiesSparkJob::parseOaf, Encoders.kryo(OafEntity.class)) - .filter((FilterFunction) e -> StringUtils.isNotBlank(ModelSupport.idFn().apply(e))) - .groupByKey((MapFunction) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) - .agg(aggregator) + Dataset allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC); + + for (Map.Entry e : ModelSupport.entityTypes.entrySet()) { + String entity = e.getKey().name(); + Class entityClass = e.getValue(); + String entityInputPath = inputPath + "/" + entity; + + if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) { + continue; + } + + allEntities = allEntities + .union( + ((Dataset) spark + .read() + .schema(Encoders.bean(entityClass).schema()) + .json(entityInputPath) + .filter("length(id) > 0") + .as(Encoders.bean(entityClass))) + .map((MapFunction) r -> r, OAFENTITY_KRYO_ENC)); + } + + Dataset groupedEntities = allEntities + .groupByKey((MapFunction) OafEntity::getId, Encoders.STRING()) + .reduceGroups((ReduceFunction) (b, a) -> OafMapperUtils.mergeEntities(b, a)) .map( - (MapFunction, String>) t -> t._2().getClass().getName() + - "|" + OBJECT_MAPPER.writeValueAsString(t._2()), - Encoders.STRING()) + (MapFunction, Tuple2>) t -> new Tuple2( + t._2().getClass().getName(), t._2()), + Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC)); + + // pivot on "_1" (classname of the entity) + // created columns containing only entities of the same class + for (Map.Entry e : ModelSupport.entityTypes.entrySet()) { + String entity = e.getKey().name(); + Class entityClass = e.getValue(); + + groupedEntities = groupedEntities + .withColumn( + entity, + when(col("_1").equalTo(entityClass.getName()), col("_2"))); + } + + groupedEntities + .drop("_1", "_2") .write() - .option("compression", "gzip") .mode(SaveMode.Overwrite) - .text(outputPath); - } + .option("compression", "gzip") + .save(checkpointPath); - public static class GroupingAggregator extends Aggregator { + ForkJoinPool parPool = new ForkJoinPool(ModelSupport.entityTypes.size()); - @Override - public OafEntity zero() { - return null; - } - - @Override - public OafEntity reduce(OafEntity b, OafEntity a) { - return mergeAndGet(b, a); - } - - private OafEntity mergeAndGet(OafEntity b, OafEntity a) { - if (Objects.nonNull(a) && Objects.nonNull(b)) { - return OafMapperUtils.mergeEntities(b, a); - } - return Objects.isNull(a) ? b : a; - } - - @Override - public OafEntity merge(OafEntity b, OafEntity a) { - return mergeAndGet(b, a); - } - - @Override - public OafEntity finish(OafEntity j) { - return j; - } - - @Override - public Encoder bufferEncoder() { - return Encoders.kryo(OafEntity.class); - } - - @Override - public Encoder outputEncoder() { - return Encoders.kryo(OafEntity.class); - } - - } - - private static OafEntity parseOaf(String s) { - - DocumentContext dc = JsonPath - .parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS)); - final String id = dc.read(ID_JPATH); - if (StringUtils.isNotBlank(id)) { - - String prefix = StringUtils.substringBefore(id, "|"); - switch (prefix) { - case "10": - return parse(s, Datasource.class); - case "20": - return parse(s, Organization.class); - case "40": - return parse(s, Project.class); - case "50": - String resultType = dc.read("$.resulttype.classid"); - switch (resultType) { - case "publication": - return parse(s, Publication.class); - case "dataset": - return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class); - case "software": - return parse(s, Software.class); - case "other": - return parse(s, OtherResearchProduct.class); - default: - throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType)); - } - default: - throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix)); - } - } else { - throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s)); - } - } - - private static OafEntity parse(String s, Class clazz) { - try { - return OBJECT_MAPPER.readValue(s, clazz); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - } - - private static List listEntityPaths(String inputPath, JavaSparkContext sc) { - return HdfsSupport - .listFiles(inputPath, sc.hadoopConfiguration()) + ModelSupport.entityTypes + .entrySet() .stream() - .filter(f -> !f.toLowerCase().contains("relation")) - .collect(Collectors.toList()); - } + .map(e -> parPool.submit(() -> { + String entity = e.getKey().name(); + Class entityClass = e.getValue(); + spark + .read() + .load(checkpointPath) + .select(col(entity).as("value")) + .filter("value IS NOT NULL") + .as(OAFENTITY_KRYO_ENC) + .map((MapFunction) r -> r, (Encoder) Encoders.bean(entityClass)) + .filter(filterInvisible ? "dataInfo.invisible != TRUE" : "TRUE") + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/" + entity); + })) + .collect(Collectors.toList()) + .forEach(t -> { + try { + t.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } } diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json deleted file mode 100644 index 60f11ac84..000000000 --- a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json +++ /dev/null @@ -1,26 +0,0 @@ -[ - { - "paramName": "issm", - "paramLongName": "isSparkSessionManaged", - "paramDescription": "when true will stop SparkSession after job execution", - "paramRequired": false - }, - { - "paramName": "i", - "paramLongName": "inputPath", - "paramDescription": "the source path", - "paramRequired": true - }, - { - "paramName": "o", - "paramLongName": "outputPath", - "paramDescription": "path of the output graph", - "paramRequired": true - }, - { - "paramName": "fi", - "paramLongName": "filterInvisible", - "paramDescription": "if true filters out invisible entities", - "paramRequired": true - } -] \ No newline at end of file diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json index e65acb3c4..58e3ca711 100644 --- a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json +++ b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json @@ -8,13 +8,25 @@ { "paramName": "gin", "paramLongName": "graphInputPath", - "paramDescription": "the graph root path", + "paramDescription": "the input graph root path", + "paramRequired": true + }, + { + "paramName": "cp", + "paramLongName": "checkpointPath", + "paramDescription": "checkpoint directory", "paramRequired": true }, { "paramName": "out", "paramLongName": "outputPath", - "paramDescription": "the output merged graph root path", + "paramDescription": "the output graph root path", + "paramRequired": true + }, + { + "paramName": "fi", + "paramLongName": "filterInvisible", + "paramDescription": "if true filters out invisible entities", "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index b724e5d0b..3640ee6ca 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -152,31 +152,7 @@ --conf spark.sql.shuffle.partitions=15000 --graphInputPath${graphBasePath} - --outputPath${workingPath}/grouped_entities - - - - - - - - yarn - cluster - Dispatch grouped entitities - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --conf spark.executor.memoryOverhead=${sparkExecutorMemoryOverhead} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities + --checkpointPath${workingPath}/grouped_entities --outputPath${graphOutputPath} --filterInvisible${filterInvisible} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml index 219dc7331..190788c9d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml @@ -96,30 +96,7 @@ --conf spark.sql.shuffle.partitions=15000 --graphInputPath${graphBasePath} - --outputPath${workingPath}/grouped_entities - - - - - - - - yarn - cluster - Dispatch grouped entities - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities + --checkpointPath${workingPath}/grouped_entities --outputPath${graphOutputPath} --filterInvisible${filterInvisible} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java index 61baf80dc..b878e778e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java @@ -1,16 +1,15 @@ package eu.dnetlib.dhp.oa.graph.group; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; - +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.DHPUtils; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; @@ -19,118 +18,108 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob; -import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.utils.DHPUtils; +import static org.junit.jupiter.api.Assertions.assertEquals; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class GroupEntitiesSparkJobTest { - private static SparkSession spark; + private static SparkSession spark; - private static ObjectMapper mapper = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private static ObjectMapper mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - private static Path workingDir; - private Path dataInputPath; + private static Path workingDir; + private Path dataInputPath; - private Path groupEntityPath; - private Path dispatchEntityPath; + private Path checkpointPath; - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); + private Path outputPath; - SparkConf conf = new SparkConf(); - conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); - conf.setMaster("local"); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - spark = SparkSession.builder().config(conf).getOrCreate(); - } + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); - @BeforeEach - public void beforeEach() throws IOException, URISyntaxException { - dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); - groupEntityPath = workingDir.resolve("grouped_entity"); - dispatchEntityPath = workingDir.resolve("dispatched_entity"); - } + SparkConf conf = new SparkConf(); + conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); + conf.setMaster("local"); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + spark = SparkSession.builder().config(conf).getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - spark.stop(); - FileUtils.deleteDirectory(workingDir.toFile()); - } + @BeforeEach + public void beforeEach() throws IOException, URISyntaxException { + dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); + checkpointPath = workingDir.resolve("grouped_entity"); + outputPath = workingDir.resolve("dispatched_entity"); + } - @Test - @Order(1) - void testGroupEntities() throws Exception { - GroupEntitiesSparkJob.main(new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-graphInputPath", - dataInputPath.toString(), - "-outputPath", - groupEntityPath.toString() - }); + @AfterAll + public static void afterAll() throws IOException { + spark.stop(); + FileUtils.deleteDirectory(workingDir.toFile()); + } - Dataset output = spark - .read() - .textFile(groupEntityPath.toString()) - .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) - .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); + @Test + @Order(1) + void testGroupEntities() throws Exception { + GroupEntitiesSparkJob.main(new String[]{ + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-graphInputPath", + dataInputPath.toString(), + "-checkpointPath", + checkpointPath.toString(), + "-outputPath", + outputPath.toString(), + "-filterInvisible", + Boolean.FALSE.toString() + }); - assertEquals( - 1, - output - .filter( - (FilterFunction) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" - .equals(r.getId()) && - r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) - .count()); - } + Dataset checkpointTable = spark + .read() + .load(checkpointPath.toString()) + .selectExpr("COALESCE(*)") + .as(Encoders.kryo(OafEntity.class)); - @Test - @Order(2) - void testDispatchEntities() throws Exception { - DispatchEntitiesSparkJob.main(new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-inputPath", - groupEntityPath.toString(), - "-outputPath", - dispatchEntityPath.resolve(".").toString(), - "-filterInvisible", - Boolean.TRUE.toString() - }); - Dataset output = spark - .read() - .textFile( - DHPUtils - .toSeq( - HdfsSupport - .listFiles(dispatchEntityPath.toString(), spark.sparkContext().hadoopConfiguration()))) - .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); + assertEquals( + 1, + checkpointTable + .filter( + (FilterFunction) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" + .equals(r.getId()) && + r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) + .count()); - assertEquals(3, output.count()); - assertEquals( - 2, - output - .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) - .filter((FilterFunction) s -> s.equals("publication")) - .count()); - assertEquals( - 1, - output - .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) - .filter((FilterFunction) s -> s.equals("dataset")) - .count()); - } -} + + Dataset output = spark + .read() + .textFile( + DHPUtils + .toSeq( + HdfsSupport + .listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration()))) + .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); + + assertEquals(3, output.count()); + assertEquals( + 2, + output + .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) + .filter((FilterFunction) s -> s.equals("publication")) + .count()); + assertEquals( + 1, + output + .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) + .filter((FilterFunction) s -> s.equals("dataset")) + .count()); + } +} \ No newline at end of file