forked from D-Net/dnet-hadoop
Update to include a blackList that filters out the results we know are wrongly associated to IE - refactoring
This commit is contained in:
parent
b55fed09f8
commit
87c9c61b41
|
@ -95,26 +95,27 @@ public class CreateActionSetFromWebEntries implements Serializable {
|
||||||
|
|
||||||
final Dataset<Row> blackList = readBlackList(spark, blackListInputPath);
|
final Dataset<Row> blackList = readBlackList(spark, blackListInputPath);
|
||||||
|
|
||||||
dataset.join(blackList, dataset.col("id").equalTo(blackList.col("OpenAlexId")), "left")
|
dataset
|
||||||
.filter((FilterFunction<Row>) r -> r.getAs("OpenAlexId") == null)
|
.join(blackList, dataset.col("id").equalTo(blackList.col("OpenAlexId")), "left")
|
||||||
.drop("OpenAlexId")
|
.filter((FilterFunction<Row>) r -> r.getAs("OpenAlexId") == null)
|
||||||
.flatMap((FlatMapFunction<Row, Relation>) row -> {
|
.drop("OpenAlexId")
|
||||||
List<Relation> ret = new ArrayList<>();
|
.flatMap((FlatMapFunction<Row, Relation>) row -> {
|
||||||
final String ror = ROR_PREFIX
|
List<Relation> ret = new ArrayList<>();
|
||||||
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror")));
|
final String ror = ROR_PREFIX
|
||||||
ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror));
|
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror")));
|
||||||
ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror));
|
ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror));
|
||||||
ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror));
|
ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror));
|
||||||
|
ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror));
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
.iterator();
|
.iterator();
|
||||||
}, Encoders.bean(Relation.class))
|
}, Encoders.bean(Relation.class))
|
||||||
.toJavaRDD()
|
.toJavaRDD()
|
||||||
.map(p -> new AtomicAction(p.getClass(), p))
|
.map(p -> new AtomicAction(p.getClass(), p))
|
||||||
.mapToPair(
|
.mapToPair(
|
||||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);//, GzipCodec.class);
|
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,13 +146,13 @@ public class CreateActionSetFromWebEntries implements Serializable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Dataset<Row> readBlackList(SparkSession spark, String inputPath){
|
private static Dataset<Row> readBlackList(SparkSession spark, String inputPath) {
|
||||||
|
|
||||||
return spark
|
return spark
|
||||||
.read()
|
.read()
|
||||||
.option("header", true)
|
.option("header", true)
|
||||||
.csv(inputPath)
|
.csv(inputPath)
|
||||||
.select("OpenAlexId");
|
.select("OpenAlexId");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Relation> createAffiliationRelationPairPMCID(String pmcid, String ror) {
|
private static List<Relation> createAffiliationRelationPairPMCID(String pmcid, String ror) {
|
||||||
|
|
|
@ -78,9 +78,9 @@ public class CreateASTest {
|
||||||
"/eu/dnetlib/dhp/actionmanager/webcrawl/input/")
|
"/eu/dnetlib/dhp/actionmanager/webcrawl/input/")
|
||||||
.getPath();
|
.getPath();
|
||||||
String blackListPath = getClass()
|
String blackListPath = getClass()
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/")
|
"/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
CreateActionSetFromWebEntries
|
CreateActionSetFromWebEntries
|
||||||
.main(
|
.main(
|
||||||
|
@ -91,7 +91,7 @@ public class CreateASTest {
|
||||||
inputPath,
|
inputPath,
|
||||||
"-outputPath",
|
"-outputPath",
|
||||||
workingDir.toString() + "/actionSet1",
|
workingDir.toString() + "/actionSet1",
|
||||||
"-blackListPath", blackListPath
|
"-blackListPath", blackListPath
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
@ -115,9 +115,9 @@ public class CreateASTest {
|
||||||
"/eu/dnetlib/dhp/actionmanager/webcrawl/")
|
"/eu/dnetlib/dhp/actionmanager/webcrawl/")
|
||||||
.getPath();
|
.getPath();
|
||||||
String blackListPath = getClass()
|
String blackListPath = getClass()
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/")
|
"/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
CreateActionSetFromWebEntries
|
CreateActionSetFromWebEntries
|
||||||
.main(
|
.main(
|
||||||
|
@ -128,7 +128,7 @@ public class CreateASTest {
|
||||||
inputPath,
|
inputPath,
|
||||||
"-outputPath",
|
"-outputPath",
|
||||||
workingDir.toString() + "/actionSet1",
|
workingDir.toString() + "/actionSet1",
|
||||||
"-blackListPath", blackListPath
|
"-blackListPath", blackListPath
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
Loading…
Reference in New Issue