1
0
Fork 0

Compare commits

...

2 Commits

Author SHA1 Message Date
Serafeim Chatzopoulos 842b309922 Add test class for ingesting DBLP data 2024-03-21 13:22:48 +02:00
Serafeim Chatzopoulos b6e4d58817 Add workflow for loading DBLP data 2024-03-19 18:11:36 +02:00
7 changed files with 1727 additions and 0 deletions

View File

@ -0,0 +1,113 @@
package eu.dnetlib.dhp.actionmanager.dblp;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2;
/**
* Creates action sets for DBLP data
*/
public class PrepareDblpActionSets implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareDblpActionSets.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String ID_PREFIX = "50|doi_________::";
public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference";
public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!";
public static final String BIP_INFERENCE_PROVENANCE = "bip:affiliation:crossref";
public static <I extends Result> void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareDblpActionSets.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/dblp/input_actionset_parameter.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String dblpInputPath = parser.get("dblpInputPath");
log.info("dblpInputPath: {}", dblpInputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Constants.removeOutputDir(spark, outputPath);
// TODO: add DBLP ID in ModelConstants
List<KeyValue> collectedFromDBLP = OafMapperUtils
.listKeyValues(ModelConstants.CROSSREF_ID, "DBLP");
JavaPairRDD<Text, Text> dblpData = prepareDblpData(
spark, dblpInputPath, collectedFromDBLP);
dblpData
.saveAsHadoopFile(
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
});
}
private static <I extends Result> JavaPairRDD<Text, Text> prepareDblpData(SparkSession spark,
String inputPath,
List<KeyValue> collectedFrom) {
log.info("Reading DBLP XML data");
//
// TODO: load DBLP data into a Dataset
Dataset<Row> df = spark
.read()
.schema("`DOI` STRING, `Matchings` ARRAY<STRUCT<`RORid`:STRING,`Confidence`:DOUBLE>>")
.json(inputPath);
return df.map((MapFunction<Row, Result>) bs -> {
Result result = new Result();
// TODO: map DBLP data to Result objects
return result;
}, Encoders.bean(Result.class))
.toJavaRDD()
.map(p -> new AtomicAction(Result.class, p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
}
}

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "dip",
"paramLongName": "dblpInputPath",
"paramDescription": "the path to get the input data from DBLP",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,35 @@
# --- You can override the following properties (if needed) coming from your ~/.dhp/application.properties ---
# dhp.hadoop.frontend.temp.dir=/home/ilias.kanellos
# dhp.hadoop.frontend.user.name=ilias.kanellos
# dhp.hadoop.frontend.host.name=iis-cdh5-test-gw.ocean.icm.edu.pl
# dhp.hadoop.frontend.port.ssh=22
# oozieServiceLoc=http://iis-cdh5-test-m3:11000/oozie
# jobTracker=yarnRM
# nameNode=hdfs://nameservice1
# oozie.execution.log.file.location = target/extract-and-run-on-remote-host.log
# maven.executable=mvn
# Some memory and driver settings for more demanding tasks
sparkDriverMemory=10G
sparkExecutorMemory=10G
sparkExecutorCores=4
sparkShufflePartitions=7680
# The above is given differently in an example I found online
oozie.action.sharelib.for.spark=spark2
oozieActionShareLibForSpark2=spark2
spark2YarnHistoryServerAddress=http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
spark2EventLogDir=/user/spark/spark2ApplicationHistory
sparkSqlWarehouseDir=/user/hive/warehouse
hiveMetastoreUris=thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
# This MAY avoid the no library used error
oozie.use.system.libpath=true
# Some stuff copied from openaire's jobs
spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener
spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener
# The following is needed as a property of a workflow
oozie.wf.application.path=${oozieTopWfApplicationPath}
dblpInputPath=/data/dblp/dblp.xml.gz
outputPath=/tmp/dblp-actionsets

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,107 @@
<workflow-app name="BipAffiliations" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>dblpInputPath</name>
<description>the path where to find the input data from DBLP</description>
</property>
<property>
<name>outputPath</name>
<description>the path where to store the actionset</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="deleteoutputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="deleteoutputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
<delete path="${workingDir}"/>
<mkdir path="${workingDir}"/>
</fs>
<ok to="atomicactions"/>
<error to="Kill"/>
</action>
<action name="atomicactions">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the atomic action sets with the DBLP data</name>
<class>eu.dnetlib.dhp.actionmanager.dblp.PrepareDblpActionSets</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--dblpInputPath</arg><arg>${dblpInputPath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,202 @@
package eu.dnetlib.dhp.actionmanager.dblp;
import static org.junit.jupiter.api.Assertions.*;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import javax.xml.crypto.Data;
import eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob;
import eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJobTest;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Result;
public class PrepareDblpActionSetsTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJobTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(SparkAtomicActionScoreJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(SparkAtomicActionScoreJobTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(SparkAtomicActionScoreJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
private void runJob(String dblpInputPath, String outputPath) throws Exception {
PrepareDblpActionSets
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-dblpInputPath", dblpInputPath,
"-outputPath", outputPath,
});
}
@Test
void testXmlParsing() throws Exception {
String dblpInputPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/dblp/dblp_dump_sample.xml")
.getPath();
String outputPath = workingDir.toString() + "/actionSet";
// execute the job to generate the action sets for result scores
runJob(dblpInputPath, outputPath);
// TODO: use the data written in `outputPath` to perform tests
// final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
//
// JavaRDD<OafEntity> tmp = sc
// .sequenceFile(outputPath, Text.class, Text.class)
// .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
// .map(aa -> ((OafEntity) aa.getPayload()));
//
// assertEquals(8, tmp.count());
//
// Dataset<OafEntity> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(OafEntity.class));
// verificationDataset.createOrReplaceTempView("result");
//
// Dataset<Row> testDataset = spark
// .sql(
// "Select p.id oaid, mes.id, mUnit.value from result p " +
// "lateral view explode(measures) m as mes " +
// "lateral view explode(mes.unit) u as mUnit ");
//
// Assertions.assertEquals(28, testDataset.count());
//
// assertResultImpactScores(testDataset);
// assertProjectImpactScores(testDataset);
}
void assertResultImpactScores(Dataset<Row> testDataset) {
Assertions
.assertEquals(
"6.63451994567e-09", testDataset
.filter(
"oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " +
"and id = 'influence'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"0.348694533145", testDataset
.filter(
"oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " +
"and id = 'popularity_alt'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"2.16094680115e-09", testDataset
.filter(
"oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " +
"and id = 'popularity'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
}
void assertProjectImpactScores(Dataset<Row> testDataset) throws Exception {
Assertions
.assertEquals(
"0", testDataset
.filter(
"oaid='40|nih_________::c02a8233e9b60f05bb418f0c9b714833' " +
"and id = 'numOfInfluentialResults'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"1", testDataset
.filter(
"oaid='40|nih_________::c02a8233e9b60f05bb418f0c9b714833' " +
"and id = 'numOfPopularResults'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"25", testDataset
.filter(
"oaid='40|nih_________::c02a8233e9b60f05bb418f0c9b714833' " +
"and id = 'totalImpulse'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
Assertions
.assertEquals(
"43", testDataset
.filter(
"oaid='40|nih_________::c02a8233e9b60f05bb418f0c9b714833' " +
"and id = 'totalCitationCount'")
.select("value")
.collectAsList()
.get(0)
.getString(0));
}
}