diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java index eb370e981..27970f2c3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java @@ -12,6 +12,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.types.StructType; @@ -70,6 +71,9 @@ public class CreateActionSetFromWebEntries implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); + final String blackListInputPath = parser.get("blackListPath"); + log.info("blackListInputPath: {}", blackListInputPath); + SparkConf conf = new SparkConf(); runWithSparkSession( @@ -77,29 +81,35 @@ public class CreateActionSetFromWebEntries implements Serializable { isSparkSessionManaged, spark -> { - createActionSet(spark, inputPath, outputPath); + createActionSet(spark, inputPath, outputPath, blackListInputPath); }); } public static void createActionSet(SparkSession spark, String inputPath, - String outputPath) { + String outputPath, String blackListInputPath) { final Dataset dataset = readWebCrawl(spark, inputPath) - .filter("publication_year <= 2020 or country_code=='IE'") + .filter("country_code=='IE'") .drop("publication_year"); - dataset.flatMap((FlatMapFunction) row -> { - List ret = new ArrayList<>(); - final String ror = ROR_PREFIX - + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror"))); - ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror)); - ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror)); - ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror)); + final Dataset blackList = readBlackList(spark, blackListInputPath); - return ret - .iterator(); - }, Encoders.bean(Relation.class)) + dataset + .join(blackList, dataset.col("id").equalTo(blackList.col("OpenAlexId")), "left") + .filter((FilterFunction) r -> r.getAs("OpenAlexId") == null) + .drop("OpenAlexId") + .flatMap((FlatMapFunction) row -> { + List ret = new ArrayList<>(); + final String ror = ROR_PREFIX + + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror"))); + ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror)); + ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror)); + ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror)); + + return ret + .iterator(); + }, Encoders.bean(Relation.class)) .toJavaRDD() .map(p -> new AtomicAction(p.getClass(), p)) .mapToPair( @@ -136,6 +146,15 @@ public class CreateActionSetFromWebEntries implements Serializable { } + private static Dataset readBlackList(SparkSession spark, String inputPath) { + + return spark + .read() + .option("header", true) + .csv(inputPath) + .select("OpenAlexId"); + } + private static List createAffiliationRelationPairPMCID(String pmcid, String ror) { if (pmcid == null) return new ArrayList<>(); diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/as_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/as_parameters.json index 3f056edf7..b79140b3a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/as_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/as_parameters.json @@ -16,5 +16,10 @@ "paramLongName": "isSparkSessionManaged", "paramDescription": "the hdfs name node", "paramRequired": false - } + },{ + "paramName": "bl", + "paramLongName": "blackListPath", + "paramDescription": "the working path", + "paramRequired": true +} ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/job.properties index f616baea7..d7bd709fc 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/job.properties +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/job.properties @@ -1,2 +1,3 @@ sourcePath=/user/miriam.baglioni/openalex-snapshot/data/works/ outputPath=/tmp/miriam/webcrawlComplete/ +blackListPath=/user/miriam.baglioni/openalex-blackList diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/oozie_app/workflow.xml index 653a7d384..b9394c7e6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/oozie_app/workflow.xml @@ -45,6 +45,7 @@ --sourcePath${sourcePath} --outputPath${outputPath} + --blackListPath${blackListPath} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java index 402f07d4d..e9291f93c 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java @@ -75,7 +75,11 @@ public class CreateASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/webcrawl/") + "/eu/dnetlib/dhp/actionmanager/webcrawl/input/") + .getPath(); + String blackListPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/") .getPath(); CreateActionSetFromWebEntries @@ -86,7 +90,8 @@ public class CreateASTest { "-sourcePath", inputPath, "-outputPath", - workingDir.toString() + "/actionSet1" + workingDir.toString() + "/actionSet1", + "-blackListPath", blackListPath }); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -96,7 +101,7 @@ public class CreateASTest { .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(aa -> ((Relation) aa.getPayload())); - Assertions.assertEquals(64, tmp.count()); + Assertions.assertEquals(58, tmp.count()); } @@ -109,6 +114,10 @@ public class CreateASTest { .getResource( "/eu/dnetlib/dhp/actionmanager/webcrawl/") .getPath(); + String blackListPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/") + .getPath(); CreateActionSetFromWebEntries .main( @@ -118,7 +127,8 @@ public class CreateASTest { "-sourcePath", inputPath, "-outputPath", - workingDir.toString() + "/actionSet1" + workingDir.toString() + "/actionSet1", + "-blackListPath", blackListPath }); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -184,7 +194,7 @@ public class CreateASTest { Assertions .assertEquals( - 5, tmp + 2, tmp .filter( r -> r .getSource() @@ -197,7 +207,7 @@ public class CreateASTest { Assertions .assertEquals( - 5, tmp + 2, tmp .filter( r -> r .getTarget() @@ -210,7 +220,7 @@ public class CreateASTest { Assertions .assertEquals( - 2, tmp + 1, tmp .filter( r -> r .getTarget() @@ -224,7 +234,7 @@ public class CreateASTest { Assertions .assertEquals( - 2, tmp + 1, tmp .filter( r -> r .getTarget() @@ -238,7 +248,7 @@ public class CreateASTest { Assertions .assertEquals( - 1, tmp + 0, tmp .filter( r -> r .getTarget() diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/part-00000 b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/input/part-00000 similarity index 100% rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/part-00000 rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/input/part-00000 diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/part-00001 b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/input/part-00001 similarity index 100% rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/part-00001 rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/input/part-00001 diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/part-00002 b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/input/part-00002 similarity index 100% rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/part-00002 rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/input/part-00002