From f26378f426def7c33ddc6488692f01ffdba53e09 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 25 Nov 2022 17:52:46 +0100 Subject: [PATCH] [Dump Subset] change code to read from db --- .../subset/ReadMasterDuplicateFromDB.java | 40 ++-- .../graph/dump/subset/SparkSelectSubset.java | 122 +++++----- .../oa/graph/dump/subset/DumpSubsetTest.java | 220 +++++++++--------- 3 files changed, 196 insertions(+), 186 deletions(-) diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/subset/ReadMasterDuplicateFromDB.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/subset/ReadMasterDuplicateFromDB.java index d606800..c646d2a 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/subset/ReadMasterDuplicateFromDB.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/subset/ReadMasterDuplicateFromDB.java @@ -1,20 +1,6 @@ package eu.dnetlib.dhp.oa.graph.dump.subset; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.mongodb.DBCursor; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.DbClient; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - import java.io.BufferedWriter; import java.io.Closeable; import java.io.IOException; @@ -24,7 +10,23 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.function.Function; -public class ReadMasterDuplicateFromDB { +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.mongodb.DBCursor; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.DbClient; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; + +public class ReadMasterDuplicateFromDB { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -58,9 +60,9 @@ public class ReadMasterDuplicateFromDB { } private static void execute(String dbUrl, String dbUser, String dbPassword, FSDataOutputStream fos) { - try(DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)){ - try(BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))){ - dbClient.processResults(QUERY, rs -> writeMap(datasourceMasterMap(rs), writer)); + try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) { + dbClient.processResults(QUERY, rs -> writeMap(datasourceMasterMap(rs), writer)); } } catch (IOException e) { @@ -68,7 +70,6 @@ public class ReadMasterDuplicateFromDB { } } - public static MasterDuplicate datasourceMasterMap(ResultSet rs) { try { MasterDuplicate dm = new MasterDuplicate(); @@ -84,7 +85,6 @@ public class ReadMasterDuplicateFromDB { } } - protected static void writeMap(final MasterDuplicate dm, BufferedWriter writer) { try { writer.write(OBJECT_MAPPER.writeValueAsString(dm)); diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/subset/SparkSelectSubset.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/subset/SparkSelectSubset.java index fa8ce5f..508e5d9 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/subset/SparkSelectSubset.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/subset/SparkSelectSubset.java @@ -80,7 +80,8 @@ public class SparkSelectSubset implements Serializable { } - private static void selectSubset(SparkSession spark, String inputPath, String outputPath, Set removeSet, String masterDuplicatePath) { + private static void selectSubset(SparkSession spark, String inputPath, String outputPath, Set removeSet, + String masterDuplicatePath) { Dataset relation = Utils .readPath(spark, inputPath + "/relation", Relation.class) .filter( @@ -202,12 +203,13 @@ public class SparkSelectSubset implements Serializable { (FlatMapFunction) p -> { List ret = new ArrayList<>(); p.getInstance().stream().forEach(i -> { - if (Optional.ofNullable(i.getHostedby()).isPresent() && Optional.ofNullable(i.getHostedby().getKey()).isPresent()) + if (Optional.ofNullable(i.getHostedby()).isPresent() + && Optional.ofNullable(i.getHostedby().getKey()).isPresent()) ret.add(i.getHostedby().getKey()); }); - if (Optional.ofNullable(p.getCollectedfrom()).isPresent()){ + if (Optional.ofNullable(p.getCollectedfrom()).isPresent()) { p.getCollectedfrom().stream().forEach(cf -> { - if(Optional.ofNullable(cf.getKey()).isPresent()) + if (Optional.ofNullable(cf.getKey()).isPresent()) ret.add(cf.getKey()); }); } @@ -216,69 +218,73 @@ public class SparkSelectSubset implements Serializable { .union( Utils .readPath(spark, outputPath + "/original/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class) - .flatMap( - (FlatMapFunction) p -> { - List ret = new ArrayList<>(); - p.getInstance().stream().forEach(i -> { - if (Optional.ofNullable(i.getHostedby()).isPresent() && Optional.ofNullable(i.getHostedby().getKey()).isPresent()) - ret.add(i.getHostedby().getKey()); - }); - if (Optional.ofNullable(p.getCollectedfrom()).isPresent() ){ - p.getCollectedfrom().stream().forEach(cf -> { - if(Optional.ofNullable(cf.getKey()).isPresent()) - ret.add(cf.getKey()); - }); - } - return ret.iterator(); - }, Encoders.STRING())) + .flatMap( + (FlatMapFunction) p -> { + List ret = new ArrayList<>(); + p.getInstance().stream().forEach(i -> { + if (Optional.ofNullable(i.getHostedby()).isPresent() + && Optional.ofNullable(i.getHostedby().getKey()).isPresent()) + ret.add(i.getHostedby().getKey()); + }); + if (Optional.ofNullable(p.getCollectedfrom()).isPresent()) { + p.getCollectedfrom().stream().forEach(cf -> { + if (Optional.ofNullable(cf.getKey()).isPresent()) + ret.add(cf.getKey()); + }); + } + return ret.iterator(); + }, Encoders.STRING())) .union( Utils .readPath(spark, outputPath + "/original/software", Software.class) - .flatMap( - (FlatMapFunction) p -> { - List ret = new ArrayList<>(); - p.getInstance().stream().forEach(i -> { - if (Optional.ofNullable(i.getHostedby()).isPresent() && Optional.ofNullable(i.getHostedby().getKey()).isPresent()) - ret.add(i.getHostedby().getKey()); - }); - if (Optional.ofNullable(p.getCollectedfrom()).isPresent()){ - p.getCollectedfrom().stream().forEach(cf -> { - if(Optional.ofNullable(cf.getKey()).isPresent()) - ret.add(cf.getKey()); - }); - } - return ret.iterator(); - }, Encoders.STRING())) + .flatMap( + (FlatMapFunction) p -> { + List ret = new ArrayList<>(); + p.getInstance().stream().forEach(i -> { + if (Optional.ofNullable(i.getHostedby()).isPresent() + && Optional.ofNullable(i.getHostedby().getKey()).isPresent()) + ret.add(i.getHostedby().getKey()); + }); + if (Optional.ofNullable(p.getCollectedfrom()).isPresent()) { + p.getCollectedfrom().stream().forEach(cf -> { + if (Optional.ofNullable(cf.getKey()).isPresent()) + ret.add(cf.getKey()); + }); + } + return ret.iterator(); + }, Encoders.STRING())) .union( Utils .readPath(spark, outputPath + "/original/otherresearchproduct", OtherResearchProduct.class) - .flatMap( - (FlatMapFunction) p -> { - List ret = new ArrayList<>(); - p.getInstance().stream().forEach(i -> { - if (Optional.ofNullable(i.getHostedby()).isPresent() && Optional.ofNullable(i.getHostedby().getKey()).isPresent()) - ret.add(i.getHostedby().getKey()); - }); - if (Optional.ofNullable(p.getCollectedfrom()).isPresent()){ - p.getCollectedfrom().stream().forEach(cf -> { - if(Optional.ofNullable(cf.getKey()).isPresent()) - ret.add(cf.getKey()); - }); - } - return ret.iterator(); - }, Encoders.STRING())) + .flatMap( + (FlatMapFunction) p -> { + List ret = new ArrayList<>(); + p.getInstance().stream().forEach(i -> { + if (Optional.ofNullable(i.getHostedby()).isPresent() + && Optional.ofNullable(i.getHostedby().getKey()).isPresent()) + ret.add(i.getHostedby().getKey()); + }); + if (Optional.ofNullable(p.getCollectedfrom()).isPresent()) { + p.getCollectedfrom().stream().forEach(cf -> { + if (Optional.ofNullable(cf.getKey()).isPresent()) + ret.add(cf.getKey()); + }); + } + return ret.iterator(); + }, Encoders.STRING())) .filter((FilterFunction) s -> !s.equals(ModelConstants.UNKNOWN_REPOSITORY.getKey())) .distinct(); Dataset masterDuplicate = Utils.readPath(spark, masterDuplicatePath, MasterDuplicate.class); - Dataset cfhb = cfhb_orig.joinWith(masterDuplicate, masterDuplicate.col("duplicate").equalTo(cfhb_orig.col("value")), "left") - .map((MapFunction, String>) t2 -> { - if (!Optional.ofNullable(t2._2()).isPresent()) { - return t2._1(); - } - return t2._2().getMaster(); - }, Encoders.STRING()); + Dataset cfhb = cfhb_orig + .joinWith(masterDuplicate, masterDuplicate.col("duplicate").equalTo(cfhb_orig.col("value")), "left") + .map((MapFunction, String>) t2 -> { + if (!Optional.ofNullable(t2._2()).isPresent()) { + return t2._1(); + } + return t2._2().getMaster(); + }, Encoders.STRING()); datasource .joinWith(cfhb, datasource.col("id").equalTo(cfhb.col("value"))) @@ -291,7 +297,9 @@ public class SparkSelectSubset implements Serializable { cfhb.foreach((ForeachFunction) s -> System.out.println("cf " + s)); datasource.foreach((ForeachFunction) ds -> System.out.println("ds " + ds.getId())); -Utils.readPath(spark, outputPath + "/original/datasource", Datasource.class).foreach((ForeachFunction) ds -> System.out.println("dsID: " + ds.getId())); + Utils + .readPath(spark, outputPath + "/original/datasource", Datasource.class) + .foreach((ForeachFunction) ds -> System.out.println("dsID: " + ds.getId())); Dataset project = Utils .readPath(spark, inputPath + "/project", Project.class) diff --git a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/subset/DumpSubsetTest.java b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/subset/DumpSubsetTest.java index f4bb78c..ee4f3f7 100644 --- a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/subset/DumpSubsetTest.java +++ b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/subset/DumpSubsetTest.java @@ -109,7 +109,7 @@ public class DumpSubsetTest { "\"contributor\" : \"$['contributor'][*]['value']\", \"description\" : \"$['description'][*]['value']\", " + "\"dateofacceptance\" : \"$['dateofacceptance']['value']\", " + - "\"context\": \"['context'][*]['id']\"}"; + "\"context\": \"['context'][*]['id']\"}"; final String constraint = "{\"criteria\":[{\"constraint\":[{\"verb\":\"lesser_than\",\"field\":\"dateofacceptance\",\"value\":\"2023-01-01\"},{\"verb\":\"greater_than\",\"field\":\"dateofacceptance\",\"value\":\"2006-12-31\"}]}]}"; final String sourcePath = getClass() @@ -153,51 +153,53 @@ public class DumpSubsetTest { @Test // Step 1 void testSelectionConstraintsCommunity() throws Exception { final String pathMap = "{\"author\" : \"$['author'][*]['fullname']\", " + - "\"title\" : \"$['title'][*]['value']\", \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\", " - + - "\"contributor\" : \"$['contributor'][*]['value']\", \"description\" : \"$['description'][*]['value']\", " - + - "\"dateofacceptance\" : \"$['dateofacceptance']['value']\", " + - "\"context\": \"$['context'][*]['id']\"}"; + "\"title\" : \"$['title'][*]['value']\", \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\", " + + + "\"contributor\" : \"$['contributor'][*]['value']\", \"description\" : \"$['description'][*]['value']\", " + + + "\"dateofacceptance\" : \"$['dateofacceptance']['value']\", " + + "\"context\": \"$['context'][*]['id']\"}"; final String constraint = "{\"criteria\":[{\"constraint\":[{\"verb\":\"equals\",\"field\":\"context\",\"value\":\"dh-ch\"}]}]}"; final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/input/publication") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/input/publication") + .getPath(); final String communityMapPath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") + .getPath(); SparkDumpResult - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-sourcePath", sourcePath, - "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", - "-outputPath", workingDir.toString(), - "-communityMapPath", communityMapPath, - "-pathMap", pathMap, - "-selectionCriteria", constraint, - "-resultType", "publication" + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", + "-outputPath", workingDir.toString(), + "-communityMapPath", communityMapPath, + "-pathMap", pathMap, + "-selectionCriteria", constraint, + "-resultType", "publication" - }); + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/dump/publication") - .map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class)); + .textFile(workingDir.toString() + "/dump/publication") + .map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class)); Assertions.assertEquals(17, tmp.count()); JavaRDD tmp_pubs = sc - .textFile(workingDir.toString() + "/original/publication") - .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); + .textFile(workingDir.toString() + "/original/publication") + .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); Assertions.assertEquals(17, tmp_pubs.count()); - Assertions.assertEquals(17, tmp_pubs.filter(p -> p.getContext().stream().anyMatch(c -> c.getId().equals("dh-ch"))).count()); + Assertions + .assertEquals( + 17, tmp_pubs.filter(p -> p.getContext().stream().anyMatch(c -> c.getId().equals("dh-ch"))).count()); } @@ -265,7 +267,7 @@ public class DumpSubsetTest { "-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-outputPath", workingDir.toString(), - "-masterDuplicatePath", getClass() + "-masterDuplicatePath", getClass() .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/masterDuplicate/empty") .getPath() @@ -329,123 +331,123 @@ public class DumpSubsetTest { void testSelectSubsetMaster() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/input/") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/input/") + .getPath(); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); sc - .textFile( - getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/publication") - .getPath()) - .saveAsTextFile(workingDir.toString() + "/original/publication"); + .textFile( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/publication") + .getPath()) + .saveAsTextFile(workingDir.toString() + "/original/publication"); sc - .textFile( - getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/software") - .getPath()) - .saveAsTextFile(workingDir.toString() + "/original/software"); + .textFile( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/software") + .getPath()) + .saveAsTextFile(workingDir.toString() + "/original/software"); sc - .textFile( - getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/dataset") - .getPath()) - .saveAsTextFile(workingDir.toString() + "/original/dataset"); + .textFile( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/dataset") + .getPath()) + .saveAsTextFile(workingDir.toString() + "/original/dataset"); sc - .textFile( - getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/otherresearchproduct") - .getPath()) - .saveAsTextFile(workingDir.toString() + "/original/otherresearchproduct"); + .textFile( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/otherresearchproduct") + .getPath()) + .saveAsTextFile(workingDir.toString() + "/original/otherresearchproduct"); sc - .textFile( - getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/publication") - .getPath()) - .saveAsTextFile(workingDir.toString() + "/dump/publication"); + .textFile( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/publication") + .getPath()) + .saveAsTextFile(workingDir.toString() + "/dump/publication"); sc - .textFile( - getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/software") - .getPath()) - .saveAsTextFile(workingDir.toString() + "/dump/software"); + .textFile( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/software") + .getPath()) + .saveAsTextFile(workingDir.toString() + "/dump/software"); sc - .textFile( - getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/dataset") - .getPath()) - .saveAsTextFile(workingDir.toString() + "/dump/dataset"); + .textFile( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/dataset") + .getPath()) + .saveAsTextFile(workingDir.toString() + "/dump/dataset"); sc - .textFile( - getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/otherresearchproduct") - .getPath()) - .saveAsTextFile(workingDir.toString() + "/dump/otherresearchproduct"); + .textFile( + getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/otherresearchproduct") + .getPath()) + .saveAsTextFile(workingDir.toString() + "/dump/otherresearchproduct"); SparkSelectSubset - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-sourcePath", sourcePath, - "-outputPath", workingDir.toString(), - "-masterDuplicatePath", getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/masterDuplicate/correspondence") - .getPath() + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-outputPath", workingDir.toString(), + "-masterDuplicatePath", getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/masterDuplicate/correspondence") + .getPath() - }); + }); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/original/relation") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + .textFile(workingDir.toString() + "/original/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); Assertions.assertEquals(20, tmp.count()); Assertions - .assertEquals( - 6, tmp.filter(r -> r.getSource().startsWith("50|") && r.getTarget().startsWith("50|")).count()); + .assertEquals( + 6, tmp.filter(r -> r.getSource().startsWith("50|") && r.getTarget().startsWith("50|")).count()); Assertions - .assertEquals( - 3, tmp.filter(r -> r.getSource().startsWith("50|") && r.getTarget().startsWith("20|")).count()); + .assertEquals( + 3, tmp.filter(r -> r.getSource().startsWith("50|") && r.getTarget().startsWith("20|")).count()); Assertions - .assertEquals( - 3, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("50|")).count()); + .assertEquals( + 3, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("50|")).count()); Assertions - .assertEquals( - 4, tmp.filter(r -> r.getSource().startsWith("40|") && r.getTarget().startsWith("50|")).count()); + .assertEquals( + 4, tmp.filter(r -> r.getSource().startsWith("40|") && r.getTarget().startsWith("50|")).count()); Assertions - .assertEquals( - 1, tmp.filter(r -> r.getSource().startsWith("10|") && r.getTarget().startsWith("20|")).count()); + .assertEquals( + 1, tmp.filter(r -> r.getSource().startsWith("10|") && r.getTarget().startsWith("20|")).count()); Assertions - .assertEquals( - 1, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("10|")).count()); + .assertEquals( + 1, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("10|")).count()); Assertions - .assertEquals( - 1, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("40|")).count()); + .assertEquals( + 1, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("40|")).count()); Assertions - .assertEquals( - 1, tmp.filter(r -> r.getSource().startsWith("40|") && r.getTarget().startsWith("20|")).count()); + .assertEquals( + 1, tmp.filter(r -> r.getSource().startsWith("40|") && r.getTarget().startsWith("20|")).count()); JavaRDD tmp_datasource = sc - .textFile(workingDir.toString() + "/original/datasource") - .map(item -> OBJECT_MAPPER.readValue(item, Datasource.class)); + .textFile(workingDir.toString() + "/original/datasource") + .map(item -> OBJECT_MAPPER.readValue(item, Datasource.class)); Assertions.assertEquals(5, tmp_datasource.count()); - + Assertions - .assertEquals( - 1, - tmp_datasource - .filter(d -> d.getId().equals("10|fairsharing_::cd0f74b5955dc87fd0605745c4b49ee8")) - .count()); + .assertEquals( + 1, + tmp_datasource + .filter(d -> d.getId().equals("10|fairsharing_::cd0f74b5955dc87fd0605745c4b49ee8")) + .count()); JavaRDD tmp_organization = sc - .textFile(workingDir.toString() + "/original/organization") - .map(item -> OBJECT_MAPPER.readValue(item, Organization.class)); + .textFile(workingDir.toString() + "/original/organization") + .map(item -> OBJECT_MAPPER.readValue(item, Organization.class)); Assertions.assertEquals(3, tmp_organization.count()); JavaRDD tmp_project = sc - .textFile(workingDir.toString() + "/original/project") - .map(item -> OBJECT_MAPPER.readValue(item, Project.class)); + .textFile(workingDir.toString() + "/original/project") + .map(item -> OBJECT_MAPPER.readValue(item, Project.class)); Assertions.assertEquals(3, tmp_project.count()); }