[WebCrawl] addressing comments from PR

This commit is contained in:
Miriam Baglioni 2024-04-22 13:52:50 +02:00
parent eb4692e4ee
commit 7de114bda0
2 changed files with 376 additions and 414 deletions

View File

@ -35,238 +35,198 @@ import scala.Tuple2;
* @Date 18/04/24 * @Date 18/04/24
*/ */
public class CreateActionSetFromWebEntries implements Serializable { public class CreateActionSetFromWebEntries implements Serializable {
private static final Logger log = LoggerFactory.getLogger(CreateActionSetFromWebEntries.class); private static final Logger log = LoggerFactory.getLogger(CreateActionSetFromWebEntries.class);
private static final String DOI_PREFIX = "50|doi_________::"; private static final String DOI_PREFIX = "50|doi_________::";
private static final String ROR_PREFIX = "20|ror_________::"; private static final String ROR_PREFIX = "20|ror_________::";
private static final String PMID_PREFIX = "50|pmid________::"; private static final String PMID_PREFIX = "50|pmid________::";
private static final String PMCID_PREFIX = "50|pmc_________::"; private static final String PMCID_PREFIX = "50|pmc_________::";
private static final String WEB_CRAWL_ID = "10|openaire____::fb98a192f6a055ba495ef414c330834b"; private static final String WEB_CRAWL_ID = "10|openaire____::fb98a192f6a055ba495ef414c330834b";
private static final String WEB_CRAWL_NAME = "Web Crawl"; private static final String WEB_CRAWL_NAME = "Web Crawl";
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
CreateActionSetFromWebEntries.class CreateActionSetFromWebEntries.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/webcrawl/as_parameters.json")); "/eu/dnetlib/dhp/actionmanager/webcrawl/as_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged")) .ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf) .map(Boolean::valueOf)
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath"); final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath); log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
createActionSet(spark, inputPath, outputPath + "actionSet"); createActionSet(spark, inputPath, outputPath );
createPlainRelations(spark, inputPath, outputPath + "relations");
});
}
private static void createPlainRelations(SparkSession spark, String inputPath, String outputPath) { });
final Dataset<Row> dataset = readWebCrawl(spark, inputPath); }
dataset.flatMap((FlatMapFunction<Row, Tuple2<String, Relation>>) row -> { public static void createActionSet(SparkSession spark, String inputPath,
List<Tuple2<String, Relation>> ret = new ArrayList<>(); String outputPath) {
final String ror = row.getAs("ror"); final Dataset<Row> dataset = readWebCrawl(spark, inputPath)
ret.addAll(createAffiliationRelationPairDOI(row.getAs("publication_year"), row.getAs("doi"), ror)); .filter("publication_year <= 2020 or country_code=='IE'")
ret.addAll(createAffiliationRelationPairPMID(row.getAs("publication_year"), row.getAs("pmid"), ror)); .drop("publication_year");
ret.addAll(createAffiliationRelationPairPMCID(row.getAs("publication_year"), row.getAs("pmcid"), ror));
return ret dataset.flatMap((FlatMapFunction<Row, Relation>) row -> {
.iterator(); List<Relation> ret = new ArrayList<>();
}, Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class))) final String ror = ROR_PREFIX
.write() + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror")));
.mode(SaveMode.Overwrite) ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror));
.option("compression", "gzip") ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror));
.json(outputPath); ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror));
}
private static Collection<? extends Tuple2<String, Relation>> createAffiliationRelationPairPMCID( return ret
String publication_year, String pmcid, String ror) { .iterator();
if (pmcid == null) }, Encoders.bean(Relation.class))
return new ArrayList<>(); .toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
return createAffiliatioRelationPair("PMC" + pmcid, ror) }
.stream()
.map(r -> new Tuple2<String, Relation>(publication_year, r))
.collect(Collectors.toList());
}
private static Collection<? extends Tuple2<String, Relation>> createAffiliationRelationPairPMID( private static Dataset<Row> readWebCrawl(SparkSession spark, String inputPath) {
String publication_year, String pmid, String ror) { StructType webInfo = StructType
if (pmid == null) .fromDDL(
return new ArrayList<>(); "`id` STRING , `doi` STRING, `ids` STRUCT<`pmid` :STRING, `pmcid`: STRING >, `publication_year` STRING, "
+
"`authorships` ARRAY<STRUCT <`institutions`: ARRAY <STRUCT <`ror`: STRING, `country_code` :STRING>>>>");
return createAffiliatioRelationPair(pmid, ror) return spark
.stream() .read()
.map(r -> new Tuple2<String, Relation>(publication_year, r)) .schema(webInfo)
.collect(Collectors.toList()); .json(inputPath)
} .withColumn(
"authors", functions
.explode(
functions.col("authorships")))
.selectExpr("id", "doi", "ids", "publication_year", "authors.institutions as institutions")
.withColumn(
"institution", functions
.explode(
functions.col("institutions")))
.selectExpr(
"id", "doi", "ids.pmcid as pmcid", "ids.pmid as pmid", "institution.ror as ror",
"institution.country_code as country_code", "publication_year")
.distinct();
private static Collection<? extends Tuple2<String, Relation>> createAffiliationRelationPairDOI( }
String publication_year, String doi, String ror) {
if (doi == null)
return new ArrayList<>();
return createAffiliatioRelationPair(doi, ror) private static List<Relation> createAffiliationRelationPairPMCID(String pmcid, String ror) {
.stream() if (pmcid == null)
.map(r -> new Tuple2<String, Relation>(publication_year, r)) return new ArrayList<>();
.collect(Collectors.toList());
}
public static void createActionSet(SparkSession spark, String inputPath, return createAffiliatioRelationPair(
String outputPath) { PMCID_PREFIX
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), removeResolver("PMC" , pmcid))),
ror);
}
final Dataset<Row> dataset = readWebCrawl(spark, inputPath) private static List<Relation> createAffiliationRelationPairPMID(String pmid, String ror) {
.filter("publication_year <= 2020 or country_code=='IE'") if (pmid == null)
.drop("publication_year"); return new ArrayList<>();
dataset.flatMap((FlatMapFunction<Row, Relation>) row -> { return createAffiliatioRelationPair(
List<Relation> ret = new ArrayList<>(); PMID_PREFIX
final String ror = ROR_PREFIX + IdentifierFactory
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror"))); .md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), removeResolver("PMID", pmid))),
ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror)); ror);
ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror)); }
ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror));
return ret private static String removeResolver(String pidType, String pid) {
.iterator(); switch (pidType){
}, Encoders.bean(Relation.class)) case "PMID":
.toJavaRDD() return pid.substring(33);
.map(p -> new AtomicAction(p.getClass(), p)) case "PMC":
.mapToPair( return "PMC" + pid.substring(43);
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), case "DOI":
new Text(OBJECT_MAPPER.writeValueAsString(aa)))) return pid.substring(16);
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); }
} throw new RuntimeException();
private static Dataset<Row> readWebCrawl(SparkSession spark, String inputPath) { }
StructType webInfo = StructType
.fromDDL(
"`id` STRING , `doi` STRING, `ids` STRUCT<`pmid` :STRING, `pmcid`: STRING >, `publication_year` STRING, "
+
"`authorships` ARRAY<STRUCT <`institutions`: ARRAY <STRUCT <`ror`: STRING, `country_code` :STRING>>>>");
return spark private static List<Relation> createAffiliationRelationPairDOI(String doi, String ror) {
.read() if (doi == null)
.schema(webInfo) return new ArrayList<>();
.json(inputPath)
.withColumn(
"authors", functions
.explode(
functions.col("authorships")))
.selectExpr("id", "doi", "ids", "publication_year", "authors.institutions as institutions")
.withColumn(
"institution", functions
.explode(
functions.col("institutions")))
.selectExpr(
"id", "doi", "ids.pmcid as pmcid", "ids.pmid as pmid", "institution.ror as ror",
"institution.country_code as country_code", "publication_year")
// .where("country_code == 'IE'")
.distinct();
} return createAffiliatioRelationPair(
DOI_PREFIX
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), removeResolver("DOI" ,doi))),
ror);
private static List<Relation> createAffiliationRelationPairPMCID(String pmcid, String ror) { }
if (pmcid == null)
return new ArrayList<>();
return createAffiliatioRelationPair( private static List<Relation> createAffiliatioRelationPair(String resultId, String orgId) {
PMCID_PREFIX ArrayList<Relation> newRelations = new ArrayList();
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), "PMC" + pmcid.substring(43))),
ror);
}
private static List<Relation> createAffiliationRelationPairPMID(String pmid, String ror) { newRelations
if (pmid == null) .add(
return new ArrayList<>(); OafMapperUtils
.getRelation(
orgId, resultId, ModelConstants.RESULT_ORGANIZATION, ModelConstants.AFFILIATION,
ModelConstants.IS_AUTHOR_INSTITUTION_OF,
Arrays
.asList(
OafMapperUtils.keyValue(WEB_CRAWL_ID, WEB_CRAWL_NAME)),
OafMapperUtils
.dataInfo(
false, null, false, false,
OafMapperUtils
.qualifier(
"sysimport:crasswalk:webcrawl", "Imported from Webcrawl",
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
"0.9"),
null));
return createAffiliatioRelationPair( newRelations
PMID_PREFIX .add(
+ IdentifierFactory OafMapperUtils
.md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), pmid.substring(33))), .getRelation(
ror); resultId, orgId, ModelConstants.RESULT_ORGANIZATION, ModelConstants.AFFILIATION,
} ModelConstants.HAS_AUTHOR_INSTITUTION,
Arrays
.asList(
OafMapperUtils.keyValue(WEB_CRAWL_ID, WEB_CRAWL_NAME)),
OafMapperUtils
.dataInfo(
false, null, false, false,
OafMapperUtils
.qualifier(
"sysimport:crasswalk:webcrawl", "Imported from Webcrawl",
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
"0.9"),
null));
private static List<Relation> createAffiliationRelationPairDOI(String doi, String ror) { return newRelations;
if (doi == null)
return new ArrayList<>();
return createAffiliatioRelationPair( }
DOI_PREFIX
+ IdentifierFactory
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), doi.substring(16))),
ror);
}
private static List<Relation> createAffiliatioRelationPair(String resultId, String orgId) {
ArrayList<Relation> newRelations = new ArrayList();
newRelations
.add(
OafMapperUtils
.getRelation(
orgId, resultId, ModelConstants.RESULT_ORGANIZATION, ModelConstants.AFFILIATION,
ModelConstants.IS_AUTHOR_INSTITUTION_OF,
Arrays
.asList(
OafMapperUtils.keyValue(WEB_CRAWL_ID, WEB_CRAWL_NAME)),
OafMapperUtils
.dataInfo(
false, null, false, false,
OafMapperUtils
.qualifier(
"sysimport:crasswalk:webcrawl", "Imported from Webcrawl",
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
"0.9"),
null));
newRelations
.add(
OafMapperUtils
.getRelation(
resultId, orgId, ModelConstants.RESULT_ORGANIZATION, ModelConstants.AFFILIATION,
ModelConstants.HAS_AUTHOR_INSTITUTION,
Arrays
.asList(
OafMapperUtils.keyValue(WEB_CRAWL_ID, WEB_CRAWL_NAME)),
OafMapperUtils
.dataInfo(
false, null, false, false,
OafMapperUtils
.qualifier(
"sysimport:crasswalk:webcrawl", "Imported from Webcrawl",
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
"0.9"),
null));
return newRelations;
}
} }

View File

@ -1,13 +1,12 @@
package eu.dnetlib.dhp.actionmanager.webcrawl; package eu.dnetlib.dhp.actionmanager.webcrawl;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -25,261 +24,264 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
/** /**
* @author miriam.baglioni * @author miriam.baglioni
* @Date 22/04/24 * @Date 22/04/24
*/ */
public class CreateASTest { public class CreateASTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark; private static SparkSession spark;
private static Path workingDir; private static Path workingDir;
private static final Logger log = LoggerFactory private static final Logger log = LoggerFactory
.getLogger(CreateASTest.class); .getLogger(CreateASTest.class);
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
workingDir = Files workingDir = Files
.createTempDirectory(CreateASTest.class.getSimpleName()); .createTempDirectory(CreateASTest.class.getSimpleName());
log.info("using work dir {}", workingDir); log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.setAppName(CreateASTest.class.getSimpleName()); conf.setAppName(CreateASTest.class.getSimpleName());
conf.setMaster("local[*]"); conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost"); conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true"); conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false"); conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString()); conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession spark = SparkSession
.builder() .builder()
.appName(CreateASTest.class.getSimpleName()) .appName(CreateASTest.class.getSimpleName())
.config(conf) .config(conf)
.getOrCreate(); .getOrCreate();
} }
@AfterAll @AfterAll
public static void afterAll() throws IOException { public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile()); FileUtils.deleteDirectory(workingDir.toFile());
spark.stop(); spark.stop();
} }
@Test
void testNumberofRelations() throws Exception {
String inputPath = getClass() @Test
.getResource( void testNumberofRelations() throws Exception {
"/eu/dnetlib/dhp/actionmanager/webcrawl/")
.getPath();
CreateActionSetFromWebEntries String inputPath = getClass()
.main( .getResource(
new String[] { "/eu/dnetlib/dhp/actionmanager/webcrawl/")
"-isSparkSessionManaged", .getPath();
Boolean.FALSE.toString(),
"-sourcePath",
inputPath,
"-outputPath",
workingDir.toString() + "/actionSet1"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); CreateActionSetFromWebEntries
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-sourcePath",
inputPath,
"-outputPath",
workingDir.toString() + "/actionSet1"
});
JavaRDD<Relation> tmp = sc final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
.sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload()));
Assertions.assertEquals(64, tmp.count()); JavaRDD<Relation> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload()));
} Assertions.assertEquals(64, tmp.count());
@Test
void testRelations() throws Exception { }
@Test
void testRelations() throws Exception {
// , "doi":"https://doi.org/10.1126/science.1188021", "pmid":"https://pubmed.ncbi.nlm.nih.gov/20448178", https://www.ncbi.nlm.nih.gov/pmc/articles/5100745 // , "doi":"https://doi.org/10.1126/science.1188021", "pmid":"https://pubmed.ncbi.nlm.nih.gov/20448178", https://www.ncbi.nlm.nih.gov/pmc/articles/5100745
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl/") "/eu/dnetlib/dhp/actionmanager/webcrawl/")
.getPath(); .getPath();
CreateActionSetFromWebEntries CreateActionSetFromWebEntries
.main( .main(
new String[] { new String[] {
"-isSparkSessionManaged", "-isSparkSessionManaged",
Boolean.FALSE.toString(), Boolean.FALSE.toString(),
"-sourcePath", "-sourcePath",
inputPath, inputPath,
"-outputPath", "-outputPath",
workingDir.toString() + "/actionSet1" workingDir.toString() + "/actionSet1"
}); });
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc JavaRDD<Relation> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload())); .map(aa -> ((Relation) aa.getPayload()));
tmp.foreach(r -> System.out.println(new ObjectMapper().writeValueAsString(r))); tmp.foreach(r -> System.out.println(new ObjectMapper().writeValueAsString(r)));
Assertions Assertions
.assertEquals( .assertEquals(
1, tmp 1, tmp
.filter( .filter(
r -> r r -> r
.getSource() .getSource()
.equals( .equals(
"50|doi_________::" + IdentifierFactory "50|doi_________::" + IdentifierFactory
.md5( .md5(
PidCleaner PidCleaner
.normalizePidValue(PidType.doi.toString(), "10.1098/rstl.1684.0023")))) .normalizePidValue(PidType.doi.toString(), "10.1098/rstl.1684.0023"))))
.count()); .count());
Assertions Assertions
.assertEquals( .assertEquals(
1, tmp 1, tmp
.filter( .filter(
r -> r r -> r
.getTarget() .getTarget()
.equals( .equals(
"50|doi_________::" + IdentifierFactory "50|doi_________::" + IdentifierFactory
.md5( .md5(
PidCleaner PidCleaner
.normalizePidValue(PidType.doi.toString(), "10.1098/rstl.1684.0023")))) .normalizePidValue(PidType.doi.toString(), "10.1098/rstl.1684.0023"))))
.count()); .count());
Assertions Assertions
.assertEquals( .assertEquals(
1, tmp 1, tmp
.filter( .filter(
r -> r r -> r
.getSource() .getSource()
.equals( .equals(
"20|ror_________::" + IdentifierFactory "20|ror_________::" + IdentifierFactory
.md5( .md5(
PidCleaner PidCleaner
.normalizePidValue("ROR", "https://ror.org/03argrj65")))) .normalizePidValue("ROR", "https://ror.org/03argrj65"))))
.count()); .count());
Assertions Assertions
.assertEquals( .assertEquals(
1, tmp 1, tmp
.filter( .filter(
r -> r r -> r
.getTarget() .getTarget()
.equals( .equals(
"20|ror_________::" + IdentifierFactory "20|ror_________::" + IdentifierFactory
.md5( .md5(
PidCleaner PidCleaner
.normalizePidValue("ROR", "https://ror.org/03argrj65")))) .normalizePidValue("ROR", "https://ror.org/03argrj65"))))
.count()); .count());
Assertions Assertions
.assertEquals( .assertEquals(
5, tmp 5, tmp
.filter( .filter(
r -> r r -> r
.getSource() .getSource()
.equals( .equals(
"20|ror_________::" + IdentifierFactory "20|ror_________::" + IdentifierFactory
.md5( .md5(
PidCleaner PidCleaner
.normalizePidValue("ROR", "https://ror.org/03265fv13")))) .normalizePidValue("ROR", "https://ror.org/03265fv13"))))
.count()); .count());
Assertions Assertions
.assertEquals( .assertEquals(
5, tmp 5, tmp
.filter( .filter(
r -> r r -> r
.getTarget() .getTarget()
.equals( .equals(
"20|ror_________::" + IdentifierFactory "20|ror_________::" + IdentifierFactory
.md5( .md5(
PidCleaner PidCleaner
.normalizePidValue("ROR", "https://ror.org/03265fv13")))) .normalizePidValue("ROR", "https://ror.org/03265fv13"))))
.count()); .count());
Assertions Assertions
.assertEquals( .assertEquals(
2, tmp 2, tmp
.filter( .filter(
r -> r r -> r
.getTarget() .getTarget()
.equals( .equals(
"20|ror_________::" + IdentifierFactory "20|ror_________::" + IdentifierFactory
.md5( .md5(
PidCleaner PidCleaner
.normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13"))) .normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13")))
&& r.getSource().startsWith("50|doi")) && r.getSource().startsWith("50|doi"))
.count()); .count());
Assertions Assertions
.assertEquals( .assertEquals(
2, tmp 2, tmp
.filter( .filter(
r -> r r -> r
.getTarget() .getTarget()
.equals( .equals(
"20|ror_________::" + IdentifierFactory "20|ror_________::" + IdentifierFactory
.md5( .md5(
PidCleaner PidCleaner
.normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13"))) .normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13")))
&& r.getSource().startsWith("50|pmid")) && r.getSource().startsWith("50|pmid"))
.count()); .count());
Assertions Assertions
.assertEquals( .assertEquals(
1, tmp 1, tmp
.filter( .filter(
r -> r r -> r
.getTarget() .getTarget()
.equals( .equals(
"20|ror_________::" + IdentifierFactory "20|ror_________::" + IdentifierFactory
.md5( .md5(
PidCleaner PidCleaner
.normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13"))) .normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13")))
&& r.getSource().startsWith("50|pmc")) && r.getSource().startsWith("50|pmc"))
.count()); .count());
} }
@Test @Test
void testRelationsCollectedFrom() throws Exception { void testRelationsCollectedFrom() throws Exception {
String inputPath = getClass() String inputPath = getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/webcrawl") "/eu/dnetlib/dhp/actionmanager/webcrawl")
.getPath(); .getPath();
CreateActionSetFromWebEntries CreateActionSetFromWebEntries
.main( .main(
new String[] { new String[] {
"-isSparkSessionManaged", "-isSparkSessionManaged",
Boolean.FALSE.toString(), Boolean.FALSE.toString(),
"-sourcePath", "-sourcePath",
inputPath, inputPath,
"-outputPath", "-outputPath",
workingDir.toString() + "/actionSet1" workingDir.toString() + "/actionSet1"
}); });
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc JavaRDD<Relation> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class) .sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload())); .map(aa -> ((Relation) aa.getPayload()));
tmp.foreach(r -> {
assertEquals("Web Crawl", r.getCollectedfrom().get(0).getValue());
assertEquals("10|openaire____::fb98a192f6a055ba495ef414c330834b", r.getCollectedfrom().get(0).getKey());
});
}
tmp.foreach(r -> {
assertEquals("Web Crawl", r.getCollectedfrom().get(0).getValue());
assertEquals("10|openaire____::fb98a192f6a055ba495ef414c330834b", r.getCollectedfrom().get(0).getKey());
});
}
} }