dump #50

Merged
claudio.atzori merged 98 commits from miriam.baglioni/dnet-hadoop:dump into master 2020-11-04 18:07:01 +01:00
1 changed files with 14 additions and 7 deletions
Showing only changes of commit d407852ac2 - Show all commits

View File

@ -7,10 +7,13 @@ import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
@ -28,7 +31,7 @@ import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Software;
@Disabled
//@Disabled
public class DumpJobTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -165,6 +168,10 @@ public class DumpJobTest {
Assertions.assertEquals(90, verificationDataset.count());
// verificationDataset
// .filter("id = '50|DansKnawCris::1a960e20087cb46b93588e4e184e8a58'")
// .foreach((ForeachFunction<CommunityResult>) rec -> System.out.println(OBJECT_MAPPER.writeValueAsString(rec)));
Assertions
.assertTrue(
verificationDataset.filter("bestAccessright.code = 'c_abf2'").count() == verificationDataset
@ -213,20 +220,20 @@ public class DumpJobTest {
.run(
// false, sourcePath, workingDir.toString() + "/result", communityMapPath, Dataset.class,
false, sourcePath, workingDir.toString() + "/result", communityMapPath, Dataset.class,
Result.class, true);
GraphResult.class, true);
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<eu.dnetlib.dhp.schema.dump.oaf.Result> tmp = sc
JavaRDD<eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult> tmp = sc
.textFile(workingDir.toString() + "/result")
.map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.oaf.Result.class));
.map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult.class));
org.apache.spark.sql.Dataset<eu.dnetlib.dhp.schema.dump.oaf.Result> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.Result.class));
org.apache.spark.sql.Dataset<eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult.class));
Assertions.assertEquals(5, verificationDataset.count());
verificationDataset.show(false);
verificationDataset.foreach((ForeachFunction<GraphResult>) res -> System.out.println(OBJECT_MAPPER.writeValueAsString(res)));
}
@Test