forked from D-Net/dnet-hadoop
[WebCrawl] adding affiliation relations from web information
This commit is contained in:
parent
5857fd38c1
commit
776c898c4b
|
@ -0,0 +1,272 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.webcrawl;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
import org.apache.spark.sql.types.StructType;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 18/04/24
|
||||||
|
*/
|
||||||
|
public class CreateActionSetFromWebEntries implements Serializable {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(CreateActionSetFromWebEntries.class);
|
||||||
|
private static final String DOI_PREFIX = "50|doi_________::";
|
||||||
|
|
||||||
|
private static final String ROR_PREFIX = "20|ror_________::";
|
||||||
|
|
||||||
|
private static final String PMID_PREFIX = "50|pmid________::";
|
||||||
|
|
||||||
|
private static final String PMCID_PREFIX = "50|pmc_________::";
|
||||||
|
private static final String WEB_CRAWL_ID = "10|openaire____::fb98a192f6a055ba495ef414c330834b";
|
||||||
|
private static final String WEB_CRAWL_NAME = "Web Crawl";
|
||||||
|
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
CreateActionSetFromWebEntries.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/webcrawl/as_parameters.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String inputPath = parser.get("sourcePath");
|
||||||
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
|
||||||
|
createActionSet(spark, inputPath, outputPath + "actionSet");
|
||||||
|
createPlainRelations(spark, inputPath, outputPath + "relations");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createPlainRelations(SparkSession spark, String inputPath, String outputPath) {
|
||||||
|
final Dataset<Row> dataset = readWebCrawl(spark, inputPath);
|
||||||
|
|
||||||
|
dataset.flatMap((FlatMapFunction<Row, Tuple2<String, Relation>>) row -> {
|
||||||
|
List<Tuple2<String, Relation>> ret = new ArrayList<>();
|
||||||
|
|
||||||
|
final String ror = row.getAs("ror");
|
||||||
|
ret.addAll(createAffiliationRelationPairDOI(row.getAs("publication_year"), row.getAs("doi"), ror));
|
||||||
|
ret.addAll(createAffiliationRelationPairPMID(row.getAs("publication_year"), row.getAs("pmid"), ror));
|
||||||
|
ret.addAll(createAffiliationRelationPairPMCID(row.getAs("publication_year"), row.getAs("pmcid"), ror));
|
||||||
|
|
||||||
|
return ret
|
||||||
|
.iterator();
|
||||||
|
}, Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class)))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Collection<? extends Tuple2<String, Relation>> createAffiliationRelationPairPMCID(
|
||||||
|
String publication_year, String pmcid, String ror) {
|
||||||
|
if (pmcid == null)
|
||||||
|
return new ArrayList<>();
|
||||||
|
|
||||||
|
return createAffiliatioRelationPair("PMC" + pmcid, ror)
|
||||||
|
.stream()
|
||||||
|
.map(r -> new Tuple2<String, Relation>(publication_year, r))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Collection<? extends Tuple2<String, Relation>> createAffiliationRelationPairPMID(
|
||||||
|
String publication_year, String pmid, String ror) {
|
||||||
|
if (pmid == null)
|
||||||
|
return new ArrayList<>();
|
||||||
|
|
||||||
|
return createAffiliatioRelationPair(pmid, ror)
|
||||||
|
.stream()
|
||||||
|
.map(r -> new Tuple2<String, Relation>(publication_year, r))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Collection<? extends Tuple2<String, Relation>> createAffiliationRelationPairDOI(
|
||||||
|
String publication_year, String doi, String ror) {
|
||||||
|
if (doi == null)
|
||||||
|
return new ArrayList<>();
|
||||||
|
|
||||||
|
return createAffiliatioRelationPair(doi, ror)
|
||||||
|
.stream()
|
||||||
|
.map(r -> new Tuple2<String, Relation>(publication_year, r))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void createActionSet(SparkSession spark, String inputPath,
|
||||||
|
String outputPath) {
|
||||||
|
|
||||||
|
final Dataset<Row> dataset = readWebCrawl(spark, inputPath)
|
||||||
|
.filter("publication_year <= 2020 or country_code=='IE'")
|
||||||
|
.drop("publication_year");
|
||||||
|
|
||||||
|
dataset.flatMap((FlatMapFunction<Row, Relation>) row -> {
|
||||||
|
List<Relation> ret = new ArrayList<>();
|
||||||
|
final String ror = ROR_PREFIX
|
||||||
|
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAs("ror")));
|
||||||
|
ret.addAll(createAffiliationRelationPairDOI(row.getAs("doi"), ror));
|
||||||
|
ret.addAll(createAffiliationRelationPairPMID(row.getAs("pmid"), ror));
|
||||||
|
ret.addAll(createAffiliationRelationPairPMCID(row.getAs("pmcid"), ror));
|
||||||
|
|
||||||
|
return ret
|
||||||
|
.iterator();
|
||||||
|
}, Encoders.bean(Relation.class))
|
||||||
|
.toJavaRDD()
|
||||||
|
.map(p -> new AtomicAction(p.getClass(), p))
|
||||||
|
.mapToPair(
|
||||||
|
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||||
|
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||||
|
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Dataset<Row> readWebCrawl(SparkSession spark, String inputPath) {
|
||||||
|
StructType webInfo = StructType
|
||||||
|
.fromDDL(
|
||||||
|
"`id` STRING , `doi` STRING, `ids` STRUCT<`pmid` :STRING, `pmcid`: STRING >, `publication_year` STRING, "
|
||||||
|
+
|
||||||
|
"`authorships` ARRAY<STRUCT <`institutions`: ARRAY <STRUCT <`ror`: STRING, `country_code` :STRING>>>>");
|
||||||
|
|
||||||
|
return spark
|
||||||
|
.read()
|
||||||
|
.schema(webInfo)
|
||||||
|
.json(inputPath)
|
||||||
|
.withColumn(
|
||||||
|
"authors", functions
|
||||||
|
.explode(
|
||||||
|
functions.col("authorships")))
|
||||||
|
.selectExpr("id", "doi", "ids", "publication_year", "authors.institutions as institutions")
|
||||||
|
.withColumn(
|
||||||
|
"institution", functions
|
||||||
|
.explode(
|
||||||
|
functions.col("institutions")))
|
||||||
|
.selectExpr(
|
||||||
|
"id", "doi", "ids.pmcid as pmcid", "ids.pmid as pmid", "institution.ror as ror",
|
||||||
|
"institution.country_code as country_code", "publication_year")
|
||||||
|
// .where("country_code == 'IE'")
|
||||||
|
.distinct();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Relation> createAffiliationRelationPairPMCID(String pmcid, String ror) {
|
||||||
|
if (pmcid == null)
|
||||||
|
return new ArrayList<>();
|
||||||
|
|
||||||
|
return createAffiliatioRelationPair(
|
||||||
|
PMCID_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.pmc.toString(), "PMC" + pmcid.substring(43))),
|
||||||
|
ror);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Relation> createAffiliationRelationPairPMID(String pmid, String ror) {
|
||||||
|
if (pmid == null)
|
||||||
|
return new ArrayList<>();
|
||||||
|
|
||||||
|
return createAffiliatioRelationPair(
|
||||||
|
PMID_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.pmid.toString(), pmid.substring(33))),
|
||||||
|
ror);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Relation> createAffiliationRelationPairDOI(String doi, String ror) {
|
||||||
|
if (doi == null)
|
||||||
|
return new ArrayList<>();
|
||||||
|
|
||||||
|
return createAffiliatioRelationPair(
|
||||||
|
DOI_PREFIX
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), doi.substring(16))),
|
||||||
|
ror);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Relation> createAffiliatioRelationPair(String resultId, String orgId) {
|
||||||
|
ArrayList<Relation> newRelations = new ArrayList();
|
||||||
|
|
||||||
|
newRelations
|
||||||
|
.add(
|
||||||
|
OafMapperUtils
|
||||||
|
.getRelation(
|
||||||
|
orgId, resultId, ModelConstants.RESULT_ORGANIZATION, ModelConstants.AFFILIATION,
|
||||||
|
ModelConstants.IS_AUTHOR_INSTITUTION_OF,
|
||||||
|
Arrays
|
||||||
|
.asList(
|
||||||
|
OafMapperUtils.keyValue(WEB_CRAWL_ID, WEB_CRAWL_NAME)),
|
||||||
|
OafMapperUtils
|
||||||
|
.dataInfo(
|
||||||
|
false, null, false, false,
|
||||||
|
OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
"sysimport:crasswalk:webcrawl", "Imported from Webcrawl",
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"0.9"),
|
||||||
|
null));
|
||||||
|
|
||||||
|
newRelations
|
||||||
|
.add(
|
||||||
|
OafMapperUtils
|
||||||
|
.getRelation(
|
||||||
|
resultId, orgId, ModelConstants.RESULT_ORGANIZATION, ModelConstants.AFFILIATION,
|
||||||
|
ModelConstants.HAS_AUTHOR_INSTITUTION,
|
||||||
|
Arrays
|
||||||
|
.asList(
|
||||||
|
OafMapperUtils.keyValue(WEB_CRAWL_ID, WEB_CRAWL_NAME)),
|
||||||
|
OafMapperUtils
|
||||||
|
.dataInfo(
|
||||||
|
false, null, false, false,
|
||||||
|
OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
"sysimport:crasswalk:webcrawl", "Imported from Webcrawl",
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"0.9"),
|
||||||
|
null));
|
||||||
|
|
||||||
|
return newRelations;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "sp",
|
||||||
|
"paramLongName": "sourcePath",
|
||||||
|
"paramDescription": "the zipped opencitations file",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "op",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the working path",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": false
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,2 @@
|
||||||
|
sourcePath=/user/miriam.baglioni/openalex-snapshot/data/works/
|
||||||
|
outputPath=/tmp/miriam/webcrawlComplete/
|
|
@ -0,0 +1,58 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>spark2</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive_metastore_uris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
|
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2ExtraListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2SqlQueryExecutionListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorNumber</name>
|
||||||
|
<value>4</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2EventLogDir</name>
|
||||||
|
<value>/user/spark/spark2ApplicationHistory</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkDriverMemory</name>
|
||||||
|
<value>15G</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorMemory</name>
|
||||||
|
<value>6G</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorCores</name>
|
||||||
|
<value>1</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,53 @@
|
||||||
|
<workflow-app name="WebCrawl Integration" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
|
||||||
|
<global>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.queuename</name>
|
||||||
|
<value>${queueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||||
|
<value>${oozieLauncherQueueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>${oozieActionShareLibForSpark2}</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
</configuration>
|
||||||
|
</global>
|
||||||
|
|
||||||
|
<start to="create_actionset"/>
|
||||||
|
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
<action name="create_actionset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Produces the AS for WC</name>
|
||||||
|
<class>eu.dnetlib.dhp.actionmanager.webcrawl.CreateActionSetFromWebEntries</class>
|
||||||
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
|
@ -0,0 +1,285 @@
|
||||||
|
package eu.dnetlib.dhp.actionmanager.webcrawl;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 22/04/24
|
||||||
|
*/
|
||||||
|
public class CreateASTest {
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path workingDir;
|
||||||
|
private static final Logger log = LoggerFactory
|
||||||
|
.getLogger(CreateASTest.class);
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
workingDir = Files
|
||||||
|
.createTempDirectory(CreateASTest.class.getSimpleName());
|
||||||
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.setAppName(CreateASTest.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
conf.set("hive.metastore.local", "true");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(CreateASTest.class.getSimpleName())
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
|
spark.stop();
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
void testNumberofRelations() throws Exception {
|
||||||
|
|
||||||
|
String inputPath = getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/webcrawl/")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
CreateActionSetFromWebEntries
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath",
|
||||||
|
inputPath,
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/actionSet1"
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Relation> tmp = sc
|
||||||
|
.sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class)
|
||||||
|
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||||
|
.map(aa -> ((Relation) aa.getPayload()));
|
||||||
|
|
||||||
|
Assertions.assertEquals(64, tmp.count());
|
||||||
|
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
void testRelations() throws Exception {
|
||||||
|
|
||||||
|
// , "doi":"https://doi.org/10.1126/science.1188021", "pmid":"https://pubmed.ncbi.nlm.nih.gov/20448178", https://www.ncbi.nlm.nih.gov/pmc/articles/5100745
|
||||||
|
|
||||||
|
String inputPath = getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/webcrawl/")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
CreateActionSetFromWebEntries
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath",
|
||||||
|
inputPath,
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/actionSet1"
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Relation> tmp = sc
|
||||||
|
.sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class)
|
||||||
|
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||||
|
.map(aa -> ((Relation) aa.getPayload()));
|
||||||
|
|
||||||
|
tmp.foreach(r -> System.out.println(new ObjectMapper().writeValueAsString(r)));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getSource()
|
||||||
|
.equals(
|
||||||
|
"50|doi_________::" + IdentifierFactory
|
||||||
|
.md5(
|
||||||
|
PidCleaner
|
||||||
|
.normalizePidValue(PidType.doi.toString(), "10.1098/rstl.1684.0023"))))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getTarget()
|
||||||
|
.equals(
|
||||||
|
"50|doi_________::" + IdentifierFactory
|
||||||
|
.md5(
|
||||||
|
PidCleaner
|
||||||
|
.normalizePidValue(PidType.doi.toString(), "10.1098/rstl.1684.0023"))))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getSource()
|
||||||
|
.equals(
|
||||||
|
"20|ror_________::" + IdentifierFactory
|
||||||
|
.md5(
|
||||||
|
PidCleaner
|
||||||
|
.normalizePidValue("ROR", "https://ror.org/03argrj65"))))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getTarget()
|
||||||
|
.equals(
|
||||||
|
"20|ror_________::" + IdentifierFactory
|
||||||
|
.md5(
|
||||||
|
PidCleaner
|
||||||
|
.normalizePidValue("ROR", "https://ror.org/03argrj65"))))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
5, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getSource()
|
||||||
|
.equals(
|
||||||
|
"20|ror_________::" + IdentifierFactory
|
||||||
|
.md5(
|
||||||
|
PidCleaner
|
||||||
|
.normalizePidValue("ROR", "https://ror.org/03265fv13"))))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
5, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getTarget()
|
||||||
|
.equals(
|
||||||
|
"20|ror_________::" + IdentifierFactory
|
||||||
|
.md5(
|
||||||
|
PidCleaner
|
||||||
|
.normalizePidValue("ROR", "https://ror.org/03265fv13"))))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getTarget()
|
||||||
|
.equals(
|
||||||
|
"20|ror_________::" + IdentifierFactory
|
||||||
|
.md5(
|
||||||
|
PidCleaner
|
||||||
|
.normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13")))
|
||||||
|
&& r.getSource().startsWith("50|doi"))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getTarget()
|
||||||
|
.equals(
|
||||||
|
"20|ror_________::" + IdentifierFactory
|
||||||
|
.md5(
|
||||||
|
PidCleaner
|
||||||
|
.normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13")))
|
||||||
|
&& r.getSource().startsWith("50|pmid"))
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, tmp
|
||||||
|
.filter(
|
||||||
|
r -> r
|
||||||
|
.getTarget()
|
||||||
|
.equals(
|
||||||
|
"20|ror_________::" + IdentifierFactory
|
||||||
|
.md5(
|
||||||
|
PidCleaner
|
||||||
|
.normalizePidValue(PidType.doi.toString(), "https://ror.org/03265fv13")))
|
||||||
|
&& r.getSource().startsWith("50|pmc"))
|
||||||
|
.count());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testRelationsCollectedFrom() throws Exception {
|
||||||
|
|
||||||
|
String inputPath = getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/webcrawl")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
CreateActionSetFromWebEntries
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath",
|
||||||
|
inputPath,
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/actionSet1"
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Relation> tmp = sc
|
||||||
|
.sequenceFile(workingDir.toString() + "/actionSet1", Text.class, Text.class)
|
||||||
|
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||||
|
.map(aa -> ((Relation) aa.getPayload()));
|
||||||
|
|
||||||
|
tmp.foreach(r -> {
|
||||||
|
assertEquals("Web Crawl", r.getCollectedfrom().get(0).getValue());
|
||||||
|
assertEquals("10|openaire____::fb98a192f6a055ba495ef414c330834b", r.getCollectedfrom().get(0).getKey());
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue