forked from D-Net/dnet-hadoop
[Transformative Agreement] added code to extract relations from the transformative agreement file for the IE products got from OpenAPC
This commit is contained in:
parent
01ce0b9c76
commit
b00771c7cc
|
@ -0,0 +1,148 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.transformativeagreement;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
import org.apache.commons.cli.ParseException;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
|
||||||
|
import eu.dnetlib.dhp.actionmanager.transformativeagreement.model.TransformativeAgreementModel;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.*;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class CreateActionSetSparkJob implements Serializable {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static final String IREL_PROJECT = "40|100018998___::1e5e62235d094afd01cd56e65112fc63";
|
||||||
|
private static final String TRANSFORMATIVE_AGREEMENT = "openapc::transformativeagreement";
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws IOException, ParseException {
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
Objects
|
||||||
|
.requireNonNull(
|
||||||
|
CreateActionSetSparkJob.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/transformativeagreement/as_parameters.json"))));
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String inputPath = parser.get("inputPath");
|
||||||
|
log.info("inputPath {}", inputPath);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath {}", outputPath);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> getRelations(spark, inputPath, outputPath));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void getRelations(SparkSession spark, String inputPath, String outputPath) {
|
||||||
|
spark
|
||||||
|
.read()
|
||||||
|
.textFile(inputPath)
|
||||||
|
.map(
|
||||||
|
(MapFunction<String, TransformativeAgreementModel>) value -> OBJECT_MAPPER
|
||||||
|
.readValue(value, TransformativeAgreementModel.class),
|
||||||
|
Encoders.bean(TransformativeAgreementModel.class))
|
||||||
|
.flatMap(
|
||||||
|
(FlatMapFunction<TransformativeAgreementModel, Relation>) value -> createRelation(
|
||||||
|
value)
|
||||||
|
.iterator(),
|
||||||
|
Encoders.bean(Relation.class))
|
||||||
|
.filter((FilterFunction<Relation>) Objects::nonNull)
|
||||||
|
.toJavaRDD()
|
||||||
|
.map(p -> new AtomicAction(p.getClass(), p))
|
||||||
|
.mapToPair(
|
||||||
|
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||||
|
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||||
|
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Relation> createRelation(TransformativeAgreementModel value) {
|
||||||
|
|
||||||
|
List<Relation> relationList = new ArrayList<>();
|
||||||
|
|
||||||
|
String paper;
|
||||||
|
|
||||||
|
paper = "50|doi_________::"
|
||||||
|
+ IdentifierFactory
|
||||||
|
.md5(PidCleaner.normalizePidValue(PidType.doi.toString(), value.getDoi()));
|
||||||
|
|
||||||
|
relationList
|
||||||
|
.add(
|
||||||
|
getRelation(
|
||||||
|
paper,
|
||||||
|
IREL_PROJECT, ModelConstants.IS_PRODUCED_BY));
|
||||||
|
|
||||||
|
relationList.add(getRelation(IREL_PROJECT, paper, ModelConstants.PRODUCES));
|
||||||
|
|
||||||
|
return relationList;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Relation getRelation(
|
||||||
|
String source,
|
||||||
|
String target,
|
||||||
|
String relClass) {
|
||||||
|
|
||||||
|
return OafMapperUtils
|
||||||
|
.getRelation(
|
||||||
|
source,
|
||||||
|
target,
|
||||||
|
ModelConstants.RESULT_PROJECT,
|
||||||
|
ModelConstants.OUTCOME,
|
||||||
|
relClass,
|
||||||
|
Arrays
|
||||||
|
.asList(
|
||||||
|
OafMapperUtils.keyValue(ModelConstants.OPEN_APC_ID, ModelConstants.OPEN_APC_NAME)),
|
||||||
|
OafMapperUtils
|
||||||
|
.dataInfo(
|
||||||
|
false, null, false, false,
|
||||||
|
OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
TRANSFORMATIVE_AGREEMENT, "Transformative Agreement",
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"0.9"),
|
||||||
|
null);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,90 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.transformativeagreement;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
|
||||||
|
import static eu.dnetlib.dhp.actionmanager.Constants.isSparkSessionManaged;
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
|
||||||
|
import eu.dnetlib.dhp.actionmanager.transformativeagreement.model.TransformativeAgreementModel;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
|
||||||
|
public class ReadTransformativeAgreement implements Serializable {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(ReadTransformativeAgreement.class);
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
ReadTransformativeAgreement.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/transformativeagreement/input_read_parameters.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
final String inputFile = parser.get("inputFile");
|
||||||
|
log.info("inputFile {}", inputFile);
|
||||||
|
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
SparkConf sconf = new SparkConf();
|
||||||
|
|
||||||
|
final String delimiter = Optional
|
||||||
|
.ofNullable(parser.get("delimiter"))
|
||||||
|
.orElse(DEFAULT_DELIMITER);
|
||||||
|
|
||||||
|
runWithSparkSession(
|
||||||
|
sconf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
doRead(
|
||||||
|
spark,
|
||||||
|
inputFile,
|
||||||
|
outputPath,
|
||||||
|
delimiter);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void doRead(SparkSession spark, String inputFile,
|
||||||
|
String outputPath,
|
||||||
|
String delimiter) {
|
||||||
|
|
||||||
|
Dataset<Row> data = spark
|
||||||
|
.read()
|
||||||
|
.format("csv")
|
||||||
|
.option("sep", delimiter)
|
||||||
|
.option("inferSchema", "true")
|
||||||
|
.option("header", "true")
|
||||||
|
.load(inputFile)
|
||||||
|
.repartition(100);
|
||||||
|
|
||||||
|
data.map((MapFunction<Row, TransformativeAgreementModel>) row -> {
|
||||||
|
TransformativeAgreementModel trm = new TransformativeAgreementModel();
|
||||||
|
|
||||||
|
trm.setInstitution(row.getString(2));
|
||||||
|
trm.setDoi(row.getString(7));
|
||||||
|
|
||||||
|
return trm;
|
||||||
|
}, Encoders.bean(TransformativeAgreementModel.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath + inputFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,42 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.transformativeagreement.model;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 18/12/23
|
||||||
|
*/
|
||||||
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
|
|
||||||
|
public class TransformativeAgreementModel implements Serializable {
|
||||||
|
private String institution;
|
||||||
|
private String doi;
|
||||||
|
private String agreement;
|
||||||
|
|
||||||
|
public String getInstitution() {
|
||||||
|
return institution;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setInstitution(String institution) {
|
||||||
|
this.institution = institution;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDoi() {
|
||||||
|
return doi;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDoi(String doi) {
|
||||||
|
this.doi = doi;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAgreement() {
|
||||||
|
return agreement;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAgreement(String agreement) {
|
||||||
|
this.agreement = agreement;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "ip",
|
||||||
|
"paramLongName": "inputPath",
|
||||||
|
"paramDescription": "the zipped opencitations file",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "op",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the working path",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "sdr",
|
||||||
|
"paramLongName": "shouldDuplicateRels",
|
||||||
|
"paramDescription": "activates/deactivates the construction of bidirectional relations Cites/IsCitedBy",
|
||||||
|
"paramRequired": false
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,30 @@
|
||||||
|
[
|
||||||
|
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "d",
|
||||||
|
"paramLongName": "delimiter",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "op",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "if",
|
||||||
|
"paramLongName": "inputFile",
|
||||||
|
"paramDescription": "the hdfs name node",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>spark2</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive_metastore_uris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
|
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2ExtraListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2SqlQueryExecutionListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorNumber</name>
|
||||||
|
<value>4</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2EventLogDir</name>
|
||||||
|
<value>/user/spark/spark2ApplicationHistory</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkDriverMemory</name>
|
||||||
|
<value>15G</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorMemory</name>
|
||||||
|
<value>6G</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorCores</name>
|
||||||
|
<value>1</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,2 @@
|
||||||
|
#!/bin/bash
|
||||||
|
curl -L $1 | hdfs dfs -put - $2
|
|
@ -0,0 +1,82 @@
|
||||||
|
<workflow-app name="Transfomative Agreement Integration" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
|
||||||
|
<global>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.queuename</name>
|
||||||
|
<value>${queueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||||
|
<value>${oozieLauncherQueueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>${oozieActionShareLibForSpark2}</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
</configuration>
|
||||||
|
</global>
|
||||||
|
|
||||||
|
<start to="resume_from"/>
|
||||||
|
|
||||||
|
<decision name="resume_from">
|
||||||
|
<switch>
|
||||||
|
<case to="download">${wf:conf('resumeFrom') eq 'DownloadDump'}</case>
|
||||||
|
<default to="create_actionset"/> <!-- first action to be done when downloadDump is to be performed -->
|
||||||
|
</switch>
|
||||||
|
</decision>
|
||||||
|
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
<action name="download">
|
||||||
|
<shell xmlns="uri:oozie:shell-action:0.2">
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>mapred.job.queue.name</name>
|
||||||
|
<value>${queueName}</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
||||||
|
<exec>download.sh</exec>
|
||||||
|
<argument>${inputFile}</argument>
|
||||||
|
<argument>${workingDir}/transformativeagreement/transformativeAgreement.json</argument>
|
||||||
|
<env-var>HADOOP_USER_NAME=${wf:user()}</env-var>
|
||||||
|
<file>download.sh</file>
|
||||||
|
<capture-output/>
|
||||||
|
</shell>
|
||||||
|
<ok to="create_actionset"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
<action name="create_actionset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Produces the AS for the Transformative Agreement</name>
|
||||||
|
<class>eu.dnetlib.dhp.actionmanager.transformativeagreement.CreateActionSetSparkJob</class>
|
||||||
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--inputPath</arg><arg>${workingDir}/transformativeagreement/</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
Loading…
Reference in New Issue