fix for issue that duplicated result

This commit is contained in:
Miriam Baglioni 2020-05-25 10:26:48 +02:00
parent 8f6ce970f9
commit b258f99ece
2 changed files with 76 additions and 27 deletions

View File

@ -77,9 +77,15 @@ public class PrepareDatasourceCountryAssociation {
List<String> allowedtypes, List<String> allowedtypes,
String inputPath, String inputPath,
String outputPath) { String outputPath) {
String whitelisted = ""; String whitelisted = " d.id = '" + whitelist.get(0) + "'";
for (String i : whitelist) { for (int i = 1; i < whitelist.size(); i++) {
whitelisted += " OR id = '" + i + "'"; whitelisted += " OR d.id = '" + whitelist.get(i) + "'";
}
String allowed = "d.datasourcetype.classid = '" + allowedtypes.get(0) + "'";
for (int i = 1; i < allowedtypes.size(); i++) {
allowed += " OR d.datasourcetype.classid = '" + allowedtypes.get(i) + "'";
} }
Dataset<Datasource> datasource = readPath(spark, inputPath + "/datasource", Datasource.class); Dataset<Datasource> datasource = readPath(spark, inputPath + "/datasource", Datasource.class);
@ -90,26 +96,39 @@ public class PrepareDatasourceCountryAssociation {
relation.createOrReplaceTempView("relation"); relation.createOrReplaceTempView("relation");
organization.createOrReplaceTempView("organization"); organization.createOrReplaceTempView("organization");
String query = "SELECT source dataSourceId, named_struct('classid', country.classid, 'classname', country.classname) country " // String query = "SELECT source dataSourceId, named_struct('classid', country.classid, 'classname', country.classname) country "
+ "FROM ( SELECT id " // + "FROM ( SELECT id "
+ " FROM datasource " // + " FROM datasource "
+ " WHERE (datainfo.deletedbyinference = false " // + " WHERE (datainfo.deletedbyinference = false "
+ whitelisted // + whitelisted
+ ") " // + ") "
+ getConstraintList("datasourcetype.classid = '", allowedtypes) // + getConstraintList("datasourcetype.classid = '", allowedtypes)
+ ") d " // + ") d "
+ "JOIN ( SELECT source, target " // + "JOIN ( SELECT source, target "
+ " FROM relation " // + " FROM relation "
+ " WHERE relclass = '" // + " WHERE relclass = '"
+ ModelConstants.IS_PROVIDED_BY // + ModelConstants.IS_PROVIDED_BY
+ "' " // + "' "
+ " AND datainfo.deletedbyinference = false ) rel " // + " AND datainfo.deletedbyinference = false ) rel "
+ "ON d.id = rel.source " // + "ON d.id = rel.source "
+ "JOIN (SELECT id, country " // + "JOIN (SELECT id, country "
+ " FROM organization " // + " FROM organization "
+ " WHERE datainfo.deletedbyinference = false " // + " WHERE datainfo.deletedbyinference = false "
+ " AND length(country.classid) > 0) o " // + " AND length(country.classid) > 0) o "
+ "ON o.id = rel.target"; // + "ON o.id = rel.target";
String query = "SELECT source dataSourceId, " +
"named_struct('classid', country.classid, 'classname', country.classname) country " +
"FROM datasource d " +
"JOIN relation rel " +
"ON d.id = rel.source " +
"JOIN organization o " +
"ON o.id = rel.target " +
"WHERE rel.datainfo.deletedbyinference = false " +
"and rel.relclass = '" + ModelConstants.IS_PROVIDED_BY + "'" +
"and o.datainfo.deletedbyinference = false " +
"and length(o.country.classid) > 0 " +
"and (" + allowed + " or " + whitelisted + ")";
spark spark
.sql(query) .sql(query)

View File

@ -4,7 +4,12 @@ package eu.dnetlib.dhp.countrypropagation;
import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.ArrayList;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -13,6 +18,7 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
public class PrepareResultCountrySet { public class PrepareResultCountrySet {
private static final Logger log = LoggerFactory.getLogger(PrepareResultCountrySet.class); private static final Logger log = LoggerFactory.getLogger(PrepareResultCountrySet.class);
@ -60,6 +66,7 @@ public class PrepareResultCountrySet {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath);
getPotentialResultToUpdate( getPotentialResultToUpdate(
spark, spark,
inputPath, inputPath,
@ -89,10 +96,33 @@ public class PrepareResultCountrySet {
spark spark
.sql(RESULT_COUNTRYSET_QUERY) .sql(RESULT_COUNTRYSET_QUERY)
.as(Encoders.bean(ResultCountrySet.class)) .as(Encoders.bean(ResultCountrySet.class))
.write() .toJavaRDD()
.option("compression", "gzip") .mapToPair(value -> new Tuple2<>(value.getResultId(), value))
.mode(SaveMode.Append) .reduceByKey((a, b) -> {
.json(outputPath); ArrayList<CountrySbs> countryList = a.getCountrySet();
Set<String> countryCodes = countryList
.stream()
.map(country -> country.getClassid())
.collect(Collectors.toSet());
b
.getCountrySet()
.stream()
.forEach(c -> {
if (!countryCodes.contains(c.getClassid())) {
countryList.add(c);
countryCodes.add(c.getClassid());
}
});
a.setCountrySet(countryList);
return a;
})
.map(couple -> OBJECT_MAPPER.writeValueAsString(couple._2()))
.saveAsTextFile(outputPath, GzipCodec.class);
// .write()
// .option("compression", "gzip")
// .mode(SaveMode.Append)
// .json(outputPath);
} }
} }