diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java new file mode 100644 index 0000000000..fe77344f5a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java @@ -0,0 +1,148 @@ + +package eu.dnetlib.dhp.actionmanager.transformativeagreement; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.io.Serializable; +import java.util.*; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; +import eu.dnetlib.dhp.actionmanager.transformativeagreement.model.TransformativeAgreementModel; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.*; +import scala.Tuple2; + +public class CreateActionSetSparkJob implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String IREL_PROJECT = "40|100018998___::1e5e62235d094afd01cd56e65112fc63"; + private static final String TRANSFORMATIVE_AGREEMENT = "openapc::transformativeagreement"; + + public static void main(final String[] args) throws IOException, ParseException { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + CreateActionSetSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/transformativeagreement/as_parameters.json")))); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> getRelations(spark, inputPath, outputPath)); + + } + + private static void getRelations(SparkSession spark, String inputPath, String outputPath) { + spark + .read() + .textFile(inputPath) + .map( + (MapFunction) value -> OBJECT_MAPPER + .readValue(value, TransformativeAgreementModel.class), + Encoders.bean(TransformativeAgreementModel.class)) + .flatMap( + (FlatMapFunction) value -> createRelation( + value) + .iterator(), + Encoders.bean(Relation.class)) + .filter((FilterFunction) Objects::nonNull) + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + } + + private static List createRelation(TransformativeAgreementModel value) { + + List relationList = new ArrayList<>(); + + String paper; + + paper = "50|doi_________::" + + IdentifierFactory + .md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getDoi())); + + relationList + .add( + getRelation( + paper, + IREL_PROJECT, ModelConstants.IS_PRODUCED_BY)); + + relationList.add(getRelation(IREL_PROJECT, paper, ModelConstants.PRODUCES)); + + return relationList; + } + + public static Relation getRelation( + String source, + String target, + String relClass) { + + return OafMapperUtils + .getRelation( + source, + target, + ModelConstants.RESULT_PROJECT, + ModelConstants.OUTCOME, + relClass, + Arrays + .asList( + OafMapperUtils.keyValue(ModelConstants.OPEN_APC_ID, ModelConstants.OPEN_APC_NAME)), + OafMapperUtils + .dataInfo( + false, null, false, false, + OafMapperUtils + .qualifier( + TRANSFORMATIVE_AGREEMENT, "Transformative Agreement", + ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.9"), + null); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/ReadTransformativeAgreement.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/ReadTransformativeAgreement.java new file mode 100644 index 0000000000..ec921c89d8 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/ReadTransformativeAgreement.java @@ -0,0 +1,90 @@ + +package eu.dnetlib.dhp.actionmanager.transformativeagreement; + +import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER; +import static eu.dnetlib.dhp.actionmanager.Constants.isSparkSessionManaged; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; +import eu.dnetlib.dhp.actionmanager.transformativeagreement.model.TransformativeAgreementModel; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class ReadTransformativeAgreement implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(ReadTransformativeAgreement.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + ReadTransformativeAgreement.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/transformativeagreement/input_read_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String inputFile = parser.get("inputFile"); + log.info("inputFile {}", inputFile); + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + SparkConf sconf = new SparkConf(); + + final String delimiter = Optional + .ofNullable(parser.get("delimiter")) + .orElse(DEFAULT_DELIMITER); + + runWithSparkSession( + sconf, + isSparkSessionManaged, + spark -> { + doRead( + spark, + inputFile, + outputPath, + delimiter); + }); + } + + private static void doRead(SparkSession spark, String inputFile, + String outputPath, + String delimiter) { + + Dataset data = spark + .read() + .format("csv") + .option("sep", delimiter) + .option("inferSchema", "true") + .option("header", "true") + .load(inputFile) + .repartition(100); + + data.map((MapFunction) row -> { + TransformativeAgreementModel trm = new TransformativeAgreementModel(); + + trm.setInstitution(row.getString(2)); + trm.setDoi(row.getString(7)); + + return trm; + }, Encoders.bean(TransformativeAgreementModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + inputFile); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/model/TransformativeAgreementModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/model/TransformativeAgreementModel.java new file mode 100644 index 0000000000..b427a09cc0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/model/TransformativeAgreementModel.java @@ -0,0 +1,42 @@ + +package eu.dnetlib.dhp.actionmanager.transformativeagreement.model; + +import java.io.Serializable; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +/** + * @author miriam.baglioni + * @Date 18/12/23 + */ +@JsonIgnoreProperties(ignoreUnknown = true) + +public class TransformativeAgreementModel implements Serializable { + private String institution; + private String doi; + private String agreement; + + public String getInstitution() { + return institution; + } + + public void setInstitution(String institution) { + this.institution = institution; + } + + public String getDoi() { + return doi; + } + + public void setDoi(String doi) { + this.doi = doi; + } + + public String getAgreement() { + return agreement; + } + + public void setAgreement(String agreement) { + this.agreement = agreement; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/as_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/as_parameters.json new file mode 100644 index 0000000000..5244a6fe4d --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/as_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "ip", + "paramLongName": "inputPath", + "paramDescription": "the zipped opencitations file", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "the working path", + "paramRequired": true + }, + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, + { + "paramName": "sdr", + "paramLongName": "shouldDuplicateRels", + "paramDescription": "activates/deactivates the construction of bidirectional relations Cites/IsCitedBy", + "paramRequired": false + } +] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/input_read_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/input_read_parameters.json new file mode 100644 index 0000000000..5986abd550 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/input_read_parameters.json @@ -0,0 +1,30 @@ +[ + + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, + { + "paramName": "d", + "paramLongName": "delimiter", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "the hdfs name node", + "paramRequired": true + }, + { + "paramName": "if", + "paramLongName": "inputFile", + "paramDescription": "the hdfs name node", + "paramRequired": true + } +] + + + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/oozie_app/config-default.xml new file mode 100644 index 0000000000..a1755f329b --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/oozie_app/config-default.xml @@ -0,0 +1,58 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + + + oozie.launcher.mapreduce.user.classpath.first + true + + + sparkExecutorNumber + 4 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + sparkDriverMemory + 15G + + + sparkExecutorMemory + 6G + + + sparkExecutorCores + 1 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/oozie_app/download.sh b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/oozie_app/download.sh new file mode 100644 index 0000000000..66b1a1bf68 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/oozie_app/download.sh @@ -0,0 +1,2 @@ +#!/bin/bash +curl -L $1 | hdfs dfs -put - $2 \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/oozie_app/workflow.xml new file mode 100644 index 0000000000..0c5b1c119b --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/transformativeagreement/oozie_app/workflow.xml @@ -0,0 +1,82 @@ + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + + + ${wf:conf('resumeFrom') eq 'DownloadDump'} + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + + + mapred.job.queue.name + ${queueName} + + + download.sh + ${inputFile} + ${workingDir}/transformativeagreement/transformativeAgreement.json + HADOOP_USER_NAME=${wf:user()} + download.sh + + + + + + + + + + yarn + cluster + Produces the AS for the Transformative Agreement + eu.dnetlib.dhp.actionmanager.transformativeagreement.CreateActionSetSparkJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --inputPath${workingDir}/transformativeagreement/ + --outputPath${outputPath} + + + + + + \ No newline at end of file