1
0
Fork 0

changes to the wf configuration

This commit is contained in:
Miriam Baglioni 2020-02-17 18:04:15 +01:00
parent 3a9d723655
commit bd0e504b42
6 changed files with 366 additions and 17 deletions

View File

@ -9,16 +9,12 @@ import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import scala.Tuple2; import scala.Tuple2;
import java.io.File; import java.io.File;
import java.util.*; import java.util.*;
import java.util.stream.Stream;
public class SparkCountryPropagationJob { public class SparkCountryPropagationJob {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -42,9 +38,8 @@ public class SparkCountryPropagationJob {
directory.mkdirs(); directory.mkdirs();
} }
//TODO: add as Job Parameters List<String> whitelist = Arrays.asList(parser.get("whitelist").split(";"));
List<String> whitelist = Arrays.asList("10|opendoar____::300891a62162b960cf02ce3827bb363c"); List<String> allowedtypes = Arrays.asList(parser.get("allowedtypes").split(";"));
List<String> allowedtypes = Arrays.asList("pubsrepository::institutional");
JavaPairRDD<String, TypedRow> organizations = sc.sequenceFile(inputPath + "/organization", Text.class, Text.class) JavaPairRDD<String, TypedRow> organizations = sc.sequenceFile(inputPath + "/organization", Text.class, Text.class)

View File

@ -1,4 +1,330 @@
package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; package eu.dnetlib.dhp.resulttoorganizationfrominstrepo;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.io.IOUtils;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.io.File;
public class SparkResultToOrganizationFromIstRepoJob { public class SparkResultToOrganizationFromIstRepoJob {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToOrganizationFromIstRepoJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_resulttoorganizationfrominstrepo_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.appName(SparkResultToOrganizationFromIstRepoJob.class.getSimpleName())
.master(parser.get("master"))
.enableHiveSupport()
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
final String outputPath = "/tmp/provision/propagation/resulttoorganizationfrominstitutionalrepositories";
File directory = new File(outputPath);
if (!directory.exists()) {
directory.mkdirs();
}
}
}
/*
package eu.dnetlib.dhp.graph;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.*;
import scala.Tuple2;
import java.io.File;
import java.util.*;
import java.util.stream.Stream;
public class SparkCountryPropagationJob {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCountryPropagationJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_countrypropagation_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.appName(SparkCountryPropagationJob.class.getSimpleName())
.master(parser.get("master"))
.enableHiveSupport()
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
final String outputPath = "/tmp/provision/propagation/countrytoresultfrominstitutionalrepositories";
File directory = new File(outputPath);
if(!directory.exists()){
directory.mkdirs();
}
//TODO: add as Job Parameters
List<String> whitelist = Arrays.asList("10|opendoar____::300891a62162b960cf02ce3827bb363c");
List<String> allowedtypes = Arrays.asList("pubsrepository::institutional");
JavaPairRDD<String, TypedRow> organizations = sc.sequenceFile(inputPath + "/organization", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Organization.class))
.filter(org -> !org.getDataInfo().getDeletedbyinference())
.map(org -> new TypedRow().setSourceId(org.getId()).setCountry(org.getCountry().getClassid()))
.mapToPair(toPair());
JavaPairRDD<String, TypedRow> organization_datasource = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class))
.filter(r -> !r.getDataInfo().getDeletedbyinference())
.filter(r -> "datasourceOrganization".equals(r.getRelClass()) && "isProvidedBy".equals(r.getRelType()))
.map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget()))
.mapToPair(toPair());
JavaPairRDD<String, TypedRow> datasources = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class))
.filter(ds -> whitelist.contains(ds.getId()) || allowedtypes.contains(ds.getDatasourcetype().getClassid()))
.map(ds -> new TypedRow().setSourceId(ds.getId()))
.mapToPair(toPair());
JavaRDD<Publication> publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class));
JavaRDD<Dataset> datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class));
JavaRDD<Software> software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class));
JavaRDD<OtherResearchProduct> other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class));
JavaPairRDD<String, TypedRow> datasource_results = publications
.map(oaf -> getTypedRows(oaf))
.flatMapToPair(f -> {
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
for (TypedRow t : f) {
ret.add(new Tuple2<>(t.getSourceId(), t));
}
return ret.iterator();
})
.union(datasets
.map(oaf -> getTypedRows(oaf))
.flatMapToPair(f -> {
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
for (TypedRow t : f) {
ret.add(new Tuple2<>(t.getSourceId(), t));
}
return ret.iterator();
}))
.union(software
.map(oaf -> getTypedRows(oaf))
.flatMapToPair(f -> {
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
for (TypedRow t : f) {
ret.add(new Tuple2<>(t.getSourceId(), t));
}
return ret.iterator();
}))
.union(other
.map(oaf -> getTypedRows(oaf))
.flatMapToPair(f -> {
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
for (TypedRow t : f) {
ret.add(new Tuple2<>(t.getSourceId(), t));
}
return ret.iterator();
}));
JavaPairRDD<String, OafEntity> pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, OafEntity> dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, OafEntity> sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, OafEntity> orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p));
JavaPairRDD<String, TypedRow> datasource_country = organizations.join(organization_datasource)
.map(x -> x._2()._1().setSourceId(x._2()._2().getTargetId())) // (OrganizationId,(TypedRow for Organization, TypedRow for Relation)
.mapToPair(toPair()); //(DatasourceId, TypedRowforOrganziation)
JavaPairRDD<String, TypedRow> alloweddatasources_country = datasources.join(datasource_country)
.mapToPair(ds -> new Tuple2<>(ds._1(), ds._2()._2()));
JavaPairRDD<String,TypedRow> toupdateresult = alloweddatasources_country.join(datasource_results)
.map(u -> u._2()._2().setCountry(u._2()._1().getCountry()))
.mapToPair(toPair())
.reduceByKey((a, p) -> {
if (a == null) {
return p;
}
if (p == null) {
return a;
}
HashSet<String> countries = new HashSet();
countries.addAll(Arrays.asList(a.getCountry().split(";")));
countries.addAll(Arrays.asList(p.getCountry().split(";")));
String country = new String();
for (String c : countries) {
country += c + ";";
}
return a.setCountry(country);
});
updateResult(pubs, toupdateresult, outputPath, "publication");
updateResult(dss, toupdateresult, outputPath, "dataset");
updateResult(sfw, toupdateresult, outputPath, "software");
updateResult(orp, toupdateresult, outputPath, "otherresearchproduct");
//we use leftOuterJoin because we want to rebuild the entire structure
}
private static void updateResult(JavaPairRDD<String, OafEntity> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type) {
results.leftOuterJoin(toupdateresult)
.map(c -> {
OafEntity oaf = c._2()._1();
List<Qualifier> qualifierList = null;
if (oaf.getClass() == Publication.class) {
qualifierList = ((Publication) oaf).getCountry();
}
if (oaf.getClass() == Dataset.class){
qualifierList = ((Dataset) oaf).getCountry();
}
if (oaf.getClass() == Software.class){
qualifierList = ((Software) oaf).getCountry();
}
if (oaf.getClass() == OtherResearchProduct.class){
qualifierList = ((OtherResearchProduct) oaf).getCountry();
}
if (c._2()._2().isPresent()) {
HashSet<String> countries = new HashSet<>();
for (Qualifier country : qualifierList) {
countries.add(country.getClassid());
}
TypedRow t = c._2()._2().get();
for (String country : t.getCountry().split(";")) {
if (!countries.contains(country)) {
Qualifier q = new Qualifier();
q.setClassid(country);
qualifierList.add(q);
}
}
if (oaf.getClass() == Publication.class) {
((Publication) oaf).setCountry(qualifierList);
return (Publication) oaf;
}
if (oaf.getClass() == Dataset.class){
((Dataset) oaf).setCountry(qualifierList);
return (Dataset) oaf;
}
if (oaf.getClass() == Software.class){
((Software) oaf).setCountry(qualifierList);
return (Software) oaf;
}
if (oaf.getClass() == OtherResearchProduct.class){
((OtherResearchProduct) oaf).setCountry(qualifierList);
return (OtherResearchProduct) oaf;
}
}
return null;
})
.map(p -> new ObjectMapper().writeValueAsString(p))
.saveAsTextFile(outputPath+"/"+type);
}
private static List<TypedRow> getTypedRows(OafEntity oaf) {
List<TypedRow> lst = new ArrayList<>();
Set<String> datasources_provenance = new HashSet<>();
List<Instance> instanceList = null;
String type = "";
if (oaf.getClass() == Publication.class) {
instanceList = ((Publication) oaf).getInstance();
type = "publication";
}
if (oaf.getClass() == Dataset.class){
instanceList = ((Dataset)oaf).getInstance();
type = "dataset";
}
if (oaf.getClass() == Software.class){
instanceList = ((Software)oaf).getInstance();
type = "software";
}
if (oaf.getClass() == OtherResearchProduct.class){
instanceList = ((OtherResearchProduct)oaf).getInstance();
type = "otherresearchproduct";
}
for (Instance i : instanceList) {
datasources_provenance.add(i.getCollectedfrom().getKey());
datasources_provenance.add(i.getHostedby().getKey());
}
for (String dsId : datasources_provenance) {
lst.add(new TypedRow().setSourceId(dsId).setTargetId(oaf.getId()).setType(type));
}
return lst;
}
private static JavaPairRDD<String, TypedRow> getResults(JavaSparkContext sc , String inputPath){
return
sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class))
.filter(ds -> !ds.getDataInfo().getDeletedbyinference())
.map(oaf -> new TypedRow().setType("dataset").setSourceId(oaf.getId()))
.mapToPair(toPair())
.union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class))
.filter(o -> !o.getDataInfo().getDeletedbyinference())
.map(oaf -> new TypedRow().setType("otherresearchproduct").setSourceId(oaf.getId()))
.mapToPair(toPair()))
.union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class))
.filter(s -> !s.getDataInfo().getDeletedbyinference())
.map(oaf -> new TypedRow().setType("software").setSourceId(oaf.getId()))
.mapToPair(toPair()))
.union(sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class))
.filter(p -> !p.getDataInfo().getDeletedbyinference())
.map(oaf -> new TypedRow().setType("publication").setSourceId(oaf.getId()))
.mapToPair(toPair()));
}
private static PairFunction<TypedRow, String, TypedRow> toPair() {
return e -> new Tuple2<>( e.getSourceId(), e);
};
} }
*/

View File

@ -1,4 +0,0 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName":"mt",
"paramLongName":"master",
"paramDescription": "should be local or yarn",
"paramRequired": true,
},
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true,
},
{
"paramName":"wl",
"paramLongName":"whitelist",
"paramDescription": "datasource id that will be considered even if not in the allowed typology list. Split by ;",
"paramRequired": true
},
{
"paramName":"at",
"paramLongName":"allowedtypes",
"paramDescription": "the types of the allowed datasources. Split by ;",
"paramRequired": true
}
]

View File

@ -1,12 +1,16 @@
<workflow-app name="import_infospace_graph" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="country_propagation" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
<name>sourcePath</name> <name>sourcePath</name>
<description>the source path</description> <description>the source path</description>
</property> </property>
<property> <property>
<name>outputPath</name> <name>whitelist</name>
<description>the output path</description> <description>the white list</description>
</property>
<property>
<name>allowedtypes</name>
<description>the allowed types</description>
</property> </property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
@ -22,7 +26,7 @@
</property> </property>
</parameters> </parameters>
<start to="MapGraphIntoDataFrame"/> <start to="CountryPropagation"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -36,15 +40,17 @@
<mode>cluster</mode> <mode>cluster</mode>
<name>CountryPropagation</name> <name>CountryPropagation</name>
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob</class> <class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob</class>
<jar>dhp-graph-countrypropagation-${projectVersion}.jar</jar> <jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} <spark-opts>--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores} --executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts> </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg> <arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--withelist</arg><arg>${whitelist}</arg>
<arg>--allowedtypes</arg><arg>${allowedtypes}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>