diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java index 5a0be98d3..4035eb33a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateActionSetFromWebEntries.java @@ -35,238 +35,198 @@ import scala.Tuple2; * @Date 18/04/24 */ public class CreateActionSetFromWebEntries implements Serializable { - private static final Logger log = LoggerFactory.getLogger(CreateActionSetFromWebEntries.class); - private static final String DOI_PREFIX = "50|doi_________::"; + private static final Logger log = LoggerFactory.getLogger(CreateActionSetFromWebEntries.class); + private static final String DOI_PREFIX = "50|doi_________::"; - private static final String ROR_PREFIX = "20|ror_________::"; + private static final String ROR_PREFIX = "20|ror_________::"; - private static final String PMID_PREFIX = "50|pmid________::"; + private static final String PMID_PREFIX = "50|pmid________::"; - private static final String PMCID_PREFIX = "50|pmc_________::"; - private static final String WEB_CRAWL_ID = "10|openaire____::fb98a192f6a055ba495ef414c330834b"; - private static final String WEB_CRAWL_NAME = "Web Crawl"; - public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String PMCID_PREFIX = "50|pmc_________::"; + private static final String WEB_CRAWL_ID = "10|openaire____::fb98a192f6a055ba495ef414c330834b"; + private static final String WEB_CRAWL_NAME = "Web Crawl"; + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - CreateActionSetFromWebEntries.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/webcrawl/as_parameters.json")); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + CreateActionSetFromWebEntries.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/webcrawl/as_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - SparkConf conf = new SparkConf(); + SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { - createActionSet(spark, inputPath, outputPath + "actionSet"); - createPlainRelations(spark, inputPath, outputPath + "relations"); - }); - } + createActionSet(spark, inputPath, outputPath ); - private static void createPlainRelations(SparkSession spark, String inputPath, String outputPath) { - final Dataset dataset = readWebCrawl(spark, inputPath); + }); + } - dataset.flatMap((FlatMapFunction>) row -> { - List> ret = new ArrayList<>(); + public static void createActionSet(SparkSession spark, String inputPath, + String outputPath) { - final String ror = row.getAs("ror"); - ret.addAll(createAffiliationRelationPairDOI(row.getAs("publication_year"), row.getAs("doi"), ror)); - ret.addAll(createAffiliationRelationPairPMID(row.getAs("publication_year"), row.getAs("pmid"), ror)); - ret.addAll(createAffiliationRelationPairPMCID(row.getAs("publication_year"), row.getAs("pmcid"), ror)); + final Dataset dataset = readWebCrawl(spark, inputPath) + .filter("publication_year <= 2020 or country_code=='IE'") + .drop("publication_year"); - return ret - .iterator(); - }, Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class))) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); - } + dataset.flatMap((FlatMapFunction) row -> { + List ret = new ArrayList<>(); + final String ror = ROR_PREFIX + + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror"))); + ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror)); + ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror)); + ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror)); - private static Collection> createAffiliationRelationPairPMCID( - String publication_year, String pmcid, String ror) { - if (pmcid == null) - return new ArrayList<>(); + return ret + .iterator(); + }, Encoders.bean(Relation.class)) + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); - return createAffiliatioRelationPair("PMC" + pmcid, ror) - .stream() - .map(r -> new Tuple2(publication_year, r)) - .collect(Collectors.toList()); - } + } - private static Collection> createAffiliationRelationPairPMID( - String publication_year, String pmid, String ror) { - if (pmid == null) - return new ArrayList<>(); + private static Dataset readWebCrawl(SparkSession spark, String inputPath) { + StructType webInfo = StructType + .fromDDL( + "`id` STRING , `doi` STRING, `ids` STRUCT<`pmid` :STRING, `pmcid`: STRING >, `publication_year` STRING, " + + + "`authorships` ARRAY>>>"); - return createAffiliatioRelationPair(pmid, ror) - .stream() - .map(r -> new Tuple2(publication_year, r)) - .collect(Collectors.toList()); - } + return spark + .read() + .schema(webInfo) + .json(inputPath) + .withColumn( + "authors", functions + .explode( + functions.col("authorships"))) + .selectExpr("id", "doi", "ids", "publication_year", "authors.institutions as institutions") + .withColumn( + "institution", functions + .explode( + functions.col("institutions"))) + .selectExpr( + "id", "doi", "ids.pmcid as pmcid", "ids.pmid as pmid", "institution.ror as ror", + "institution.country_code as country_code", "publication_year") + .distinct(); - private static Collection> createAffiliationRelationPairDOI( - String publication_year, String doi, String ror) { - if (doi == null) - return new ArrayList<>(); + } - return createAffiliatioRelationPair(doi, ror) - .stream() - .map(r -> new Tuple2(publication_year, r)) - .collect(Collectors.toList()); - } + private static List createAffiliationRelationPairPMCID(String pmcid, String ror) { + if (pmcid == null) + return new ArrayList<>(); - public static void createActionSet(SparkSession spark, String inputPath, - String outputPath) { + return createAffiliatioRelationPair( + PMCID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), removeResolver("PMC" , pmcid))), + ror); + } - final Dataset dataset = readWebCrawl(spark, inputPath) - .filter("publication_year <= 2020 or country_code=='IE'") - .drop("publication_year"); + private static List createAffiliationRelationPairPMID(String pmid, String ror) { + if (pmid == null) + return new ArrayList<>(); - dataset.flatMap((FlatMapFunction) row -> { - List ret = new ArrayList<>(); - final String ror = ROR_PREFIX - + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror"))); - ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror)); - ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror)); - ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror)); + return createAffiliatioRelationPair( + PMID_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), removeResolver("PMID", pmid))), + ror); + } - return ret - .iterator(); - }, Encoders.bean(Relation.class)) - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p)) - .mapToPair( - aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + private static String removeResolver(String pidType, String pid) { + switch (pidType){ + case "PMID": + return pid.substring(33); + case "PMC": + return "PMC" + pid.substring(43); + case "DOI": + return pid.substring(16); + } - } + throw new RuntimeException(); - private static Dataset readWebCrawl(SparkSession spark, String inputPath) { - StructType webInfo = StructType - .fromDDL( - "`id` STRING , `doi` STRING, `ids` STRUCT<`pmid` :STRING, `pmcid`: STRING >, `publication_year` STRING, " - + - "`authorships` ARRAY>>>"); + } - return spark - .read() - .schema(webInfo) - .json(inputPath) - .withColumn( - "authors", functions - .explode( - functions.col("authorships"))) - .selectExpr("id", "doi", "ids", "publication_year", "authors.institutions as institutions") - .withColumn( - "institution", functions - .explode( - functions.col("institutions"))) - .selectExpr( - "id", "doi", "ids.pmcid as pmcid", "ids.pmid as pmid", "institution.ror as ror", - "institution.country_code as country_code", "publication_year") - // .where("country_code == 'IE'") - .distinct(); + private static List createAffiliationRelationPairDOI(String doi, String ror) { + if (doi == null) + return new ArrayList<>(); - } + return createAffiliatioRelationPair( + DOI_PREFIX + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), removeResolver("DOI" ,doi))), + ror); - private static List createAffiliationRelationPairPMCID(String pmcid, String ror) { - if (pmcid == null) - return new ArrayList<>(); + } - return createAffiliatioRelationPair( - PMCID_PREFIX - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), "PMC" + pmcid.substring(43))), - ror); - } + private static List createAffiliatioRelationPair(String resultId, String orgId) { + ArrayList newRelations = new ArrayList(); - private static List createAffiliationRelationPairPMID(String pmid, String ror) { - if (pmid == null) - return new ArrayList<>(); + newRelations + .add( + OafMapperUtils + .getRelation( + orgId, resultId, ModelConstants.RESULT_ORGANIZATION, ModelConstants.AFFILIATION, + ModelConstants.IS_AUTHOR_INSTITUTION_OF, + Arrays + .asList( + OafMapperUtils.keyValue(WEB_CRAWL_ID, WEB_CRAWL_NAME)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + "sysimport:crasswalk:webcrawl", "Imported from Webcrawl", + ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.9"), + null)); - return createAffiliatioRelationPair( - PMID_PREFIX - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), pmid.substring(33))), - ror); - } + newRelations + .add( + OafMapperUtils + .getRelation( + resultId, orgId, ModelConstants.RESULT_ORGANIZATION, ModelConstants.AFFILIATION, + ModelConstants.HAS_AUTHOR_INSTITUTION, + Arrays + .asList( + OafMapperUtils.keyValue(WEB_CRAWL_ID, WEB_CRAWL_NAME)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + "sysimport:crasswalk:webcrawl", "Imported from Webcrawl", + ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.9"), + null)); - private static List createAffiliationRelationPairDOI(String doi, String ror) { - if (doi == null) - return new ArrayList<>(); + return newRelations; - return createAffiliatioRelationPair( - DOI_PREFIX - + IdentifierFactory - .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), doi.substring(16))), - ror); - - } - - private static List createAffiliatioRelationPair(String resultId, String orgId) { - ArrayList newRelations = new ArrayList(); - - newRelations - .add( - OafMapperUtils - .getRelation( - orgId, resultId, ModelConstants.RESULT_ORGANIZATION, ModelConstants.AFFILIATION, - ModelConstants.IS_AUTHOR_INSTITUTION_OF, - Arrays - .asList( - OafMapperUtils.keyValue(WEB_CRAWL_ID, WEB_CRAWL_NAME)), - OafMapperUtils - .dataInfo( - false, null, false, false, - OafMapperUtils - .qualifier( - "sysimport:crasswalk:webcrawl", "Imported from Webcrawl", - ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), - "0.9"), - null)); - - newRelations - .add( - OafMapperUtils - .getRelation( - resultId, orgId, ModelConstants.RESULT_ORGANIZATION, ModelConstants.AFFILIATION, - ModelConstants.HAS_AUTHOR_INSTITUTION, - Arrays - .asList( - OafMapperUtils.keyValue(WEB_CRAWL_ID, WEB_CRAWL_NAME)), - OafMapperUtils - .dataInfo( - false, null, false, false, - OafMapperUtils - .qualifier( - "sysimport:crasswalk:webcrawl", "Imported from Webcrawl", - ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), - "0.9"), - null)); - - return newRelations; - - } + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java index a1cd69dcc..402f07d4d 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/webcrawl/CreateASTest.java @@ -1,13 +1,12 @@ + package eu.dnetlib.dhp.actionmanager.webcrawl; + import static org.junit.jupiter.api.Assertions.assertEquals; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; -import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; -import eu.dnetlib.dhp.schema.oaf.utils.PidType; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -25,261 +24,264 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; /** * @author miriam.baglioni * @Date 22/04/24 */ public class CreateASTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; + private static SparkSession spark; - private static Path workingDir; - private static final Logger log = LoggerFactory - .getLogger(CreateASTest.class); + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(CreateASTest.class); - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files - .createTempDirectory(CreateASTest.class.getSimpleName()); - log.info("using work dir {}", workingDir); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(CreateASTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); - SparkConf conf = new SparkConf(); - conf.setAppName(CreateASTest.class.getSimpleName()); + SparkConf conf = new SparkConf(); + conf.setAppName(CreateASTest.class.getSimpleName()); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - spark = SparkSession - .builder() - .appName(CreateASTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + spark = SparkSession + .builder() + .appName(CreateASTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } - @Test - void testNumberofRelations() throws Exception { + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } - String inputPath = getClass() - .getResource( - "/eu/dnetlib/dhp/actionmanager/webcrawl/") - .getPath(); + @Test + void testNumberofRelations() throws Exception { - CreateActionSetFromWebEntries - .main( - new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-sourcePath", - inputPath, - "-outputPath", - workingDir.toString() + "/actionSet1" - }); + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/webcrawl/") + .getPath(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + CreateActionSetFromWebEntries + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-sourcePath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet1" + }); - JavaRDD tmp = sc - .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) - .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) - .map(aa -> ((Relation) aa.getPayload())); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - Assertions.assertEquals(64, tmp.count()); + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); - } - @Test - void testRelations() throws Exception { + Assertions.assertEquals(64, tmp.count()); + + } + + @Test + void testRelations() throws Exception { // , "doi":"https://doi.org/10.1126/science.1188021", "pmid":"https://pubmed.ncbi.nlm.nih.gov/20448178", https://www.ncbi.nlm.nih.gov/pmc/articles/5100745 - String inputPath = getClass() - .getResource( - "/eu/dnetlib/dhp/actionmanager/webcrawl/") - .getPath(); + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/webcrawl/") + .getPath(); - CreateActionSetFromWebEntries - .main( - new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-sourcePath", - inputPath, - "-outputPath", - workingDir.toString() + "/actionSet1" - }); + CreateActionSetFromWebEntries + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-sourcePath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet1" + }); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) - .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) - .map(aa -> ((Relation) aa.getPayload())); + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); - tmp.foreach(r -> System.out.println(new ObjectMapper().writeValueAsString(r))); + tmp.foreach(r -> System.out.println(new ObjectMapper().writeValueAsString(r))); - Assertions - .assertEquals( - 1, tmp - .filter( - r -> r - .getSource() - .equals( - "50|doi_________::" + IdentifierFactory - .md5( - PidCleaner - .normalizePidValue(PidType.doi.toString(), "10.1098/rstl.1684.0023")))) - .count()); + Assertions + .assertEquals( + 1, tmp + .filter( + r -> r + .getSource() + .equals( + "50|doi_________::" + IdentifierFactory + .md5( + PidCleaner + .normalizePidValue(PidType.doi.toString(), "10.1098/rstl.1684.0023")))) + .count()); - Assertions - .assertEquals( - 1, tmp - .filter( - r -> r - .getTarget() - .equals( - "50|doi_________::" + IdentifierFactory - .md5( - PidCleaner - .normalizePidValue(PidType.doi.toString(), "10.1098/rstl.1684.0023")))) - .count()); + Assertions + .assertEquals( + 1, tmp + .filter( + r -> r + .getTarget() + .equals( + "50|doi_________::" + IdentifierFactory + .md5( + PidCleaner + .normalizePidValue(PidType.doi.toString(), "10.1098/rstl.1684.0023")))) + .count()); - Assertions - .assertEquals( - 1, tmp - .filter( - r -> r - .getSource() - .equals( - "20|ror_________::" + IdentifierFactory - .md5( - PidCleaner - .normalizePidValue("ROR", "https://ror.org/03argrj65")))) - .count()); + Assertions + .assertEquals( + 1, tmp + .filter( + r -> r + .getSource() + .equals( + "20|ror_________::" + IdentifierFactory + .md5( + PidCleaner + .normalizePidValue("ROR", "https://ror.org/03argrj65")))) + .count()); - Assertions - .assertEquals( - 1, tmp - .filter( - r -> r - .getTarget() - .equals( - "20|ror_________::" + IdentifierFactory - .md5( - PidCleaner - .normalizePidValue("ROR", "https://ror.org/03argrj65")))) - .count()); + Assertions + .assertEquals( + 1, tmp + .filter( + r -> r + .getTarget() + .equals( + "20|ror_________::" + IdentifierFactory + .md5( + PidCleaner + .normalizePidValue("ROR", "https://ror.org/03argrj65")))) + .count()); - Assertions - .assertEquals( - 5, tmp - .filter( - r -> r - .getSource() - .equals( - "20|ror_________::" + IdentifierFactory - .md5( - PidCleaner - .normalizePidValue("ROR", "https://ror.org/03265fv13")))) - .count()); + Assertions + .assertEquals( + 5, tmp + .filter( + r -> r + .getSource() + .equals( + "20|ror_________::" + IdentifierFactory + .md5( + PidCleaner + .normalizePidValue("ROR", "https://ror.org/03265fv13")))) + .count()); - Assertions - .assertEquals( - 5, tmp - .filter( - r -> r - .getTarget() - .equals( - "20|ror_________::" + IdentifierFactory - .md5( - PidCleaner - .normalizePidValue("ROR", "https://ror.org/03265fv13")))) - .count()); + Assertions + .assertEquals( + 5, tmp + .filter( + r -> r + .getTarget() + .equals( + "20|ror_________::" + IdentifierFactory + .md5( + PidCleaner + .normalizePidValue("ROR", "https://ror.org/03265fv13")))) + .count()); - Assertions - .assertEquals( - 2, tmp - .filter( - r -> r - .getTarget() - .equals( - "20|ror_________::" + IdentifierFactory - .md5( - PidCleaner - .normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13"))) - && r.getSource().startsWith("50|doi")) - .count()); + Assertions + .assertEquals( + 2, tmp + .filter( + r -> r + .getTarget() + .equals( + "20|ror_________::" + IdentifierFactory + .md5( + PidCleaner + .normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13"))) + && r.getSource().startsWith("50|doi")) + .count()); - Assertions - .assertEquals( - 2, tmp - .filter( - r -> r - .getTarget() - .equals( - "20|ror_________::" + IdentifierFactory - .md5( - PidCleaner - .normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13"))) - && r.getSource().startsWith("50|pmid")) - .count()); + Assertions + .assertEquals( + 2, tmp + .filter( + r -> r + .getTarget() + .equals( + "20|ror_________::" + IdentifierFactory + .md5( + PidCleaner + .normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13"))) + && r.getSource().startsWith("50|pmid")) + .count()); - Assertions - .assertEquals( - 1, tmp - .filter( - r -> r - .getTarget() - .equals( - "20|ror_________::" + IdentifierFactory - .md5( - PidCleaner - .normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13"))) - && r.getSource().startsWith("50|pmc")) - .count()); - } + Assertions + .assertEquals( + 1, tmp + .filter( + r -> r + .getTarget() + .equals( + "20|ror_________::" + IdentifierFactory + .md5( + PidCleaner + .normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13"))) + && r.getSource().startsWith("50|pmc")) + .count()); + } - @Test - void testRelationsCollectedFrom() throws Exception { + @Test + void testRelationsCollectedFrom() throws Exception { - String inputPath = getClass() - .getResource( - "/eu/dnetlib/dhp/actionmanager/webcrawl") - .getPath(); + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/webcrawl") + .getPath(); - CreateActionSetFromWebEntries - .main( - new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-sourcePath", - inputPath, - "-outputPath", - workingDir.toString() + "/actionSet1" - }); + CreateActionSetFromWebEntries + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-sourcePath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet1" + }); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) - .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) - .map(aa -> ((Relation) aa.getPayload())); - - tmp.foreach(r -> { - assertEquals("Web Crawl", r.getCollectedfrom().get(0).getValue()); - assertEquals("10|openaire____::fb98a192f6a055ba495ef414c330834b", r.getCollectedfrom().get(0).getKey()); - }); - - } + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); + tmp.foreach(r -> { + assertEquals("Web Crawl", r.getCollectedfrom().get(0).getValue()); + assertEquals("10|openaire____::fb98a192f6a055ba495ef414c330834b", r.getCollectedfrom().get(0).getKey()); + }); + } }