diff --git a/dhp-workflows/dhp-bulktag/pom.xml b/dhp-workflows/dhp-bulktag/pom.xml
new file mode 100644
index 000000000..5b6a21027
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/pom.xml
@@ -0,0 +1,15 @@
+
+
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.0.5-SNAPSHOT
+
+ 4.0.0
+
+ dhp-bulktag
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-propagation/pom.xml b/dhp-workflows/dhp-propagation/pom.xml
new file mode 100644
index 000000000..10fcd38bb
--- /dev/null
+++ b/dhp-workflows/dhp-propagation/pom.xml
@@ -0,0 +1,39 @@
+
+
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.0.5-SNAPSHOT
+
+ 4.0.0
+
+ dhp-propagation
+
+
+
+ org.apache.spark
+ spark-core_2.11
+
+
+ org.apache.spark
+ spark-sql_2.11
+
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ ${project.version}
+
+
+ eu.dnetlib.dhp
+ dhp-schemas
+ ${project.version}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java
new file mode 100644
index 000000000..abcec9dbb
--- /dev/null
+++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java
@@ -0,0 +1,308 @@
+package eu.dnetlib.dhp.countrypropagation;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.oaf.*;
+import eu.dnetlib.dhp.schema.oaf.Dataset;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.*;
+import scala.Tuple2;
+
+import java.util.*;
+import java.util.stream.Stream;
+
+public class SparkCountryPropagationJob {
+ public static void main(String[] args) throws Exception {
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCountryPropagationJob.class.getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json")));
+ parser.parseArgument(args);
+ final SparkSession spark = SparkSession
+ .builder()
+ .appName(SparkCountryPropagationJob.class.getSimpleName())
+ .master(parser.get("master"))
+ .enableHiveSupport()
+ .getOrCreate();
+
+ final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+ final String inputPath = parser.get("sourcePath");
+ final String outputPath = parser.get("outputPath");
+
+
+ List whitelist = new ArrayList<>();
+ List allowedtypes = new ArrayList<>();
+
+
+
+ // JavaPairRDD results = getResults(sc, inputPath);
+// sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)
+// .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class))
+// .map(oaf -> new TypedRow().setType("dataset").setDeleted(oaf.getDataInfo().getDeletedbyinference()).setOaf(oaf.toString()).setSourceId(oaf.getId()))
+// .mapToPair(toPair())
+// .union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)
+// .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class))
+// .map(oaf -> new TypedRow().setType("otherresearchproduct").setDeleted(oaf.getDataInfo().getDeletedbyinference()).setOaf(oaf.toString()).setSourceId(oaf.getId()))
+// .mapToPair(toPair()))
+// .union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class)
+// .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class))
+// .map(oaf -> new TypedRow().setType("software").setDeleted(oaf.getDataInfo().getDeletedbyinference()).setOaf(oaf.toString()).setSourceId(oaf.getId()))
+// .mapToPair(toPair()))
+// .union(sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
+// .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class))
+// .map(oaf -> new TypedRow().setType("publication").setDeleted(oaf.getDataInfo().getDeletedbyinference()).setOaf(oaf.toString()).setSourceId(oaf.getId()))
+// .mapToPair(toPair()));
+//
+//
+ JavaPairRDD organizations = sc.sequenceFile(inputPath + "/organization", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Organization.class))
+ .filter(org -> !org.getDataInfo().getDeletedbyinference())
+ .map(org -> new TypedRow().setSourceId(org.getId()).setCountry(org.getCountry().getClassid()))
+ .mapToPair(toPair());
+
+ JavaPairRDD organization_datasource = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class))
+ .filter(r -> !r.getDataInfo().getDeletedbyinference())
+ .filter(r -> "datasourceOrganization".equals(r.getRelClass()) && "isProvidedBy".equals(r.getRelType()))
+ .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget()))
+ .mapToPair(toPair());
+
+ JavaPairRDD datasources = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class))
+ .filter(ds -> whitelist.contains(ds.getId()) || allowedtypes.contains(ds.getDatasourcetype().getClassid()))
+ .map(ds -> new TypedRow().setSourceId(ds.getId()))
+ .mapToPair(toPair());
+
+
+ JavaRDD publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class));
+ JavaRDD datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class));
+ JavaRDD software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class));
+ JavaRDD other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class));
+
+ JavaPairRDD datasource_results = publications
+ .map(oaf -> getTypedRows(oaf))
+ .flatMapToPair(f -> {
+ ArrayList> ret = new ArrayList<>();
+ for (TypedRow t : f) {
+ ret.add(new Tuple2<>(t.getSourceId(), t));
+ }
+ return ret.iterator();
+ })
+ .union(datasets
+ .map(oaf -> getTypedRows(oaf))
+ .flatMapToPair(f -> {
+ ArrayList> ret = new ArrayList<>();
+ for (TypedRow t : f) {
+ ret.add(new Tuple2<>(t.getSourceId(), t));
+ }
+ return ret.iterator();
+ }))
+ .union(software
+ .map(oaf -> getTypedRows(oaf))
+ .flatMapToPair(f -> {
+ ArrayList> ret = new ArrayList<>();
+ for (TypedRow t : f) {
+ ret.add(new Tuple2<>(t.getSourceId(), t));
+ }
+ return ret.iterator();
+ }))
+ .union(other
+ .map(oaf -> getTypedRows(oaf))
+ .flatMapToPair(f -> {
+ ArrayList> ret = new ArrayList<>();
+ for (TypedRow t : f) {
+ ret.add(new Tuple2<>(t.getSourceId(), t));
+ }
+ return ret.iterator();
+ }));
+
+ JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p));
+ JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p));
+ JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p));
+ JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p));
+
+ JavaPairRDD datasource_country = organizations.join(organization_datasource)
+ .map(x -> x._2()._1().setSourceId(x._2()._2().getTargetId())) // (OrganizationId,(TypedRow for Organization, TypedRow for Relation)
+ .mapToPair(toPair()); //(DatasourceId, TypedRowforOrganziation)
+
+
+ JavaPairRDD alloweddatasources_country = datasources.join(datasource_country)
+ .mapToPair(ds -> new Tuple2<>(ds._1(), ds._2()._2()));
+
+
+ JavaPairRDD toupdateresult = alloweddatasources_country.join(datasource_results)
+ .map(u -> u._2()._2().setCountry(u._2()._1().getCountry()))
+ .mapToPair(toPair())
+ .reduceByKey((a, p) -> {
+ if (a == null) {
+ return p;
+ }
+ if (p == null) {
+ return a;
+ }
+ HashSet countries = new HashSet();
+ countries.addAll(Arrays.asList(a.getCountry().split(";")));
+ countries.addAll(Arrays.asList(p.getCountry().split(";")));
+ String country = new String();
+ for (String c : countries) {
+ country += c + ";";
+ }
+
+ return a.setCountry(country);
+ });
+
+ updateResult(pubs, toupdateresult, outputPath, "publication");
+ updateResult(dss, toupdateresult, outputPath, "dataset");
+ updateResult(sfw, toupdateresult, outputPath, "software");
+ updateResult(orp, toupdateresult, outputPath, "otherresearchproduct");
+ //we use leftOuterJoin because we want to rebuild the entire structure
+
+ }
+
+ private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) {
+ results.leftOuterJoin(toupdateresult)
+ .map(c -> {
+ OafEntity oaf = c._2()._1();
+ List qualifierList = null;
+ if (oaf.getClass() == Publication.class) {
+ qualifierList = ((Publication) oaf).getCountry();
+
+ }
+ if (oaf.getClass() == Dataset.class){
+ qualifierList = ((Dataset) oaf).getCountry();
+ }
+
+ if (oaf.getClass() == Software.class){
+ qualifierList = ((Software) oaf).getCountry();
+ }
+
+ if (oaf.getClass() == OtherResearchProduct.class){
+ qualifierList = ((OtherResearchProduct) oaf).getCountry();
+ }
+
+ if (c._2()._2().isPresent()) {
+ HashSet countries = new HashSet<>();
+ for (Qualifier country : qualifierList) {
+ countries.add(country.getClassid());
+ }
+ TypedRow t = c._2()._2().get();
+
+ for (String country : t.getCountry().split(";")) {
+ if (!countries.contains(country)) {
+ Qualifier q = new Qualifier();
+ q.setClassid(country);
+ qualifierList.add(q);
+ }
+
+ }
+ if (oaf.getClass() == Publication.class) {
+ ((Publication) oaf).setCountry(qualifierList);
+ return (Publication) oaf;
+
+ }
+ if (oaf.getClass() == Dataset.class){
+ ((Dataset) oaf).setCountry(qualifierList);
+ return (Dataset) oaf;
+ }
+
+ if (oaf.getClass() == Software.class){
+ ((Software) oaf).setCountry(qualifierList);
+ return (Software) oaf;
+ }
+
+ if (oaf.getClass() == OtherResearchProduct.class){
+ ((OtherResearchProduct) oaf).setCountry(qualifierList);
+ return (OtherResearchProduct) oaf;
+ }
+ }
+
+
+ return null;
+ })
+ .map(p -> new ObjectMapper().writeValueAsString(p))
+ .saveAsTextFile(outputPath+"/"+type);
+ }
+
+ private static List getTypedRows(OafEntity oaf) {
+ List lst = new ArrayList<>();
+ Set datasources_provenance = new HashSet<>();
+ List instanceList = null;
+ String type = "";
+ if (oaf.getClass() == Publication.class) {
+ instanceList = ((Publication) oaf).getInstance();
+ type = "publication";
+ }
+ if (oaf.getClass() == Dataset.class){
+ instanceList = ((Dataset)oaf).getInstance();
+ type = "dataset";
+ }
+
+ if (oaf.getClass() == Software.class){
+ instanceList = ((Software)oaf).getInstance();
+ type = "software";
+ }
+
+ if (oaf.getClass() == OtherResearchProduct.class){
+ instanceList = ((OtherResearchProduct)oaf).getInstance();
+ type = "otherresearchproduct";
+ }
+
+
+ for (Instance i : instanceList) {
+ datasources_provenance.add(i.getCollectedfrom().getKey());
+ datasources_provenance.add(i.getHostedby().getKey());
+ }
+ for (String dsId : datasources_provenance) {
+ lst.add(new TypedRow().setSourceId(dsId).setTargetId(oaf.getId()).setType(type));
+ }
+ return lst;
+ }
+
+
+ private static JavaPairRDD getResults(JavaSparkContext sc , String inputPath){
+
+ return
+ sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class))
+ .filter(ds -> !ds.getDataInfo().getDeletedbyinference())
+ .map(oaf -> new TypedRow().setType("dataset").setSourceId(oaf.getId()))
+ .mapToPair(toPair())
+ .union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class))
+ .filter(o -> !o.getDataInfo().getDeletedbyinference())
+ .map(oaf -> new TypedRow().setType("otherresearchproduct").setSourceId(oaf.getId()))
+ .mapToPair(toPair()))
+ .union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class))
+ .filter(s -> !s.getDataInfo().getDeletedbyinference())
+ .map(oaf -> new TypedRow().setType("software").setSourceId(oaf.getId()))
+ .mapToPair(toPair()))
+ .union(sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class))
+ .filter(p -> !p.getDataInfo().getDeletedbyinference())
+ .map(oaf -> new TypedRow().setType("publication").setSourceId(oaf.getId()))
+ .mapToPair(toPair()));
+
+
+ }
+
+
+
+ private static PairFunction toPair() {
+ return e -> new Tuple2<>( e.getSourceId(), e);
+
+ };
+
+
+}
+
diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/TypedRow.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/TypedRow.java
new file mode 100644
index 000000000..f006a320b
--- /dev/null
+++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/TypedRow.java
@@ -0,0 +1,70 @@
+package eu.dnetlib.dhp.countrypropagation;
+
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class TypedRow implements Serializable {
+ private String sourceId;
+ private String targetId;
+ private String type;
+ private String country;
+
+ public List getAccumulator() {
+ return accumulator;
+ }
+
+ public TypedRow setAccumulator(List accumulator) {
+ this.accumulator = accumulator;
+ return this;
+ }
+
+ private List accumulator;
+
+
+ public void add(String a){
+ if (accumulator == null){
+ accumulator = new ArrayList<>();
+ }
+ accumulator.add(a);
+ }
+
+ public Iterator getAccumulatorIterator(){
+ return accumulator.iterator();
+ }
+
+ public String getCountry() {
+ return country;
+ }
+
+ public TypedRow setCountry(String country) {
+ this.country = country;
+ return this;
+ }
+
+ public String getSourceId() {
+ return sourceId;
+ }
+ public TypedRow setSourceId(String sourceId) {
+ this.sourceId = sourceId;
+ return this;
+ }
+ public String getTargetId() {
+ return targetId;
+ }
+ public TypedRow setTargetId(String targetId) {
+ this.targetId = targetId;
+ return this;
+ }
+
+ public String getType() {
+ return type;
+ }
+ public TypedRow setType(String type) {
+ this.type = type;
+ return this;
+ }
+
+}
diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/countrypropagation-ovverride.properties b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/countrypropagation-ovverride.properties
new file mode 100644
index 000000000..7754151e8
--- /dev/null
+++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/countrypropagation-ovverride.properties
@@ -0,0 +1,3 @@
+sourcePath=/tmp/db_openaireplus_services.export_dhp.2020.02.03
+sparkDriverMemory=15G
+sparkExecutorMemory=15G
\ No newline at end of file
diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json
new file mode 100644
index 000000000..03f3dd8ff
--- /dev/null
+++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json
@@ -0,0 +1,5 @@
+[
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
+ {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path of the sequential file to write", "paramRequired": true}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/config-default.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/config-default.xml
new file mode 100644
index 000000000..2e0ed9aee
--- /dev/null
+++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/config-default.xml
@@ -0,0 +1,18 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml
new file mode 100644
index 000000000..1fef8b17c
--- /dev/null
+++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml
@@ -0,0 +1,55 @@
+
+
+
+ sourcePath
+ the source path
+
+
+ outputPath
+ the output path
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ yarn-cluster
+ cluster
+ CountryPropagation
+ eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob
+ dhp-graph-countrypropagation-${projectVersion}.jar
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
+ --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
+ --conf spark.sql.warehouse.dir="/user/hive/warehouse"
+ -mt yarn-cluster
+ --sourcePath${sourcePath}
+ --outputPath${outputPath}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml
index cf71190a4..2a9a72f30 100644
--- a/dhp-workflows/pom.xml
+++ b/dhp-workflows/pom.xml
@@ -18,6 +18,8 @@
dhp-distcp
dhp-graph-mapper
dhp-dedup
+ dhp-bulktag
+ dhp-propagation