From 618d2de2da0b9749b5eca6097faee25706d08956 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 13 Jul 2021 17:10:02 +0200 Subject: [PATCH] minor changes and refactoring --- .../dhp/oa/graph/dump/ResultMapper.java | 2 +- .../dump/complete/CreateContextRelation.java | 8 +- .../dump/complete/DumpGraphEntities.java | 4 +- .../dhp/oa/graph/dump/complete/Extractor.java | 4 +- .../SparkSelectValidRelationsJob.java | 188 +++++++++--------- .../dhp/oa/graph/dump/DumpJobTest.java | 36 ++-- .../graph/dump/complete/CreateEntityTest.java | 10 +- .../dump/complete/SelectRelationTest.java | 99 ++++----- 8 files changed, 176 insertions(+), 175 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java index 47baee30d..fe9b4e443 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java @@ -428,7 +428,7 @@ public class ResultMapper implements Serializable { if (oPca.isPresent() && oPcc.isPresent()) { Field pca = oPca.get(); Field pcc = oPcc.get(); - if(!pca.getValue().trim().equals("") && !pcc.getValue().trim().equals("")){ + if (!pca.getValue().trim().equals("") && !pcc.getValue().trim().equals("")) { APC apc = new APC(); apc.setCurrency(oPcc.get().getValue()); apc.setAmount(oPca.get().getValue()); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateContextRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateContextRelation.java index 102406315..59f660525 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateContextRelation.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateContextRelation.java @@ -70,10 +70,10 @@ public class CreateContextRelation implements Serializable { cce.execute(Process::getRelation, CONTEX_RELATION_DATASOURCE, ModelSupport.getIdPrefix(Datasource.class)); log.info("Creating relations for projects... "); -// cce -// .execute( -// Process::getRelation, CONTEX_RELATION_PROJECT, -// ModelSupport.getIdPrefix(eu.dnetlib.dhp.schema.oaf.Project.class)); + cce + .execute( + Process::getRelation, CONTEX_RELATION_PROJECT, + ModelSupport.getIdPrefix(eu.dnetlib.dhp.schema.oaf.Project.class)); cce.close(); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpGraphEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpGraphEntities.java index 530f7a003..b843ec365 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpGraphEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpGraphEntities.java @@ -453,7 +453,7 @@ public class DumpGraphEntities implements Serializable { .map( (MapFunction) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o), Encoders.bean(Organization.class)) - .filter(Objects::nonNull) + .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -461,7 +461,7 @@ public class DumpGraphEntities implements Serializable { } private static Organization mapOrganization(eu.dnetlib.dhp.schema.oaf.Organization org) { - if(org.getDataInfo().getDeletedbyinference()) + if (org.getDataInfo().getDeletedbyinference()) return null; Organization organization = new Organization(); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/Extractor.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/Extractor.java index 31886d1b1..f76f92013 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/Extractor.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/Extractor.java @@ -129,7 +129,7 @@ public class Extractor implements Serializable { return relationList.iterator(); }, Encoders.bean(Relation.class)) - .write() + .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) .json(outputPath); @@ -147,7 +147,7 @@ public class Extractor implements Serializable { .map( paction -> Provenance .newInstance( - paction.getClassid(), + paction.getClassname(), dinfo.getTrust())) .orElse( Provenance diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java index 77402ea1b..e729e13df 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.oa.graph.dump.complete; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; @@ -25,113 +26,112 @@ import eu.dnetlib.dhp.schema.oaf.*; * with this view for both the source and the target */ - public class SparkSelectValidRelationsJob implements Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkSelectValidRelationsJob.class); + private static final Logger log = LoggerFactory.getLogger(SparkSelectValidRelationsJob.class); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkSelectValidRelationsJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/complete/input_relationdump_parameters.json")); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkSelectValidRelationsJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/complete/input_relationdump_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - SparkConf conf = new SparkConf(); + SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - selectValidRelation(spark, inputPath, outputPath); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + selectValidRelation(spark, inputPath, outputPath); - }); + }); - } + } - private static void selectValidRelation(SparkSession spark, String inputPath, String outputPath) { - Dataset relation = Utils.readPath(spark, inputPath + "/relation", Relation.class); - Dataset publication = Utils.readPath(spark, inputPath + "/publication", Publication.class); - Dataset dataset = Utils - .readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); - Dataset software = Utils.readPath(spark, inputPath + "/software", Software.class); - Dataset other = Utils - .readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class); - Dataset organization = Utils.readPath(spark, inputPath + "/organization", Organization.class); - Dataset project = Utils.readPath(spark, inputPath + "/project", Project.class); - Dataset datasource = Utils.readPath(spark, inputPath + "/datasource", Datasource.class); + private static void selectValidRelation(SparkSession spark, String inputPath, String outputPath) { + Dataset relation = Utils.readPath(spark, inputPath + "/relation", Relation.class); + Dataset publication = Utils.readPath(spark, inputPath + "/publication", Publication.class); + Dataset dataset = Utils + .readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + Dataset software = Utils.readPath(spark, inputPath + "/software", Software.class); + Dataset other = Utils + .readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class); + Dataset organization = Utils.readPath(spark, inputPath + "/organization", Organization.class); + Dataset project = Utils.readPath(spark, inputPath + "/project", Project.class); + Dataset datasource = Utils.readPath(spark, inputPath + "/datasource", Datasource.class); - relation.createOrReplaceTempView("relation"); - publication.createOrReplaceTempView("publication"); - dataset.createOrReplaceTempView("dataset"); - other.createOrReplaceTempView("other"); - software.createOrReplaceTempView("software"); - organization.createOrReplaceTempView("organization"); - project.createOrReplaceTempView("project"); - datasource.createOrReplaceTempView("datasource"); + relation.createOrReplaceTempView("relation"); + publication.createOrReplaceTempView("publication"); + dataset.createOrReplaceTempView("dataset"); + other.createOrReplaceTempView("other"); + software.createOrReplaceTempView("software"); + organization.createOrReplaceTempView("organization"); + project.createOrReplaceTempView("project"); + datasource.createOrReplaceTempView("datasource"); - spark - .sql( - "SELECT id " + - "FROM publication " + - "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + - "UNION ALL " + - "SELECT id " + - "FROM dataset " + - "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + - "UNION ALL " + - "SELECT id " + - "FROM other " + - "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + - "UNION ALL " + - "SELECT id " + - "FROM software " + - "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + - "UNION ALL " + - "SELECT id " + - "FROM organization " + - "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + - "UNION ALL " + - "SELECT id " + - "FROM project " + - "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + - "UNION ALL " + - "SELECT id " + - "FROM datasource " + - "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " ) - .createOrReplaceTempView("identifiers"); + spark + .sql( + "SELECT id " + + "FROM publication " + + "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + + "UNION ALL " + + "SELECT id " + + "FROM dataset " + + "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + + "UNION ALL " + + "SELECT id " + + "FROM other " + + "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + + "UNION ALL " + + "SELECT id " + + "FROM software " + + "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + + "UNION ALL " + + "SELECT id " + + "FROM organization " + + "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + + "UNION ALL " + + "SELECT id " + + "FROM project " + + "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + + "UNION ALL " + + "SELECT id " + + "FROM datasource " + + "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false ") + .createOrReplaceTempView("identifiers"); - spark - .sql( - "SELECT relation.* " + - "FROM relation " + - "JOIN identifiers i1 " + - "ON source = i1.id " + - "JOIN identifiers i2 " + - "ON target = i2.id " + - "WHERE datainfo.deletedbyinference = false") - .as(Encoders.bean(Relation.class)) - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .json(outputPath); - ; + spark + .sql( + "SELECT relation.* " + + "FROM relation " + + "JOIN identifiers i1 " + + "ON source = i1.id " + + "JOIN identifiers i2 " + + "ON target = i2.id " + + "WHERE datainfo.deletedbyinference = false") + .as(Encoders.bean(Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); + ; - } -} \ No newline at end of file + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java index f0842c844..6b64a4124 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java @@ -412,40 +412,42 @@ public class DumpJobTest { @Test public void testArticlePCA() { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication_pca") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication_pca") + .getPath(); final String communityMapPath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") + .getPath(); DumpProducts dump = new DumpProducts(); dump - .run( - // false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, - false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, - GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType()); + .run( + // false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, + false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, + GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/result") - .map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class)); + .textFile(workingDir.toString() + "/result") + .map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(GraphResult.class)); + .createDataset(tmp.rdd(), Encoders.bean(GraphResult.class)); Assertions.assertEquals(23, verificationDataset.count()); - //verificationDataset.show(false); + // verificationDataset.show(false); Assertions.assertEquals(23, verificationDataset.filter("type = 'publication'").count()); verificationDataset.createOrReplaceTempView("check"); - org.apache.spark.sql.Dataset temp = spark.sql("select id " + - "from check " + - "lateral view explode (instance) i as inst " + - "where inst.articleprocessingcharge is not null"); + org.apache.spark.sql.Dataset temp = spark + .sql( + "select id " + + "from check " + + "lateral view explode (instance) i as inst " + + "where inst.articleprocessingcharge is not null"); Assertions.assertTrue(temp.count() == 2); @@ -453,8 +455,6 @@ public class DumpJobTest { Assertions.assertTrue(temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").count() == 1); - - // verificationDataset.filter("bestAccessright.code = 'c_abf2'").count() == verificationDataset // .filter("bestAccessright.code = 'c_abf2' and bestAccessright.label = 'OPEN'") // .count() diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateEntityTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateEntityTest.java index 3ecbd1894..679c09393 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateEntityTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateEntityTest.java @@ -97,7 +97,7 @@ public class CreateEntityTest { Assertions.assertEquals(12, riList.size()); riList.stream().forEach(c -> { - switch (c.getOriginalId()) { + switch (c.getAcronym()) { case "mes": Assertions .assertTrue(c.getType().equals(eu.dnetlib.dhp.oa.graph.dump.Constants.RESEARCH_COMMUNITY)); @@ -115,9 +115,9 @@ public class CreateEntityTest { String .format( "%s|%s::%s", Constants.CONTEXT_ID, Constants.CONTEXT_NS_PREFIX, - DHPUtils.md5(c.getOriginalId())))); + DHPUtils.md5(c.getAcronym())))); Assertions.assertTrue(c.getZenodo_community().equals("https://zenodo.org/communities/oac_mes")); - Assertions.assertTrue("mes".equals(c.getOriginalId())); + Assertions.assertTrue("mes".equals(c.getAcronym())); break; case "clarin": Assertions @@ -130,9 +130,9 @@ public class CreateEntityTest { String .format( "%s|%s::%s", Constants.CONTEXT_ID, Constants.CONTEXT_NS_PREFIX, - DHPUtils.md5(c.getOriginalId())))); + DHPUtils.md5(c.getAcronym())))); Assertions.assertTrue(c.getZenodo_community().equals("https://zenodo.org/communities/oac_clarin")); - Assertions.assertTrue("clarin".equals(c.getOriginalId())); + Assertions.assertTrue("clarin".equals(c.getAcronym())); break; } // TODO add check for all the others Entities diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/SelectRelationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/SelectRelationTest.java index 248bd7256..3e4ab93c5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/SelectRelationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/SelectRelationTest.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.oa.graph.dump.complete; import java.io.IOException; @@ -28,72 +29,72 @@ import net.sf.saxon.expr.instruct.ForEach; public class SelectRelationTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; + private static SparkSession spark; - private static Path workingDir; + private static Path workingDir; - private static final Logger log = LoggerFactory - .getLogger(SelectRelationTest.class); + private static final Logger log = LoggerFactory + .getLogger(SelectRelationTest.class); - private static HashMap map = new HashMap<>(); + private static HashMap map = new HashMap<>(); - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files - .createTempDirectory(SelectRelationTest.class.getSimpleName()); - log.info("using work dir {}", workingDir); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(SelectRelationTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); - SparkConf conf = new SparkConf(); - conf.setAppName(SelectRelationTest.class.getSimpleName()); + SparkConf conf = new SparkConf(); + conf.setAppName(SelectRelationTest.class.getSimpleName()); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - spark = SparkSession - .builder() - .appName(SelectRelationTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + spark = SparkSession + .builder() + .appName(SelectRelationTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } - @Test - public void test1() throws Exception { + @Test + public void test1() throws Exception { - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/selectrelations") - .getPath(); + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/selectrelations") + .getPath(); - SparkSelectValidRelationsJob.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/relation", - "-sourcePath", sourcePath - }); + SparkSelectValidRelationsJob.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/relation", + "-sourcePath", sourcePath + }); // dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset"); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/relation") - .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.oaf.Relation.class)); + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.oaf.Relation.class)); - org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.oaf.Relation.class)); + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.oaf.Relation.class)); - Assertions.assertTrue(verificationDataset.count() == 7); + Assertions.assertTrue(verificationDataset.count() == 7); - } + } -} \ No newline at end of file +}