diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java index 9828ad9076..7607cfc76c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java @@ -8,7 +8,7 @@ import java.util.*; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -112,7 +112,7 @@ public class CreateActionSetFromWebEntries implements Serializable { .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/RemoveRelationFromActionSet.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/RemoveRelationFromActionSet.java index 074311f1f1..33dfbacf16 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/RemoveRelationFromActionSet.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/RemoveRelationFromActionSet.java @@ -1,244 +1,159 @@ + package eu.dnetlib.dhp.actionmanager.webcrawl; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.Constants; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; -import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; -import eu.dnetlib.dhp.schema.oaf.utils.PidType; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static org.apache.spark.sql.functions.*; + +import java.io.File; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Optional; + +import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.commons.io.filefilter.DirectoryFileFilter; +import org.apache.commons.io.filefilter.FileFileFilter; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; -import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import scala.Tuple2; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - public class RemoveRelationFromActionSet - implements Serializable { - private static final Logger log = LoggerFactory.getLogger(CreateActionSetFromWebEntries.class); - private static final String DOI_PREFIX = "50|doi_________::"; + implements Serializable { + private static final Logger log = LoggerFactory.getLogger(CreateActionSetFromWebEntries.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final StructType KV_SCHEMA = StructType$.MODULE$ + .apply( + Arrays + .asList( + StructField$.MODULE$.apply("key", DataTypes.StringType, false, Metadata.empty()), + StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty()))); - public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final StructType ATOMIC_ACTION_SCHEMA = StructType$.MODULE$ + .apply( + Arrays + .asList( + StructField$.MODULE$.apply("clazz", DataTypes.StringType, false, Metadata.empty()), + StructField$.MODULE$ + .apply( + "payload", DataTypes.StringType, false, Metadata.empty()))); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - CreateActionSetFromWebEntries.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/webcrawl/as_parameters.json")); + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + CreateActionSetFromWebEntries.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/webcrawl/as_parameters.json")); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); - final String inputPath = parser.get("actionSetPath"); - log.info("inputPath: {}", inputPath); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + // the actionSet path + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); - final String blackListInputPath = parser.get("blackListPath"); - log.info("blackListInputPath: {}", blackListInputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - SparkConf conf = new SparkConf(); + final String blackListInputPath = parser.get("blackListPath"); + log.info("blackListInputPath: {}", blackListInputPath); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { + SparkConf conf = new SparkConf(); - removeFromActionSet(spark, inputPath, outputPath, blackListInputPath); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { - }); - } + removeFromActionSet(spark, inputPath, outputPath, blackListInputPath); - private static void removeFromActionSet(SparkSession spark, String inputPath, String outputPath, String blackListInputPath) { + }); + } - } + private static void removeFromActionSet(SparkSession spark, String inputPath, String outputPath, + String blackListInputPath) { + // read the blacklist + Dataset blackList = readBlackList(spark, blackListInputPath) + .map( + (MapFunction) r -> IdentifierFactory + .idFromPid("50", "doi", ((String) r.getAs("DOI / PMID")).substring(16), true), + Encoders.STRING()); - public static void createActionSet(SparkSession spark, String inputPath, - String outputPath, String blackListInputPath) { + // read the old actionset and get the relations in the payload + JavaPairRDD seq = JavaSparkContext + .fromSparkContext(spark.sparkContext()) + .sequenceFile(inputPath, Text.class, Text.class); - final Dataset dataset = readWebCrawl(spark, inputPath) - .filter("country_code=='IE'") - .drop("publication_year"); + JavaRDD rdd = seq + .map(x -> RowFactory.create(x._1().toString(), x._2().toString())); - final Dataset blackList = readBlackList(spark, blackListInputPath); + Dataset actionSet = spark + .createDataFrame(rdd, KV_SCHEMA) + .withColumn("atomic_action", from_json(col("value"), ATOMIC_ACTION_SCHEMA)) + .select(expr("atomic_action.*")); - dataset - .join(blackList, dataset.col("id").equalTo(blackList.col("OpenAlexId")), "left") - .filter((FilterFunction) r -> r.getAs("OpenAlexId") == null) - .drop("OpenAlexId") - .flatMap((FlatMapFunction) row -> { - List ret = new ArrayList<>(); - final String ror = ROR_PREFIX - + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror"))); - ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror)); + Dataset relation = actionSet + .map( + (MapFunction) r -> MAPPER.readValue((String) r.getAs("payload"), Relation.class), + Encoders.bean(Relation.class)); - return ret - .iterator(); - }, Encoders.bean(Relation.class)) - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p)) - .mapToPair( - aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + // select only the relation not matching any pid in the blacklist as source for the relation + Dataset relNoSource = relation + .joinWith(blackList, relation.col("source").equalTo(blackList.col("value")), "left") + .filter((FilterFunction>) t2 -> t2._2() == null) + .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)); - } + // select only the relation not matching any pid in the blacklist as target of the relation + relNoSource + .joinWith(blackList, relNoSource.col("target").equalTo(blackList.col("value")), "left") + .filter((FilterFunction>) t2 -> t2._2() == null) + .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)) + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); + ; - private static Dataset readWebCrawl(SparkSession spark, String inputPath) { - StructType webInfo = StructType - .fromDDL( - "`id` STRING , `doi` STRING, `ids` STRUCT<`pmid` :STRING, `pmcid`: STRING >, `publication_year` STRING, " - + - "`authorships` ARRAY>>>"); + } - return spark - .read() - .schema(webInfo) - .json(inputPath) - .withColumn( - "authors", functions - .explode( - functions.col("authorships"))) - .selectExpr("id", "doi", "ids", "publication_year", "authors.institutions as institutions") - .withColumn( - "institution", functions - .explode( - functions.col("institutions"))) + private static Dataset readBlackList(SparkSession spark, String inputPath) { - .selectExpr( - "id", "doi", "institution.ror as ror", - "institution.country_code as country_code", "publication_year") - .distinct(); + return spark + .read() + .option("header", true) + .csv(inputPath) + .select("DOI / PMID"); + } - } - - private static Dataset readBlackList(SparkSession spark, String inputPath) { - - return spark - .read() - .option("header", true) - .csv(inputPath) - .select("OpenAlexId"); - } - - private static List createAffiliationRelationPairPMCID(String pmcid, String ror) { - if (pmcid == null) - return new ArrayList<>(); - - return createAffiliatioRelationPair( - PMCID_PREFIX - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), removeResolver("PMC", pmcid))), - ror); - } - - private static List createAffiliationRelationPairPMID(String pmid, String ror) { - if (pmid == null) - return new ArrayList<>(); - - return createAffiliatioRelationPair( - PMID_PREFIX - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), removeResolver("PMID", pmid))), - ror); - } - - private static String removeResolver(String pidType, String pid) { - switch (pidType) { - case "PMID": - return pid.substring(33); - case "PMC": - return "PMC" + pid.substring(43); - case "DOI": - return pid.substring(16); - } - - throw new RuntimeException(); - - } - - private static List createAffiliationRelationPairDOI(String doi, String ror) { - if (doi == null) - return new ArrayList<>(); - - return createAffiliatioRelationPair( - DOI_PREFIX - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), removeResolver("DOI", doi))), - ror); - - } - - private static List createAffiliatioRelationPair(String resultId, String orgId) { - ArrayList newRelations = new ArrayList(); - - newRelations - .add( - OafMapperUtils - .getRelation( - orgId, resultId, ModelConstants.RESULT_ORGANIZATION, ModelConstants.AFFILIATION, - ModelConstants.IS_AUTHOR_INSTITUTION_OF, - Arrays - .asList( - OafMapperUtils.keyValue(Constants.WEB_CRAWL_ID, Constants.WEB_CRAWL_NAME)), - OafMapperUtils - .dataInfo( - false, null, false, false, - OafMapperUtils - .qualifier( - "sysimport:crasswalk:webcrawl", "Imported from Webcrawl", - ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), - "0.9"), - null)); - - newRelations - .add( - OafMapperUtils - .getRelation( - resultId, orgId, ModelConstants.RESULT_ORGANIZATION, ModelConstants.AFFILIATION, - ModelConstants.HAS_AUTHOR_INSTITUTION, - Arrays - .asList( - OafMapperUtils.keyValue(Constants.WEB_CRAWL_ID, Constants.WEB_CRAWL_NAME)), - OafMapperUtils - .dataInfo( - false, null, false, false, - OafMapperUtils - .qualifier( - "sysimport:crasswalk:webcrawl", "Imported from Webcrawl", - ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), - "0.9"), - null)); - - return newRelations; - - } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/job.properties index d7bd709fca..641e72610e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/job.properties +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/job.properties @@ -1,3 +1,11 @@ -sourcePath=/user/miriam.baglioni/openalex-snapshot/data/works/ -outputPath=/tmp/miriam/webcrawlComplete/ -blackListPath=/user/miriam.baglioni/openalex-blackList +#PROPERTIES TO CREATE THE ACTION SET +#sourcePath=/user/miriam.baglioni/openalex-snapshot/data/works/ +#outputPath=/tmp/miriam/webcrawlComplete/ +#blackListPath=/user/miriam.baglioni/openalex-blackList +#resumeFrom=create + +#PROPERTIES TO REMOVE FROM THE ACTION SET +sourcePath=/var/lib/dnet/actionManager_PROD/webcrawl/rawset_28247629-468b-478e-9a42-bc540877125d_1718121542061/ +outputPath=/tmp/miriam/webcrawlRemoved/ +blackListPath=/user/miriam.baglioni/oalexBlackListNormalized +resumeFrom=remove \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/oozie_app/workflow.xml index b9394c7e69..ccf34c5577 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/webcrawl/oozie_app/workflow.xml @@ -20,12 +20,19 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + ${wf:conf('resumeFrom') eq 'create'} + + + + yarn @@ -50,5 +57,30 @@ + + + + yarn + cluster + Removes some relations found to be wrong from the AS + eu.dnetlib.dhp.actionmanager.webcrawl.RemoveRelationFromActionSet + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath} + --outputPath${outputPath} + --blackListPath${blackListPath} + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java index e9291f93c5..d23b7faa26 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.actionmanager.webcrawl; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.nio.file.Files; @@ -101,7 +102,10 @@ public class CreateASTest { .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(aa -> ((Relation) aa.getPayload())); - Assertions.assertEquals(58, tmp.count()); + tmp.foreach(r -> System.out.println(new ObjectMapper().writeValueAsString(r))); + tmp.foreach(r -> assertTrue(r.getSource().startsWith("20|ror") || r.getSource().startsWith("50|doi"))); + tmp.foreach(r -> assertTrue(r.getTarget().startsWith("20|ror") || r.getTarget().startsWith("50|doi"))); + Assertions.assertEquals(24, tmp.count()); } @@ -112,7 +116,7 @@ public class CreateASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/webcrawl/") + "/eu/dnetlib/dhp/actionmanager/webcrawl/input/") .getPath(); String blackListPath = getClass() .getResource( @@ -194,7 +198,7 @@ public class CreateASTest { Assertions .assertEquals( - 2, tmp + 1, tmp .filter( r -> r .getSource() @@ -207,7 +211,7 @@ public class CreateASTest { Assertions .assertEquals( - 2, tmp + 1, tmp .filter( r -> r .getTarget() @@ -228,13 +232,13 @@ public class CreateASTest { "20|ror_________::" + IdentifierFactory .md5( PidCleaner - .normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13"))) + .normalizePidValue("ROR", "https://ror.org/03265fv13"))) && r.getSource().startsWith("50|doi")) .count()); Assertions .assertEquals( - 1, tmp + 0, tmp .filter( r -> r .getTarget() @@ -268,6 +272,10 @@ public class CreateASTest { .getResource( "/eu/dnetlib/dhp/actionmanager/webcrawl") .getPath(); + String blackListPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/webcrawl/blackList/") + .getPath(); CreateActionSetFromWebEntries .main( @@ -277,7 +285,8 @@ public class CreateASTest { "-sourcePath", inputPath, "-outputPath", - workingDir.toString() + "/actionSet1" + workingDir.toString() + "/actionSet1", + "-blackListPath", blackListPath }); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/RemoveFromASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/RemoveFromASTest.java new file mode 100644 index 0000000000..bc78804f2e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/RemoveFromASTest.java @@ -0,0 +1,108 @@ + +package eu.dnetlib.dhp.actionmanager.webcrawl; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; + +/** + * @author miriam.baglioni + * @Date 22/04/24 + */ +public class RemoveFromASTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(RemoveFromASTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(RemoveFromASTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(RemoveFromASTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(RemoveFromASTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testNumberofRelations() throws Exception { + + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/webcrawl/actionSet/") + .getPath(); + String blackListPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/webcrawl/blackListRemove/") + .getPath(); + + RemoveRelationFromActionSet + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-sourcePath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet1", + "-blackListPath", blackListPath + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); + + Assertions.assertEquals(22, tmp.count()); + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/blackListRemove/not_irish.csv b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/blackListRemove/not_irish.csv new file mode 100644 index 0000000000..009925839d --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/webcrawl/blackListRemove/not_irish.csv @@ -0,0 +1,2 @@ +DOI / PMID,OpenAlexId,Comments, +https://doi.org/10.1098/rstl.1684.0023,https://openalex.org/W2124362779,, \ No newline at end of file