new classes for external dump. Only classes functional to dump products

This commit is contained in:
Miriam Baglioni 2020-06-09 15:37:46 +02:00
parent f232db84e9
commit 5121cbaf6a
31 changed files with 1094 additions and 73 deletions

View File

@ -1,5 +1,7 @@
package eu.dnetlib.dhp.schema.dump.oaf; package eu.dnetlib.dhp.schema.dump.oaf;
import com.fasterxml.jackson.annotation.JsonProperty;
public class AccessRight extends Qualifier{ public class AccessRight extends Qualifier{
private String schema; private String schema;
@ -11,4 +13,6 @@ public class AccessRight extends Qualifier{
public void setSchema(String schema) { public void setSchema(String schema) {
this.schema = schema; this.schema = schema;
} }
} }

View File

@ -0,0 +1,120 @@
package eu.dnetlib.dhp.schema.dump.oaf;
import java.io.Serializable;
import java.util.Objects;
public class Container implements Serializable {
private String name;
private String issnPrinted;
private String issnOnline;
private String issnLinking;
private String ep;
private String iss;
private String sp;
private String vol;
private String edition;
private String conferenceplace;
private String conferencedate;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getIssnPrinted() {
return issnPrinted;
}
public void setIssnPrinted(String issnPrinted) {
this.issnPrinted = issnPrinted;
}
public String getIssnOnline() {
return issnOnline;
}
public void setIssnOnline(String issnOnline) {
this.issnOnline = issnOnline;
}
public String getIssnLinking() {
return issnLinking;
}
public void setIssnLinking(String issnLinking) {
this.issnLinking = issnLinking;
}
public String getEp() {
return ep;
}
public void setEp(String ep) {
this.ep = ep;
}
public String getIss() {
return iss;
}
public void setIss(String iss) {
this.iss = iss;
}
public String getSp() {
return sp;
}
public void setSp(String sp) {
this.sp = sp;
}
public String getVol() {
return vol;
}
public void setVol(String vol) {
this.vol = vol;
}
public String getEdition() {
return edition;
}
public void setEdition(String edition) {
this.edition = edition;
}
public String getConferenceplace() {
return conferenceplace;
}
public void setConferenceplace(String conferenceplace) {
this.conferenceplace = conferenceplace;
}
public String getConferencedate() {
return conferencedate;
}
public void setConferencedate(String conferencedate) {
this.conferencedate = conferencedate;
}
}

View File

@ -2,14 +2,16 @@
package eu.dnetlib.dhp.schema.dump.oaf; package eu.dnetlib.dhp.schema.dump.oaf;
public class Context extends Qualifier { import java.util.List;
private String provenance;
public String getProvenance() { public class Context extends Qualifier {
private List<String> provenance;
public List<String> getProvenance() {
return provenance; return provenance;
} }
public void setProvenance(String provenance) { public void setProvenance(List<String> provenance) {
this.provenance = provenance; this.provenance = provenance;
} }
} }

View File

@ -1,5 +1,7 @@
package eu.dnetlib.dhp.schema.dump.oaf; package eu.dnetlib.dhp.schema.dump.oaf;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import java.io.Serializable; import java.io.Serializable;
public class ControlledField implements Serializable { public class ControlledField implements Serializable {
@ -21,4 +23,13 @@ public class ControlledField implements Serializable {
public void setValue(String value) { public void setValue(String value) {
this.value = value; this.value = value;
} }
public static ControlledField newInstance(StructuredProperty pid){
ControlledField cf = new ControlledField();
cf.scheme = pid.getQualifier().getClassid();
cf.value = pid.getValue();
return cf;
}
} }

View File

@ -17,7 +17,7 @@ public class Dataset extends Result implements Serializable {
private List<GeoLocation> geolocation; private List<GeoLocation> geolocation;
public Dataset() { public Dataset() {
setResulttype(ModelConstants.DATASET_DEFAULT_RESULTTYPE.getClassname()); setType(ModelConstants.DATASET_DEFAULT_RESULTTYPE.getClassname());
} }

View File

@ -1,6 +1,8 @@
package eu.dnetlib.dhp.schema.dump.oaf; package eu.dnetlib.dhp.schema.dump.oaf;
import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
import java.io.Serializable; import java.io.Serializable;
import java.util.Objects; import java.util.Objects;
//ExtraInfo //ExtraInfo
@ -56,22 +58,14 @@ public class ExternalReference implements Serializable {
this.value = value; this.value = value;
} }
@Override public static ExternalReference newInstance(ExtraInfo ei){
public boolean equals(Object o) { ExternalReference er = new ExternalReference();
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ExternalReference extraInfo = (ExternalReference) o;
return Objects.equals(name, extraInfo.name)
&& Objects.equals(typology, extraInfo.typology)
&& Objects.equals(provenance, extraInfo.provenance)
&& Objects.equals(trust, extraInfo.trust)
&& Objects.equals(value, extraInfo.value);
}
@Override er.name = ei.getName();
public int hashCode() { er.typology = ei.getTypology();
return Objects.hash(name, typology, provenance, trust, value); er.provenance = ei.getProvenance();
er.trust = ei.getTrust();
er.value = ei.getValue();
return er;
} }
} }

View File

@ -31,6 +31,12 @@ public class KeyValue implements Serializable {
} }
public static KeyValue newInstance(String key, String value){
KeyValue inst = new KeyValue();
inst.key = key;
inst.value = value;
return inst;
}
@JsonIgnore @JsonIgnore
public boolean isBlank() { public boolean isBlank() {

View File

@ -5,6 +5,7 @@ package eu.dnetlib.dhp.schema.dump.oaf;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Collectors;
public abstract class Oaf implements Serializable { public abstract class Oaf implements Serializable {
@ -32,4 +33,11 @@ public abstract class Oaf implements Serializable {
this.lastupdatetimestamp = lastupdatetimestamp; this.lastupdatetimestamp = lastupdatetimestamp;
} }
// public void setAllowedValues(eu.dnetlib.dhp.schema.oaf.Oaf o){
// collectedfrom = o.getCollectedfrom().stream().map(cf -> KeyValue.newInstance(cf)).collect(Collectors.toList());
//
// lastupdatetimestamp = o.getLastupdatetimestamp();
//
// }
} }

View File

@ -6,6 +6,7 @@ import java.io.Serializable;
import java.util.List; import java.util.List;
public abstract class OafEntity extends Oaf implements Serializable { public abstract class OafEntity extends Oaf implements Serializable {
private String id; private String id;
@ -16,9 +17,7 @@ public abstract class OafEntity extends Oaf implements Serializable {
private String dateofcollection; private String dateofcollection;
private List<Projects> projects;
private List<ExternalReference> externalReferences; //extraInfo
public String getId() { public String getId() {
return id; return id;
@ -52,11 +51,13 @@ public abstract class OafEntity extends Oaf implements Serializable {
this.dateofcollection = dateofcollection; this.dateofcollection = dateofcollection;
} }
public List<ExternalReference> getExternalReferences() { public List<Projects> getProjects() {
return externalReferences; return projects;
} }
public void setExternalReferences(List<ExternalReference> externalReferences) { public void setProjects(List<Projects> projects) {
this.externalReferences = externalReferences; this.projects = projects;
} }
} }

View File

@ -16,7 +16,7 @@ public class OtherResearchProduct extends Result implements Serializable {
private List<String> tool; private List<String> tool;
public OtherResearchProduct() { public OtherResearchProduct() {
setResulttype(ModelConstants.ORP_DEFAULT_RESULTTYPE.getClassname()); setType(ModelConstants.ORP_DEFAULT_RESULTTYPE.getClassname());
} }
public List<String> getContactperson() { public List<String> getContactperson() {

View File

@ -0,0 +1,67 @@
package eu.dnetlib.dhp.schema.dump.oaf;
import eu.dnetlib.dhp.schema.oaf.Project;
import java.util.List;
public class Projects {
private String id ;//OpenAIRE id
private String code;
private String acronym;
private String title;
private List<String> funding_tree;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getAcronym() {
return acronym;
}
public void setAcronym(String acronym) {
this.acronym = acronym;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public List<String> getFunding_tree() {
return funding_tree;
}
public void setFunding_tree(List<String> funding_tree) {
this.funding_tree = funding_tree;
}
public static Projects newInstance(String id, String code, String acronym, String title, List<String> funding_tree){
Projects projects = new Projects();
projects.setAcronym(acronym);
projects.setCode(code);
projects.setFunding_tree(funding_tree);
projects.setId(id);
projects.setTitle(title);
return projects;
}
}

View File

@ -0,0 +1,17 @@
package eu.dnetlib.dhp.schema.dump.oaf;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import java.io.Serializable;
public class Publication extends Result implements Serializable {
public Publication() {
setType(ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE.getClassname());
}
}

View File

@ -27,40 +27,10 @@ public class Qualifier implements Serializable {
this.label = label; this.label = label;
} }
public String toComparableString() { public static Qualifier newInstance(String code, String value){
return isBlank() Qualifier qualifier = new Qualifier();
? "" qualifier.setCode(code);
: String qualifier.setLabel(value);
.format( return qualifier;
"%s::%s::%s::%s",
code != null ? code : "",
label != null ? label : "");
}
@JsonIgnore
public boolean isBlank() {
return StringUtils.isBlank(code)
&& StringUtils.isBlank(label);
}
@Override
public int hashCode() {
return toComparableString().hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Qualifier other = (Qualifier) obj;
return toComparableString().equals(other.toComparableString());
} }
} }

View File

@ -24,7 +24,6 @@ public class Result extends OafEntity implements Serializable {
private String subtitle; private String subtitle;
private List<String> description; private List<String> description;
private String publicationdata; // dateofacceptance; private String publicationdata; // dateofacceptance;
@ -49,21 +48,32 @@ public class Result extends OafEntity implements Serializable {
private List<Instance> instance; private List<Instance> instance;
private Container container;//Journal
public List<Author> getAuthor() { public List<Author> getAuthor() {
return author; return author;
} }
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public Container getContainer() {
return container;
}
public void setContainer(Container container) {
this.container = container;
}
public void setAuthor(List<Author> author) { public void setAuthor(List<Author> author) {
this.author = author; this.author = author;
} }
public String getResulttype() {
return type;
}
public void setResulttype(String resulttype) {
this.type = resulttype;
}
public Qualifier getLanguage() { public Qualifier getLanguage() {
return language; return language;
@ -200,4 +210,6 @@ public class Result extends OafEntity implements Serializable {
public void setInstance(List<Instance> instance) { public void setInstance(List<Instance> instance) {
this.instance = instance; this.instance = instance;
} }
} }

View File

@ -16,7 +16,7 @@ public class Software extends Result implements Serializable {
private String programmingLanguage; private String programmingLanguage;
public Software() { public Software() {
setResulttype(ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE.getClassname()); setType(ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE.getClassname());
} }
public List<String> getDocumentationUrl() { public List<String> getDocumentationUrl() {
@ -42,4 +42,6 @@ public class Software extends Result implements Serializable {
public void setProgrammingLanguage(String programmingLanguage) { public void setProgrammingLanguage(String programmingLanguage) {
this.programmingLanguage = programmingLanguage; this.programmingLanguage = programmingLanguage;
} }
} }

View File

@ -0,0 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump;
public class CommunityMap {
}

View File

@ -0,0 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump;
public class Constants {
}

View File

@ -0,0 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump;
public class Mapper {
}

View File

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.oa.graph.dump;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import java.io.StringReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PrepareCommunityMap {
private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " +
" where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " +
" return " +
"<community> " +
"{$x//CONFIGURATION/context/@id}" +
"{$x//CONFIGURATION/context/@label}" +
"</community>";
public static Map<String,String> getCommunityMap(final String isLookupUrl)
throws ISLookUpException, DocumentException {
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
final List<String> res = isLookUp.quickSearchProfile(XQUERY);
final Map<String, String> communityMap = new HashMap<>();
res.stream().forEach(xml -> {
final Document doc;
try {
doc = new SAXReader().read(new StringReader(xml));
Element root = doc.getRootElement();
communityMap.put(root.attribute("id").getValue(), root.attribute("label").getValue());
} catch (DocumentException e) {
e.printStackTrace();
}
});
return communityMap;
}
}

View File

@ -0,0 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump;
public class ResultProject {
}

View File

@ -0,0 +1,122 @@
package eu.dnetlib.dhp.oa.graph.dump;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Context;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Result;
public class DumpCommunityProducts implements Serializable {
private static final Logger log = LoggerFactory.getLogger(DumpCommunityProducts.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
DumpCommunityProducts.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String dumpClassName = parser.get("dumpClassName");
log.info("dumpClassName: {}", dumpClassName);
final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl);
final String resultType = parser.get("resultType");
log.info("resultType: {}", resultType);
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
Class<? extends eu.dnetlib.dhp.schema.dump.oaf.Result> dumpClazz =
(Class<? extends eu.dnetlib.dhp.schema.dump.oaf.Result>) Class.forName(dumpClassName);
SparkConf conf = new SparkConf();
Map<String,String>
communityMap = QueryInformationSystem.getCommunityMap(isLookUpUrl);
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execDump(spark, inputPath, outputPath + "/" + resultType, communityMap, inputClazz, dumpClazz);
});
}
private static <I extends Result, O extends eu.dnetlib.dhp.schema.dump.oaf.Result > void execDump(
SparkSession spark,
String inputPath,
String outputPath,
Map<String,String> communityMap,
Class<I> inputClazz,
Class<O> dumpClazz) {
Set<String> communities = communityMap.keySet();
Dataset<I> tmp = Utils.readPath(spark, inputPath, inputClazz);
tmp.map(value -> {
Optional<List<Context>> inputContext = Optional.ofNullable(value.getContext());
if(!inputContext.isPresent()){
return null;
}
List<String> toDumpFor = inputContext.get().stream().map(c -> {
if (communities.contains(c.getId())) {
return c.getId();
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());
if(toDumpFor.size() == 0){
return null;
}
return Mapper.map(value, communityMap);
},Encoders.bean(dumpClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(outputPath);
}
}

View File

@ -0,0 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump;
public class SparkPrepareResultProject {
}

View File

@ -0,0 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump;
public class SparkSplitForCommunity {
}

View File

@ -0,0 +1,105 @@
package eu.dnetlib.dhp.oa.graph.dump;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.dump.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class UpdateProjectInfo implements Serializable {
private static final Logger log = LoggerFactory.getLogger(UpdateProjectInfo.class);
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
UpdateProjectInfo.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/project_input_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String resultType = parser.get("resultType");
log.info("resultType: {}", resultType);
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
extend(spark, inputPath, outputPath , resultType, inputClazz);
});
}
private static <R extends Result > void extend(
SparkSession spark,
String inputPath,
String outputPath,
String resultType,
Class<R> inputClazz) {
Dataset<R> result = Utils.readPath(spark, inputPath + "/" + resultType, inputClazz);
Dataset<Relation> relation = Utils.readPath(spark, inputPath + "/relation", Relation.class)
.filter("dataInfo.deletedbyinference = false and relClass = 'produces'");
Dataset<Project> project = Utils.readPath(spark,inputPath + "/project", Project.class);
relation.joinWith(project, relation.col("source").equalTo(project.col("id")))
result.joinWith(relation, result.col("id").equalTo(relation.col("target")), "left")
.groupByKey(
(MapFunction<Tuple2<R,Relation>, String>) p -> p._1().getId(),
Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<R, Relation>, R>)(c, it) -> {
Tuple2<R, Relation> first = it.next();
}, Encoders.bean(inputClazz));
.mapGroups((MapGroupsFunction<String, Project, Project>) (s, it) -> {
Project first = it.next();
it.forEachRemaining(p -> {
first.mergeFrom(p);
});
return first;
}
}
}

View File

@ -0,0 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump;
public class Utils {
}

View File

@ -0,0 +1,20 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
}
]

View File

@ -0,0 +1,26 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
</configuration>

View File

@ -0,0 +1,336 @@
<workflow-app name="import_graph_as_hive_DB" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>inputPath</name>
<description>the source path</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reset_DB"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="reset_DB">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>${hiveMetastoreUris}</value>
</property>
</configuration>
<jdbc-url>${hiveJdbcUrl}/${hiveDbName}</jdbc-url>
<script>lib/scripts/reset_db.sql</script>
<param>hiveDbName=${hiveDbName}</param>
</hive2>
<ok to="fork_import"/>
<error to="Kill"/>
</action>
<fork name="fork_import">
<path start="import_publication"/>
<path start="import_dataset"/>
<path start="import_orp"/>
<path start="import_software"/>
<path start="import_datasource"/>
<path start="import_organization"/>
<path start="import_project"/>
<path start="import_relation"/>
</fork>
<action name="import_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Import table publication</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_import"/>
<error to="Kill"/>
</action>
<action name="import_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Import table dataset</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_import"/>
<error to="Kill"/>
</action>
<action name="import_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Import table otherresearchproduct</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_import"/>
<error to="Kill"/>
</action>
<action name="import_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Import table software</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_import"/>
<error to="Kill"/>
</action>
<action name="import_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Import table datasource</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/datasource</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_import"/>
<error to="Kill"/>
</action>
<action name="import_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Import table organization</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/organization</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_import"/>
<error to="Kill"/>
</action>
<action name="import_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Import table project</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/project</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_import"/>
<error to="Kill"/>
</action>
<action name="import_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Import table project</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/relation</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
</spark>
<ok to="join_import"/>
<error to="Kill"/>
</action>
<join name="join_import" to="PostProcessing"/>
<action name="PostProcessing">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>${hiveMetastoreUris}</value>
</property>
</configuration>
<jdbc-url>${hiveJdbcUrl}/${hiveDbName}</jdbc-url>
<script>lib/scripts/postprocessing.sql</script>
<param>hiveDbName=${hiveDbName}</param>
</hive2>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,48 @@
[
{
"paramName":"is",
"paramLongName":"isLookUpUrl",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName":"dn",
"paramLongName":"dumpTableName",
"paramDescription": "the name of the corresondent dump element ",
"paramRequired": true
},
{
"paramName":"rt",
"paramLongName":"resultType",
"paramDescription": "the name of the corresondent dump element ",
"paramRequired": true
}
]

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramDescription": "the source path",
"paramRequired": true
},
{
"paramName": "g",
"paramLongName": "graphRawPath",
"paramDescription": "the path of the graph Raw in hdfs",
"paramRequired": true
}
]

View File

@ -0,0 +1,48 @@
[
{
"paramName":"is",
"paramLongName":"isLookUpUrl",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName":"dn",
"paramLongName":"dumpTableName",
"paramDescription": "the name of the corresondent dump element ",
"paramRequired": true
},
{
"paramName":"rt",
"paramLongName":"resultType",
"paramDescription": "the name of the corresondent dump element ",
"paramRequired": true
}
]