diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java index 541ed8e10..27970f2c3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java @@ -95,26 +95,27 @@ public class CreateActionSetFromWebEntries implements Serializable { final Dataset blackList = readBlackList(spark, blackListInputPath); - dataset.join(blackList, dataset.col("id").equalTo(blackList.col("OpenAlexId")), "left") - .filter((FilterFunction) r -> r.getAs("OpenAlexId") == null) - .drop("OpenAlexId") - .flatMap((FlatMapFunction) row -> { - List ret = new ArrayList<>(); - final String ror = ROR_PREFIX - + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror"))); - ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror)); - ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror)); - ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror)); + dataset + .join(blackList, dataset.col("id").equalTo(blackList.col("OpenAlexId")), "left") + .filter((FilterFunction) r -> r.getAs("OpenAlexId") == null) + .drop("OpenAlexId") + .flatMap((FlatMapFunction) row -> { + List ret = new ArrayList<>(); + final String ror = ROR_PREFIX + + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror"))); + ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror)); + ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror)); + ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror)); - return ret - .iterator(); - }, Encoders.bean(Relation.class)) - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p)) - .mapToPair( - aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);//, GzipCodec.class); + return ret + .iterator(); + }, Encoders.bean(Relation.class)) + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); } @@ -145,13 +146,13 @@ public class CreateActionSetFromWebEntries implements Serializable { } - private static Dataset readBlackList(SparkSession spark, String inputPath){ + private static Dataset readBlackList(SparkSession spark, String inputPath) { return spark - .read() - .option("header", true) - .csv(inputPath) - .select("OpenAlexId"); + .read() + .option("header", true) + .csv(inputPath) + .select("OpenAlexId"); } private static List createAffiliationRelationPairPMCID(String pmcid, String ror) { diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java index c574a5812..e9291f93c 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java @@ -78,9 +78,9 @@ public class CreateASTest { "/eu/dnetlib/dhp/actionmanager/webcrawl/input/") .getPath(); String blackListPath = getClass() - .getResource( - "/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/") - .getPath(); + .getResource( + "/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/") + .getPath(); CreateActionSetFromWebEntries .main( @@ -91,7 +91,7 @@ public class CreateASTest { inputPath, "-outputPath", workingDir.toString() + "/actionSet1", - "-blackListPath", blackListPath + "-blackListPath", blackListPath }); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -115,9 +115,9 @@ public class CreateASTest { "/eu/dnetlib/dhp/actionmanager/webcrawl/") .getPath(); String blackListPath = getClass() - .getResource( - "/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/") - .getPath(); + .getResource( + "/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/") + .getPath(); CreateActionSetFromWebEntries .main( @@ -128,7 +128,7 @@ public class CreateASTest { inputPath, "-outputPath", workingDir.toString() + "/actionSet1", - "-blackListPath", blackListPath + "-blackListPath", blackListPath }); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());