Update to include a blackList that filters out the results we know are wrongly associated to IE

This commit is contained in:
Miriam Baglioni 2024-05-24 12:28:24 +02:00
parent c3fe59bc78
commit 12ffde023f
6 changed files with 62 additions and 29 deletions

View File

@ -12,6 +12,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType;
@ -70,6 +71,9 @@ public class CreateActionSetFromWebEntries implements Serializable {
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
final String blackListInputPath = parser.get("blackListPath");
log.info("blackListInputPath: {}", blackListInputPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -77,19 +81,24 @@ public class CreateActionSetFromWebEntries implements Serializable {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
createActionSet(spark, inputPath, outputPath); createActionSet(spark, inputPath, outputPath, blackListInputPath);
}); });
} }
public static void createActionSet(SparkSession spark, String inputPath, public static void createActionSet(SparkSession spark, String inputPath,
String outputPath) { String outputPath, String blackListInputPath) {
final Dataset<Row> dataset = readWebCrawl(spark, inputPath) final Dataset<Row> dataset = readWebCrawl(spark, inputPath)
.filter("publication_year <= 2020 or country_code=='IE'") .filter("country_code=='IE'")
.drop("publication_year"); .drop("publication_year");
dataset.flatMap((FlatMapFunction<Row, Relation>) row -> { final Dataset<Row> blackList = readBlackList(spark, blackListInputPath);
dataset.join(blackList, dataset.col("id").equalTo(blackList.col("OpenAlexId")), "left")
.filter((FilterFunction<Row>) r -> r.getAs("OpenAlexId") == null)
.drop("OpenAlexId")
.flatMap((FlatMapFunction<Row, Relation>) row -> {
List<Relation> ret = new ArrayList<>(); List<Relation> ret = new ArrayList<>();
final String ror = ROR_PREFIX final String ror = ROR_PREFIX
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror"))); + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror")));
@ -105,7 +114,7 @@ public class CreateActionSetFromWebEntries implements Serializable {
.mapToPair( .mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa)))) new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);//, GzipCodec.class);
} }
@ -136,6 +145,15 @@ public class CreateActionSetFromWebEntries implements Serializable {
} }
private static Dataset<Row> readBlackList(SparkSession spark, String inputPath){
return spark
.read()
.option("header", true)
.csv(inputPath)
.select("OpenAlexId");
}
private static List<Relation> createAffiliationRelationPairPMCID(String pmcid, String ror) { private static List<Relation> createAffiliationRelationPairPMCID(String pmcid, String ror) {
if (pmcid == null) if (pmcid == null)
return new ArrayList<>(); return new ArrayList<>();

View File

@ -16,5 +16,10 @@
"paramLongName": "isSparkSessionManaged", "paramLongName": "isSparkSessionManaged",
"paramDescription": "the hdfs name node", "paramDescription": "the hdfs name node",
"paramRequired": false "paramRequired": false
},{
"paramName": "bl",
"paramLongName": "blackListPath",
"paramDescription": "the working path",
"paramRequired": true
} }
] ]

View File

@ -75,7 +75,11 @@ public class CreateASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/") "/eu/dnetlib/dhp/actionmanager/webcrawl/input/")
.getPath();
String blackListPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/")
.getPath(); .getPath();
CreateActionSetFromWebEntries CreateActionSetFromWebEntries
@ -86,7 +90,8 @@ public class CreateASTest {
"-sourcePath", "-sourcePath",
inputPath, inputPath,
"-outputPath", "-outputPath",
workingDir.toString() + "/actionSet1" workingDir.toString() + "/actionSet1",
"-blackListPath", blackListPath
}); });
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -96,7 +101,7 @@ public class CreateASTest {
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload())); .map(aa -> ((Relation) aa.getPayload()));
Assertions.assertEquals(64, tmp.count()); Assertions.assertEquals(58, tmp.count());
} }
@ -109,6 +114,10 @@ public class CreateASTest {
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/") "/eu/dnetlib/dhp/actionmanager/webcrawl/")
.getPath(); .getPath();
String blackListPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/")
.getPath();
CreateActionSetFromWebEntries CreateActionSetFromWebEntries
.main( .main(
@ -118,7 +127,8 @@ public class CreateASTest {
"-sourcePath", "-sourcePath",
inputPath, inputPath,
"-outputPath", "-outputPath",
workingDir.toString() + "/actionSet1" workingDir.toString() + "/actionSet1",
"-blackListPath", blackListPath
}); });
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -184,7 +194,7 @@ public class CreateASTest {
Assertions Assertions
.assertEquals( .assertEquals(
5, tmp 2, tmp
.filter( .filter(
r -> r r -> r
.getSource() .getSource()
@ -197,7 +207,7 @@ public class CreateASTest {
Assertions Assertions
.assertEquals( .assertEquals(
5, tmp 2, tmp
.filter( .filter(
r -> r r -> r
.getTarget() .getTarget()
@ -210,7 +220,7 @@ public class CreateASTest {
Assertions Assertions
.assertEquals( .assertEquals(
2, tmp 1, tmp
.filter( .filter(
r -> r r -> r
.getTarget() .getTarget()
@ -224,7 +234,7 @@ public class CreateASTest {
Assertions Assertions
.assertEquals( .assertEquals(
2, tmp 1, tmp
.filter( .filter(
r -> r r -> r
.getTarget() .getTarget()
@ -238,7 +248,7 @@ public class CreateASTest {
Assertions Assertions
.assertEquals( .assertEquals(
1, tmp 0, tmp
.filter( .filter(
r -> r r -> r
.getTarget() .getTarget()