Update to include a blackList that filters out the results we know are wrongly associated to IE

This commit is contained in:
Miriam Baglioni 2024-05-24 12:28:24 +02:00 committed by Claudio Atzori
parent 107d958b89
commit b55fed09f8
6 changed files with 62 additions and 29 deletions

View File

@ -12,6 +12,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType;
@ -70,6 +71,9 @@ public class CreateActionSetFromWebEntries implements Serializable {
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
final String blackListInputPath = parser.get("blackListPath");
log.info("blackListInputPath: {}", blackListInputPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -77,35 +81,40 @@ public class CreateActionSetFromWebEntries implements Serializable {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
createActionSet(spark, inputPath, outputPath); createActionSet(spark, inputPath, outputPath, blackListInputPath);
}); });
} }
public static void createActionSet(SparkSession spark, String inputPath, public static void createActionSet(SparkSession spark, String inputPath,
String outputPath) { String outputPath, String blackListInputPath) {
final Dataset<Row> dataset = readWebCrawl(spark, inputPath) final Dataset<Row> dataset = readWebCrawl(spark, inputPath)
.filter("publication_year <= 2020 or country_code=='IE'") .filter("country_code=='IE'")
.drop("publication_year"); .drop("publication_year");
dataset.flatMap((FlatMapFunction<Row, Relation>) row -> { final Dataset<Row> blackList = readBlackList(spark, blackListInputPath);
List<Relation> ret = new ArrayList<>();
final String ror = ROR_PREFIX
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror")));
ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror));
ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror));
ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror));
return ret dataset.join(blackList, dataset.col("id").equalTo(blackList.col("OpenAlexId")), "left")
.iterator(); .filter((FilterFunction<Row>) r -> r.getAs("OpenAlexId") == null)
}, Encoders.bean(Relation.class)) .drop("OpenAlexId")
.toJavaRDD() .flatMap((FlatMapFunction<Row, Relation>) row -> {
.map(p -> new AtomicAction(p.getClass(), p)) List<Relation> ret = new ArrayList<>();
.mapToPair( final String ror = ROR_PREFIX
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror")));
new Text(OBJECT_MAPPER.writeValueAsString(aa)))) ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror));
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror));
ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror));
return ret
.iterator();
}, Encoders.bean(Relation.class))
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);//, GzipCodec.class);
} }
@ -136,6 +145,15 @@ public class CreateActionSetFromWebEntries implements Serializable {
} }
private static Dataset<Row> readBlackList(SparkSession spark, String inputPath){
return spark
.read()
.option("header", true)
.csv(inputPath)
.select("OpenAlexId");
}
private static List<Relation> createAffiliationRelationPairPMCID(String pmcid, String ror) { private static List<Relation> createAffiliationRelationPairPMCID(String pmcid, String ror) {
if (pmcid == null) if (pmcid == null)
return new ArrayList<>(); return new ArrayList<>();

View File

@ -16,5 +16,10 @@
"paramLongName": "isSparkSessionManaged", "paramLongName": "isSparkSessionManaged",
"paramDescription": "the hdfs name node", "paramDescription": "the hdfs name node",
"paramRequired": false "paramRequired": false
} },{
"paramName": "bl",
"paramLongName": "blackListPath",
"paramDescription": "the working path",
"paramRequired": true
}
] ]

View File

@ -75,8 +75,12 @@ public class CreateASTest {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/") "/eu/dnetlib/dhp/actionmanager/webcrawl/input/")
.getPath(); .getPath();
String blackListPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/")
.getPath();
CreateActionSetFromWebEntries CreateActionSetFromWebEntries
.main( .main(
@ -86,7 +90,8 @@ public class CreateASTest {
"-sourcePath", "-sourcePath",
inputPath, inputPath,
"-outputPath", "-outputPath",
workingDir.toString() + "/actionSet1" workingDir.toString() + "/actionSet1",
"-blackListPath", blackListPath
}); });
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -96,7 +101,7 @@ public class CreateASTest {
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload())); .map(aa -> ((Relation) aa.getPayload()));
Assertions.assertEquals(64, tmp.count()); Assertions.assertEquals(58, tmp.count());
} }
@ -109,6 +114,10 @@ public class CreateASTest {
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/") "/eu/dnetlib/dhp/actionmanager/webcrawl/")
.getPath(); .getPath();
String blackListPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/")
.getPath();
CreateActionSetFromWebEntries CreateActionSetFromWebEntries
.main( .main(
@ -118,7 +127,8 @@ public class CreateASTest {
"-sourcePath", "-sourcePath",
inputPath, inputPath,
"-outputPath", "-outputPath",
workingDir.toString() + "/actionSet1" workingDir.toString() + "/actionSet1",
"-blackListPath", blackListPath
}); });
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -184,7 +194,7 @@ public class CreateASTest {
Assertions Assertions
.assertEquals( .assertEquals(
5, tmp 2, tmp
.filter( .filter(
r -> r r -> r
.getSource() .getSource()
@ -197,7 +207,7 @@ public class CreateASTest {
Assertions Assertions
.assertEquals( .assertEquals(
5, tmp 2, tmp
.filter( .filter(
r -> r r -> r
.getTarget() .getTarget()
@ -210,7 +220,7 @@ public class CreateASTest {
Assertions Assertions
.assertEquals( .assertEquals(
2, tmp 1, tmp
.filter( .filter(
r -> r r -> r
.getTarget() .getTarget()
@ -224,7 +234,7 @@ public class CreateASTest {
Assertions Assertions
.assertEquals( .assertEquals(
2, tmp 1, tmp
.filter( .filter(
r -> r r -> r
.getTarget() .getTarget()
@ -238,7 +248,7 @@ public class CreateASTest {
Assertions Assertions
.assertEquals( .assertEquals(
1, tmp 0, tmp
.filter( .filter(
r -> r r -> r
.getTarget() .getTarget()