added test classes and resources. removed one step from the workflow since it was not needed

This commit is contained in:
Miriam Baglioni 2023-05-04 12:05:10 +02:00
parent 011b7737ad
commit 1fb840ff28
14 changed files with 264 additions and 13 deletions

View File

@ -70,6 +70,10 @@ public class SparkDumpCommunityProducts implements Serializable {
.ofNullable(parser.get("communityMapPath"))
.orElse(null);
String dumpType = Optional
.ofNullable(parser.get("dumpType"))
.orElse(null);
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
@ -80,7 +84,7 @@ public class SparkDumpCommunityProducts implements Serializable {
spark -> {
Utils.removeOutputDir(spark, outputPath);
resultDump(
spark, inputPath, outputPath, communityMapPath, inputClazz);
spark, inputPath, outputPath, communityMapPath, inputClazz, dumpType);
});
}
@ -90,7 +94,8 @@ public class SparkDumpCommunityProducts implements Serializable {
String inputPath,
String outputPath,
String communityMapPath,
Class<I> inputClazz) {
Class<I> inputClazz,
String dumpType) {
CommunityMap communityMap = null;
if (!StringUtils.isEmpty(communityMapPath))
@ -100,7 +105,7 @@ public class SparkDumpCommunityProducts implements Serializable {
Utils
.readPath(spark, inputPath, inputClazz)
.map(
(MapFunction<I, CommunityResult>) value -> execMap(value, finalCommunityMap),
(MapFunction<I, CommunityResult>) value -> execMap(value, finalCommunityMap, dumpType),
Encoders.bean(CommunityResult.class))
.filter((FilterFunction<CommunityResult>) value -> value != null)
.map(
@ -113,7 +118,7 @@ public class SparkDumpCommunityProducts implements Serializable {
}
private static <I extends OafEntity, O extends eu.dnetlib.dhp.oa.model.Result> O execMap(I value,
CommunityMap communityMap) throws NoAvailableEntityTypeException, CardinalityTooHighException {
CommunityMap communityMap, String dumpType) throws NoAvailableEntityTypeException, CardinalityTooHighException {
Optional<DataInfo> odInfo = Optional.ofNullable(value.getDataInfo());
if (Boolean.FALSE.equals(odInfo.isPresent())) {
@ -123,7 +128,7 @@ public class SparkDumpCommunityProducts implements Serializable {
|| Boolean.TRUE.equals(odInfo.get().getInvisible())) {
return null;
}
if (communityMap != null) {
if (StringUtils.isEmpty(dumpType)) {
Set<String> communities = communityMap.keySet();
Optional<List<Context>> inputContext = Optional

View File

@ -116,7 +116,7 @@ public class SparkFindResultsRelatedToCountry implements Serializable {
.readPath(spark, inputPath + "/relation", Relation.class)
.filter(
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION));
r.getRelClass().equals(ModelConstants.IS_AUTHOR_INSTITUTION_OF));
organizationsInCountry
.joinWith(

View File

@ -80,7 +80,8 @@
</configuration>
</global>
<start to="save_community_map" />
<!-- <start to="save_community_map" />-->
<start to="make_archive" />
<action name="save_community_map">
<java>
@ -236,7 +237,7 @@
<error to="Kill"/>
</action>
<join name="join_dump" to="select_subset"/>
<join name="join_dump" to="fork_dump_community"/>
<action name="select_subset">
<spark xmlns="uri:oozie:spark-action:0.2">
@ -294,6 +295,7 @@
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/publication</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--dumpType</arg><arg>country</arg>
</spark>
<ok to="join_dump_comm"/>
<error to="Kill"/>
@ -320,6 +322,7 @@
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/dataset</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--dumpType</arg><arg>country</arg>
</spark>
<ok to="join_dump_comm"/>
<error to="Kill"/>
@ -346,6 +349,7 @@
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/otherresearchproduct</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--dumpType</arg><arg>country</arg>
</spark>
<ok to="join_dump_comm"/>
<error to="Kill"/>
@ -372,6 +376,7 @@
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/software</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--dumpType</arg><arg>country</arg>
</spark>
<ok to="join_dump_comm"/>
<error to="Kill"/>
@ -396,7 +401,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/original</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="fork_extendWithProject"/>
@ -513,8 +518,18 @@
</action>
<join name="join_extend" to="End"/>
<join name="join_extend" to="make_archive"/>
<action name="make_archive">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}/tar</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${outputPath}/dump</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<kill name="Kill">
<message>Sub-workflow dump complete failed with error message ${wf:errorMessage()}

View File

@ -23,13 +23,13 @@
"paramName": "sd",
"paramLongName": "singleDeposition",
"paramDescription": "true if the dump should be created for a single community",
"paramRequired": true
"paramRequired": false
},
{
"paramName": "ci",
"paramLongName": "communityId",
"paramDescription": "the id of the community for which to create the dump",
"paramRequired": true
"paramRequired": false
}
]

View File

@ -0,0 +1,96 @@
package eu.dnetlib.dhp.oa.graph.dump.country;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import org.apache.commons.io.FileUtils;
import org.apache.neethi.Assertion;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author miriam.baglioni
* @Date 02/05/23
*/
public class FindResultRelatedToCountryTest {
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(FindResultRelatedToCountryTest.class);
private static final HashMap<String, String> map = new HashMap<>();
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(FindResultRelatedToCountryTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(FindResultRelatedToCountryTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(FindResultRelatedToCountryTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void test1() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/country/graph")
.getPath();
SparkFindResultsRelatedToCountry.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/resultWithCountry",
"-sourcePath", sourcePath,
"-country", "PT"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<String> tmp = sc
.textFile(workingDir.toString() + "/resultWithCountry");
Assertions.assertEquals(3, tmp.count());
Assertions.assertTrue(tmp.collect().contains("50|06cdd3ff4700::136eb030ccb020e170df9627fa1a70af"));
Assertions.assertTrue(tmp.collect().contains("50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cbb"));
Assertions.assertTrue(tmp.collect().contains("50|06cdd3ff4700::136eb030ccb020e170df9627fa1a70ag"));
}
}

View File

@ -0,0 +1,115 @@
package eu.dnetlib.dhp.oa.graph.dump.country;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.PublicKey;
import java.util.HashMap;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.model.graph.Relation;
import eu.dnetlib.dhp.schema.oaf.Publication;
/**
* @author miriam.baglioni
* @Date 02/05/23
*/
public class FindResultWithCountryTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(FindResultWithCountryTest.class);
private static final HashMap<String, String> map = new HashMap<>();
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(FindResultWithCountryTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(FindResultWithCountryTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(FindResultWithCountryTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void test1() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/country/graph/publication")
.getPath();
spark
.read()
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/country/resultWithCountry")
.getPath())
.write()
.text(workingDir.toString() + "/resWithCountry");
SparkFindResultWithCountry.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/out",
"-sourcePath", sourcePath,
"-resultType", "publication",
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-resultWithCountry", workingDir.toString() + "/resWithCountry"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Publication> tmp = sc
.textFile(workingDir.toString() + "/out/original/publication")
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
Assertions.assertEquals(2, tmp.count());
Assertions
.assertTrue(
tmp.map(p -> p.getId()).collect().contains("50|06cdd3ff4700::136eb030ccb020e170df9627fa1a70af"));
Assertions
.assertTrue(
tmp.map(p -> p.getId()).collect().contains("50|06cdd3ff4700::136eb030ccb020e170df9627fa1a70ag"));
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,8 @@
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1680789947124,"properties":[],"relClass":"produces","relType":"datasourceOrganization","source":"40|fct_________::037de16e31bf518b1b2a836ee37ce983","subRelType":"provision","target":"50|06cdd3ff4700::136eb030ccb020e170df9627fa1a70ag","validated":false}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1680789947124,"properties":[],"relClass":"isAuthorInstitutionOf","relType":"datasourceOrganization","source":"20|anr_________::17811d09a4f30a15d4fe207d46861b53","subRelType":"provision","target":"50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cbb","validated":false}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1680789947124,"properties":[],"relClass":"isAuthorInstitutionOf","relType":"datasourceOrganization","source":"20|corda_______::810928466bdce1665c4f1838ca9e2844","subRelType":"provision","target":"50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cba","validated":false}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1680789947124,"properties":[],"relClass":"produces","relType":"datasourceOrganization","source":"40|anr_________::658a5d80c4b3e1175781fe2e2294cffa","subRelType":"provision","target":"50|____EGIAppDB::ce6d2420f693a96a29d1400970393969","validated":false}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1680789947124,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"10|doajarticles::690b3aaf177a4c70b81bacd8d023cbdc","subRelType":"provision","target":"20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0","validated":false}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1680789947124,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"10|doajarticles::7a71f278237d1ab35088efda03fa007a","subRelType":"provision","target":"20|doajarticles::03748bcb5d754c951efec9700e18a56d","validated":false}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1680789947124,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"10|doajarticles::8b75543067b50076e70764917e188178","subRelType":"provision","target":"20|doajarticles::50cb15ff7a6a3f8531f063770179e346","validated":false}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1680789947124,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"10|doajarticles::9f3ff882f023209d9ffb4dc32b77d376","subRelType":"provision","target":"20|doajarticles::ffc1811633b3222e4764c7b0517f83e8","validated":false}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,3 @@
50|06cdd3ff4700::136eb030ccb020e170df9627fa1a70af
50|06cdd3ff4700::93859bd27121c3ee7c6ee4bfb1790cbb
50|06cdd3ff4700::136eb030ccb020e170df9627fa1a70ag