1
0
Fork 0

[Clean Country] added logic not to remove country from result if it exist a hosting datasource with that country. Moreover the country will be removed only if added with propagation

This commit is contained in:
Miriam Baglioni 2022-07-22 16:23:12 +02:00
parent 2c933f1158
commit a12d28c644
3 changed files with 130 additions and 12 deletions

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.oa.graph.clean;
package eu.dnetlib.dhp.oa.graph.clean.country;
/**
* @author miriam.baglioni
@ -7,6 +7,7 @@ package eu.dnetlib.dhp.oa.graph.clean;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import org.apache.commons.io.IOUtils;
@ -21,6 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@ -54,10 +56,13 @@ public class CleanCountrySparkJob implements Serializable {
String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
String datasourcePath = parser.get("datasourcePath");
log.info("datasourcePath: {}", datasourcePath);
String country = parser.get("country");
log.info("country: {}", country);
String verifyParam = parser.get("verifyParam");
String[] verifyParam = parser.get("verifyParam").split(";");
log.info("verifyParam: {}", verifyParam);
String collectedfrom = parser.get("collectedfrom");
@ -74,12 +79,17 @@ public class CleanCountrySparkJob implements Serializable {
isSparkSessionManaged,
spark -> {
cleanCountry(spark, country, verifyParam, inputPath, entityClazz, workingPath,collectedfrom);
cleanCountry(spark, country, verifyParam, inputPath, entityClazz, workingPath,collectedfrom, datasourcePath);
});
}
private static <T extends Result> void cleanCountry(SparkSession spark, String country, String verifyParam,
String inputPath, Class<T> entityClazz, String workingPath, String collectedfrom) {
private static <T extends Result> void cleanCountry(SparkSession spark, String country, String[] verifyParam,
String inputPath, Class<T> entityClazz, String workingPath, String collectedfrom, String datasourcePath) {
List<String> hostedBy = spark.read().textFile(datasourcePath)
// .filter((FilterFunction<String>) ds -> !ds.equals(collectedfrom))
.collectAsList();
Dataset<T> res = spark
.read()
.textFile(inputPath)
@ -89,11 +99,14 @@ public class CleanCountrySparkJob implements Serializable {
;
res.map((MapFunction<T, T>) r -> {
for(StructuredProperty p: r.getPid()){
if (p.getQualifier().getClassid().equalsIgnoreCase("doi") &&
p.getValue().startsWith(verifyParam) && r.getCollectedfrom().stream().anyMatch(cf -> cf.getValue().equalsIgnoreCase(collectedfrom))) {
if(r.getInstance().stream().anyMatch(i -> hostedBy.contains(i.getHostedby().getKey())) ||
!r.getCollectedfrom().stream().anyMatch(cf -> cf.getValue().equals(collectedfrom))){
return r;
}
r
if(r.getPid().stream().anyMatch(p -> p.getQualifier().getClassid().equals("doi") && pidInParam(p.getValue(), verifyParam))
&& r.getCountry().stream().anyMatch(c -> c.getClassid().equals(country) && c.getDataInfo().getInferenceprovenance().equals("propagation")))
{ r
.setCountry(
r
.getCountry()
@ -102,7 +115,7 @@ public class CleanCountrySparkJob implements Serializable {
c -> !c.getClassid()
.equalsIgnoreCase(country))
.collect(Collectors.toList()));
}
}
return r;
@ -124,5 +137,12 @@ public class CleanCountrySparkJob implements Serializable {
.json(inputPath);
}
private static boolean pidInParam(String value, String[] verifyParam) {
for (String s : verifyParam )
if (value.startsWith(s))
return true;
return false;
}
}

View File

@ -0,0 +1,99 @@
package eu.dnetlib.dhp.oa.graph.clean.country;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/**
* @author miriam.baglioni
* @Date 22/07/22
*/
public class GetDatasourceFromCountry implements Serializable {
private static final Logger log = LoggerFactory.getLogger(GetDatasourceFromCountry.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
GetDatasourceFromCountry.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/input_clean_country_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
String country = parser.get("country");
log.info("country: {}", country);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
getDatasourceFromCountry(spark, country, inputPath, workingPath);
});
}
private static void getDatasourceFromCountry(SparkSession spark, String country, String inputPath, String workingPath) {
Dataset<Organization> organization = spark.read().textFile(inputPath + "/organization")
.map(
(MapFunction<String, Organization>) value -> OBJECT_MAPPER.readValue(value, Organization.class),
Encoders.bean(Organization.class))
.filter(
(FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference() &&
o.getCountry().getClassid().length() > 0 &&
o.getCountry().getClassid().equals(country));;
// filtering of the relations taking the non deleted by inference and those with IsProvidedBy as relclass
Dataset<Relation> relation = spark.read().textFile( inputPath + "/relation")
.map(
(MapFunction<String, Relation>) value -> OBJECT_MAPPER.readValue(value, Relation.class),
Encoders.bean(Relation.class))
.filter(
(FilterFunction<Relation>) rel -> rel.getRelClass().equalsIgnoreCase(ModelConstants.IS_PROVIDED_BY) &&
!rel.getDataInfo().getDeletedbyinference());
organization.joinWith(relation, organization.col("id").equalTo(relation.col("target")), "left")
.map((MapFunction<Tuple2<Organization, Relation>, String>) t2 -> t2._2().getSource(), Encoders.STRING() )
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(workingPath);
}
}

View File

@ -5,9 +5,9 @@ package eu.dnetlib.dhp.oa.graph.clean;
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob;
import eu.dnetlib.dhp.oa.graph.dump.DumpJobTest;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
public class CleanCountryTest {