diff --git a/dhp-build/dhp-shaded-libs/pom.xml b/dhp-build/dhp-shaded-libs/pom.xml
new file mode 100644
index 0000000000..c091c46f0e
--- /dev/null
+++ b/dhp-build/dhp-shaded-libs/pom.xml
@@ -0,0 +1,52 @@
+
+
+
+ dhp-build
+ eu.dnetlib.dhp
+ 1.1.7-SNAPSHOT
+
+ 4.0.0
+
+ dhp-shaded-libs
+
+
+ com.google.guava
+ guava
+ 23.3-jre
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.3
+
+
+ package
+
+ shade
+
+
+
+
+ com.google.guava
+ shaded.com.google.guava
+
+
+
+
+ *:*
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/SparkBulkTagJob.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/SparkBulkTagJob.java
index 13540cefad..140e28f3db 100644
--- a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/SparkBulkTagJob.java
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/SparkBulkTagJob.java
@@ -1,7 +1,5 @@
package eu.dnetlib.dhp;
-import java.io.File;
-
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
@@ -10,32 +8,36 @@ import eu.dnetlib.dhp.community.ProtoMap;
import eu.dnetlib.dhp.community.QueryInformationSystem;
import eu.dnetlib.dhp.community.ResultTagger;
import eu.dnetlib.dhp.schema.oaf.*;
+import java.io.File;
import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
-
-
public class SparkBulkTagJob {
public static void main(String[] args) throws Exception {
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkBulkTagJob.class.getResourceAsStream("/eu/dnetlib/dhp/input_bulktag_parameters.json")));
+ final ArgumentApplicationParser parser =
+ new ArgumentApplicationParser(
+ IOUtils.toString(
+ SparkBulkTagJob.class.getResourceAsStream(
+ "/eu/dnetlib/dhp/input_bulktag_parameters.json")));
parser.parseArgument(args);
- final SparkSession spark = SparkSession
- .builder()
- .appName(SparkBulkTagJob.class.getSimpleName())
- .master(parser.get("master"))
- .enableHiveSupport()
- .getOrCreate();
+ final SparkSession spark =
+ SparkSession.builder()
+ .appName(SparkBulkTagJob.class.getSimpleName())
+ .master(parser.get("master"))
+ .enableHiveSupport()
+ .getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
final String outputPath = "/tmp/provision/bulktagging";
final ResultTagger resultTagger = new ResultTagger();
- ProtoMap protoMappingParams = new Gson().fromJson(parser.get("mappingProto"),ProtoMap.class);;
+ ProtoMap protoMappingParams =
+ new Gson().fromJson(parser.get("mappingProto"), ProtoMap.class);
+ ;
File directory = new File(outputPath);
@@ -43,31 +45,28 @@ public class SparkBulkTagJob {
directory.mkdirs();
}
- CommunityConfiguration cc = QueryInformationSystem.getCommunityConfiguration(parser.get("isLookupUrl"));
+ CommunityConfiguration cc =
+ QueryInformationSystem.getCommunityConfiguration(parser.get("isLookupUrl"));
-
- sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
- .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class))
+ sc.textFile(inputPath + "/publication")
+ .map(item -> new ObjectMapper().readValue(item, Publication.class))
.map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
.map(p -> new ObjectMapper().writeValueAsString(p))
- .saveAsTextFile(outputPath+"/publication");
- sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)
- .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class))
+ .saveAsTextFile(outputPath + "/publication");
+ sc.textFile(inputPath + "/dataset")
+ .map(item -> new ObjectMapper().readValue(item, Dataset.class))
.map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
.map(p -> new ObjectMapper().writeValueAsString(p))
- .saveAsTextFile(outputPath+"/dataset");
- sc.sequenceFile(inputPath + "/software", Text.class, Text.class)
- .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class))
+ .saveAsTextFile(outputPath + "/dataset");
+ sc.textFile(inputPath + "/software")
+ .map(item -> new ObjectMapper().readValue(item, Software.class))
.map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
.map(p -> new ObjectMapper().writeValueAsString(p))
- .saveAsTextFile(outputPath+"/software");
- sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)
- .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class))
+ .saveAsTextFile(outputPath + "/software");
+ sc.textFile(inputPath + "/otherresearchproduct")
+ .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class))
.map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
.map(p -> new ObjectMapper().writeValueAsString(p))
- .saveAsTextFile(outputPath+"/otherresearchproduct");
-
-
-
+ .saveAsTextFile(outputPath + "/otherresearchproduct");
}
}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/SparkBulkTagJob2.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/SparkBulkTagJob2.java
new file mode 100644
index 0000000000..04897dd4ac
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/SparkBulkTagJob2.java
@@ -0,0 +1,161 @@
+package eu.dnetlib.dhp;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.community.*;
+import eu.dnetlib.dhp.schema.oaf.*;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SparkBulkTagJob2 {
+
+ private static final Logger log = LoggerFactory.getLogger(SparkBulkTagJob2.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(String[] args) throws Exception {
+ String jsonConfiguration =
+ IOUtils.toString(
+ SparkBulkTagJob2.class.getResourceAsStream(
+ "/eu/dnetlib/dhp/input_bulktag_parameters.json"));
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged =
+ Optional.ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ Boolean isTest =
+ Optional.ofNullable(parser.get("isTest"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.FALSE);
+ log.info("isTest: {} ", isTest);
+
+ final String inputPath = parser.get("sourcePath");
+ log.info("inputPath: {}", inputPath);
+
+ final String outputPath = parser.get("outputPath");
+ log.info("outputPath: {}", outputPath);
+
+ ProtoMap protoMappingParams = new Gson().fromJson(parser.get("protoMap"), ProtoMap.class);
+ ;
+ log.info("protoMap: {}", new Gson().toJson(protoMappingParams));
+
+ final String resultClassName = parser.get("resultTableName");
+ log.info("resultTableName: {}", resultClassName);
+
+ final Boolean saveGraph =
+ Optional.ofNullable(parser.get("saveGraph"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("saveGraph: {}", saveGraph);
+
+ Class extends Result> resultClazz =
+ (Class extends Result>) Class.forName(resultClassName);
+
+ SparkConf conf = new SparkConf();
+ CommunityConfiguration cc;
+
+ String taggingConf = parser.get("taggingConf");
+
+ if (isTest) {
+ cc = CommunityConfigurationFactory.fromJson(taggingConf);
+ } else {
+ cc = QueryInformationSystem.getCommunityConfiguration(parser.get("isLookupUrl"));
+ }
+
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc);
+ });
+
+ // runWithSparkSession(conf, isSparkSessionManaged,
+ // spark -> {
+ // if(isTest(parser)) {
+ // removeOutputDir(spark, outputPath);
+ // }
+ // if(saveGraph)
+ // execPropagation(spark, possibleUpdates, inputPath, outputPath,
+ // resultClazz);
+ // });
+ //
+ //
+ //
+ //
+ //
+ //
+ // sc.textFile(inputPath + "/publication")
+ // .map(item -> new ObjectMapper().readValue(item, Publication.class))
+ // .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
+ // .map(p -> new ObjectMapper().writeValueAsString(p))
+ // .saveAsTextFile(outputPath+"/publication");
+ // sc.textFile(inputPath + "/dataset")
+ // .map(item -> new ObjectMapper().readValue(item, Dataset.class))
+ // .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
+ // .map(p -> new ObjectMapper().writeValueAsString(p))
+ // .saveAsTextFile(outputPath+"/dataset");
+ // sc.textFile(inputPath + "/software")
+ // .map(item -> new ObjectMapper().readValue(item, Software.class))
+ // .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
+ // .map(p -> new ObjectMapper().writeValueAsString(p))
+ // .saveAsTextFile(outputPath+"/software");
+ // sc.textFile(inputPath + "/otherresearchproduct")
+ // .map(item -> new ObjectMapper().readValue(item,
+ // OtherResearchProduct.class))
+ // .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
+ // .map(p -> new ObjectMapper().writeValueAsString(p))
+ // .saveAsTextFile(outputPath+"/otherresearchproduct");
+ //
+
+ }
+
+ private static void execBulkTag(
+ SparkSession spark,
+ String inputPath,
+ String outputPath,
+ ProtoMap protoMappingParams,
+ Class resultClazz,
+ CommunityConfiguration communityConfiguration) {
+
+ ResultTagger resultTagger = new ResultTagger();
+ Dataset result = readPathEntity(spark, inputPath, resultClazz);
+ result.map(
+ value ->
+ resultTagger.enrichContextCriteria(
+ value, communityConfiguration, protoMappingParams),
+ Encoders.bean(resultClazz))
+ .toJSON()
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .text(outputPath);
+ }
+
+ private static org.apache.spark.sql.Dataset readPathEntity(
+ SparkSession spark, String inputEntityPath, Class resultClazz) {
+
+ return spark.read()
+ .textFile(inputEntityPath)
+ .map(
+ (MapFunction)
+ value -> OBJECT_MAPPER.readValue(value, resultClazz),
+ Encoders.bean(resultClazz));
+ }
+}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/Community.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/Community.java
index 47e4b7a124..b0c213c127 100644
--- a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/Community.java
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/Community.java
@@ -1,16 +1,14 @@
package eu.dnetlib.dhp.community;
import com.google.gson.Gson;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Created by miriam on 01/08/2018.
- */
-public class Community {
+/** Created by miriam on 01/08/2018. */
+public class Community implements Serializable {
private static final Log log = LogFactory.getLog(Community.class);
@@ -19,14 +17,15 @@ public class Community {
private List datasources = new ArrayList<>();
private List zenodoCommunities = new ArrayList<>();
-
public String toJson() {
final Gson g = new Gson();
return g.toJson(this);
}
public boolean isValid() {
- return !getSubjects().isEmpty() || !getDatasources().isEmpty() || !getZenodoCommunities().isEmpty();
+ return !getSubjects().isEmpty()
+ || !getDatasources().isEmpty()
+ || !getZenodoCommunities().isEmpty();
}
public String getId() {
@@ -60,5 +59,4 @@ public class Community {
public void setZenodoCommunities(List zenodoCommunities) {
this.zenodoCommunities = zenodoCommunities;
}
-
}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/CommunityConfiguration.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/CommunityConfiguration.java
index aa1a9bb8cf..1fd5bedd41 100644
--- a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/CommunityConfiguration.java
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/CommunityConfiguration.java
@@ -3,38 +3,58 @@ package eu.dnetlib.dhp.community;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
-
import com.google.gson.GsonBuilder;
-
import eu.dnetlib.dhp.selectioncriteria.InterfaceAdapter;
import eu.dnetlib.dhp.selectioncriteria.Selection;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
-/**
- * Created by miriam on 02/08/2018.
- */
-public class CommunityConfiguration {
+/** Created by miriam on 02/08/2018. */
+public class CommunityConfiguration implements Serializable {
private static final Log log = LogFactory.getLog(CommunityConfiguration.class);
+ private Map communities;
- private Map communities;
+ // map subject -> communityid
+ private Map>> subjectMap = new HashMap<>();
+ // map datasourceid -> communityid
+ private Map>> datasourceMap = new HashMap<>();
+ // map zenodocommunityid -> communityid
+ private Map>> zenodocommunityMap =
+ new HashMap<>();
+ public Map>> getSubjectMap() {
+ return subjectMap;
+ }
- //map subject -> communityid
- private transient Map>> subjectMap = new HashMap<>();
- //map datasourceid -> communityid
- private transient Map>> datasourceMap = new HashMap<>();
- //map zenodocommunityid -> communityid
- private transient Map>> zenodocommunityMap = new HashMap<>();
+ public void setSubjectMap(Map>> subjectMap) {
+ this.subjectMap = subjectMap;
+ }
+
+ public Map>> getDatasourceMap() {
+ return datasourceMap;
+ }
+
+ public void setDatasourceMap(
+ Map>> datasourceMap) {
+ this.datasourceMap = datasourceMap;
+ }
+
+ public Map>> getZenodocommunityMap() {
+ return zenodocommunityMap;
+ }
+
+ public void setZenodocommunityMap(
+ Map>> zenodocommunityMap) {
+ this.zenodocommunityMap = zenodocommunityMap;
+ }
CommunityConfiguration(final Map communities) {
this.communities = communities;
@@ -53,65 +73,67 @@ public class CommunityConfiguration {
zenodocommunityMap = Maps.newHashMap();
}
-
- for(Community c : getCommunities().values()) {
- //get subjects
+ for (Community c : getCommunities().values()) {
+ // get subjects
final String id = c.getId();
- for(String sbj : c.getSubjects()){
- Pair p = new Pair<>(id,new SelectionConstraints());
- add(sbj.toLowerCase().trim() , p, subjectMap);
+ for (String sbj : c.getSubjects()) {
+ Pair p = new Pair<>(id, new SelectionConstraints());
+ add(sbj.toLowerCase().trim(), p, subjectMap);
}
- //get datasources
- for(Datasource d: c.getDatasources()){
+ // get datasources
+ for (Datasource d : c.getDatasources()) {
- add(d.getOpenaireId(),new Pair<>(id,d.getSelectionConstraints()),datasourceMap);
+ add(d.getOpenaireId(), new Pair<>(id, d.getSelectionConstraints()), datasourceMap);
}
- //get zenodo communities
- for(ZenodoCommunity zc : c.getZenodoCommunities()){
- add(zc.getZenodoCommunityId(),new Pair<>(id,zc.getSelCriteria()),zenodocommunityMap);
+ // get zenodo communities
+ for (ZenodoCommunity zc : c.getZenodoCommunities()) {
+ add(
+ zc.getZenodoCommunityId(),
+ new Pair<>(id, zc.getSelCriteria()),
+ zenodocommunityMap);
}
-
-
}
}
- private void add(String key, Pair value, Map>> map){
- List> values = map.get(key);
+ private void add(
+ String key,
+ Pair value,
+ Map>> map) {
+ List> values = map.get(key);
- if (values == null){
+ if (values == null) {
values = new ArrayList<>();
- map.put(key,values);
+ map.put(key, values);
}
values.add(value);
}
- public List> getCommunityForSubject(String sbj){
+ public List> getCommunityForSubject(String sbj) {
return subjectMap.get(sbj);
}
- public List> getCommunityForDatasource(String dts){
+ public List> getCommunityForDatasource(String dts) {
return datasourceMap.get(dts);
}
+ public List getCommunityForDatasource(
+ final String dts, final Map> param) {
+ List> lp = datasourceMap.get(dts);
+ if (lp == null) return Lists.newArrayList();
- public List getCommunityForDatasource(final String dts, final Map> param) {
- List> lp = datasourceMap.get(dts);
- if (lp==null)
- return Lists.newArrayList();
-
- return lp.stream().map(p -> {
- if (p.getSnd() == null)
- return p.getFst();
- if (((SelectionConstraints) p.getSnd()).verifyCriteria(param))
- return p.getFst();
- else
- return null;
- }).filter(st->(st!=null)).collect(Collectors.toList());
-
-
+ return lp.stream()
+ .map(
+ p -> {
+ if (p.getSnd() == null) return p.getFst();
+ if (((SelectionConstraints) p.getSnd()).verifyCriteria(param))
+ return p.getFst();
+ else return null;
+ })
+ .filter(st -> (st != null))
+ .collect(Collectors.toList());
}
- public List> getCommunityForZenodoCommunity(String zc){
+ public List> getCommunityForZenodoCommunity(String zc) {
return zenodocommunityMap.get(zc);
}
@@ -125,7 +147,7 @@ public class CommunityConfiguration {
return getContextIds(datasourceMap.get(value.toLowerCase()));
}
- public List getCommunityForZenodoCommunityValue(String value){
+ public List getCommunityForZenodoCommunityValue(String value) {
return getContextIds(zenodocommunityMap.get(value.toLowerCase()));
}
@@ -137,7 +159,6 @@ public class CommunityConfiguration {
return Lists.newArrayList();
}
-
public Map getCommunities() {
return communities;
}
@@ -158,11 +179,11 @@ public class CommunityConfiguration {
return communities.keySet().size();
}
- public Community getCommunityById(String id){
+ public Community getCommunityById(String id) {
return communities.get(id);
}
public List getCommunityList() {
return Lists.newLinkedList(communities.values());
}
-}
\ No newline at end of file
+}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/Datasource.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/Datasource.java
index c0047a07e2..5acba31d6f 100644
--- a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/Datasource.java
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/Datasource.java
@@ -1,24 +1,20 @@
package eu.dnetlib.dhp.community;
-
import com.google.gson.Gson;
-
import eu.dnetlib.dhp.selectioncriteria.VerbResolver;
+import java.io.Serializable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.dom4j.Node;
-/**
- * Created by miriam on 01/08/2018.
- */
-public class Datasource {
+/** Created by miriam on 01/08/2018. */
+public class Datasource implements Serializable {
private static final Log log = LogFactory.getLog(Datasource.class);
private String openaireId;
private SelectionConstraints selectionConstraints;
-
public SelectionConstraints getSelCriteria() {
return selectionConstraints;
}
@@ -43,23 +39,19 @@ public class Datasource {
this.openaireId = openaireId;
}
- private void setSelCriteria(String json, VerbResolver resolver){
+ private void setSelCriteria(String json, VerbResolver resolver) {
log.info("Selection constraints for datasource = " + json);
selectionConstraints = new Gson().fromJson(json, SelectionConstraints.class);
selectionConstraints.setSelection(resolver);
}
- public void setSelCriteria(Node n, VerbResolver resolver){
- try{
- setSelCriteria(n.getText(),resolver);
- }catch(Exception e) {
+ public void setSelCriteria(Node n, VerbResolver resolver) {
+ try {
+ setSelCriteria(n.getText(), resolver);
+ } catch (Exception e) {
log.info("not set selection criteria... ");
- selectionConstraints =null;
+ selectionConstraints = null;
}
-
}
-
-
-
-}
\ No newline at end of file
+}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/Pair.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/Pair.java
index 1cff8bfef6..78ffe860de 100644
--- a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/Pair.java
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/Pair.java
@@ -1,11 +1,10 @@
package eu.dnetlib.dhp.community;
import com.google.gson.Gson;
+import java.io.Serializable;
-/**
- * Created by miriam on 03/08/2018.
- */
-public class Pair {
+/** Created by miriam on 03/08/2018. */
+public class Pair implements Serializable {
private A fst;
private B snd;
@@ -27,12 +26,12 @@ public class Pair {
return this;
}
- public Pair(A a, B b){
+ public Pair(A a, B b) {
fst = a;
snd = b;
}
- public String toJson(){
+ public String toJson() {
return new Gson().toJson(this);
}
}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/ProtoMap.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/ProtoMap.java
index 6bbe50c86d..773955d4ad 100644
--- a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/ProtoMap.java
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/ProtoMap.java
@@ -1,10 +1,11 @@
package eu.dnetlib.dhp.community;
+import java.io.Serializable;
import java.util.HashMap;
-public class ProtoMap extends HashMap {
+public class ProtoMap extends HashMap implements Serializable {
- public ProtoMap(){
+ public ProtoMap() {
super();
}
}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/ResultTagger.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/ResultTagger.java
index b9962050cd..abedab476f 100644
--- a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/ResultTagger.java
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/ResultTagger.java
@@ -1,67 +1,68 @@
package eu.dnetlib.dhp.community;
+
+import static eu.dnetlib.dhp.community.TagginConstants.*;
+
import com.google.gson.Gson;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.schema.oaf.*;
-import org.apache.commons.lang3.StringUtils;
-
+import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
-import static eu.dnetlib.dhp.community.TagginConstants.*;
-
-
-/**
- * Created by miriam on 02/08/2018.
- */
-public class ResultTagger {
-
+/** Created by miriam on 02/08/2018. */
+public class ResultTagger implements Serializable {
private String trust = "0.8";
-
- private boolean clearContext(Result result){
+ private boolean clearContext(Result result) {
int tmp = result.getContext().size();
- List clist = result.getContext().stream()
- .filter(c -> (!c.getId().contains(ZENODO_COMMUNITY_INDICATOR))).collect(Collectors.toList());
+ List clist =
+ result.getContext().stream()
+ .filter(c -> (!c.getId().contains(ZENODO_COMMUNITY_INDICATOR)))
+ .collect(Collectors.toList());
result.setContext(clist);
return (tmp != clist.size());
}
- private Map> getParamMap(final Result result, Map params) {
- Map> param = new HashMap<>();
- String json = new Gson().toJson(result,Result.class);
+ private Map> getParamMap(final Result result, Map params) {
+ Map> param = new HashMap<>();
+ String json = new Gson().toJson(result, Result.class);
DocumentContext jsonContext = JsonPath.parse(json);
- if (params == null){
+ if (params == null) {
params = new HashMap<>();
}
- for(String key : params.keySet()) {
+ for (String key : params.keySet()) {
try {
param.put(key, jsonContext.read(params.get(key)));
} catch (com.jayway.jsonpath.PathNotFoundException e) {
param.put(key, new ArrayList<>());
- // throw e;
+ // throw e;
}
}
return param;
-
}
+ public R enrichContextCriteria(
+ final R result, final CommunityConfiguration conf, final Map criteria) {
- public Result enrichContextCriteria(final Result result, final CommunityConfiguration conf, final Map criteria) {
+ // }
+ // public Result enrichContextCriteria(final Result result, final CommunityConfiguration
+ // conf, final Map criteria) {
final Map> param = getParamMap(result, criteria);
- //Verify if the entity is deletedbyinference. In case verify if to clean the context list from all the zenodo communities
- if(result.getDataInfo().getDeletedbyinference()){
- return result;
+ // Verify if the entity is deletedbyinference. In case verify if to clean the context list
+ // from all the zenodo communities
+ if (result.getDataInfo().getDeletedbyinference()) {
+ if (clearContext(result)) return result;
}
- //communities contains all the communities to be added as context for the result
+ // communities contains all the communities to be added as context for the result
final Set communities = new HashSet<>();
-
- //tagging for Subject
+ // tagging for Subject
final Set subjects = new HashSet<>();
result.getSubject().stream()
.map(subject -> subject.getValue())
@@ -73,89 +74,115 @@ public class ResultTagger {
communities.addAll(subjects);
-
- //Tagging for datasource
+ // Tagging for datasource
final Set datasources = new HashSet<>();
final Set tmp = new HashSet<>();
- for(Instance i : result.getInstance()){
- tmp.add(StringUtils.substringAfter(i.getCollectedfrom().getKey(),"|"));
- tmp.add(StringUtils.substringAfter(i.getHostedby().getKey(),"|"));
+ for (Instance i : result.getInstance()) {
+ tmp.add(StringUtils.substringAfter(i.getCollectedfrom().getKey(), "|"));
+ tmp.add(StringUtils.substringAfter(i.getHostedby().getKey(), "|"));
}
- result.getInstance()
- .stream()
+ result.getInstance().stream()
.map(i -> new Pair<>(i.getCollectedfrom().getKey(), i.getHostedby().getKey()))
.flatMap(p -> Stream.of(p.getFst(), p.getSnd()))
.map(s -> StringUtils.substringAfter(s, "|"))
.collect(Collectors.toCollection(HashSet::new))
- .forEach(dsId -> datasources.addAll(conf.getCommunityForDatasource(dsId,param)));
+ .forEach(dsId -> datasources.addAll(conf.getCommunityForDatasource(dsId, param)));
communities.addAll(datasources);
/*Tagging for Zenodo Communities*/
final Set czenodo = new HashSet<>();
- //final ResultProtos.Result.Metadata.Builder mBuilder = builder.getEntityBuilder().getResultBuilder().getMetadataBuilder();
- result.getContext()
- .stream()
+ result.getContext().stream()
.filter(c -> c.getId().contains(ZENODO_COMMUNITY_INDICATOR))
.collect(Collectors.toList())
- .forEach(c->czenodo.addAll(conf.getCommunityForZenodoCommunityValue(c.getId().substring(c.getId().lastIndexOf("/")+1).trim())));
+ .forEach(
+ c ->
+ czenodo.addAll(
+ conf.getCommunityForZenodoCommunityValue(
+ c.getId()
+ .substring(c.getId().lastIndexOf("/") + 1)
+ .trim())));
communities.addAll(czenodo);
clearContext(result);
/*Verify if there is something to bulktag*/
- if(communities.isEmpty()){
+ if (communities.isEmpty()) {
return result;
-
}
- result.getContext()
- .stream()
- .map(c -> {
- if(communities.contains(c.getId())){
- List dataInfoList = c.getDataInfo();
- if (subjects.contains(c.getId()))
- dataInfoList.add(getDataInfo(BULKTAG_DATA_INFO_TYPE, CLASS_ID_SUBJECT, CLASS_NAME_BULKTAG_SUBJECT));
- if (datasources.contains(c.getId()))
- dataInfoList.add(getDataInfo(BULKTAG_DATA_INFO_TYPE, CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE));
- if (czenodo.contains(c.getId()))
- dataInfoList.add(getDataInfo(BULKTAG_DATA_INFO_TYPE, CLASS_ID_CZENODO, CLASS_NAME_BULKTAG_ZENODO));
- }
- return c;
- })
+ result.getContext().stream()
+ .map(
+ c -> {
+ if (communities.contains(c.getId())) {
+ List dataInfoList = c.getDataInfo();
+ if (subjects.contains(c.getId()))
+ dataInfoList.add(
+ getDataInfo(
+ BULKTAG_DATA_INFO_TYPE,
+ CLASS_ID_SUBJECT,
+ CLASS_NAME_BULKTAG_SUBJECT));
+ if (datasources.contains(c.getId()))
+ dataInfoList.add(
+ getDataInfo(
+ BULKTAG_DATA_INFO_TYPE,
+ CLASS_ID_DATASOURCE,
+ CLASS_NAME_BULKTAG_DATASOURCE));
+ if (czenodo.contains(c.getId()))
+ dataInfoList.add(
+ getDataInfo(
+ BULKTAG_DATA_INFO_TYPE,
+ CLASS_ID_CZENODO,
+ CLASS_NAME_BULKTAG_ZENODO));
+ }
+ return c;
+ })
.collect(Collectors.toList());
+ communities.removeAll(
+ result.getContext().stream().map(c -> c.getId()).collect(Collectors.toSet()));
- communities.removeAll(result.getContext().stream().map(c -> c.getId()).collect(Collectors.toSet()));
+ if (communities.isEmpty()) return result;
- if(communities.isEmpty())
- return result;
-
- List toaddcontext = communities
- .stream()
- .map(c -> {
- Context context = new Context();
- context.setId(c);
- List dataInfoList = Arrays.asList();
- if (subjects.contains(c))
- dataInfoList.add(getDataInfo(BULKTAG_DATA_INFO_TYPE, CLASS_ID_SUBJECT, CLASS_NAME_BULKTAG_SUBJECT));
- if (datasources.contains(c))
- dataInfoList.add(getDataInfo(BULKTAG_DATA_INFO_TYPE, CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE));
- if (czenodo.contains(c))
- dataInfoList.add(getDataInfo(BULKTAG_DATA_INFO_TYPE, CLASS_ID_CZENODO, CLASS_NAME_BULKTAG_ZENODO));
- context.setDataInfo(dataInfoList);
- return context;
- })
- .collect(Collectors.toList());
+ List toaddcontext =
+ communities.stream()
+ .map(
+ c -> {
+ Context context = new Context();
+ context.setId(c);
+ List dataInfoList = Arrays.asList();
+ if (subjects.contains(c))
+ dataInfoList.add(
+ getDataInfo(
+ BULKTAG_DATA_INFO_TYPE,
+ CLASS_ID_SUBJECT,
+ CLASS_NAME_BULKTAG_SUBJECT));
+ if (datasources.contains(c))
+ dataInfoList.add(
+ getDataInfo(
+ BULKTAG_DATA_INFO_TYPE,
+ CLASS_ID_DATASOURCE,
+ CLASS_NAME_BULKTAG_DATASOURCE));
+ if (czenodo.contains(c))
+ dataInfoList.add(
+ getDataInfo(
+ BULKTAG_DATA_INFO_TYPE,
+ CLASS_ID_CZENODO,
+ CLASS_NAME_BULKTAG_ZENODO));
+ context.setDataInfo(dataInfoList);
+ return context;
+ })
+ .collect(Collectors.toList());
result.getContext().addAll(toaddcontext);
return result;
}
- public static DataInfo getDataInfo(String inference_provenance, String inference_class_id, String inference_class_name){
+ public static DataInfo getDataInfo(
+ String inference_provenance, String inference_class_id, String inference_class_name) {
DataInfo di = new DataInfo();
di.setInferred(true);
di.setInferenceprovenance(inference_provenance);
@@ -171,5 +198,4 @@ public class ResultTagger {
pa.setSchemename(DNET_SCHEMA_NAME);
return pa;
}
-
-}
\ No newline at end of file
+}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/TagginConstants.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/TagginConstants.java
index 494d955c80..9f681472a3 100644
--- a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/TagginConstants.java
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/TagginConstants.java
@@ -1,27 +1,23 @@
package eu.dnetlib.dhp.community;
-
public class TagginConstants {
+ public static final String BULKTAG_DATA_INFO_TYPE = "bulktagging";
- public final static String BULKTAG_DATA_INFO_TYPE = "bulktagging";
+ public static final String DNET_SCHEMA_NAME = "dnet:provenanceActions";
+ public static final String DNET_SCHEMA_ID = "dnet:provenanceActions";
- public final static String DNET_SCHEMA_NAME = "dnet:provenanceActions";
- public final static String DNET_SCHEMA_ID = "dnet:provenanceActions";
-
- public final static String CLASS_ID_SUBJECT = "bulktagging:community:subject";
- public final static String CLASS_ID_DATASOURCE = "bulktagging:community:datasource";
- public final static String CLASS_ID_CZENODO = "bulktagging:community:zenodocommunity";
-
- public final static String SCHEMA_ID = "dnet:provenanceActions";
- public final static String COUNTER_GROUP = "Bulk Tagging";
-
- public final static String ZENODO_COMMUNITY_INDICATOR = "zenodo.org/communities/";
-
- public final static String CLASS_NAME_BULKTAG_SUBJECT = "Bulktagging for Community - Subject";
- public final static String CLASS_NAME_BULKTAG_DATASOURCE = "Bulktagging for Community - Datasource";
- public final static String CLASS_NAME_BULKTAG_ZENODO = "Bulktagging for Community - Zenodo";
+ public static final String CLASS_ID_SUBJECT = "community:subject";
+ public static final String CLASS_ID_DATASOURCE = "community:datasource";
+ public static final String CLASS_ID_CZENODO = "community:zenodocommunity";
+ public static final String SCHEMA_ID = "dnet:provenanceActions";
+ public static final String COUNTER_GROUP = "Bulk Tagging";
+ public static final String ZENODO_COMMUNITY_INDICATOR = "zenodo.org/communities/";
+ public static final String CLASS_NAME_BULKTAG_SUBJECT = "Bulktagging for Community - Subject";
+ public static final String CLASS_NAME_BULKTAG_DATASOURCE =
+ "Bulktagging for Community - Datasource";
+ public static final String CLASS_NAME_BULKTAG_ZENODO = "Bulktagging for Community - Zenodo";
}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/ZenodoCommunity.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/ZenodoCommunity.java
index d785a3d94c..19d97d2217 100644
--- a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/ZenodoCommunity.java
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/community/ZenodoCommunity.java
@@ -1,13 +1,11 @@
package eu.dnetlib.dhp.community;
import com.google.gson.Gson;
+import java.io.Serializable;
import org.dom4j.Node;
-
-/**
- * Created by miriam on 01/08/2018.
- */
-public class ZenodoCommunity {
+/** Created by miriam on 01/08/2018. */
+public class ZenodoCommunity implements Serializable {
private String zenodoCommunityId;
@@ -29,18 +27,16 @@ public class ZenodoCommunity {
this.selCriteria = selCriteria;
}
- private void setSelCriteria(String json){
- //Type collectionType = new TypeToken>(){}.getType();
+ private void setSelCriteria(String json) {
+ // Type collectionType = new TypeToken>(){}.getType();
selCriteria = new Gson().fromJson(json, SelectionConstraints.class);
-
}
- public void setSelCriteria(Node n){
- if (n==null){
+ public void setSelCriteria(Node n) {
+ if (n == null) {
selCriteria = null;
- }else{
+ } else {
setSelCriteria(n.getText());
}
}
-
-}
\ No newline at end of file
+}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/VerbClass.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/VerbClass.java
index 9f519f0915..9a5fe4e8aa 100644
--- a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/VerbClass.java
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/VerbClass.java
@@ -7,7 +7,7 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
-public @interface VerbClass {
+@interface VerbClass {
- public String value();
-}
\ No newline at end of file
+ String value();
+}
diff --git a/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/input_bulkTag_parameters.json b/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/input_bulkTag_parameters.json
index 3221924bf3..b370467aec 100644
--- a/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/input_bulkTag_parameters.json
+++ b/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/input_bulkTag_parameters.json
@@ -5,12 +5,6 @@
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
- {
- "paramName":"mt",
- "paramLongName":"master",
- "paramDescription": "should be local or yarn",
- "paramRequired": true
- },
{
"paramName":"s",
"paramLongName":"sourcePath",
@@ -22,6 +16,36 @@
"paramLongName":"protoMap",
"paramDescription": "the json path associated to each selection field",
"paramRequired": true
+ },
+ {
+ "paramName":"tn",
+ "paramLongName":"resultTableName",
+ "paramDescription": "the name of the result table we are currently working on",
+ "paramRequired": true
+ },
+ {
+ "paramName": "out",
+ "paramLongName": "outputPath",
+ "paramDescription": "the path used to store temporary output files",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ssm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "true if the spark session is managed, false otherwise",
+ "paramRequired": false
+ },
+ {
+ "paramName": "test",
+ "paramLongName": "isTest",
+ "paramDescription": "true if the spark session is managed, false otherwise",
+ "paramRequired": false
+ },
+ {
+ "paramName": "tg",
+ "paramLongName": "taggingConf",
+ "paramDescription": "true if the spark session is managed, false otherwise",
+ "paramRequired": false
}
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/oozie_app/config-default.xml b/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/oozie_app/config-default.xml
index ea3a4d9223..73bfe9ae7a 100644
--- a/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/oozie_app/config-default.xml
+++ b/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/oozie_app/config-default.xml
@@ -19,4 +19,28 @@
hive_metastore_uris
thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+ spark2YarnHistoryServerAddress
+ http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
+
+
+ spark2EventLogDir
+ /user/spark/spark2ApplicationHistory
+
+
+ sparkExecutorNumber
+ 1
+
+
+ sparkDriverMemory
+ 15G
+
+
+ sparkExecutorMemory
+ 6G
+
+
+ sparkExecutorCores
+ 1
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/oozie_app/workflow.xml b/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/oozie_app/workflow.xml
index 1866bb0a01..b4f918bbd0 100644
--- a/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-bulktag/src/main/resources/eu/dnetlib/dhp/oozie_app/workflow.xml
@@ -1,13 +1,9 @@
-
+
sourcePath
the source path
-
- allowedsemrels
- the semantic relationships allowed for propagation
-
sparkDriverMemory
memory for driver process
@@ -24,38 +20,163 @@
isLookupUrl
the isLookup service endpoint
+
+ protoMap
+ the json path associated to each selection field
+
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
- ${jobTracker}
- ${nameNode}
- yarn-cluster
- cluster
- ResultToCommunitySemRelPropagation
- eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob
- dhp-propagation-${projectVersion}.jar
- --executor-memory ${sparkExecutorMemory}
- --executor-cores ${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
- --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
-
- -mt yarn-cluster
- --sourcePath${sourcePath}
-
- --hive_metastore_uris${hive_metastore_uris}
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ ${nameNode}/${sourcePath}/relation
+ ${nameNode}/${workingDir}/relation
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ yarn-cluster
+ cluster
+ bulkTagging
+ eu.dnetlib.dhp.SparkBulkTagJob
+ dhp-bulktag-${projectVersion}.jar
+
+ --num-executors=${sparkExecutorNumber}
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePath${sourcePath}/publication
+
+ --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication
+ --outputPath${workingDir}/publication
+ --proto_map${protoMap}
+ --isLookupUrl${isLookupUrl}
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ yarn-cluster
+ cluster
+ bulkTagging
+ eu.dnetlib.dhp.SparkBulkTagJob
+ dhp-bulktag-${projectVersion}.jar
+
+ --num-executors=${sparkExecutorNumber}
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePath${sourcePath}/dataset
+
+ --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset
+ --outputPath${workingDir}/dataset
+ --proto_map${protoMap}
+ --isLookupUrl${isLookupUrl}
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ yarn-cluster
+ cluster
+ bulkTagging
+ eu.dnetlib.dhp.SparkBulkTagJob
+ dhp-bulktag-${projectVersion}.jar
+
+ --num-executors=${sparkExecutorNumber}
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePath${sourcePath}/otherresearchproduct
+
+ --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct
+ --outputPath${workingDir}/otherresearchproduct
+ --proto_map${protoMap}
+ --isLookupUrl${isLookupUrl}
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ yarn-cluster
+ cluster
+ bulkTagging
+ eu.dnetlib.dhp.SparkBulkTagJob
+ dhp-bulktag-${projectVersion}.jar
+
+ --num-executors=${sparkExecutorNumber}
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePath${sourcePath}/software
+
+ --resultTableNameeu.dnetlib.dhp.schema.oaf.Software
+ --outputPath${workingDir}/software
+ --proto_map${protoMap}
+ --isLookupUrl${isLookupUrl}
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/test/java/eu/dnetlib/dhp/BulkTagJobTest.java b/dhp-workflows/dhp-bulktag/src/test/java/eu/dnetlib/dhp/BulkTagJobTest.java
new file mode 100644
index 0000000000..4a45d234c4
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/test/java/eu/dnetlib/dhp/BulkTagJobTest.java
@@ -0,0 +1,233 @@
+package eu.dnetlib.dhp;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.commons.io.FileUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mortbay.util.IO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BulkTagJobTest {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static final ClassLoader cl = eu.dnetlib.dhp.BulkTagJobTest.class.getClassLoader();
+
+ private static SparkSession spark;
+
+ private static Path workingDir;
+ private static final Logger log = LoggerFactory.getLogger(eu.dnetlib.dhp.BulkTagJobTest.class);
+
+ private static String taggingConf = "";
+
+ static {
+ try {
+ taggingConf =
+ IO.toString(
+ BulkTagJobTest.class.getResourceAsStream(
+ "/eu/dnetlib/dhp/communityconfiguration/tagging_conf.json"));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @BeforeAll
+ public static void beforeAll() throws IOException {
+ workingDir = Files.createTempDirectory(eu.dnetlib.dhp.BulkTagJobTest.class.getSimpleName());
+ log.info("using work dir {}", workingDir);
+
+ SparkConf conf = new SparkConf();
+ conf.setAppName(eu.dnetlib.dhp.BulkTagJobTest.class.getSimpleName());
+
+ conf.setMaster("local[*]");
+ conf.set("spark.driver.host", "localhost");
+ conf.set("hive.metastore.local", "true");
+ conf.set("spark.ui.enabled", "false");
+ conf.set("spark.sql.warehouse.dir", workingDir.toString());
+ conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
+
+ spark =
+ SparkSession.builder()
+ .appName(BulkTagJobTest.class.getSimpleName())
+ .config(conf)
+ .getOrCreate();
+ }
+
+ @AfterAll
+ public static void afterAll() throws IOException {
+ FileUtils.deleteDirectory(workingDir.toFile());
+ spark.stop();
+ }
+
+ @Test
+ public void test1() throws Exception {
+ SparkBulkTagJob2.main(
+ new String[] {
+ "-isTest",
+ Boolean.TRUE.toString(),
+ "-isSparkSessionManaged",
+ Boolean.FALSE.toString(),
+ "-sourcePath",
+ getClass().getResource("/eu/dnetlib/dhp/sample/dataset").getPath(),
+ "-taggingConf",
+ taggingConf,
+ "-resultTableName",
+ "eu.dnetlib.dhp.schema.oaf.Dataset",
+ "-outputPath",
+ workingDir.toString() + "/dataset",
+ "-isLookupUrl",
+ "http://beta.services.openaire.eu:8280/is/services/isLookUp",
+ "-protoMap",
+ "{ \"author\" : \"$['author'][*]['fullname']\","
+ + " \"title\" : \"$['title'][*]['value']\","
+ + " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
+ + " \"contributor\" : \"$['contributor'][*]['value']\","
+ + " \"description\" : \"$['description'][*]['value']\"}"
+ // "-preparedInfoPath",
+ // getClass().getResource("/eu/dnetlib/dhp/resulttocommunityfromsemrel/preparedInfo").getPath()
+ });
+ }
+}
+
+/*
+
+
+import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidPropagationJobTest;
+import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob2;
+import eu.dnetlib.dhp.schema.oaf.Dataset;
+import org.apache.commons.io.FileUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import static org.apache.spark.sql.functions.desc;
+
+
+
+
+
+ @Test
+ public void test1() throws Exception {
+ SparkResultToCommunityThroughSemRelJob4.main(new String[]{
+ "-isTest", Boolean.TRUE.toString(),
+ "-isSparkSessionManaged", Boolean.FALSE.toString(),
+ "-sourcePath", getClass().getResource("/eu/dnetlib/dhp/resulttocommunityfromsemrel/sample").getPath(),
+ "-hive_metastore_uris", "",
+ "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
+ "-outputPath", workingDir.toString() + "/dataset",
+ "-preparedInfoPath", getClass().getResource("/eu/dnetlib/dhp/resulttocommunityfromsemrel/preparedInfo").getPath()
+ });
+
+ final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+
+ JavaRDD tmp = sc.textFile(workingDir.toString() + "/dataset")
+ .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
+
+ Assertions.assertEquals(10, tmp.count());
+ org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
+
+ verificationDataset.createOrReplaceTempView("dataset");
+
+ String query = "select id, MyT.id community " +
+ "from dataset " +
+ "lateral view explode(context) c as MyT " +
+ "lateral view explode(MyT.datainfo) d as MyD " +
+ "where MyD.inferenceprovenance = 'propagation'";
+
+ org.apache.spark.sql.Dataset resultExplodedProvenance = spark.sql(query);
+ Assertions.assertEquals(5, resultExplodedProvenance.count());
+
+ Assertions.assertEquals(0, resultExplodedProvenance.filter("id = '50|dedup_wf_001::2305908abeca9da37eaf3bddcaf81b7b'").count());
+
+ Assertions.assertEquals(1, resultExplodedProvenance.filter("id = '50|dedup_wf_001::0489ae524201eedaa775da282dce35e7'").count());
+ Assertions.assertEquals("dh-ch",resultExplodedProvenance.select("community")
+ .where(resultExplodedProvenance.col("id").equalTo("50|dedup_wf_001::0489ae524201eedaa775da282dce35e7"))
+ .collectAsList().get(0).getString(0));
+
+ Assertions.assertEquals(3, resultExplodedProvenance.filter("id = '50|dedup_wf_001::0a60e33b4f0986ebd9819451f2d87a28'").count());
+ List rowList = resultExplodedProvenance.select("community")
+ .where(resultExplodedProvenance.col("id")
+ .equalTo("50|dedup_wf_001::0a60e33b4f0986ebd9819451f2d87a28"))
+ .sort(desc("community")).collectAsList();
+ Assertions.assertEquals("mes", rowList.get(0).getString(0));
+ Assertions.assertEquals("fam", rowList.get(1).getString(0));
+ Assertions.assertEquals("ee", rowList.get(2).getString(0));
+
+
+ Assertions.assertEquals(1, resultExplodedProvenance.filter("id = '50|dedup_wf_001::0ae02edb5598a5545d10b107fcf48dcc'").count());
+ Assertions.assertEquals("aginfra", resultExplodedProvenance.select("community")
+ .where(resultExplodedProvenance.col("id")
+ .equalTo("50|dedup_wf_001::0ae02edb5598a5545d10b107fcf48dcc"))
+ .collectAsList().get(0).getString(0));
+
+
+ query = "select id, MyT.id community " +
+ "from dataset " +
+ "lateral view explode(context) c as MyT " +
+ "lateral view explode(MyT.datainfo) d as MyD ";
+
+ org.apache.spark.sql.Dataset resultCommunityId = spark.sql(query);
+
+ Assertions.assertEquals(10, resultCommunityId.count());
+
+ Assertions.assertEquals(2, resultCommunityId.filter("id = '50|dedup_wf_001::0489ae524201eedaa775da282dce35e7'").count());
+ rowList = resultCommunityId.select("community")
+ .where(resultCommunityId.col("id").equalTo("50|dedup_wf_001::0489ae524201eedaa775da282dce35e7"))
+ .sort(desc("community"))
+ .collectAsList();
+ Assertions.assertEquals("dh-ch", rowList.get(0).getString(0));
+ Assertions.assertEquals("beopen", rowList.get(1).getString(0));
+
+ Assertions.assertEquals(3, resultCommunityId.filter("id = '50|dedup_wf_001::0a60e33b4f0986ebd9819451f2d87a28'").count());
+ rowList = resultCommunityId.select("community")
+ .where(resultCommunityId.col("id").equalTo("50|dedup_wf_001::0a60e33b4f0986ebd9819451f2d87a28"))
+ .sort(desc("community"))
+ .collectAsList();
+ Assertions.assertEquals("mes", rowList.get(0).getString(0));
+ Assertions.assertEquals("fam", rowList.get(1).getString(0));
+ Assertions.assertEquals("ee", rowList.get(2).getString(0));
+
+ Assertions.assertEquals(2, resultCommunityId.filter("id = '50|dedup_wf_001::0ae02edb5598a5545d10b107fcf48dcc'").count());
+ rowList = resultCommunityId.select("community")
+ .where(resultCommunityId.col("id").equalTo("50|dedup_wf_001::0ae02edb5598a5545d10b107fcf48dcc"))
+ .sort(desc("community"))
+ .collectAsList();
+ Assertions.assertEquals("beopen", rowList.get(0).getString(0));
+ Assertions.assertEquals("aginfra", rowList.get(1).getString(0));
+
+ Assertions.assertEquals(2, resultCommunityId.filter("id = '50|dedup_wf_001::2305908abeca9da37eaf3bddcaf81b7b'").count());
+ rowList = resultCommunityId.select("community")
+ .where(resultCommunityId.col("id").equalTo("50|dedup_wf_001::2305908abeca9da37eaf3bddcaf81b7b"))
+ .sort(desc("community"))
+ .collectAsList();
+ Assertions.assertEquals("euromarine", rowList.get(1).getString(0));
+ Assertions.assertEquals("ni", rowList.get(0).getString(0));
+
+ Assertions.assertEquals(1, resultCommunityId.filter("id = '50|doajarticles::8d817039a63710fcf97e30f14662c6c8'").count());
+ Assertions.assertEquals("euromarine", resultCommunityId.select("community")
+ .where(resultCommunityId.col("id")
+ .equalTo("50|doajarticles::8d817039a63710fcf97e30f14662c6c8"))
+ .collectAsList().get(0).getString(0));
+
+
+ }
+ */
diff --git a/dhp-workflows/dhp-bulktag/src/test/java/eu/dnetlib/dhp/CommunityConfigurationFactoryTest.java b/dhp-workflows/dhp-bulktag/src/test/java/eu/dnetlib/dhp/CommunityConfigurationFactoryTest.java
new file mode 100644
index 0000000000..77c4482652
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/test/java/eu/dnetlib/dhp/CommunityConfigurationFactoryTest.java
@@ -0,0 +1,147 @@
+package eu.dnetlib.dhp;
+
+import com.google.gson.Gson;
+import eu.dnetlib.dhp.community.CommunityConfiguration;
+import eu.dnetlib.dhp.community.CommunityConfigurationFactory;
+import eu.dnetlib.dhp.community.Constraint;
+import eu.dnetlib.dhp.community.SelectionConstraints;
+import eu.dnetlib.dhp.selectioncriteria.VerbResolver;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.*;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.dom4j.DocumentException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Created by miriam on 03/08/2018. */
+public class CommunityConfigurationFactoryTest {
+
+ private static String xml;
+ private static String xml1;
+
+ private final VerbResolver resolver = new VerbResolver();
+
+ @Test
+ public void parseTest() throws DocumentException, IOException {
+ String xml =
+ IOUtils.toString(
+ getClass()
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/communityconfiguration/community_configuration.xml"));
+ final CommunityConfiguration cc = CommunityConfigurationFactory.newInstance(xml);
+ Assertions.assertEquals(5, cc.size());
+ cc.getCommunityList()
+ .forEach(c -> Assertions.assertTrue(StringUtils.isNoneBlank(c.getId())));
+ }
+
+ @Test
+ public void applyVerb()
+ throws InvocationTargetException, IllegalAccessException, NoSuchMethodException,
+ InstantiationException {
+ Constraint sc = new Constraint();
+ sc.setVerb("not_contains");
+ sc.setField("contributor");
+ sc.setValue("DARIAH");
+ sc.setSelection(resolver.getSelectionCriteria(sc.getVerb(), sc.getValue()));
+ String metadata = "This work has been partially supported by DARIAH-EU infrastructure";
+ Assertions.assertFalse(sc.verifyCriteria(metadata));
+ }
+
+ @Test
+ public void loadSelCriteriaTest() throws DocumentException, IOException {
+ String xml =
+ IOUtils.toString(
+ getClass()
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/communityconfiguration/community_configuration_selcrit.xml"));
+ final CommunityConfiguration cc = CommunityConfigurationFactory.newInstance(xml);
+ Map> param = new HashMap<>();
+ param.put("author", new ArrayList<>(Collections.singletonList("Pippo Pippi")));
+ param.put(
+ "description",
+ new ArrayList<>(
+ Collections.singletonList(
+ "This work has been partially supported by DARIAH-EU infrastructure")));
+ param.put(
+ "contributor",
+ new ArrayList<>(
+ Collections.singletonList(
+ "Pallino ha aiutato a scrivere il paper. Pallino lavora per DARIAH")));
+ List comm =
+ cc.getCommunityForDatasource(
+ "openaire____::1cfdb2e14977f31a98e0118283401f32", param);
+ Assertions.assertEquals(1, comm.size());
+ Assertions.assertEquals("dariah", comm.get(0));
+ }
+
+ @Test
+ public void test4() throws DocumentException, IOException {
+ final CommunityConfiguration cc =
+ CommunityConfigurationFactory.fromJson(
+ IOUtils.toString(
+ getClass()
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/communityconfiguration/community_configuration_selcrit.json")));
+ cc.toString();
+ }
+
+ @Test
+ public void test5() throws IOException, DocumentException {
+
+ // final CommunityConfiguration cc =
+ // CommunityConfigurationFactory.newInstance(IOUtils.toString(getClass().getResourceAsStream("test.xml")));
+ final CommunityConfiguration cc =
+ CommunityConfigurationFactory.fromJson(
+ IOUtils.toString(
+ getClass()
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/communityconfiguration/community_configuration.json")));
+
+ System.out.println(cc.toJson());
+ }
+
+ @Test
+ public void test6() {
+ String json =
+ "{\"criteria\":[{\"constraint\":[{\"verb\":\"contains\",\"field\":\"contributor\",\"value\":\"DARIAH\"}]}]}";
+
+ String step1 = "{\"verb\":\"contains\",\"field\":\"contributor\",\"value\":\"DARIAH\"}";
+
+ Constraint c = new Gson().fromJson(step1, Constraint.class);
+ //
+ // String step2 =
+ // "{\"constraint\":[{\"verb\":\"contains\",\"field\":\"contributor\",\"value\":\"DARIAH\"}]}";
+ //
+ // ConstraintEncapsulator ce = new
+ // Gson().fromJson(step2,ConstraintEncapsulator.class);
+ //
+ //
+ // String step3 =
+ // "{\"ce\":{\"constraint\":[{\"verb\":\"contains\",\"field\":\"contributor\",\"value\":\"DARIAH\"}]}}";
+ //
+ // Constraints cons = new Gson().fromJson(step3,Constraints.class);
+ //
+ // String step4 =
+ // "{\"criteria\":[{\"ce\":{\"constraint\":[{\"verb\":\"contains\",\"field\":\"contributor\",\"value\":\"DARIAH\"}]}}]}";
+ //
+ // ConstraintsList cl = new Gson().fromJson(step4,ConstraintsList.class);
+ //
+ // String step5 =
+ // "{\"cl\":{\"criteria\":[{\"ce\":{\"constraint\":[{\"verb\":\"contains\",\"field\":\"contributor\",\"value\":\"DARIAH\"}]}}]}}";
+ SelectionConstraints sl = new Gson().fromJson(json, SelectionConstraints.class);
+ }
+
+ @Test
+ public void test7() throws IOException {
+ final CommunityConfiguration cc =
+ CommunityConfigurationFactory.fromJson(
+ IOUtils.toString(
+ getClass()
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/communityconfiguration/tagging_conf.json")));
+
+ System.out.println(cc.toJson());
+ }
+}
diff --git a/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/community_configuration.json b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/community_configuration.json
new file mode 100644
index 0000000000..d21dc4ceda
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/community_configuration.json
@@ -0,0 +1,694 @@
+{"communities": {
+ "clarin": {
+ "id": "clarin",
+ "subjects": [],
+ "datasources": [
+ {
+ "openaireId": "re3data_____::a507cdacc5bbcc08761c92185dee5cab"
+ }
+ ],
+ "zenodoCommunities": [
+
+ ]
+ },
+ "ee": {
+ "id": "ee",
+ "subjects": [
+ "SDG13 - Climate action",
+ "SDG8 - Decent work and economic\n\t\t\t\t\tgrowth",
+ "SDG15 - Life on land",
+ "SDG2 - Zero hunger",
+ "SDG17 - Partnerships for the\n\t\t\t\t\tgoals",
+ "SDG10 - Reduced inequalities",
+ "SDG5 - Gender equality",
+ "SDG12 - Responsible\n\t\t\t\t\tconsumption and production",
+ "SDG14 - Life below water",
+ "SDG6 - Clean water and\n\t\t\t\t\tsanitation",
+ "SDG11 - Sustainable cities and communities",
+ "SDG1 - No poverty",
+ "SDG3 -\n\t\t\t\t\tGood health and well being",
+ "SDG7 - Affordable and clean energy",
+ "SDG4 - Quality\n\t\t\t\t\teducation",
+ "SDG9 - Industry innovation and infrastructure",
+ "SDG16 - Peace justice\n\t\t\t\t\tand strong institutions"
+ ],
+ "datasources": [
+
+ ],
+ "zenodoCommunities": [
+
+ ]
+ },
+ "aginfra": {
+ "id": "aginfra",
+ "subjects": [
+ "animal production and health",
+ "fisheries and aquaculture",
+ "food safety and human nutrition",
+ "information management",
+ "food technology",
+ "agri-food education and extension",
+ "natural resources and environment",
+ "food system",
+ "engineering technology and Research",
+ "agriculture",
+ "food safety risk assessment",
+ "food security",
+ "farming practices and systems",
+ "plant production and protection",
+ "agri-food economics and policy",
+ "food distribution",
+ "forestry"
+ ],
+ "datasources": [
+ {
+ "openaireId": "opendoar____::1a551829d50f1400b0dab21fdd969c04"
+ },
+ {
+ "openaireId": "opendoar____::49af6c4e558a7569d80eee2e035e2bd7"
+ },
+ {
+ "openaireId": "opendoar____::0266e33d3f546cb5436a10798e657d97"
+ },
+ {
+ "openaireId": "opendoar____::fd4c2dc64ccb8496e6f1f94c85f30d06"
+ },
+ {
+ "openaireId": "opendoar____::41bfd20a38bb1b0bec75acf0845530a7"
+ },
+ {
+ "openaireId": "opendoar____::87ae6fb631f7c8a627e8e28785d9992d"
+ }
+ ],
+ "zenodoCommunities": [
+ {
+ "zenodoCommunityId": "edenis"
+ },
+ {
+ "zenodoCommunityId": "efsa-pilot"
+ },
+ {
+ "zenodoCommunityId": "egene3"
+ },
+ {
+ "zenodoCommunityId": "efsa-kj"
+ },
+ {
+ "zenodoCommunityId": "euromixproject"
+ },
+ {
+ "zenodoCommunityId": "discardless"
+ },
+ {
+ "zenodoCommunityId": "sedinstcjfst"
+ },
+ {
+ "zenodoCommunityId": "afinet-kc"
+ },
+ {
+ "zenodoCommunityId": "2231-4784"
+ },
+ {
+ "zenodoCommunityId": "2231-0606"
+ },
+ {
+ "zenodoCommunityId": "solace"
+ },
+ {
+ "zenodoCommunityId": "pa17"
+ },
+ {
+ "zenodoCommunityId": "smartakis"
+ },
+ {
+ "zenodoCommunityId": "sedinstcjae"
+ },
+ {
+ "zenodoCommunityId": "phenology_camera"
+ },
+ {
+ "zenodoCommunityId": "aginfra"
+ },
+ {
+ "zenodoCommunityId": "erosa"
+ },
+ {
+ "zenodoCommunityId": "bigdatagrapes"
+ }
+ ]
+ },
+ "fam": {
+ "id": "fam",
+ "subjects": [
+ "Stock Assessment",
+ "pelagic",
+ "Fish farming",
+ "EMFF",
+ "Fisheries",
+ "Fishermen",
+ "maximum sustainable yield",
+ "trawler",
+ "Fishing vessel",
+ "Fisherman",
+ "Fishing gear",
+ "RFMO",
+ "Fish Aggregating Device",
+ "Bycatch",
+ "Fishery",
+ "common fisheries policy",
+ "Fishing fleet",
+ "Aquaculture"
+ ],
+ "datasources": [
+ {
+ "openaireId": "doajarticles::8cec81178926caaca531afbd8eb5d64c"
+ },
+ {
+ "openaireId": "doajarticles::0f7a7f30b5400615cae1829f3e743982"
+ },
+ {
+ "openaireId": "doajarticles::9740f7f5af3e506d2ad2c215cdccd51a"
+ },
+ {
+ "openaireId": "doajarticles::9f3fbaae044fa33cb7069b72935a3254"
+ },
+ {
+ "openaireId": "doajarticles::cb67f33eb9819f5c624ce0313957f6b3"
+ },
+ {
+ "openaireId": "doajarticles::e21c97cbb7a209afc75703681c462906"
+ },
+ {
+ "openaireId": "doajarticles::554cde3be9e5c4588b4c4f9f503120cb"
+ },
+ {
+ "openaireId": "tubitakulakb::11e22f49e65b9fd11d5b144b93861a1b"
+ },
+ {
+ "openaireId": "doajarticles::57c5d3837da943e93b28ec4db82ec7a5"
+ },
+ {
+ "openaireId": "doajarticles::a186f5ddb8e8c7ecc992ef51cf3315b1"
+ },
+ {
+ "openaireId": "doajarticles::e21c97cbb7a209afc75703681c462906"
+ },
+ {
+ "openaireId": "doajarticles::dca64612dfe0963fffc119098a319957"
+ },
+ {
+ "openaireId": "doajarticles::dd70e44479f0ade25aa106aef3e87a0a"
+ }
+ ],
+ "zenodoCommunities": [
+ {
+ "zenodoCommunityId": "discardless"
+ },
+ {
+ "zenodoCommunityId": "farfish2020"
+ },
+ {
+ "zenodoCommunityId": "facts"
+ },
+ {
+ "zenodoCommunityId": "climefish"
+ },
+ {
+ "zenodoCommunityId": "proeel"
+ },
+ {
+ "zenodoCommunityId": "primefish"
+ },
+ {
+ "zenodoCommunityId": "h2020_vicinaqua"
+ },
+ {
+ "zenodoCommunityId": "meece"
+ },
+ {
+ "zenodoCommunityId": "rlsadb"
+ }
+ ]
+ },
+ "instruct": {
+ "id": "instruct",
+ "subjects": [
+
+ ],
+ "datasources": [
+
+ ],
+ "zenodoCommunities": [
+ {
+ "zenodoCommunityId": "instruct"
+ },
+ {
+ "zenodoCommunityId": "west-life"
+ }
+ ]
+ },
+ "mes": {
+ "id": "mes",
+ "subjects": [
+ "marine",
+ "ocean",
+ "fish",
+ "aqua",
+ "sea"
+ ],
+ "datasources": [
+
+ ],
+ "zenodoCommunities": [
+ {
+ "zenodoCommunityId": "adriplan"
+ },
+ {
+ "zenodoCommunityId": "devotes-project"
+ },
+ {
+ "zenodoCommunityId": "euro-basin"
+ },
+ {
+ "zenodoCommunityId": "naclim"
+ },
+ {
+ "zenodoCommunityId": "discardless"
+ },
+ {
+ "zenodoCommunityId": "assisibf"
+ },
+ {
+ "zenodoCommunityId": "meece"
+ },
+ {
+ "zenodoCommunityId": "facts"
+ },
+ {
+ "zenodoCommunityId": "proeel"
+ },
+ {
+ "zenodoCommunityId": "aquatrace"
+ },
+ {
+ "zenodoCommunityId": "myfish"
+ },
+ {
+ "zenodoCommunityId": "atlas"
+ },
+ {
+ "zenodoCommunityId": "blue-actionh2020"
+ },
+ {
+ "zenodoCommunityId": "sponges"
+ },
+ {
+ "zenodoCommunityId": "merces_project"
+ },
+ {
+ "zenodoCommunityId": "bigdataocean"
+ },
+ {
+ "zenodoCommunityId": "columbus"
+ },
+ {
+ "zenodoCommunityId": "h2020-aquainvad-ed"
+ },
+ {
+ "zenodoCommunityId": "aquarius"
+ },
+ {
+ "zenodoCommunityId": "southern-ocean-observing-system"
+ },
+ {
+ "zenodoCommunityId": "eawag"
+ },
+ {
+ "zenodoCommunityId": "mossco"
+ },
+ {
+ "zenodoCommunityId": "onc"
+ },
+ {
+ "zenodoCommunityId": "oceanbiogeochemistry"
+ },
+ {
+ "zenodoCommunityId": "oceanliteracy"
+ },
+ {
+ "zenodoCommunityId": "openearth"
+ },
+ {
+ "zenodoCommunityId": "ocean"
+ },
+ {
+ "zenodoCommunityId": "calcifierraman"
+ },
+ {
+ "zenodoCommunityId": "bermudabream"
+ },
+ {
+ "zenodoCommunityId": "brcorp1"
+ },
+ {
+ "zenodoCommunityId": "mce"
+ },
+ {
+ "zenodoCommunityId": "biogeochem"
+ },
+ {
+ "zenodoCommunityId": "ecc2014"
+ },
+ {
+ "zenodoCommunityId": "fisheries"
+ },
+ {
+ "zenodoCommunityId": "sedinstcjfas"
+ },
+ {
+ "zenodoCommunityId": "narmada"
+ },
+ {
+ "zenodoCommunityId": "umr-entropie"
+ },
+ {
+ "zenodoCommunityId": "farfish2020"
+ },
+ {
+ "zenodoCommunityId": "primefish"
+ },
+ {
+ "zenodoCommunityId": "zf-ilcs"
+ },
+ {
+ "zenodoCommunityId": "climefish"
+ },
+ {
+ "zenodoCommunityId": "afrimed_eu"
+ },
+ {
+ "zenodoCommunityId": "spi-ace"
+ },
+ {
+ "zenodoCommunityId": "cice-consortium"
+ },
+ {
+ "zenodoCommunityId": "nemo-ocean"
+ },
+ {
+ "zenodoCommunityId": "mesopp-h2020"
+ },
+ {
+ "zenodoCommunityId": "marxiv"
+ }
+ ]
+ },
+ "ni": {
+ "id": "ni",
+ "subjects": [
+ "brain mapping",
+ "brain imaging",
+ "electroencephalography",
+ "arterial spin labelling",
+ "brain fingerprinting",
+ "brain",
+ "neuroimaging",
+ "Multimodal Brain Image Analysis",
+ "fMRI",
+ "neuroinformatics",
+ "fetal brain",
+ "brain ultrasonic imaging",
+ "topographic brain mapping",
+ "diffusion tensor imaging",
+ "computerized knowledge assessment",
+ "connectome mapping",
+ "brain magnetic resonance imaging",
+ "brain abnormalities"
+ ],
+ "datasources": [
+ {
+ "openaireId": "re3data_____::5b9bf9171d92df854cf3c520692e9122"
+ },
+ {
+ "openaireId": "doajarticles::c7d3de67dc77af72f6747157441252ec"
+ },
+ {
+ "openaireId": "re3data_____::8515794670370f49c1d176c399c714f5"
+ },
+ {
+ "openaireId": "doajarticles::d640648c84b10d425f96f11c3de468f3"
+ },
+ {
+ "openaireId": "doajarticles::0c0e74daa5d95504eade9c81ebbd5b8a"
+ },
+ {
+ "openaireId": "rest________::fb1a3d4523c95e63496e3bc7ba36244b"
+ }
+ ],
+ "zenodoCommunities": [
+ {
+ "zenodoCommunityId": "neuroinformatics"
+ },
+ {
+ "zenodoCommunityId": "hbp"
+ },
+ {
+ "zenodoCommunityId": "from_neuroscience_to_machine_learning"
+ },
+ {
+ "zenodoCommunityId": "ci2c"
+ },
+ {
+ "zenodoCommunityId": "opensourcebrain"
+ },
+ {
+ "zenodoCommunityId": "brainspeak"
+ },
+ {
+ "zenodoCommunityId": "braincom"
+ },
+ {
+ "zenodoCommunityId": "nextgenvis"
+ },
+ {
+ "zenodoCommunityId": "meso-brain"
+ },
+ {
+ "zenodoCommunityId": "neuroplasticity-workshop"
+ },
+ {
+ "zenodoCommunityId": "bionics"
+ },
+ {
+ "zenodoCommunityId": "brainmattrain-676408"
+ },
+ {
+ "zenodoCommunityId": "repronim"
+ },
+ {
+ "zenodoCommunityId": "affectiveneuro"
+ },
+ {
+ "zenodoCommunityId": "con"
+ },
+ {
+ "zenodoCommunityId": "lab_neurol_sperim_irfmn_irccs_milano_it"
+ }
+ ]
+ },
+ "dariah": {
+ "id": "dariah",
+ "subjects": [
+
+ ],
+ "datasources": [
+ {
+ "openaireId": "opendoar____::7e7757b1e12abcb736ab9a754ffb617a",
+ "sc": {
+ "cl": {
+ "criteria": [
+ {
+ "ce": {
+ "constraint": [
+ {
+ "verb": "contains",
+ "field": "contributor",
+ "value": "DARIAH"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ }
+ }
+ ],
+ "zenodoCommunities": [
+ {
+ "zenodoCommunityId": "dimpo"
+ }
+ ]
+ },
+ "rda": {
+ "id": "rda",
+ "subjects": [
+
+ ],
+ "datasources": [
+
+ ],
+ "zenodoCommunities": [
+ {
+ "zenodoCommunityId": "rda"
+ }
+ ]
+ },
+ "dh-ch": {
+ "id": "dh-ch",
+ "subjects": [
+ "modern art",
+ "metadata",
+ "monuments",
+ "sites",
+ "field walking",
+ "frescoes",
+ "excavation",
+ "ontologies",
+ "mapping",
+ "cities",
+ "temples",
+ "lithics",
+ "roads",
+ "digital cultural heritage",
+ "interoperability",
+ "archaeological reports",
+ "churches",
+ "standards",
+ "archaeological stratigraphy",
+ "buidings",
+ "digital humanities",
+ "survey",
+ "archaeological sites",
+ "CIDOC CRM",
+ "decorations",
+ "classic art",
+ "stratigraphy",
+ "digital archaeology",
+ "walls",
+ "data science",
+ "chapels",
+ "paintings",
+ "archaeology",
+ "fair data",
+ "mosaics",
+ "data visualization",
+ "burials",
+ "medieval art",
+ "castles",
+ "statues",
+ "natural language processing",
+ "inscriptions",
+ "vaults",
+ "open data",
+ "contemporary art",
+ "3D",
+ "pottery",
+ "site",
+ "metadata schema",
+ "architectural",
+ "vessels"
+ ],
+ "datasources": [
+ {
+ "openaireId": "re3data_____::9ebe127e5f3a0bf401875690f3bb6b81"
+ },
+ {
+ "openaireId": "doajarticles::c6cd4b532e12868c1d760a8d7cda6815"
+ },
+ {
+ "openaireId": "doajarticles::a6de4499bb87bf3c01add0a9e2c9ed0b"
+ },
+ {
+ "openaireId": "doajarticles::6eb31d13b12bc06bbac06aef63cf33c9"
+ },
+ {
+ "openaireId": "doajarticles::0da84e9dfdc8419576169e027baa8028"
+ },
+ {
+ "openaireId": "re3data_____::84e123776089ce3c7a33db98d9cd15a8"
+ },
+ {
+ "openaireId": "openaire____::c5502a43e76feab55dd00cf50f519125"
+ },
+ {
+ "openaireId": "re3data_____::a48f09c562b247a9919acfe195549b47"
+ },
+ {
+ "openaireId": "opendoar____::97275a23ca44226c9964043c8462be96"
+ }
+ ],
+ "zenodoCommunities": [
+ {
+ "zenodoCommunityId": "storm"
+ },
+ {
+ "zenodoCommunityId": "crosscult"
+ },
+ {
+ "zenodoCommunityId": "wholodance_eu"
+ },
+ {
+ "zenodoCommunityId": "digcur2013"
+ },
+ {
+ "zenodoCommunityId": "gravitate"
+ },
+ {
+ "zenodoCommunityId": "dipp2014"
+ },
+ {
+ "zenodoCommunityId": "digitalhumanities"
+ },
+ {
+ "zenodoCommunityId": "dimpo"
+ },
+ {
+ "zenodoCommunityId": "adho"
+ },
+ {
+ "zenodoCommunityId": "chc"
+ },
+ {
+ "zenodoCommunityId": "wahr"
+ },
+ {
+ "zenodoCommunityId": "ibe"
+ },
+ {
+ "zenodoCommunityId": "ariadne"
+ },
+ {
+ "zenodoCommunityId": "parthenos-hub"
+ },
+ {
+ "zenodoCommunityId": "parthenos-training"
+ },
+ {
+ "zenodoCommunityId": "gandhara"
+ },
+ {
+ "zenodoCommunityId": "cmsouthasia"
+ },
+ {
+ "zenodoCommunityId": "nilgirihills"
+ },
+ {
+ "zenodoCommunityId": "shamsa_mustecio"
+ },
+ {
+ "zenodoCommunityId": "bodhgaya"
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/community_configuration.xml b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/community_configuration.xml
new file mode 100644
index 0000000000..8fec18593f
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/community_configuration.xml
@@ -0,0 +1,176 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ SDG13 - Climate action
+ SDG8 - Decent work and economic growth
+ SDG15 - Life on land
+ SDG2 - Zero hunger
+ SDG17 - Partnerships for the goals
+ SDG10 - Reduced inequalities
+ SDG5 - Gender equality
+ SDG12 - Responsible consumption and production
+ SDG14 - Life below water
+ SDG6 - Clean water and sanitation
+ SDG11 - Sustainable cities and communities
+ SDG1 - No poverty
+ SDG3 - Good health and well being
+ SDG7 - Affordable and clean energy
+ SDG4 - Quality education
+ SDG9 - Industry innovation and infrastructure
+ SDG16 - Peace justice and strong institutions
+
+
+
+
+ 123
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ brain mapping
+ brain imaging
+ electroencephalography
+ arterial spin labelling
+ brain fingerprinting
+ brain
+ neuroimaging
+ Multimodal Brain Image Analysis
+ fMRI
+ neuroinformatics
+ fetal brain
+ brain ultrasonic imaging
+ topographic brain mapping
+ diffusion tensor imaging
+ computerized knowledge assessment
+ connectome mapping
+ brain magnetic resonance imaging
+ brain abnormalities
+
+
+
+ re3data_____::5b9bf9171d92df854cf3c520692e9122
+
+
+
+ doajarticles::c7d3de67dc77af72f6747157441252ec
+
+
+
+ re3data_____::8515794670370f49c1d176c399c714f5
+
+
+
+ doajarticles::d640648c84b10d425f96f11c3de468f3
+
+
+
+ doajarticles::0c0e74daa5d95504eade9c81ebbd5b8a
+
+
+
+
+
+
+
+ marine
+ ocean
+ fish
+ aqua
+ sea
+
+
+
+ re3data_____::9633d1e8c4309c833c2c442abeb0cfeb
+
+
+
+
+
+
+
+ animal production and health
+ fisheries and aquaculture
+ food safety and human nutrition
+ information management
+ food technology
+ agri-food education and extension
+ natural resources and environment
+ food system
+ engineering technology and Research
+ agriculture
+ food safety risk assessment
+ food security
+ farming practices and systems
+ plant production and protection
+ agri-food economics and policy
+ food distribution
+ forestry
+
+
+
+ opendoar____::1a551829d50f1400b0dab21fdd969c04
+
+
+
+ opendoar____::49af6c4e558a7569d80eee2e035e2bd7
+
+
+
+ opendoar____::0266e33d3f546cb5436a10798e657d97
+
+
+
+ opendoar____::fd4c2dc64ccb8496e6f1f94c85f30d06
+
+
+
+ opendoar____::41bfd20a38bb1b0bec75acf0845530a7
+
+
+
+ opendoar____::87ae6fb631f7c8a627e8e28785d9992d
+
+
+
+
+
+
+ oac_clarin
+
+
+
+ re3data_____::a507cdacc5bbcc08761c92185dee5cab
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/community_configuration_selcrit.json b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/community_configuration_selcrit.json
new file mode 100644
index 0000000000..6aa4275d6d
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/community_configuration_selcrit.json
@@ -0,0 +1,37 @@
+{
+ "communities": {
+ "dariah": {
+ "id": "dariah",
+ "subjects": [
+
+ ],
+ "datasources": [
+ {
+ "openaireId": "opendoar____::7e7757b1e12abcb736ab9a754ffb617a",
+ "sc": {
+ "cl": {
+ "criteria": [
+ {
+ "ce": {
+ "constraint": [
+ {
+ "verb": "contains",
+ "field": "contributor",
+ "value": "DARIAH"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ }
+ }
+ ],
+ "zenodoCommunities": [
+ {
+ "zenodoCommunityId": "dimpo"
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/community_configuration_selcrit.xml b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/community_configuration_selcrit.xml
new file mode 100644
index 0000000000..ad31e17633
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/community_configuration_selcrit.xml
@@ -0,0 +1,193 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ SDG13 - Climate action
+ SDG8 - Decent work and economic growth
+ SDG15 - Life on land
+ SDG2 - Zero hunger
+ SDG17 - Partnerships for the goals
+ SDG10 - Reduced inequalities
+ SDG5 - Gender equality
+ SDG12 - Responsible consumption and production
+ SDG14 - Life below water
+ SDG6 - Clean water and sanitation
+ SDG11 - Sustainable cities and communities
+ SDG1 - No poverty
+ SDG3 - Good health and well being
+ SDG7 - Affordable and clean energy
+ SDG4 - Quality education
+ SDG9 - Industry innovation and infrastructure
+ SDG16 - Peace justice and strong institutions
+
+
+
+
+ 123
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ brain mapping
+ brain imaging
+ electroencephalography
+ arterial spin labelling
+ brain fingerprinting
+ brain
+ neuroimaging
+ Multimodal Brain Image Analysis
+ fMRI
+ neuroinformatics
+ fetal brain
+ brain ultrasonic imaging
+ topographic brain mapping
+ diffusion tensor imaging
+ computerized knowledge assessment
+ connectome mapping
+ brain magnetic resonance imaging
+ brain abnormalities
+
+
+
+ re3data_____::5b9bf9171d92df854cf3c520692e9122
+
+
+
+ doajarticles::c7d3de67dc77af72f6747157441252ec
+
+
+
+ re3data_____::8515794670370f49c1d176c399c714f5
+
+
+
+ doajarticles::d640648c84b10d425f96f11c3de468f3
+
+
+
+ doajarticles::0c0e74daa5d95504eade9c81ebbd5b8a
+
+
+
+
+
+
+
+ marine
+ ocean
+ fish
+ aqua
+ sea
+
+
+
+ re3data_____::9633d1e8c4309c833c2c442abeb0cfeb
+
+
+
+
+
+
+
+ animal production and health
+ fisheries and aquaculture
+ food safety and human nutrition
+ information management
+ food technology
+ agri-food education and extension
+ natural resources and environment
+ food system
+ engineering technology and Research
+ agriculture
+ food safety risk assessment
+ food security
+ farming practices and systems
+ plant production and protection
+ agri-food economics and policy
+ food distribution
+ forestry
+
+
+
+ opendoar____::1a551829d50f1400b0dab21fdd969c04
+
+
+
+ opendoar____::49af6c4e558a7569d80eee2e035e2bd7
+
+
+
+ opendoar____::0266e33d3f546cb5436a10798e657d97
+
+
+
+ opendoar____::fd4c2dc64ccb8496e6f1f94c85f30d06
+
+
+
+ opendoar____::41bfd20a38bb1b0bec75acf0845530a7
+
+
+
+ opendoar____::87ae6fb631f7c8a627e8e28785d9992d
+
+
+
+
+
+
+ oac_clarin
+
+
+
+ re3data_____::a507cdacc5bbcc08761c92185dee5cab
+
+
+
+
+
+
+ oaa_dariah
+
+
+
+ openaire____::1cfdb2e14977f31a98e0118283401f32
+ {"criteria":[{"constraint":[{"verb":"contains","field":"contributor","value":"DARIAH"}]}]}
+
+
+
+
+
+ dimpo
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/tagging_conf.json b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/tagging_conf.json
new file mode 100644
index 0000000000..c150e43861
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/communityconfiguration/tagging_conf.json
@@ -0,0 +1,20 @@
+{"communities":
+ {"ee":
+ {"id":"ee",
+ "subjects":["SDG13 - Climate action","SDG8 - Decent work and economic growth","SDG15 - Life on land","SDG2 - Zero hunger","SDG17 - Partnerships for the goals","SDG10 - Reduced inequalities","SDG5 - Gender equality","SDG12 - Responsible consumption and production","SDG14 - Life below water","SDG6 - Clean water and sanitation","SDG11 - Sustainable cities and communities","SDG1 - No poverty","SDG3 - Good health and well being","SDG7 - Affordable and clean energy","SDG4 - Quality education","SDG9 - Industry innovation and infrastructure","SDG16 - Peace justice and strong institutions"],
+ "datasources":[],
+ "zenodoCommunities":[],
+ "organizationCommunity":[]
+ },
+ "instruct":
+ {"id":"instruct",
+ "subjects":[],
+ "datasources":[],
+ "zenodoCommunities":[{"zenodoCommunityId":"instruct"},{"zenodoCommunityId":"west-life"}],"organizationCommunity":[]},
+ "egi":{"id":"egi","subjects":[],"datasources":[],"zenodoCommunities":[{"zenodoCommunityId":"zenodo"}],"organizationCommunity":[]},
+ "covid-19":{"id":"covid-19","subjects":["COVID-19","SARS-CoV-2","2019-nCoV","Severe acute respiratory syndrome coronavirus 2","2019 novel coronavirus","coronavirus disease 2019","coronavirus disease-19","HCoV-19","mesh:COVID-19","mesh:C000657245"],
+ "datasources":[{"openaireId":"opendoar____::358aee4cc897452c00244351e4d91f69","selectionConstraints":{"criteria":[{"constraint":[{"verb":"contains","field":"title","value":"COVID-19"}]},{"constraint":[{"verb":"contains","field":"title","value":"SARS-CoV-2"}]},{"constraint":[{"verb":"contains","field":"title","value":"2019-nCoV"}]}]}},{"openaireId":"re3data_____::7b0ad08687b2c960d5aeef06f811d5e6","selectionConstraints":{"criteria":[{"constraint":[{"verb":"contains","field":"title","value":"COVID-19"}
+ ]},{"constraint":[{"verb":"contains","field":"title","value":"SARS-CoV-2"}]},{"constraint":[{"verb":"contains","field":"title","value":"2019-nCoV"}]}]}},{"openaireId":"driver______::bee53aa31dc2cbb538c10c2b65fa5824","selectionConstraints":{"criteria":[{"constraint":[{"verb":"contains","field":"title","value":"COVID-19"}]},{"constraint":[{"verb":"contains","field":"title","value":"SARS-CoV-2"}]},{"constraint":[{"verb":"contains","field":"title","value":"2019-nCoV"}]}]}},{"openaireId":"openaire____::437f4b072b1aa198adcbc35910ff3b98","selectionConstraints":{"criteria":[{"constraint":[{"verb":"contains","field":"title","value":"COVID-19"}]},{"constraint":[{"verb":"contains","field":"title","value":"SARS-CoV-2"}]},{"constraint":[{"verb":"contains","field":"title","value":"2019-nCoV"}]}]}},{"openaireId":"openaire____::081b82f96300b6a6e3d282bad31cb6e2","selectionConstraints":{"criteria":[{"constraint":[{"verb":"contains","field":"title","value":"COVID-19"}]},{"constraint":[{"verb":"contains","field":"title","value":"SARS-CoV-2"}]},{"constraint":[{"verb":"contains","field":"title","value":"2019-nCoV"}]}]}},{"openaireId":"openaire____::9e3be59865b2c1c335d32dae2fe7b254","selectionConstraints":{"criteria":[{"constraint":[{"verb":"contains","field":"title","value":"COVID-19"}]},{"constraint":[{"verb":"contains","field":"title","value":"SARS-CoV-2"}]}
+ ,{"constraint":[{"verb":"contains","field":"title","value":"2019-nCoV"}]}]}},{"openaireId":"opendoar____::8b6dd7db9af49e67306feb59a8bdc52c","selectionConstraints":{"criteria":[{"constraint":[{"verb":"contains","field":"title","value":"COVID-19"}]},{"constraint":[{"verb":"contains","field":"title","value":"SARS-CoV-2"}]},{"constraint":[{"verb":"contains","field":"title","value":"2019-nCoV"}]}]}},{"openaireId":"share_______::4719356ec8d7d55d3feb384ce879ad6c","selectionConstraints":{"criteria":[{"constraint":[{"verb":"contains","field":"title","value":"COVID-19"}]},{"constraint":[{"verb":"contains","field":"title","value":"SARS-CoV-2"}]},{"constraint":[{"verb":"contains","field":"title","value":"2019-nCoV"}]}]}},{"openaireId":"share_______::bbd802baad85d1fd440f32a7a3a2c2b1","selectionConstraints":{"criteria":[{"constraint":[{"verb":"contains","field":"title","value":"COVID-19"}]},{"constraint":[{"verb":"contains","field":"title","value":"SARS-CoV-2"}]},{"constraint":[{"verb":"contains","field":"title","value":"2019-nCoV"}]}]}},{"openaireId":"opendoar____::6f4922f45568161a8cdf4ad2299f6d23","selectionConstraints":{"criteria":[{"constraint":[{"verb":"contains","field":"title","value":"COVID-19"}]},
+ {"constraint":[{"verb":"contains","field":"title","value":"SARS-CoV-2"}]},{"constraint":[{"verb":"contains","field":"title","value":"2019-nCoV"}]}]}}],"zenodoCommunities":[{"zenodoCommunityId":"chicago-covid-19"},{"zenodoCommunityId":"covid-19-senacyt-panama-sample"},{"zenodoCommunityId":"covid-19-tx-rct-stats-review"},{"zenodoCommunityId":"covid_19_senacyt_abc_panama"}],"organizationCommunity":[]},
+ "dariah":{"id":"dariah","subjects":[],"datasources":[{"openaireId":"opendoar____::7e7757b1e12abcb736ab9a754ffb617a","selectionConstraints":{"criteria":[{"constraint":[{"verb":"contains","field":"contributor","value":"DARIAH"}]}]}},{"openaireId":"opendoar____::96da2f590cd7246bbde0051047b0d6f7","selectionConstraints":{"criteria":[{"constraint":[{"verb":"contains","field":"contributor","value":"DARIAH"}]}]}}],"zenodoCommunities":[{"zenodoCommunityId":"dimpo"}],"organizationCommunity":[]},"rda":{"id":"rda","subjects":[],"datasources":[],"zenodoCommunities":[{"zenodoCommunityId":"rda"}],"organizationCommunity":[]},"clarin":{"id":"clarin","subjects":[],"datasources":[{"openaireId":"re3data_____::a507cdacc5bbcc08761c92185dee5cab"}],"zenodoCommunities":[],"organizationCommunity":[]},"aginfra":{"id":"aginfra","subjects":["animal production and health","fisheries and aquaculture","food safety and human nutrition","information management","food technology","agri-food education and extension","natural resources and environment","food system","engineering technology and Research","agriculture","food safety risk assessment","food security","farming practices and systems","plant production and protection","agri-food economics and policy","Agri-food","food distribution","forestry"],"datasources":[{"openaireId":"opendoar____::1a551829d50f1400b0dab21fdd969c04"},{"openaireId":"opendoar____::49af6c4e558a7569d80eee2e035e2bd7"},{"openaireId":"opendoar____::0266e33d3f546cb5436a10798e657d97"},{"openaireId":"opendoar____::fd4c2dc64ccb8496e6f1f94c85f30d06"},{"openaireId":"opendoar____::41bfd20a38bb1b0bec75acf0845530a7"},{"openaireId":"opendoar____::87ae6fb631f7c8a627e8e28785d9992d"}],"zenodoCommunities":[{"zenodoCommunityId":"edenis"},{"zenodoCommunityId":"efsa-pilot"},{"zenodoCommunityId":"egene3"},{"zenodoCommunityId":"efsa-kj"},{"zenodoCommunityId":"euromixproject"},{"zenodoCommunityId":"discardless"},{"zenodoCommunityId":"sedinstcjfst"},{"zenodoCommunityId":"afinet-kc"},{"zenodoCommunityId":"2231-4784"},{"zenodoCommunityId":"2231-0606"},{"zenodoCommunityId":"solace"},{"zenodoCommunityId":"pa17"},{"zenodoCommunityId":"smartakis"},{"zenodoCommunityId":"sedinstcjae"},{"zenodoCommunityId":"phenology_camera"},{"zenodoCommunityId":"aginfra"},{"zenodoCommunityId":"erosa"},{"zenodoCommunityId":"bigdatagrapes"}],"organizationCommunity":[]},"fam":{"id":"fam","subjects":["Stock Assessment","pelagic","Acoustic","Fish farming","Fisheries","Fishermen","maximum sustainable yield","trawler","Fishing vessel","Fisherman","Fishing gear","mackerel","RFMO","Fish Aggregating Device","Bycatch","Fishery","common fisheries policy","Fishing fleet","Aquaculture"],"datasources":[{"openaireId":"doajarticles::8cec81178926caaca531afbd8eb5d64c"},{"openaireId":"doajarticles::0f7a7f30b5400615cae1829f3e743982"},{"openaireId":"doajarticles::9740f7f5af3e506d2ad2c215cdccd51a"},{"openaireId":"doajarticles::9f3fbaae044fa33cb7069b72935a3254"},{"openaireId":"doajarticles::cb67f33eb9819f5c624ce0313957f6b3"},{"openaireId":"doajarticles::e21c97cbb7a209afc75703681c462906"},{"openaireId":"doajarticles::554cde3be9e5c4588b4c4f9f503120cb"},{"openaireId":"tubitakulakb::11e22f49e65b9fd11d5b144b93861a1b"},{"openaireId":"doajarticles::57c5d3837da943e93b28ec4db82ec7a5"},{"openaireId":"doajarticles::a186f5ddb8e8c7ecc992ef51cf3315b1"},{"openaireId":"doajarticles::e21c97cbb7a209afc75703681c462906"},{"openaireId":"doajarticles::dca64612dfe0963fffc119098a319957"},{"openaireId":"doajarticles::dd70e44479f0ade25aa106aef3e87a0a"}],"zenodoCommunities":[{"zenodoCommunityId":"discardless"},{"zenodoCommunityId":"farfish2020"},{"zenodoCommunityId":"facts"},{"zenodoCommunityId":"climefish"},{"zenodoCommunityId":"proeel"},{"zenodoCommunityId":"primefish"},{"zenodoCommunityId":"h2020_vicinaqua"},{"zenodoCommunityId":"meece"},{"zenodoCommunityId":"rlsadb"},{"zenodoCommunityId":"iotc_ctoi"}],"organizationCommunity":[]},"beopen":{"id":"beopen","subjects":["Green Transport","City mobility systems","Vulnerable road users","Traffic engineering","Transport electrification","Mobility","Intermodal freight transport","Clean vehicle fleets","Intelligent mobility","Inflight refueling","District mobility systems","Navigation and control systems for optimised planning and routing","European Space Technology Platform","European Transport networks","Green cars","Inter-modality infrastructures","Advanced Take Off and Landing Ideas","Sustainable urban systems","port-area railway networks","Innovative forms of urban transport","Alliance for Logistics Innovation through Collaboration in Europe","Advisory Council for Aeronautics Research in Europe","Mobility services for people and goods","Guidance and traffic management","Passenger mobility","Smart mobility and services","transport innovation","high-speed railway","Vehicle design","Inland shipping","public transportation","aviation’s climate impact","Road transport","On-demand public transport","Personal Air Transport","Transport","transport vulnerability","Pipeline transport","European Association of Aviation Training and Education Organisations","Defrosting of railway infrastructure","Inclusive and affordable transport","River Information Services","jel:L92","Increased use of public transport","Seamless mobility","STRIA","trolleybus transport","Intelligent Transport System","Low-emission alternative energy for transport","Shared mobility for people and goods","Business model for urban mobility","Interoperability of transport systems","Cross-border train slot booking","Air transport","Transport pricing","Sustainable transport","European Rail Transport Research Advisory Council","Alternative aircraft configurations","Transport and Mobility","Railways applications","urban transport","Environmental impact of transport","urban freight delivery systems","Automated Road Transport","Alternative fuels in public transport","Active LIDAR-sensor for GHG-measurements","Autonomous logistics operations","Rational use of motorised transport","Network and traffic management systems","electrification of railway wagons","Single European Sky","Electrified road systems","transportation planning","Railway dynamics","Motorway of the Sea","smart railway communications","Maritime transport","Environmental- friendly transport","Combined transport","Connected automated driving technology","Innovative freight logistics services","automated and shared vehicles","Alternative Aircraft Systems","Land-use and transport interaction","Public transport system","Business plan for shared mobility","Shared mobility","Growing of mobility demand","European Road Transport Research Advisory Council","WATERBORNE ETP","Effective transport management system","Short Sea Shipping","air traffic management","Sea hubs and the motorways of the sea","Urban mobility solutions","Smart city planning","Maritime spatial planning","EUropean rail Research Network of Excellence","Transport governance","ENERGY CONSUMPTION BY THE TRANSPORT SECTOR","Integrated urban plan","inland waterway services","European Conference of Transport Research Institutes","air vehicles","E-freight","Automated Driving","Automated ships","pricing for cross-border passenger transport","Vehicle efficiency","Railway transport","Electric vehicles","Road traffic monitoring","Deep sea shipping","Circular economy in transport","Traffic congestion","air transport system","Urban logistics","Rail transport","OpenStreetMap","high speed rail","Transportation engineering","Intermodal travel information","Flight Data Recorders","Advanced driver assistance systems","long distance freight transport","Inland waterway transport","Smart mobility","Mobility integration","Personal Rapid Transit system","Safety measures \\u0026 requirements for roads","Green rail transport","Electrical","Vehicle manufacturing","Future Airport Layout","Rail technologies","European Intermodal Research Advisory Council","inland navigation","Automated urban vehicles","ECSS-standards","Traveller services","Polluting transport","Air Traffic Control","Cooperative and connected and automated transport","Innovative powertrains","Quality of transport system and services","door-to- door logistics chain","Inter-modal aspects of urban mobility","travel (and mobility)","Innovative freight delivery systems","urban freight delivery infrastructures"],"datasources":[{"openaireId":"doajarticles::1c5bdf8fca58937894ad1441cca99b76"},{"openaireId":"doajarticles::b37a634324a45c821687e6e80e6f53b4"},{"openaireId":"doajarticles::4bf64f2a104040e4e055cd9594b2d77c"},{"openaireId":"doajarticles::479ca537c12755d1868bbf02938a900c"},{"openaireId":"doajarticles::55f31df96a60e2309f45b7c265fcf7a2"},{"openaireId":"doajarticles::c52a09891a5301f9986ebbfe3761810c"},{"openaireId":"doajarticles::379807bc7f6c71a227ef1651462c414c"},{"openaireId":"doajarticles::36069db531a00b85a2e8fb301f4bdc19"},{"openaireId":"doajarticles::b6a898da311ded96fabf49c520b80d5d"},{"openaireId":"doajarticles::d0753d9180b35a271d8b4a31f449749f"},{"openaireId":"doajarticles::172050a92511838393a3fe237ae47e31"},{"openaireId":"doajarticles::301ed96c62abb160a3e29796efe5c95c"},{"openaireId":"doajarticles::0f4f805b3d842f2c7f1b077c3426fa59"},{"openaireId":"doajarticles::ba73728b84437b8d48ae287b867c7215"},{"openaireId":"doajarticles::86faef424d804309ccf45f692523aa48"},{"openaireId":"doajarticles::73bd758fa41671de70964c3ecba013af"},{"openaireId":"doajarticles::e661fc0bdb24af42b740a08f0ddc6cf4"},{"openaireId":"doajarticles::a6d3052047d5dbfbd43d95b4afb0f3d7"},{"openaireId":"doajarticles::ca61df07089acc53a1569bde6673d82a"},{"openaireId":"doajarticles::237dd6f1606600459d0297abd8ed9976"},{"openaireId":"doajarticles::fba6191177ede7c51ea1cdf58eae7f8b"}],"zenodoCommunities":[{"zenodoCommunityId":"jsdtl"},{"zenodoCommunityId":"utc-martrec"},{"zenodoCommunityId":"utc-uti"},{"zenodoCommunityId":"stp"},{"zenodoCommunityId":"c2smart"},{"zenodoCommunityId":"stride-utc"},{"zenodoCommunityId":"crowd4roads"},{"zenodoCommunityId":"lemo"},{"zenodoCommunityId":"imov3d"},{"zenodoCommunityId":"tra2018"},{"zenodoCommunityId":"optimum"},{"zenodoCommunityId":"stars"},{"zenodoCommunityId":"iecteim"},{"zenodoCommunityId":"iccpt2019"}],"organizationCommunity":[]},"science-innovation-policy":{"id":"science-innovation-policy","subjects":[],"datasources":[],"zenodoCommunities":[{"zenodoCommunityId":"risis"}],"organizationCommunity":[]},"mes":{"id":"mes","subjects":["marine","ocean","fish","aqua","sea"],"datasources":[],"zenodoCommunities":[{"zenodoCommunityId":"adriplan"},{"zenodoCommunityId":"devotes-project"},{"zenodoCommunityId":"euro-basin"},{"zenodoCommunityId":"naclim"},{"zenodoCommunityId":"discardless"},{"zenodoCommunityId":"assisibf"},{"zenodoCommunityId":"meece"},{"zenodoCommunityId":"facts"},{"zenodoCommunityId":"proeel"},{"zenodoCommunityId":"aquatrace"},{"zenodoCommunityId":"myfish"},{"zenodoCommunityId":"atlas"},{"zenodoCommunityId":"blue-actionh2020"},{"zenodoCommunityId":"sponges"},{"zenodoCommunityId":"merces_project"},{"zenodoCommunityId":"bigdataocean"},{"zenodoCommunityId":"columbus"},{"zenodoCommunityId":"h2020-aquainvad-ed"},{"zenodoCommunityId":"aquarius"},{"zenodoCommunityId":"southern-ocean-observing-system"},{"zenodoCommunityId":"eawag"},{"zenodoCommunityId":"mossco"},{"zenodoCommunityId":"onc"},{"zenodoCommunityId":"oceanbiogeochemistry"},{"zenodoCommunityId":"oceanliteracy"},{"zenodoCommunityId":"openearth"},{"zenodoCommunityId":"ocean"},{"zenodoCommunityId":"calcifierraman"},{"zenodoCommunityId":"bermudabream"},{"zenodoCommunityId":"brcorp1"},{"zenodoCommunityId":"mce"},{"zenodoCommunityId":"biogeochem"},{"zenodoCommunityId":"ecc2014"},{"zenodoCommunityId":"fisheries"},{"zenodoCommunityId":"sedinstcjfas"},{"zenodoCommunityId":"narmada"},{"zenodoCommunityId":"umr-entropie"},{"zenodoCommunityId":"farfish2020"},{"zenodoCommunityId":"primefish"},{"zenodoCommunityId":"zf-ilcs"},{"zenodoCommunityId":"climefish"},{"zenodoCommunityId":"afrimed_eu"},{"zenodoCommunityId":"spi-ace"},{"zenodoCommunityId":"cice-consortium"},{"zenodoCommunityId":"nemo-ocean"},{"zenodoCommunityId":"mesopp-h2020"},{"zenodoCommunityId":"marxiv"}],"organizationCommunity":[]},"ni":{"id":"ni","subjects":["brain mapping","brain imaging","electroencephalography","arterial spin labelling","brain fingerprinting","brain","neuroimaging","Multimodal Brain Image Analysis","fMRI","neuroinformatics","fetal brain","brain ultrasonic imaging","topographic brain mapping","diffusion tensor imaging","computerized knowledge assessment","connectome mapping","brain magnetic resonance imaging","brain abnormalities"],"datasources":[{"openaireId":"re3data_____::5b9bf9171d92df854cf3c520692e9122"},{"openaireId":"doajarticles::c7d3de67dc77af72f6747157441252ec"},{"openaireId":"re3data_____::8515794670370f49c1d176c399c714f5"},{"openaireId":"doajarticles::d640648c84b10d425f96f11c3de468f3"},{"openaireId":"doajarticles::0c0e74daa5d95504eade9c81ebbd5b8a"},{"openaireId":"rest________::fb1a3d4523c95e63496e3bc7ba36244b"}],"zenodoCommunities":[{"zenodoCommunityId":"neuroinformatics"},{"zenodoCommunityId":"hbp"},{"zenodoCommunityId":"from_neuroscience_to_machine_learning"},{"zenodoCommunityId":"ci2c"},{"zenodoCommunityId":"opensourcebrain"},{"zenodoCommunityId":"brainspeak"},{"zenodoCommunityId":"braincom"},{"zenodoCommunityId":"nextgenvis"},{"zenodoCommunityId":"meso-brain"},{"zenodoCommunityId":"neuroplasticity-workshop"},{"zenodoCommunityId":"bionics"},{"zenodoCommunityId":"brainmattrain-676408"},{"zenodoCommunityId":"repronim"},{"zenodoCommunityId":"affectiveneuro"},{"zenodoCommunityId":"con"},{"zenodoCommunityId":"lab_neurol_sperim_irfmn_irccs_milano_it"}],"organizationCommunity":[]},"dh-ch":{"id":"dh-ch","subjects":["modern art","monuments","europeana data model","sites","field walking","frescoes","LIDO metadata schema","art history","excavation","Arts and Humanities General","cities","coins","temples","numismatics","lithics","roads","environmental archaeology","digital cultural heritage","archaeological reports","history","CRMba","churches","cultural heritage","archaeological stratigraphy","religious art","buidings","digital humanities","survey","archaeological sites","linguistic studies","bioarchaeology","architectural orders","palaeoanthropology","fine arts","europeana","CIDOC CRM","decorations","classic art","stratigraphy","digital archaeology","intangible cultural heritage","walls","humanities","chapels","CRMtex","Language and Literature","paintings","archaeology","fair data","mosaics","burials","architecture","medieval art","castles","CARARE metadata schema","statues","natural language processing","inscriptions","CRMsci","vaults","contemporary art","Arts and Humanities","CRMarchaeo","pottery","site","architectural","vessels"],"datasources":[{"openaireId":"re3data_____::9ebe127e5f3a0bf401875690f3bb6b81"},{"openaireId":"doajarticles::c6cd4b532e12868c1d760a8d7cda6815"},{"openaireId":"doajarticles::a6de4499bb87bf3c01add0a9e2c9ed0b"},{"openaireId":"doajarticles::6eb31d13b12bc06bbac06aef63cf33c9"},{"openaireId":"doajarticles::0da84e9dfdc8419576169e027baa8028"},{"openaireId":"re3data_____::84e123776089ce3c7a33db98d9cd15a8"},{"openaireId":"openaire____::c5502a43e76feab55dd00cf50f519125"},{"openaireId":"re3data_____::a48f09c562b247a9919acfe195549b47"},{"openaireId":"opendoar____::97275a23ca44226c9964043c8462be96"}],"zenodoCommunities":[{"zenodoCommunityId":"storm"},{"zenodoCommunityId":"crosscult"},{"zenodoCommunityId":"wholodance_eu"},{"zenodoCommunityId":"digcur2013"},{"zenodoCommunityId":"gravitate"},{"zenodoCommunityId":"dipp2014"},{"zenodoCommunityId":"digitalhumanities"},{"zenodoCommunityId":"dimpo"},{"zenodoCommunityId":"adho"},{"zenodoCommunityId":"chc"},{"zenodoCommunityId":"wahr"},{"zenodoCommunityId":"ibe"},{"zenodoCommunityId":"ariadne"},{"zenodoCommunityId":"parthenos-hub"},{"zenodoCommunityId":"parthenos-training"},{"zenodoCommunityId":"gandhara"},{"zenodoCommunityId":"cmsouthasia"},{"zenodoCommunityId":"nilgirihills"},{"zenodoCommunityId":"shamsa_mustecio"},{"zenodoCommunityId":"bodhgaya"}],"organizationCommunity":[]}}}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/sample/dataset/dataset_10.json.gz b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/sample/dataset/dataset_10.json.gz
new file mode 100644
index 0000000000..bd29d59ae9
Binary files /dev/null and b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/sample/dataset/dataset_10.json.gz differ
diff --git a/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/sample/otherresearchproduct/otherresearchproduct_10.json.gz b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/sample/otherresearchproduct/otherresearchproduct_10.json.gz
new file mode 100644
index 0000000000..20b6a4dba3
Binary files /dev/null and b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/sample/otherresearchproduct/otherresearchproduct_10.json.gz differ
diff --git a/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/sample/publication/publication_10.json.gz b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/sample/publication/publication_10.json.gz
new file mode 100644
index 0000000000..257e0db3a4
Binary files /dev/null and b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/sample/publication/publication_10.json.gz differ
diff --git a/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/sample/software/software_10.json.gz b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/sample/software/software_10.json.gz
new file mode 100644
index 0000000000..a5b8c8774c
Binary files /dev/null and b/dhp-workflows/dhp-bulktag/src/test/resources/eu/dnetlib/dhp/sample/software/software_10.json.gz differ
diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob4.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob4.java
new file mode 100644
index 0000000000..0e39090dd7
--- /dev/null
+++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob4.java
@@ -0,0 +1,473 @@
+package eu.dnetlib.dhp.resulttocommunityfromsemrel;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.QueryInformationSystem;
+import eu.dnetlib.dhp.TypedRow;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import scala.Tuple2;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static eu.dnetlib.dhp.PropagationConstant.*;
+
+public class SparkResultToCommunityThroughSemRelJob3 {
+ public static void main(String[] args) throws Exception {
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils
+ .toString(SparkResultToCommunityThroughSemRelJob3.class
+ .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json")));
+ parser.parseArgument(args);
+
+ SparkConf conf = new SparkConf();
+ conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
+ final SparkSession spark = SparkSession
+ .builder()
+ .appName(SparkResultToCommunityThroughSemRelJob3.class.getSimpleName())
+ .master(parser.get("master"))
+ .config(conf)
+ .enableHiveSupport()
+ .getOrCreate();
+
+ final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+ final String inputPath = parser.get("sourcePath");
+ final String outputPath = "/tmp/provision/propagation/resulttocommunityfromsemrel";
+
+ final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
+
+ final List communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl"));
+
+ createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
+
+
+ JavaRDD publication_rdd = sc.textFile(inputPath + "/publication")
+ .map(item -> new ObjectMapper().readValue(item, Publication.class));
+
+ JavaRDD dataset_rdd = sc.textFile(inputPath + "/dataset")
+ .map(item -> new ObjectMapper().readValue(item, Dataset.class));
+
+ JavaRDD orp_rdd = sc.textFile(inputPath + "/otherresearchproduct")
+ .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class));
+
+ JavaRDD software_rdd = sc.textFile(inputPath + "/software")
+ .map(item -> new ObjectMapper().readValue(item, Software.class));
+
+ JavaRDD relation_rdd = sc.textFile(inputPath + "/relation")
+ .map(item -> new ObjectMapper().readValue(item, Relation.class));
+
+
+ org.apache.spark.sql.Dataset publication = spark.createDataset(publication_rdd.rdd(),
+ Encoders.bean(Publication.class));
+
+ org.apache.spark.sql.Dataset relation = spark.createDataset(relation_rdd.rdd(),
+ Encoders.bean(Relation.class));
+
+ org.apache.spark.sql.Dataset dataset = spark.createDataset(dataset_rdd.rdd(),
+ Encoders.bean(Dataset.class));
+
+ org.apache.spark.sql.Dataset other = spark.createDataset(orp_rdd.rdd(),
+ Encoders.bean(OtherResearchProduct.class));
+
+ org.apache.spark.sql.Dataset software = spark.createDataset(software_rdd.rdd(),
+ Encoders.bean(Software.class));
+
+
+ publication.createOrReplaceTempView("publication");
+ relation.createOrReplaceTempView("relation");
+ dataset.createOrReplaceTempView("dataset");
+ software.createOrReplaceTempView("software");
+ other.createOrReplaceTempView("other");
+
+ String communitylist = getConstraintList(" co.id = '", communityIdList);
+
+ String semrellist = getConstraintList(" relClass = '", allowedsemrel );
+
+
+ String query = "Select source, community_context, target " +
+ "from (select id, collect_set(co.id) community_context " +
+ "from publication " +
+ "lateral view explode (context) c as co " +
+ "where datainfo.deletedbyinference = false "+ communitylist +
+ " group by id) p " +
+ "JOIN " +
+ "(select * " +
+ "from relation " +
+ "where datainfo.deletedbyinference = false " + semrellist + ") r " +
+ "ON p.id = r.source";
+
+
+ org.apache.spark.sql.Dataset publication_context = spark.sql( query);
+ publication_context.createOrReplaceTempView("publication_context");
+
+ //( source, (mes, dh-ch-, ni), target )
+ query = "select target , collect_set(co) " +
+ "from (select target, community_context " +
+ "from publication_context pc join publication p on " +
+ "p.id = pc.source) tmp " +
+ "lateral view explode (community_context) c as co " +
+ "group by target";
+
+
+
+ org.apache.spark.sql.Dataset toupdatepublicationreresult = spark.sql(query);
+ org.apache.spark.sql.Dataset toupdatesoftwareresult = getUpdateCommunitiesForTable(spark, "software");
+ org.apache.spark.sql.Dataset toupdatedatasetresult = getUpdateCommunitiesForTable(spark, "dataset");
+ org.apache.spark.sql.Dataset toupdateotherresult = getUpdateCommunitiesForTable(spark, "other");
+
+ createUpdateForResultDatasetWrite(toupdatesoftwareresult.toJavaRDD(), outputPath, "software_update",
+ PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
+
+ createUpdateForResultDatasetWrite(toupdatedatasetresult.toJavaRDD(), outputPath, "dataset_update",
+ PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
+
+ createUpdateForResultDatasetWrite(toupdatepublicationreresult.toJavaRDD(), outputPath, "publication_update",
+ PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
+
+ createUpdateForResultDatasetWrite(toupdateotherresult.toJavaRDD(), outputPath, "other_update",
+ PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
+
+ updateForDatasetDataset(toupdatedatasetresult.toJavaRDD(), dataset.toJavaRDD(), outputPath, "dataset",
+ PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
+
+ updateForOtherDataset(toupdateotherresult.toJavaRDD(), other.toJavaRDD(), outputPath, "otherresearchproduct",
+ PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
+
+ updateForSoftwareDataset(toupdatesoftwareresult.toJavaRDD(), software.toJavaRDD(), outputPath, "software",
+ PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
+
+ updateForPublicationDataset(toupdatepublicationreresult.toJavaRDD(), publication.toJavaRDD(), outputPath, "publication",
+ PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, communityIdList);
+//
+
+/*
+ JavaPairRDD resultLinkedToCommunities = publication
+ .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication"))
+ .filter(p -> !(p == null))
+ .mapToPair(toPair())
+ .union(datasets
+ .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset"))
+ .filter(p -> !(p == null))
+ .mapToPair(toPair())
+ )
+ .union(software
+ .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software"))
+ .filter(p -> !(p == null))
+ .mapToPair(toPair())
+ )
+ .union(other
+ .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct"))
+ .filter(p -> !(p == null))
+ .mapToPair(toPair())
+ );
+
+ JavaPairRDD to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId()))
+ .mapToPair(toPair());
+
+ JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p));
+ JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p));
+ JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p));
+ JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p));
+
+ updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
+ updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
+ updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
+ updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME);
+ //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato]
+ //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla
+*/
+ }
+
+ private static org.apache.spark.sql.Dataset getUpdateCommunitiesForTable(SparkSession spark, String table){
+ String query = "SELECT target_id, collect_set(co.id) context_id " +
+ " FROM (SELECT t.id target_id, s.context source_context " +
+ " FROM context_software s " +
+ " JOIN " + table + " t " +
+ " ON s.target = t.id " +
+ " UNION ALL " +
+ " SELECT t.id target_id, d.context source_context " +
+ " FROM dataset_context d " +
+ " JOIN " + table + " t" +
+ " ON s.target = t.id " +
+ " UNION ALL " +
+ " SELECT t.id target_id, p.context source_context " +
+ " FROM publication_context p" +
+ " JOIN " + table +" t " +
+ " on p.target = t.id " +
+ " UNION ALL " +
+ " SELECT t.id target_id, o.context source_context " +
+ " FROM other_context o " +
+ " JOIN " + table + " t " +
+ " ON o.target = t.id) TMP " +
+ " LATERAL VIEW EXPLODE(source_context) MyT as co " +
+ " GROUP BY target_id" ;
+
+ return spark.sql(query);
+ }
+
+ private static JavaRDD createUpdateForResultDatasetWrite(JavaRDD toupdateresult, String outputPath, String type, String class_id, String class_name, List communityIdList){
+ return toupdateresult.map(r -> {
+ List contextList = new ArrayList();
+ List toAddContext = r.getList(1);
+ for (String cId : toAddContext) {
+ if (communityIdList.contains(cId)) {
+ Context newContext = new Context();
+ newContext.setId(cId);
+ newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
+ contextList.add(newContext);
+ }
+
+ }
+
+ if (contextList.size() > 0) {
+ Result ret = new Result();
+ ret.setId(r.getString(0));
+ ret.setContext(contextList);
+ return ret;
+ }
+ return null;
+ }).filter(r -> r != null);
+ }
+
+ private static void updateForSoftwareDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){
+ JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
+ getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
+ .map(r -> (Software) r)
+ .map(s -> new ObjectMapper().writeValueAsString(s))
+ .saveAsTextFile(outputPath + "/" + type);
+ }
+
+ private static void updateForDatasetDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){
+ JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
+ getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
+ .map( r-> (Dataset)r)
+ .map(d -> new ObjectMapper().writeValueAsString(d))
+ .saveAsTextFile(outputPath + "/" + type);
+ }
+
+ private static void updateForPublicationDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){
+ JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
+ getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
+ .map(r -> (Publication)r)
+ .map(p -> new ObjectMapper().writeValueAsString(p))
+ .saveAsTextFile(outputPath + "/" + type);
+ }
+
+ private static void updateForOtherDataset(JavaRDD toupdateresult, JavaRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){
+ JavaPairRDD tmp = result.mapToPair(r -> new Tuple2(r.getId(), r));
+ getUpdateForResultDataset(toupdateresult, tmp, outputPath, type, class_id, class_name, communityIdList)
+ .map( r -> (OtherResearchProduct)r)
+ .map( o -> new ObjectMapper().writeValueAsString(o))
+ .saveAsTextFile(outputPath + "/" + type);
+ }
+
+
+
+ private static JavaRDD getUpdateForResultDataset(JavaRDD toupdateresult, JavaPairRDD result, String outputPath, String type, String class_id, String class_name, List communityIdList){
+ return result.leftOuterJoin(toupdateresult.mapToPair(r -> new Tuple2<>(r.getString(0), r.getList(1))))
+ .map(c -> {
+ if(! c._2()._2().isPresent()){
+ return c._2()._1();
+ }
+
+ List