diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java index 87510c108..99981bf6a 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java @@ -33,7 +33,7 @@ import scala.Tuple2; public class GroupEntitiesSparkJob { private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); - private static final Encoder OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class); + private static final Encoder OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class); public static void main(String[] args) throws Exception { @@ -114,7 +114,7 @@ public class GroupEntitiesSparkJob { Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC)); // pivot on "_1" (classname of the entity) - // created columns containing only entities of the same class + // created columns containing only entities of the same class for (Map.Entry e : ModelSupport.entityTypes.entrySet()) { String entity = e.getKey().name(); Class entityClass = e.getValue(); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 739295c91..cb1c70059 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -67,60 +67,60 @@ public class SparkPropagateRelation extends AbstractSparkAction { log.info("graphOutputPath: '{}'", graphOutputPath); Dataset mergeRels = spark - .read() - .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) - .as(REL_BEAN_ENC); + .read() + .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) + .as(REL_BEAN_ENC); // Dataset idsToMerge = mergeRels - .where(col("relClass").equalTo(ModelConstants.MERGES)) - .select(col("source").as("dedupID"), col("target").as("mergedObjectID")) - .distinct(); + .where(col("relClass").equalTo(ModelConstants.MERGES)) + .select(col("source").as("dedupID"), col("target").as("mergedObjectID")) + .distinct(); Dataset allRels = spark - .read() - .schema(REL_BEAN_ENC.schema()) - .json(graphBasePath + "/relation"); + .read() + .schema(REL_BEAN_ENC.schema()) + .json(graphBasePath + "/relation"); Dataset dedupedRels = allRels - .joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") - .joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") - .select("_1._1", "_1._2.dedupID", "_2.dedupID") - .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) - .map((MapFunction, Relation>) t -> { - Relation rel = t._1(); - String newSource = t._2(); - String newTarget = t._3(); + .joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") + .joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") + .select("_1._1", "_1._2.dedupID", "_2.dedupID") + .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) + .map((MapFunction, Relation>) t -> { + Relation rel = t._1(); + String newSource = t._2(); + String newTarget = t._3(); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); - } + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } - if (newSource != null || newTarget != null) { - rel.getDataInfo().setDeletedbyinference(false); + if (newSource != null || newTarget != null) { + rel.getDataInfo().setDeletedbyinference(false); - if (newSource != null) - rel.setSource(newSource); + if (newSource != null) + rel.setSource(newSource); - if (newTarget != null) - rel.setTarget(newTarget); - } + if (newTarget != null) + rel.setTarget(newTarget); + } - return rel; - }, REL_BEAN_ENC); + return rel; + }, REL_BEAN_ENC); // ids of records that are both not deletedbyinference and not invisible Dataset ids = validIds(spark, graphBasePath); // filter relations that point to valid records, can force them to be visible Dataset cleanedRels = dedupedRels - .join(ids, col("source").equalTo(ids.col("id")), "leftsemi") - .join(ids, col("target").equalTo(ids.col("id")), "leftsemi") - .as(REL_BEAN_ENC) - .map((MapFunction) r -> { - r.getDataInfo().setInvisible(false); - return r; - }, REL_KRYO_ENC); + .join(ids, col("source").equalTo(ids.col("id")), "leftsemi") + .join(ids, col("target").equalTo(ids.col("id")), "leftsemi") + .as(REL_BEAN_ENC) + .map((MapFunction) r -> { + r.getDataInfo().setInvisible(false); + return r; + }, REL_KRYO_ENC); Dataset distinctRels = cleanedRels .groupByKey( diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java index b878e778e..0887adf45 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java @@ -1,14 +1,14 @@ package eu.dnetlib.dhp.oa.graph.group; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.utils.DHPUtils; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -18,108 +18,108 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; -import static org.junit.jupiter.api.Assertions.assertEquals; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.DHPUtils; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class GroupEntitiesSparkJobTest { - private static SparkSession spark; + private static SparkSession spark; - private static ObjectMapper mapper = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private static ObjectMapper mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - private static Path workingDir; - private Path dataInputPath; + private static Path workingDir; + private Path dataInputPath; - private Path checkpointPath; + private Path checkpointPath; - private Path outputPath; + private Path outputPath; - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); - SparkConf conf = new SparkConf(); - conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); - conf.setMaster("local"); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - spark = SparkSession.builder().config(conf).getOrCreate(); - } + SparkConf conf = new SparkConf(); + conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); + conf.setMaster("local"); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + spark = SparkSession.builder().config(conf).getOrCreate(); + } - @BeforeEach - public void beforeEach() throws IOException, URISyntaxException { - dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); - checkpointPath = workingDir.resolve("grouped_entity"); - outputPath = workingDir.resolve("dispatched_entity"); - } + @BeforeEach + public void beforeEach() throws IOException, URISyntaxException { + dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); + checkpointPath = workingDir.resolve("grouped_entity"); + outputPath = workingDir.resolve("dispatched_entity"); + } - @AfterAll - public static void afterAll() throws IOException { - spark.stop(); - FileUtils.deleteDirectory(workingDir.toFile()); - } + @AfterAll + public static void afterAll() throws IOException { + spark.stop(); + FileUtils.deleteDirectory(workingDir.toFile()); + } - @Test - @Order(1) - void testGroupEntities() throws Exception { - GroupEntitiesSparkJob.main(new String[]{ - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-graphInputPath", - dataInputPath.toString(), - "-checkpointPath", - checkpointPath.toString(), - "-outputPath", - outputPath.toString(), - "-filterInvisible", - Boolean.FALSE.toString() - }); + @Test + @Order(1) + void testGroupEntities() throws Exception { + GroupEntitiesSparkJob.main(new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-graphInputPath", + dataInputPath.toString(), + "-checkpointPath", + checkpointPath.toString(), + "-outputPath", + outputPath.toString(), + "-filterInvisible", + Boolean.FALSE.toString() + }); - Dataset checkpointTable = spark - .read() - .load(checkpointPath.toString()) - .selectExpr("COALESCE(*)") - .as(Encoders.kryo(OafEntity.class)); + Dataset checkpointTable = spark + .read() + .load(checkpointPath.toString()) + .selectExpr("COALESCE(*)") + .as(Encoders.kryo(OafEntity.class)); + assertEquals( + 1, + checkpointTable + .filter( + (FilterFunction) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" + .equals(r.getId()) && + r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) + .count()); - assertEquals( - 1, - checkpointTable - .filter( - (FilterFunction) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" - .equals(r.getId()) && - r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) - .count()); + Dataset output = spark + .read() + .textFile( + DHPUtils + .toSeq( + HdfsSupport + .listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration()))) + .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); - - Dataset output = spark - .read() - .textFile( - DHPUtils - .toSeq( - HdfsSupport - .listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration()))) - .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); - - assertEquals(3, output.count()); - assertEquals( - 2, - output - .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) - .filter((FilterFunction) s -> s.equals("publication")) - .count()); - assertEquals( - 1, - output - .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) - .filter((FilterFunction) s -> s.equals("dataset")) - .count()); - } -} \ No newline at end of file + assertEquals(3, output.count()); + assertEquals( + 2, + output + .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) + .filter((FilterFunction) s -> s.equals("publication")) + .count()); + assertEquals( + 1, + output + .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) + .filter((FilterFunction) s -> s.equals("dataset")) + .count()); + } +}