Update to include a blackList that filters out the results we know are wrongly associated to IE - refactoring

This commit is contained in:
Miriam Baglioni 2024-05-24 15:23:42 +02:00
parent 12ffde023f
commit 7a44869d87
2 changed files with 33 additions and 32 deletions

View File

@ -95,26 +95,27 @@ public class CreateActionSetFromWebEntries implements Serializable {
final Dataset<Row> blackList = readBlackList(spark, blackListInputPath);
dataset.join(blackList, dataset.col("id").equalTo(blackList.col("OpenAlexId")), "left")
.filter((FilterFunction<Row>) r -> r.getAs("OpenAlexId") == null)
.drop("OpenAlexId")
.flatMap((FlatMapFunction<Row, Relation>) row -> {
List<Relation> ret = new ArrayList<>();
final String ror = ROR_PREFIX
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror")));
ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror));
ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror));
ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror));
dataset
.join(blackList, dataset.col("id").equalTo(blackList.col("OpenAlexId")), "left")
.filter((FilterFunction<Row>) r -> r.getAs("OpenAlexId") == null)
.drop("OpenAlexId")
.flatMap((FlatMapFunction<Row, Relation>) row -> {
List<Relation> ret = new ArrayList<>();
final String ror = ROR_PREFIX
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror")));
ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror));
ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror));
ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror));
return ret
.iterator();
}, Encoders.bean(Relation.class))
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);//, GzipCodec.class);
return ret
.iterator();
}, Encoders.bean(Relation.class))
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
}
@ -145,13 +146,13 @@ public class CreateActionSetFromWebEntries implements Serializable {
}
private static Dataset<Row> readBlackList(SparkSession spark, String inputPath){
private static Dataset<Row> readBlackList(SparkSession spark, String inputPath) {
return spark
.read()
.option("header", true)
.csv(inputPath)
.select("OpenAlexId");
.read()
.option("header", true)
.csv(inputPath)
.select("OpenAlexId");
}
private static List<Relation> createAffiliationRelationPairPMCID(String pmcid, String ror) {

View File

@ -78,9 +78,9 @@ public class CreateASTest {
"/eu/dnetlib/dhp/actionmanager/webcrawl/input/")
.getPath();
String blackListPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/")
.getPath();
.getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/")
.getPath();
CreateActionSetFromWebEntries
.main(
@ -91,7 +91,7 @@ public class CreateASTest {
inputPath,
"-outputPath",
workingDir.toString() + "/actionSet1",
"-blackListPath", blackListPath
"-blackListPath", blackListPath
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -115,9 +115,9 @@ public class CreateASTest {
"/eu/dnetlib/dhp/actionmanager/webcrawl/")
.getPath();
String blackListPath = getClass()
.getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/")
.getPath();
.getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/")
.getPath();
CreateActionSetFromWebEntries
.main(
@ -128,7 +128,7 @@ public class CreateASTest {
inputPath,
"-outputPath",
workingDir.toString() + "/actionSet1",
"-blackListPath", blackListPath
"-blackListPath", blackListPath
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());