[Graph Dump] New funded products dump #222

Merged
claudio.atzori merged 5 commits from dump_new_funded_products into master 2022-07-01 10:51:37 +02:00
10 changed files with 304 additions and 287 deletions
Showing only changes of commit 0cb1c70788 - Show all commits

View File

@ -255,7 +255,8 @@ public class ZenodoAPIClient implements Serializable {
private void setDepositionId(String concept_rec_id, Integer page) throws IOException, MissingConceptDoiException { private void setDepositionId(String concept_rec_id, Integer page) throws IOException, MissingConceptDoiException {
ZenodoModelList zenodoModelList = new Gson().fromJson(getPrevDepositions(String.valueOf(page)), ZenodoModelList.class); ZenodoModelList zenodoModelList = new Gson()
.fromJson(getPrevDepositions(String.valueOf(page)), ZenodoModelList.class);
for (ZenodoModel zm : zenodoModelList) { for (ZenodoModel zm : zenodoModelList) {
if (zm.getConceptrecid().equals(concept_rec_id)) { if (zm.getConceptrecid().equals(concept_rec_id)) {
@ -263,8 +264,9 @@ public class ZenodoAPIClient implements Serializable {
return; return;
} }
} }
if(zenodoModelList.size() == 0) if (zenodoModelList.size() == 0)
throw new MissingConceptDoiException("The concept record id specified was missing in the list of depositions"); throw new MissingConceptDoiException(
"The concept record id specified was missing in the list of depositions");
setDepositionId(concept_rec_id, page + 1); setDepositionId(concept_rec_id, page + 1);
} }
@ -278,11 +280,11 @@ public class ZenodoAPIClient implements Serializable {
String url = urlBuilder.build().toString(); String url = urlBuilder.build().toString();
Request request = new Request.Builder() Request request = new Request.Builder()
.url(url) .url(url)
.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()) // add request headers .addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()) // add request headers
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token) .addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token)
.get() .get()
.build(); .build();
try (Response response = httpClient.newCall(request).execute()) { try (Response response = httpClient.newCall(request).execute()) {
@ -295,7 +297,6 @@ public class ZenodoAPIClient implements Serializable {
} }
private String getBucket(String url) throws IOException { private String getBucket(String url) throws IOException {
OkHttpClient httpClient = new OkHttpClient.Builder() OkHttpClient httpClient = new OkHttpClient.Builder()
.connectTimeout(600, TimeUnit.SECONDS) .connectTimeout(600, TimeUnit.SECONDS)

View File

@ -142,7 +142,8 @@ class TransformationJobTest extends AbstractVocabularyTest {
@Test @Test
@DisplayName("Test TransformSparkJobNode.main with oaiOpenaire_datacite (v4)") @DisplayName("Test TransformSparkJobNode.main with oaiOpenaire_datacite (v4)")
void transformTestITGv4OAIdatacite(@TempDir final Path testDir) throws Exception { void transformTestITGv4OAIdatacite(@TempDir
final Path testDir) throws Exception {
try (SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate()) { try (SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate()) {
@ -152,7 +153,9 @@ class TransformationJobTest extends AbstractVocabularyTest {
.getFile(); .getFile();
final String mdstore_output = testDir.toString() + "/version"; final String mdstore_output = testDir.toString() + "/version";
mockupTrasformationRule("simpleTRule", "/eu/dnetlib/dhp/transform/scripts/xslt_cleaning_oaiOpenaire_datacite_ExchangeLandingpagePid.xsl"); mockupTrasformationRule(
"simpleTRule",
"/eu/dnetlib/dhp/transform/scripts/xslt_cleaning_oaiOpenaire_datacite_ExchangeLandingpagePid.xsl");
final Map<String, String> parameters = Stream.of(new String[][] { final Map<String, String> parameters = Stream.of(new String[][] {
{ {
@ -203,7 +206,8 @@ class TransformationJobTest extends AbstractVocabularyTest {
@Test @Test
@DisplayName("Test TransformSparkJobNode.main") @DisplayName("Test TransformSparkJobNode.main")
void transformTest(@TempDir final Path testDir) throws Exception { void transformTest(@TempDir
final Path testDir) throws Exception {
try (SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate()) { try (SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate()) {

View File

@ -103,7 +103,7 @@ public class SparkBulkTagJob {
ResultTagger resultTagger = new ResultTagger(); ResultTagger resultTagger = new ResultTagger();
readPath(spark, inputPath, resultClazz) readPath(spark, inputPath, resultClazz)
.map(patchResult(), Encoders.bean(resultClazz)) .map(patchResult(), Encoders.bean(resultClazz))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map( .map(
(MapFunction<R, R>) value -> resultTagger (MapFunction<R, R>) value -> resultTagger
.enrichContextCriteria( .enrichContextCriteria(

View File

@ -1,10 +1,14 @@
package eu.dnetlib.dhp.oa.graph.dump.funderresults; package eu.dnetlib.dhp.oa.graph.dump.funderresults;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
@ -14,29 +18,32 @@ import org.apache.spark.sql.*;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
import eu.dnetlib.dhp.schema.dump.oaf.community.Funder; import eu.dnetlib.dhp.schema.dump.oaf.community.Funder;
import eu.dnetlib.dhp.schema.dump.oaf.community.Project; import eu.dnetlib.dhp.schema.dump.oaf.community.Project;
/** /**
* Splits the dumped results by funder and stores them in a folder named as the funder nsp (for all the funders, but the EC * Splits the dumped results by funder and stores them in a folder named as the funder nsp (for all the funders, but the EC
* for the EC it specifies also the fundingStream (FP7 or H2020) * for the EC it specifies also the fundingStream (FP7 or H2020)
*/ */
public class SparkDumpFunderResults implements Serializable { public class SparkDumpFunderResults implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkDumpFunderResults.class); private static final Logger log = LoggerFactory.getLogger(SparkDumpFunderResults.class);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
SparkDumpFunderResults.class SparkDumpFunderResults.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/funder_result_parameters.json")); "/eu/dnetlib/dhp/oa/graph/dump/funder_result_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged")) .ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf) .map(Boolean::valueOf)
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath"); final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath); log.info("inputPath: {}", inputPath);
@ -44,30 +51,32 @@ public class SparkDumpFunderResults implements Serializable {
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath); Utils.removeOutputDir(spark, outputPath);
writeResultProjectList(spark, inputPath, outputPath); writeResultProjectList(spark, inputPath, outputPath);
}); });
} }
private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath) { private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath) {
Dataset<CommunityResult> result = Utils Dataset<CommunityResult> result = Utils
.readPath(spark, inputPath + "/publication", CommunityResult.class) .readPath(spark, inputPath + "/publication", CommunityResult.class)
.union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class)) .union(Utils.readPath(spark, inputPath + "/dataset", CommunityResult.class))
.union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class)) .union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class))
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
log.info("Number of result {}", result.count()); log.info("Number of result {}", result.count());
Dataset<String> tmp = result Dataset<String> tmp = result
.flatMap((FlatMapFunction<CommunityResult, String>) cr -> cr.getProjects().stream().map(p -> { .flatMap((FlatMapFunction<CommunityResult, String>) cr -> cr.getProjects().stream().map(p -> {
return getFunderName(p); return getFunderName(p);
}).collect(Collectors.toList()).iterator(), Encoders.STRING()) }).collect(Collectors.toList()).iterator(), Encoders.STRING())
.distinct(); .distinct();
List<String> funderList = tmp.collectAsList(); List<String> funderList = tmp.collectAsList();
funderList.forEach(funder -> { funderList.forEach(funder -> {
dumpResults(funder, result, outputPath); dumpResults(funder, result, outputPath);
}); });
} }
@NotNull @NotNull
private static String getFunderName(Project p) { private static String getFunderName(Project p) {
Optional<Funder> ofunder = Optional.ofNullable(p.getFunder()); Optional<Funder> ofunder = Optional.ofNullable(p.getFunder());
@ -97,23 +106,24 @@ public class SparkDumpFunderResults implements Serializable {
return fName; return fName;
} }
} }
private static void dumpResults(String funder, Dataset<CommunityResult> results, String outputPath) { private static void dumpResults(String funder, Dataset<CommunityResult> results, String outputPath) {
results.map((MapFunction<CommunityResult, CommunityResult>) r -> { results.map((MapFunction<CommunityResult, CommunityResult>) r -> {
if (!Optional.ofNullable(r.getProjects()).isPresent()) { if (!Optional.ofNullable(r.getProjects()).isPresent()) {
return null; return null;
} }
for (Project p : r.getProjects()) { for (Project p : r.getProjects()) {
String fName = getFunderName(p); String fName = getFunderName(p);
if (fName.equalsIgnoreCase(funder)) { if (fName.equalsIgnoreCase(funder)) {
return r; return r;
} }
} }
return null; return null;
}, Encoders.bean(CommunityResult.class)) }, Encoders.bean(CommunityResult.class))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "/" + funder); .json(outputPath + "/" + funder);
} }
} }

View File

@ -45,18 +45,18 @@ public class SparkResultLinkedToProject implements Serializable {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
SparkResultLinkedToProject.class SparkResultLinkedToProject.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_parameters_link_prj.json")); "/eu/dnetlib/dhp/oa/graph/dump/input_parameters_link_prj.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged")) .ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf) .map(Boolean::valueOf)
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath"); final String inputPath = parser.get("sourcePath");
@ -78,41 +78,41 @@ public class SparkResultLinkedToProject implements Serializable {
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath); Utils.removeOutputDir(spark, outputPath);
writeResultsLinkedToProjects( writeResultsLinkedToProjects(
communityMapPath, spark, inputClazz, inputPath, outputPath, resultProjectsPath); communityMapPath, spark, inputClazz, inputPath, outputPath, resultProjectsPath);
}); });
} }
private static <R extends Result> void writeResultsLinkedToProjects(String communityMapPath, SparkSession spark, private static <R extends Result> void writeResultsLinkedToProjects(String communityMapPath, SparkSession spark,
Class<R> inputClazz, Class<R> inputClazz,
String inputPath, String outputPath, String resultProjectsPath) { String inputPath, String outputPath, String resultProjectsPath) {
Dataset<R> results = Utils Dataset<R> results = Utils
.readPath(spark, inputPath, inputClazz) .readPath(spark, inputPath, inputClazz)
.filter( .filter(
(FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference() && (FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference() &&
!r.getDataInfo().getInvisible()); !r.getDataInfo().getInvisible());
Dataset<ResultProject> resultProjectDataset = Utils Dataset<ResultProject> resultProjectDataset = Utils
.readPath(spark, resultProjectsPath, ResultProject.class); .readPath(spark, resultProjectsPath, ResultProject.class);
CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath); CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
results results
.joinWith(resultProjectDataset, results.col("id").equalTo(resultProjectDataset.col("resultId"))) .joinWith(resultProjectDataset, results.col("id").equalTo(resultProjectDataset.col("resultId")))
.map((MapFunction<Tuple2<R, ResultProject>, CommunityResult>) t2 -> { .map((MapFunction<Tuple2<R, ResultProject>, CommunityResult>) t2 -> {
CommunityResult cr = (CommunityResult) ResultMapper CommunityResult cr = (CommunityResult) ResultMapper
.map( .map(
t2._1(), t2._1(),
communityMap, Constants.DUMPTYPE.FUNDER.getType()); communityMap, Constants.DUMPTYPE.FUNDER.getType());
cr.setProjects(t2._2().getProjectsList()); cr.setProjects(t2._2().getProjectsList());
return cr; return cr;
}, Encoders.bean(CommunityResult.class)) }, Encoders.bean(CommunityResult.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath); .json(outputPath);
} }
} }

View File

@ -2,9 +2,11 @@
package eu.dnetlib.dhp.oa.graph.dump.projectssubset; package eu.dnetlib.dhp.oa.graph.dump.projectssubset;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -14,6 +16,7 @@ import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.dump.oaf.graph.Project; import eu.dnetlib.dhp.schema.dump.oaf.graph.Project;

View File

@ -35,12 +35,12 @@ public class PrepareResultProjectJobTest {
private static Path workingDir; private static Path workingDir;
private static final Logger log = LoggerFactory private static final Logger log = LoggerFactory
.getLogger(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class); .getLogger(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class);
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
workingDir = Files workingDir = Files
.createTempDirectory(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName()); .createTempDirectory(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir); log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
@ -54,10 +54,10 @@ public class PrepareResultProjectJobTest {
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession spark = SparkSession
.builder() .builder()
.appName(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName()) .appName(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName())
.config(conf) .config(conf)
.getOrCreate(); .getOrCreate();
} }
@AfterAll @AfterAll
@ -70,23 +70,23 @@ public class PrepareResultProjectJobTest {
void testNoMatch() throws Exception { void testNoMatch() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/no_match") .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/no_match")
.getPath(); .getPath();
SparkPrepareResultProject.main(new String[] { SparkPrepareResultProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo", "-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath "-sourcePath", sourcePath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultProject> tmp = sc JavaRDD<ResultProject> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo") .textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class));
org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class));
assertEquals(0, verificationDataset.count()); assertEquals(0, verificationDataset.count());
@ -96,37 +96,37 @@ public class PrepareResultProjectJobTest {
void testMatchOne() throws Exception { void testMatchOne() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_one") .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_one")
.getPath(); .getPath();
SparkPrepareResultProject.main(new String[] { SparkPrepareResultProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo", "-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath "-sourcePath", sourcePath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultProject> tmp = sc JavaRDD<ResultProject> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo") .textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class));
org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class));
assertEquals(1, verificationDataset.count()); assertEquals(1, verificationDataset.count());
assertEquals( assertEquals(
1, 1,
verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count());
verificationDataset.createOrReplaceTempView("table"); verificationDataset.createOrReplaceTempView("table");
Dataset<Row> check = spark Dataset<Row> check = spark
.sql( .sql(
"Select projList.provenance.provenance " + "Select projList.provenance.provenance " +
"from table " + "from table " +
"lateral view explode (projectsList) pl as projList"); "lateral view explode (projectsList) pl as projList");
assertEquals(1, check.filter("provenance = 'sysimport:crosswalk:entityregistry'").count()); assertEquals(1, check.filter("provenance = 'sysimport:crosswalk:entityregistry'").count());
@ -138,88 +138,88 @@ public class PrepareResultProjectJobTest {
void testMatch() throws Exception { void testMatch() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match") .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match")
.getPath(); .getPath();
SparkPrepareResultProject.main(new String[] { SparkPrepareResultProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo", "-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath "-sourcePath", sourcePath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultProject> tmp = sc JavaRDD<ResultProject> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo") .textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class));
org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class));
assertEquals(2, verificationDataset.count()); assertEquals(2, verificationDataset.count());
assertEquals( assertEquals(
1, 1,
verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count());
assertEquals( assertEquals(
1, 1,
verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count()); verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count());
verificationDataset.createOrReplaceTempView("dataset"); verificationDataset.createOrReplaceTempView("dataset");
String query = "select resultId, MyT.id project , MyT.title title, MyT.acronym acronym , MyT.provenance.provenance provenance " String query = "select resultId, MyT.id project , MyT.title title, MyT.acronym acronym , MyT.provenance.provenance provenance "
+ "from dataset " + "from dataset "
+ "lateral view explode(projectsList) p as MyT "; + "lateral view explode(projectsList) p as MyT ";
org.apache.spark.sql.Dataset<Row> resultExplodedProvenance = spark.sql(query); org.apache.spark.sql.Dataset<Row> resultExplodedProvenance = spark.sql(query);
assertEquals(3, resultExplodedProvenance.count()); assertEquals(3, resultExplodedProvenance.count());
assertEquals( assertEquals(
2, 2,
resultExplodedProvenance resultExplodedProvenance
.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") .filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") .filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'")
.count()); .count());
assertEquals( assertEquals(
2, 2,
resultExplodedProvenance resultExplodedProvenance
.filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'") .filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter( .filter(
"project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter( .filter(
"project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'") .filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter( .filter(
"project = '40|aka_________::03376222b28a3aebf2730ac514818d04' and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") "project = '40|aka_________::03376222b28a3aebf2730ac514818d04' and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'")
.count()); .count());
assertEquals( assertEquals(
3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count()); 3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count());
} }
@ -227,98 +227,98 @@ public class PrepareResultProjectJobTest {
public void testMatchValidated() throws Exception { public void testMatchValidated() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_validatedRels") .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_validatedRels")
.getPath(); .getPath();
SparkPrepareResultProject.main(new String[] { SparkPrepareResultProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo", "-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath "-sourcePath", sourcePath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultProject> tmp = sc JavaRDD<ResultProject> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo") .textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class));
org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark org.apache.spark.sql.Dataset<ResultProject> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class));
assertEquals(2, verificationDataset.count()); assertEquals(2, verificationDataset.count());
assertEquals( assertEquals(
1, 1,
verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count());
assertEquals( assertEquals(
1, 1,
verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count()); verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count());
verificationDataset.createOrReplaceTempView("dataset"); verificationDataset.createOrReplaceTempView("dataset");
String query = "select resultId, MyT.id project , MyT.title title, MyT.acronym acronym , MyT.provenance.provenance provenance, " String query = "select resultId, MyT.id project , MyT.title title, MyT.acronym acronym , MyT.provenance.provenance provenance, "
+ +
"MyT.validated.validatedByFunder, MyT.validated.validationDate " "MyT.validated.validatedByFunder, MyT.validated.validationDate "
+ "from dataset " + "from dataset "
+ "lateral view explode(projectsList) p as MyT "; + "lateral view explode(projectsList) p as MyT ";
org.apache.spark.sql.Dataset<Row> resultExplodedProvenance = spark.sql(query); org.apache.spark.sql.Dataset<Row> resultExplodedProvenance = spark.sql(query);
assertEquals(3, resultExplodedProvenance.count()); assertEquals(3, resultExplodedProvenance.count());
assertEquals(3, resultExplodedProvenance.filter("validatedByFunder = true").count()); assertEquals(3, resultExplodedProvenance.filter("validatedByFunder = true").count());
assertEquals( assertEquals(
2, 2,
resultExplodedProvenance resultExplodedProvenance
.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") .filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") .filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'")
.count()); .count());
assertEquals( assertEquals(
2, 2,
resultExplodedProvenance resultExplodedProvenance
.filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'") .filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter( .filter(
"project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " + "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " +
"and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " + "and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " +
"and validatedByFunder = true " + "and validatedByFunder = true " +
"and validationDate = '2021-08-06'") "and validationDate = '2021-08-06'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter( .filter(
"project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " + "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' " +
"and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80' " + "and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80' " +
"and validatedByFunder = true and validationDate = '2021-08-04'") "and validatedByFunder = true and validationDate = '2021-08-04'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'") .filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'")
.count()); .count());
assertEquals( assertEquals(
1, 1,
resultExplodedProvenance resultExplodedProvenance
.filter( .filter(
"project = '40|aka_________::03376222b28a3aebf2730ac514818d04' " + "project = '40|aka_________::03376222b28a3aebf2730ac514818d04' " +
"and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " + "and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' " +
"and validatedByFunder = true and validationDate = '2021-08-05'") "and validatedByFunder = true and validationDate = '2021-08-05'")
.count()); .count());
assertEquals( assertEquals(
3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count()); 3, resultExplodedProvenance.filter("provenance = 'sysimport:crosswalk:entityregistry'").count());
} }
@ -326,20 +326,20 @@ public class PrepareResultProjectJobTest {
void testMatchx() throws Exception { void testMatchx() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match") .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match")
.getPath(); .getPath();
SparkPrepareResultProject.main(new String[] { SparkPrepareResultProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo", "-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath "-sourcePath", sourcePath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultProject> tmp = sc JavaRDD<ResultProject> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo") .textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class));
tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
} }

View File

@ -35,15 +35,15 @@ public class ResultLinkedToProjectTest {
private static Path workingDir; private static Path workingDir;
private static final Logger log = LoggerFactory private static final Logger log = LoggerFactory
.getLogger(eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class); .getLogger(eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class);
private static final HashMap<String, String> map = new HashMap<>(); private static final HashMap<String, String> map = new HashMap<>();
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
workingDir = Files workingDir = Files
.createTempDirectory( .createTempDirectory(
eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class.getSimpleName()); eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class.getSimpleName());
log.info("using work dir {}", workingDir); log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
@ -57,10 +57,10 @@ public class ResultLinkedToProjectTest {
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession spark = SparkSession
.builder() .builder()
.appName(eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class.getSimpleName()) .appName(eu.dnetlib.dhp.oa.graph.dump.funderresult.ResultLinkedToProjectTest.class.getSimpleName())
.config(conf) .config(conf)
.getOrCreate(); .getOrCreate();
} }
@AfterAll @AfterAll
@ -73,32 +73,32 @@ public class ResultLinkedToProjectTest {
void testNoMatch() throws Exception { void testNoMatch() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/nomatch/papers.json") .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/nomatch/papers.json")
.getPath(); .getPath();
final String graphPath = getClass() final String graphPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/preparedInfo") .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/preparedInfo")
.getPath(); .getPath();
final String communityMapPath = getClass() final String communityMapPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath") .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath")
.getPath(); .getPath();
SparkResultLinkedToProject.main(new String[] { SparkResultLinkedToProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo", "-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-graphPath", graphPath, "-graphPath", graphPath,
"-communityMapPath", communityMapPath "-communityMapPath", communityMapPath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<CommunityResult> tmp = sc JavaRDD<CommunityResult> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo") .textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(0, tmp.count()); Assertions.assertEquals(0, tmp.count());
@ -108,32 +108,32 @@ public class ResultLinkedToProjectTest {
void testMatchOne() throws Exception { void testMatchOne() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match/papers.json") .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/match/papers.json")
.getPath(); .getPath();
final String graphPath = getClass() final String graphPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/preparedInfo") .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/preparedInfo")
.getPath(); .getPath();
final String communityMapPath = getClass() final String communityMapPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath") .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/communityMapPath")
.getPath(); .getPath();
SparkResultLinkedToProject.main(new String[] { SparkResultLinkedToProject.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/preparedInfo", "-outputPath", workingDir.toString() + "/preparedInfo",
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-graphPath", graphPath, "-graphPath", graphPath,
"-communityMapPath", communityMapPath "-communityMapPath", communityMapPath
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<CommunityResult> tmp = sc JavaRDD<CommunityResult> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo") .textFile(workingDir.toString() + "/preparedInfo")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(1, tmp.count()); Assertions.assertEquals(1, tmp.count());

View File

@ -56,10 +56,10 @@ public class SplitPerFunderTest {
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession spark = SparkSession
.builder() .builder()
.appName(SplitPerFunderTest.class.getSimpleName()) .appName(SplitPerFunderTest.class.getSimpleName())
.config(conf) .config(conf)
.getOrCreate(); .getOrCreate();
} }
@AfterAll @AfterAll
@ -72,13 +72,13 @@ public class SplitPerFunderTest {
void test1() throws Exception { void test1() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/ext") .getResource("/eu/dnetlib/dhp/oa/graph/dump/funderresource/ext")
.getPath(); .getPath();
SparkDumpFunderResults.main(new String[] { SparkDumpFunderResults.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/split", "-outputPath", workingDir.toString() + "/split",
"-sourcePath", sourcePath "-sourcePath", sourcePath
}); });
@ -86,64 +86,64 @@ public class SplitPerFunderTest {
// FP7 3 and H2020 3 // FP7 3 and H2020 3
JavaRDD<CommunityResult> tmp = sc JavaRDD<CommunityResult> tmp = sc
.textFile(workingDir.toString() + "/split/EC_FP7") .textFile(workingDir.toString() + "/split/EC_FP7")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
org.apache.spark.sql.Dataset<CommunityResult> verificationDataset = spark org.apache.spark.sql.Dataset<CommunityResult> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(CommunityResult.class)); .createDataset(tmp.rdd(), Encoders.bean(CommunityResult.class));
Assertions.assertEquals(3, verificationDataset.count()); Assertions.assertEquals(3, verificationDataset.count());
Assertions Assertions
.assertEquals( .assertEquals(
1, verificationDataset.filter("id = '50|dedup_wf_001::0d16b1714ab3077df73893a8ea57d776'").count()); 1, verificationDataset.filter("id = '50|dedup_wf_001::0d16b1714ab3077df73893a8ea57d776'").count());
// CIHR 2 // CIHR 2
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/CIHR") .textFile(workingDir.toString() + "/split/CIHR")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(2, tmp.count()); Assertions.assertEquals(2, tmp.count());
// NWO 1 // NWO 1
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/NWO") .textFile(workingDir.toString() + "/split/NWO")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(1, tmp.count()); Assertions.assertEquals(1, tmp.count());
// NIH 3 // NIH 3
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/NIH") .textFile(workingDir.toString() + "/split/NIH")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(2, tmp.count()); Assertions.assertEquals(2, tmp.count());
// NSF 1 // NSF 1
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/NSF") .textFile(workingDir.toString() + "/split/NSF")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(1, tmp.count()); Assertions.assertEquals(1, tmp.count());
// SNSF 1 // SNSF 1
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/SNSF") .textFile(workingDir.toString() + "/split/SNSF")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(1, tmp.count()); Assertions.assertEquals(1, tmp.count());
// NHMRC 1 // NHMRC 1
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/NHMRC") .textFile(workingDir.toString() + "/split/NHMRC")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(1, tmp.count()); Assertions.assertEquals(1, tmp.count());
// H2020 3 // H2020 3
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/EC_H2020") .textFile(workingDir.toString() + "/split/EC_H2020")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(3, tmp.count()); Assertions.assertEquals(3, tmp.count());
// MZOS 1 // MZOS 1
tmp = sc tmp = sc
.textFile(workingDir.toString() + "/split/MZOS") .textFile(workingDir.toString() + "/split/MZOS")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(1, tmp.count()); Assertions.assertEquals(1, tmp.count());
} }

View File

@ -30,7 +30,6 @@ public class ProjectSubsetTest {
private static final Logger log = LoggerFactory private static final Logger log = LoggerFactory
.getLogger(eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class); .getLogger(eu.dnetlib.dhp.oa.graph.dump.projectssubset.ProjectSubsetTest.class);
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
workingDir = Files workingDir = Files