Add workflow for loading DBLP data

This commit is contained in:
Serafeim Chatzopoulos 2024-03-19 18:11:36 +02:00
parent cb29b9773c
commit b6e4d58817
5 changed files with 303 additions and 0 deletions

View File

@ -0,0 +1,111 @@
package eu.dnetlib.dhp.actionmanager.dblp;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2;
/**
* Creates action sets for DBLP data
*/
public class PrepareDblpActionSets implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String ID_PREFIX = "50|doi_________::";
public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference";
public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!";
public static final String BIP_INFERENCE_PROVENANCE = "bip:affiliation:crossref";
public static <I extends Result> void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareAffiliationRelations.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/dblp/input_actionset_parameter.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String dblpInputPath = parser.get("dblpInputPath");
log.info("dblpInputPath: {}", dblpInputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Constants.removeOutputDir(spark, outputPath);
// TODO: add DBLP ID in ModelConstants
List<KeyValue> collectedFromDBLP = OafMapperUtils
.listKeyValues(ModelConstants.CROSSREF_ID, "DBLP");
JavaPairRDD<Text, Text> dblpData = prepareDblpData(
spark, dblpInputPath, collectedFromDBLP);
dblpData
.saveAsHadoopFile(
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
});
}
private static <I extends Result> JavaPairRDD<Text, Text> prepareDblpData(SparkSession spark,
String inputPath,
List<KeyValue> collectedFrom) {
// TODO: load DBLP data into a Dataset
Dataset<Row> df = spark
.read()
.schema("`DOI` STRING, `Matchings` ARRAY<STRUCT<`RORid`:STRING,`Confidence`:DOUBLE>>")
.json(inputPath);
return df.map((MapFunction<Row, Result>) bs -> {
Result result = new Result();
// TODO: map DBLP data to Result objects
return result;
}, Encoders.bean(Result.class))
.toJavaRDD()
.map(p -> new AtomicAction(Result.class, p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
}
}

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "dip",
"paramLongName": "dblpInputPath",
"paramDescription": "the path to get the input data from DBLP",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,35 @@
# --- You can override the following properties (if needed) coming from your ~/.dhp/application.properties ---
# dhp.hadoop.frontend.temp.dir=/home/ilias.kanellos
# dhp.hadoop.frontend.user.name=ilias.kanellos
# dhp.hadoop.frontend.host.name=iis-cdh5-test-gw.ocean.icm.edu.pl
# dhp.hadoop.frontend.port.ssh=22
# oozieServiceLoc=http://iis-cdh5-test-m3:11000/oozie
# jobTracker=yarnRM
# nameNode=hdfs://nameservice1
# oozie.execution.log.file.location = target/extract-and-run-on-remote-host.log
# maven.executable=mvn
# Some memory and driver settings for more demanding tasks
sparkDriverMemory=10G
sparkExecutorMemory=10G
sparkExecutorCores=4
sparkShufflePartitions=7680
# The above is given differently in an example I found online
oozie.action.sharelib.for.spark=spark2
oozieActionShareLibForSpark2=spark2
spark2YarnHistoryServerAddress=http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
spark2EventLogDir=/user/spark/spark2ApplicationHistory
sparkSqlWarehouseDir=/user/hive/warehouse
hiveMetastoreUris=thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
# This MAY avoid the no library used error
oozie.use.system.libpath=true
# Some stuff copied from openaire's jobs
spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener
spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener
# The following is needed as a property of a workflow
oozie.wf.application.path=${oozieTopWfApplicationPath}
dblpInputPath=/data/dblp/dblp.xml.gz
outputPath=/tmp/dblp-actionsets

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,107 @@
<workflow-app name="BipAffiliations" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>dblpInputPath</name>
<description>the path where to find the input data from DBLP</description>
</property>
<property>
<name>outputPath</name>
<description>the path where to store the actionset</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="deleteoutputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="deleteoutputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
<delete path="${workingDir}"/>
<mkdir path="${workingDir}"/>
</fs>
<ok to="atomicactions"/>
<error to="Kill"/>
</action>
<action name="atomicactions">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the atomic action sets with the DBLP data</name>
<class>eu.dnetlib.dhp.actionmanager.dblp.PrepareDblpActionSets</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--dblpInputPath</arg><arg>${dblpInputPath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>