diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java index 9828ad907..b5aed6ea2 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java @@ -8,7 +8,7 @@ import java.util.*; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -112,7 +112,7 @@ public class CreateActionSetFromWebEntries implements Serializable { .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); } @@ -148,8 +148,7 @@ public class CreateActionSetFromWebEntries implements Serializable { return spark .read() - .option("header", true) - .csv(inputPath) + .json(inputPath) .select("OpenAlexId"); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/RemoveRelationFromActionSet.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/RemoveRelationFromActionSet.java new file mode 100644 index 000000000..08d543218 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/RemoveRelationFromActionSet.java @@ -0,0 +1,158 @@ + +package eu.dnetlib.dhp.actionmanager.webcrawl; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static org.apache.spark.sql.functions.*; + +import java.io.File; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Optional; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.filefilter.DirectoryFileFilter; +import org.apache.commons.io.filefilter.FileFileFilter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.types.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import scala.Tuple2; + +public class RemoveRelationFromActionSet + implements Serializable { + private static final Logger log = LoggerFactory.getLogger(CreateActionSetFromWebEntries.class); + + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final StructType KV_SCHEMA = StructType$.MODULE$ + .apply( + Arrays + .asList( + StructField$.MODULE$.apply("key", DataTypes.StringType, false, Metadata.empty()), + StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty()))); + + private static final StructType ATOMIC_ACTION_SCHEMA = StructType$.MODULE$ + .apply( + Arrays + .asList( + StructField$.MODULE$.apply("clazz", DataTypes.StringType, false, Metadata.empty()), + StructField$.MODULE$ + .apply( + "payload", DataTypes.StringType, false, Metadata.empty()))); + + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + CreateActionSetFromWebEntries.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/webcrawl/as_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + // the actionSet path + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String blackListInputPath = parser.get("blackListPath"); + log.info("blackListInputPath: {}", blackListInputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + + removeFromActionSet(spark, inputPath, outputPath, blackListInputPath); + + }); + } + + private static void removeFromActionSet(SparkSession spark, String inputPath, String outputPath, + String blackListInputPath) { + // read the blacklist + Dataset blackList = readBlackList(spark, blackListInputPath) + .map( + (MapFunction) r -> IdentifierFactory + .idFromPid("50", "doi", ((String) r.getAs("doi")).substring(16), true), + Encoders.STRING()); + + // read the old actionset and get the relations in the payload + JavaPairRDD seq = JavaSparkContext + .fromSparkContext(spark.sparkContext()) + .sequenceFile(inputPath, Text.class, Text.class); + + JavaRDD rdd = seq + .map(x -> RowFactory.create(x._1().toString(), x._2().toString())); + + Dataset actionSet = spark + .createDataFrame(rdd, KV_SCHEMA) + .withColumn("atomic_action", from_json(col("value"), ATOMIC_ACTION_SCHEMA)) + .select(expr("atomic_action.*")); + + Dataset relation = actionSet + .map( + (MapFunction) r -> MAPPER.readValue((String) r.getAs("payload"), Relation.class), + Encoders.bean(Relation.class)); + + // select only the relation not matching any pid in the blacklist as source for the relation + Dataset relNoSource = relation + .joinWith(blackList, relation.col("source").equalTo(blackList.col("value")), "left") + .filter((FilterFunction>) t2 -> t2._2() == null) + .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)); + + // select only the relation not matching any pid in the blacklist as target of the relation + relNoSource + .joinWith(blackList, relNoSource.col("target").equalTo(blackList.col("value")), "left") + .filter((FilterFunction>) t2 -> t2._2() == null) + .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)) + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); + ; + + } + + private static Dataset readBlackList(SparkSession spark, String inputPath) { + + return spark + .read() + .json(inputPath) + .select("doi"); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/job.properties index d7bd709fc..641e72610 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/job.properties +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/job.properties @@ -1,3 +1,11 @@ -sourcePath=/user/miriam.baglioni/openalex-snapshot/data/works/ -outputPath=/tmp/miriam/webcrawlComplete/ -blackListPath=/user/miriam.baglioni/openalex-blackList +#PROPERTIES TO CREATE THE ACTION SET +#sourcePath=/user/miriam.baglioni/openalex-snapshot/data/works/ +#outputPath=/tmp/miriam/webcrawlComplete/ +#blackListPath=/user/miriam.baglioni/openalex-blackList +#resumeFrom=create + +#PROPERTIES TO REMOVE FROM THE ACTION SET +sourcePath=/var/lib/dnet/actionManager_PROD/webcrawl/rawset_28247629-468b-478e-9a42-bc540877125d_1718121542061/ +outputPath=/tmp/miriam/webcrawlRemoved/ +blackListPath=/user/miriam.baglioni/oalexBlackListNormalized +resumeFrom=remove \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/oozie_app/workflow.xml index b9394c7e6..ccf34c557 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/oozie_app/workflow.xml @@ -20,12 +20,19 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + ${wf:conf('resumeFrom') eq 'create'} + + + + yarn @@ -50,5 +57,30 @@ + + + + yarn + cluster + Removes some relations found to be wrong from the AS + eu.dnetlib.dhp.actionmanager.webcrawl.RemoveRelationFromActionSet + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath} + --outputPath${outputPath} + --blackListPath${blackListPath} + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java index e9291f93c..d23b7faa2 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.actionmanager.webcrawl; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.nio.file.Files; @@ -101,7 +102,10 @@ public class CreateASTest { .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(aa -> ((Relation) aa.getPayload())); - Assertions.assertEquals(58, tmp.count()); + tmp.foreach(r -> System.out.println(new ObjectMapper().writeValueAsString(r))); + tmp.foreach(r -> assertTrue(r.getSource().startsWith("20|ror") || r.getSource().startsWith("50|doi"))); + tmp.foreach(r -> assertTrue(r.getTarget().startsWith("20|ror") || r.getTarget().startsWith("50|doi"))); + Assertions.assertEquals(24, tmp.count()); } @@ -112,7 +116,7 @@ public class CreateASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/webcrawl/") + "/eu/dnetlib/dhp/actionmanager/webcrawl/input/") .getPath(); String blackListPath = getClass() .getResource( @@ -194,7 +198,7 @@ public class CreateASTest { Assertions .assertEquals( - 2, tmp + 1, tmp .filter( r -> r .getSource() @@ -207,7 +211,7 @@ public class CreateASTest { Assertions .assertEquals( - 2, tmp + 1, tmp .filter( r -> r .getTarget() @@ -228,13 +232,13 @@ public class CreateASTest { "20|ror_________::" + IdentifierFactory .md5( PidCleaner - .normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13"))) + .normalizePidValue("ROR", "https://ror.org/03265fv13"))) && r.getSource().startsWith("50|doi")) .count()); Assertions .assertEquals( - 1, tmp + 0, tmp .filter( r -> r .getTarget() @@ -268,6 +272,10 @@ public class CreateASTest { .getResource( "/eu/dnetlib/dhp/actionmanager/webcrawl") .getPath(); + String blackListPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/") + .getPath(); CreateActionSetFromWebEntries .main( @@ -277,7 +285,8 @@ public class CreateASTest { "-sourcePath", inputPath, "-outputPath", - workingDir.toString() + "/actionSet1" + workingDir.toString() + "/actionSet1", + "-blackListPath", blackListPath }); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/RemoveFromASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/RemoveFromASTest.java new file mode 100644 index 000000000..bc78804f2 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/RemoveFromASTest.java @@ -0,0 +1,108 @@ + +package eu.dnetlib.dhp.actionmanager.webcrawl; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; + +/** + * @author miriam.baglioni + * @Date 22/04/24 + */ +public class RemoveFromASTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(RemoveFromASTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(RemoveFromASTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(RemoveFromASTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(RemoveFromASTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testNumberofRelations() throws Exception { + + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/webcrawl/actionSet/") + .getPath(); + String blackListPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/webcrawl/blackListRemove/") + .getPath(); + + RemoveRelationFromActionSet + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-sourcePath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet1", + "-blackListPath", blackListPath + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); + + Assertions.assertEquals(22, tmp.count()); + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/blackListRemove/not_irish.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/blackListRemove/not_irish.json new file mode 100644 index 000000000..2c470c555 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/blackListRemove/not_irish.json @@ -0,0 +1 @@ +{"doi":"https://doi.org/10.1098/rstl.1684.0023","OpenAlexId":"https://openalex.org/W2124362779"} \ No newline at end of file