523 lines
18 KiB
Java
523 lines
18 KiB
Java
|
|
package eu.dnetlib.dhp.oa.graph.dump.subset;
|
|
|
|
import java.io.IOException;
|
|
import java.nio.file.Files;
|
|
import java.nio.file.Path;
|
|
|
|
import org.apache.commons.io.FileUtils;
|
|
import org.apache.spark.SparkConf;
|
|
import org.apache.spark.api.java.JavaRDD;
|
|
import org.apache.spark.api.java.JavaSparkContext;
|
|
import org.apache.spark.api.java.function.FilterFunction;
|
|
import org.apache.spark.api.java.function.ForeachFunction;
|
|
import org.apache.spark.sql.Encoders;
|
|
import org.apache.spark.sql.SparkSession;
|
|
import org.junit.jupiter.api.AfterAll;
|
|
import org.junit.jupiter.api.Assertions;
|
|
import org.junit.jupiter.api.BeforeAll;
|
|
import org.junit.jupiter.api.Test;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
import eu.dnetlib.dhp.oa.graph.dump.DumpJobTest;
|
|
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
|
|
import eu.dnetlib.dhp.oa.graph.dump.complete.Extractor;
|
|
import eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob;
|
|
import eu.dnetlib.dhp.oa.model.community.CommunityResult;
|
|
import eu.dnetlib.dhp.oa.model.graph.GraphResult;
|
|
import eu.dnetlib.dhp.oa.model.graph.ResearchCommunity;
|
|
import eu.dnetlib.dhp.schema.oaf.*;
|
|
|
|
/**
|
|
* @author miriam.baglioni
|
|
* @Date 16/11/22
|
|
*/
|
|
public class DumpSubsetTest {
|
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
|
|
private static SparkSession spark;
|
|
|
|
private static Path workingDir;
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(DumpSubsetTest.class);
|
|
|
|
private static final CommunityMap map = new CommunityMap();
|
|
|
|
static {
|
|
map.put("egi", "EGI Federation");
|
|
map.put("fet-fp7", "FET FP7");
|
|
map.put("fet-h2020", "FET H2020");
|
|
map.put("clarin", "CLARIN");
|
|
map.put("fam", "Fisheries and Aquaculture Management");
|
|
map.put("ni", "Neuroinformatics");
|
|
map.put("mes", "European Marine Scinece");
|
|
map.put("instruct", "Instruct-Eric");
|
|
map.put("rda", "Research Data Alliance");
|
|
map.put("elixir-gr", "ELIXIR GR");
|
|
map.put("aginfra", "Agricultural and Food Sciences");
|
|
map.put("dariah", "DARIAH EU");
|
|
map.put("risis", "RISI");
|
|
map.put("ee", "SDSN - Greece");
|
|
map.put("oa-pg", "EC Post-Grant Open Access Pilot");
|
|
map.put("beopen", "Transport Research");
|
|
map.put("euromarine", "Euromarine");
|
|
map.put("ifremer", "Ifremer");
|
|
map.put("dh-ch", "Digital Humanities and Cultural Heritage");
|
|
map.put("science-innovation-policy", "Science and Innovation Policy Studies");
|
|
map.put("covid-19", "COVID-19");
|
|
map.put("enrmaps", "Energy Research");
|
|
map.put("epos", "EPOS");
|
|
|
|
}
|
|
|
|
@BeforeAll
|
|
public static void beforeAll() throws IOException {
|
|
workingDir = Files.createTempDirectory(DumpSubsetTest.class.getSimpleName());
|
|
log.info("using work dir {}", workingDir);
|
|
|
|
SparkConf conf = new SparkConf();
|
|
conf.setAppName(DumpSubsetTest.class.getSimpleName());
|
|
|
|
conf.setMaster("local[*]");
|
|
conf.set("spark.driver.host", "localhost");
|
|
conf.set("hive.metastore.local", "true");
|
|
conf.set("spark.ui.enabled", "false");
|
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
|
|
|
spark = SparkSession
|
|
.builder()
|
|
.appName(DumpSubsetTest.class.getSimpleName())
|
|
.config(conf)
|
|
.getOrCreate();
|
|
}
|
|
|
|
@AfterAll
|
|
public static void afterAll() throws IOException {
|
|
FileUtils.deleteDirectory(workingDir.toFile());
|
|
spark.stop();
|
|
}
|
|
|
|
@Test // Step 1
|
|
void testSelectionConstraints() throws Exception {
|
|
final String pathMap = "{\"author\" : \"$['author'][*]['fullname']\", " +
|
|
"\"title\" : \"$['title'][*]['value']\", \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\", "
|
|
+
|
|
"\"contributor\" : \"$['contributor'][*]['value']\", \"description\" : \"$['description'][*]['value']\", "
|
|
+
|
|
"\"dateofacceptance\" : \"$['dateofacceptance']['value']\", " +
|
|
"\"context\": \"['context'][*]['id']\"}";
|
|
final String constraint = "{\"criteria\":[{\"constraint\":[{\"verb\":\"lesser_than\",\"field\":\"dateofacceptance\",\"value\":\"2023-01-01\"},{\"verb\":\"greater_than\",\"field\":\"dateofacceptance\",\"value\":\"2006-12-31\"}]}]}";
|
|
|
|
final String sourcePath = getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/input/publication")
|
|
.getPath();
|
|
|
|
final String communityMapPath = getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json")
|
|
.getPath();
|
|
|
|
SparkDumpResult
|
|
.main(
|
|
new String[] {
|
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
|
"-sourcePath", sourcePath,
|
|
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
|
"-outputPath", workingDir.toString(),
|
|
"-communityMapPath", communityMapPath,
|
|
"-pathMap", pathMap,
|
|
"-selectionCriteria", constraint,
|
|
"-resultType", "publication",
|
|
"-masterDuplicatePath", getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/masterDuplicate/correspondence")
|
|
.getPath()
|
|
|
|
});
|
|
|
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
|
|
JavaRDD<GraphResult> tmp = sc
|
|
.textFile(workingDir.toString() + "/dump/publication")
|
|
.map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class));
|
|
|
|
Assertions.assertEquals(16, tmp.count());
|
|
|
|
JavaRDD<Publication> tmp_pubs = sc
|
|
.textFile(workingDir.toString() + "/original/publication")
|
|
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
|
|
|
JavaRDD<Publication> input = sc
|
|
.textFile(sourcePath)
|
|
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
|
|
|
Assertions
|
|
.assertTrue(
|
|
input
|
|
.filter(r -> r.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
|
|
.first()
|
|
.getCollectedfrom()
|
|
.stream()
|
|
.anyMatch(
|
|
cf -> cf.getKey().equals("10|openaire____::806360c771262b4d6770e7cdf04b5c5a")
|
|
&& cf.getValue().equals("ZENODO")));
|
|
Assertions.assertEquals(16, tmp_pubs.count());
|
|
Assertions
|
|
.assertTrue(
|
|
tmp_pubs
|
|
.filter(r -> r.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
|
|
.first()
|
|
.getCollectedfrom()
|
|
.stream()
|
|
.anyMatch(
|
|
cf -> cf.getKey().equals("10|fairsharing_::cd0f74b5955dc87fd0605745c4b49ee8")
|
|
&& cf.getValue().equals("ZENODO")));
|
|
|
|
tmp_pubs.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p)));
|
|
|
|
}
|
|
|
|
@Test // Step 1
|
|
void testSelectionConstraintsCommunity() throws Exception {
|
|
final String pathMap = "{\"author\" : \"$['author'][*]['fullname']\", " +
|
|
"\"title\" : \"$['title'][*]['value']\", \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\", "
|
|
+
|
|
"\"contributor\" : \"$['contributor'][*]['value']\", \"description\" : \"$['description'][*]['value']\", "
|
|
+
|
|
"\"dateofacceptance\" : \"$['dateofacceptance']['value']\", " +
|
|
"\"context\": \"$['context'][*]['id']\"}";
|
|
final String constraint = "{\"criteria\":[{\"constraint\":[{\"verb\":\"equals\",\"field\":\"context\",\"value\":\"dh-ch\"}]}]}";
|
|
|
|
final String sourcePath = getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/input/publication")
|
|
.getPath();
|
|
|
|
final String communityMapPath = getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json")
|
|
.getPath();
|
|
|
|
SparkDumpResult
|
|
.main(
|
|
new String[] {
|
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
|
"-sourcePath", sourcePath,
|
|
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
|
|
"-outputPath", workingDir.toString(),
|
|
"-communityMapPath", communityMapPath,
|
|
"-pathMap", pathMap,
|
|
"-selectionCriteria", constraint,
|
|
"-resultType", "publication",
|
|
"-masterDuplicatePath", getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/masterDuplicate/empty")
|
|
.getPath()
|
|
|
|
});
|
|
|
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
|
|
JavaRDD<GraphResult> tmp = sc
|
|
.textFile(workingDir.toString() + "/dump/publication")
|
|
.map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class));
|
|
|
|
Assertions.assertEquals(17, tmp.count());
|
|
|
|
JavaRDD<Publication> tmp_pubs = sc
|
|
.textFile(workingDir.toString() + "/original/publication")
|
|
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
|
|
|
Assertions.assertEquals(17, tmp_pubs.count());
|
|
|
|
Assertions
|
|
.assertEquals(
|
|
17, tmp_pubs.filter(p -> p.getContext().stream().anyMatch(c -> c.getId().equals("dh-ch"))).count());
|
|
|
|
}
|
|
|
|
@Test // Step2
|
|
void testSelectSubset() throws Exception {
|
|
|
|
final String sourcePath = getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/input/")
|
|
.getPath();
|
|
|
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/publication")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/original/publication");
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/software")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/original/software");
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/dataset")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/original/dataset");
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/otherresearchproduct")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/original/otherresearchproduct");
|
|
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/publication")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/dump/publication");
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/software")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/dump/software");
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/dataset")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/dump/dataset");
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/otherresearchproduct")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/dump/otherresearchproduct");
|
|
|
|
SparkSelectSubset
|
|
.main(
|
|
new String[] {
|
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
|
"-sourcePath", sourcePath,
|
|
"-outputPath", workingDir.toString()
|
|
|
|
});
|
|
|
|
JavaRDD<Relation> tmp = sc
|
|
.textFile(workingDir.toString() + "/original/relation")
|
|
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
|
|
|
|
Assertions.assertEquals(20, tmp.count());
|
|
|
|
Assertions
|
|
.assertEquals(
|
|
6, tmp.filter(r -> r.getSource().startsWith("50|") && r.getTarget().startsWith("50|")).count());
|
|
Assertions
|
|
.assertEquals(
|
|
3, tmp.filter(r -> r.getSource().startsWith("50|") && r.getTarget().startsWith("20|")).count());
|
|
Assertions
|
|
.assertEquals(
|
|
3, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("50|")).count());
|
|
Assertions
|
|
.assertEquals(
|
|
4, tmp.filter(r -> r.getSource().startsWith("40|") && r.getTarget().startsWith("50|")).count());
|
|
Assertions
|
|
.assertEquals(
|
|
1, tmp.filter(r -> r.getSource().startsWith("10|") && r.getTarget().startsWith("20|")).count());
|
|
Assertions
|
|
.assertEquals(
|
|
1, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("10|")).count());
|
|
Assertions
|
|
.assertEquals(
|
|
1, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("40|")).count());
|
|
Assertions
|
|
.assertEquals(
|
|
1, tmp.filter(r -> r.getSource().startsWith("40|") && r.getTarget().startsWith("20|")).count());
|
|
|
|
JavaRDD<eu.dnetlib.dhp.schema.oaf.Datasource> tmp_datasource = sc
|
|
.textFile(workingDir.toString() + "/original/datasource")
|
|
.map(item -> OBJECT_MAPPER.readValue(item, Datasource.class));
|
|
Assertions.assertEquals(5, tmp_datasource.count());
|
|
Assertions
|
|
.assertEquals(
|
|
0,
|
|
tmp_datasource
|
|
.filter(d -> d.getId().equals("10|issn___print::0a79337eaf5145faa478785423273355"))
|
|
.count());
|
|
|
|
JavaRDD<Organization> tmp_organization = sc
|
|
.textFile(workingDir.toString() + "/original/organization")
|
|
.map(item -> OBJECT_MAPPER.readValue(item, Organization.class));
|
|
Assertions.assertEquals(3, tmp_organization.count());
|
|
|
|
JavaRDD<Project> tmp_project = sc
|
|
.textFile(workingDir.toString() + "/original/project")
|
|
.map(item -> OBJECT_MAPPER.readValue(item, Project.class));
|
|
Assertions.assertEquals(3, tmp_project.count());
|
|
|
|
}
|
|
|
|
@Test
|
|
public void selectValidContextTest() throws Exception {
|
|
|
|
final String sourcePath = getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/")
|
|
.getPath();
|
|
|
|
final String communityMapPath = getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/communityMap")
|
|
.getPath();
|
|
|
|
final String contextPath = getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/context/community_infrastructure")
|
|
.getPath();
|
|
|
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
|
|
SparkSelectValidContext
|
|
.main(
|
|
new String[] {
|
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
|
"-sourcePath", sourcePath,
|
|
"-outputPath", workingDir.toString() + "/dump/community_infrastructure",
|
|
"-communityMapPath", communityMapPath,
|
|
"-contextPath", contextPath
|
|
|
|
});
|
|
|
|
JavaRDD<ResearchCommunity> tmp = sc
|
|
.textFile(workingDir.toString() + "/dump/community_infrastructure")
|
|
.map(item -> OBJECT_MAPPER.readValue(item, ResearchCommunity.class));
|
|
|
|
Assertions.assertEquals(6, tmp.count());
|
|
|
|
Assertions.assertEquals(1, tmp.filter(cr -> cr.getAcronym().equals("enermaps")).count());
|
|
Assertions.assertEquals(1, tmp.filter(cr -> cr.getAcronym().equals("eutopia")).count());
|
|
Assertions.assertEquals(1, tmp.filter(cr -> cr.getAcronym().equals("dh-ch")).count());
|
|
Assertions.assertEquals(1, tmp.filter(cr -> cr.getAcronym().equals("beopen")).count());
|
|
Assertions.assertEquals(1, tmp.filter(cr -> cr.getAcronym().equals("neanias-underwater")).count());
|
|
Assertions.assertEquals(1, tmp.filter(cr -> cr.getAcronym().equals("sdsn-gr")).count());
|
|
|
|
}
|
|
|
|
@Test
|
|
public void selectValidRelationContextTest() throws Exception {
|
|
|
|
final String contextRelationPath = getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/working/relation")
|
|
.getPath();
|
|
|
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/publication")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/dump/publication");
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/software")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/dump/software");
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/dataset")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/dump/dataset");
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/otherresearchproduct")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/dump/otherresearchproduct");
|
|
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/organization")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/dump/organization");
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/datasource")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/dump/datasource");
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/project")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/dump/project");
|
|
|
|
sc
|
|
.textFile(
|
|
getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/community_infrastructure")
|
|
.getPath())
|
|
.saveAsTextFile(workingDir.toString() + "/dump/communities_infrastructures");
|
|
|
|
SparkSelectValidRelationContext
|
|
.main(
|
|
new String[] {
|
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
|
"-sourcePath", workingDir.toString() + "/dump",
|
|
|
|
"-contextRelationPath", contextRelationPath
|
|
|
|
});
|
|
|
|
JavaRDD<eu.dnetlib.dhp.oa.model.graph.Relation> tmp = sc
|
|
.textFile(workingDir.toString() + "/dump/relation")
|
|
.map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.oa.model.graph.Relation.class));
|
|
|
|
Assertions.assertEquals(10, tmp.count());
|
|
|
|
Assertions.assertEquals(5, tmp.filter(r -> r.getSource().getId().startsWith("00")).count());
|
|
Assertions.assertEquals(5, tmp.filter(r -> r.getTarget().getId().startsWith("00")).count());
|
|
|
|
Assertions.assertEquals(2, tmp.filter(r -> r.getSource().getId().startsWith("10")).count());
|
|
Assertions.assertEquals(2, tmp.filter(r -> r.getTarget().getId().startsWith("10")).count());
|
|
|
|
Assertions.assertEquals(1, tmp.filter(r -> r.getSource().getId().startsWith("40")).count());
|
|
Assertions.assertEquals(1, tmp.filter(r -> r.getTarget().getId().startsWith("40")).count());
|
|
|
|
Assertions.assertEquals(2, tmp.filter(r -> r.getSource().getId().startsWith("20")).count());
|
|
Assertions.assertEquals(2, tmp.filter(r -> r.getTarget().getId().startsWith("20")).count());
|
|
|
|
}
|
|
|
|
@Test
|
|
public void extractRelationFromResultTest() {
|
|
|
|
final String sourcePath = getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/publication")
|
|
.getPath();
|
|
|
|
final String communityMapPath = getClass()
|
|
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/communityMap")
|
|
.getPath();
|
|
|
|
Extractor ex = new Extractor();
|
|
ex
|
|
.run(
|
|
false, sourcePath, workingDir.toString() + "/relation",
|
|
// eu.dnetlib.dhp.schema.oaf.Publication.class, communityMapPath);
|
|
eu.dnetlib.dhp.schema.oaf.Publication.class, communityMapPath);
|
|
|
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
|
|
JavaRDD<eu.dnetlib.dhp.oa.model.graph.Relation> tmp = sc
|
|
.textFile(workingDir.toString() + "/relation")
|
|
.map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.oa.model.graph.Relation.class));
|
|
|
|
Assertions.assertEquals(102, tmp.count());
|
|
|
|
Assertions.assertEquals(51, tmp.filter(r -> r.getSource().getId().startsWith("50|")).count());
|
|
Assertions.assertEquals(39, tmp.filter(r -> r.getSource().getId().startsWith("10|")).count());
|
|
Assertions.assertEquals(12, tmp.filter(r -> r.getSource().getId().startsWith("00|")).count());
|
|
}
|
|
|
|
}
|