diff --git a/dhp-workflows/dhp-bulktag/pom.xml b/dhp-workflows/dhp-bulktag/pom.xml
index 5b6a21027..3fa311315 100644
--- a/dhp-workflows/dhp-bulktag/pom.xml
+++ b/dhp-workflows/dhp-bulktag/pom.xml
@@ -5,11 +5,46 @@
dhp-workflows
eu.dnetlib.dhp
- 1.0.5-SNAPSHOT
+ 1.1.6-SNAPSHOT
4.0.0
dhp-bulktag
+
+
+ org.apache.spark
+ spark-core_2.11
+
+
+ org.apache.spark
+ spark-sql_2.11
+
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ ${project.version}
+
+
+ eu.dnetlib.dhp
+ dhp-schemas
+ ${project.version}
+
+
+ dom4j
+ dom4j
+
+
+ com.jayway.jsonpath
+ json-path
+
+
+ org.reflections
+ reflections
+ 0.9.11
+ compile
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Community.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Community.java
new file mode 100644
index 000000000..eebeb05b3
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Community.java
@@ -0,0 +1,64 @@
+package eu.dnetlib.dhp;
+
+import com.google.gson.Gson;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created by miriam on 01/08/2018.
+ */
+public class Community {
+
+ private static final Log log = LogFactory.getLog(Community.class);
+
+ private String id;
+ private List subjects = new ArrayList<>();
+ private List datasources = new ArrayList<>();
+ private List zenodoCommunities = new ArrayList<>();
+
+
+ public String toJson() {
+ final Gson g = new Gson();
+ return g.toJson(this);
+ }
+
+ public boolean isValid() {
+ return !getSubjects().isEmpty() || !getDatasources().isEmpty() || !getZenodoCommunities().isEmpty();
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public List getSubjects() {
+ return subjects;
+ }
+
+ public void setSubjects(List subjects) {
+ this.subjects = subjects;
+ }
+
+ public List getDatasources() {
+ return datasources;
+ }
+
+ public void setDatasources(List datasources) {
+ this.datasources = datasources;
+ }
+
+ public List getZenodoCommunities() {
+ return zenodoCommunities;
+ }
+
+ public void setZenodoCommunities(List zenodoCommunities) {
+ this.zenodoCommunities = zenodoCommunities;
+ }
+
+}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/CommunityConfiguration.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/CommunityConfiguration.java
new file mode 100644
index 000000000..6bf5a1a80
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/CommunityConfiguration.java
@@ -0,0 +1,170 @@
+package eu.dnetlib.dhp;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+
+import com.google.gson.GsonBuilder;
+
+import eu.dnetlib.dhp.selectioncriteria.InterfaceAdapter;
+import eu.dnetlib.dhp.selectioncriteria.Selection;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Created by miriam on 02/08/2018.
+ */
+public class CommunityConfiguration {
+
+ private static final Log log = LogFactory.getLog(CommunityConfiguration.class);
+
+
+ private Map communities;
+
+
+ //map subject -> communityid
+ private transient Map>> subjectMap = new HashMap<>();
+ //map datasourceid -> communityid
+ private transient Map>> datasourceMap = new HashMap<>();
+ //map zenodocommunityid -> communityid
+ private transient Map>> zenodocommunityMap = new HashMap<>();
+
+ CommunityConfiguration(final Map communities) {
+ this.communities = communities;
+ init();
+ }
+
+ void init() {
+
+ if (subjectMap == null) {
+ subjectMap = Maps.newHashMap();
+ }
+ if (datasourceMap == null) {
+ datasourceMap = Maps.newHashMap();
+ }
+ if (zenodocommunityMap == null) {
+ zenodocommunityMap = Maps.newHashMap();
+ }
+
+
+ for(Community c : getCommunities().values()) {
+ //get subjects
+ final String id = c.getId();
+ for(String sbj : c.getSubjects()){
+ Pair p = new Pair<>(id,new SelectionConstraints());
+ add(sbj.toLowerCase().trim() , p, subjectMap);
+ }
+ //get datasources
+ for(Datasource d: c.getDatasources()){
+
+ add(d.getOpenaireId(),new Pair<>(id,d.getSelectionConstraints()),datasourceMap);
+ }
+ //get zenodo communities
+ for(ZenodoCommunity zc : c.getZenodoCommunities()){
+ add(zc.getZenodoCommunityId(),new Pair<>(id,zc.getSelCriteria()),zenodocommunityMap);
+ }
+
+
+ }
+ }
+
+ private void add(String key, Pair value, Map>> map){
+ List> values = map.get(key);
+
+ if (values == null){
+ values = new ArrayList<>();
+ map.put(key,values);
+ }
+ values.add(value);
+ }
+
+ public List> getCommunityForSubject(String sbj){
+ return subjectMap.get(sbj);
+ }
+
+ public List> getCommunityForDatasource(String dts){
+ return datasourceMap.get(dts);
+ }
+
+
+ public List getCommunityForDatasource(final String dts, final Map> param) {
+ List> lp = datasourceMap.get(dts);
+ if (lp==null)
+ return Lists.newArrayList();
+
+ return lp.stream().map(p -> {
+ if (p.getSnd() == null)
+ return p.getFst();
+ if (((SelectionConstraints) p.getSnd()).verifyCriteria(param))
+ return p.getFst();
+ else
+ return null;
+ }).filter(st->(st!=null)).collect(Collectors.toList());
+
+
+ }
+
+ public List> getCommunityForZenodoCommunity(String zc){
+ return zenodocommunityMap.get(zc);
+ }
+
+ public List getCommunityForSubjectValue(String value) {
+
+ return getContextIds(subjectMap.get(value));
+ }
+
+ public List getCommunityForDatasourceValue(String value) {
+
+ return getContextIds(datasourceMap.get(value.toLowerCase()));
+ }
+
+ public List getCommunityForZenodoCommunityValue(String value){
+
+ return getContextIds(zenodocommunityMap.get(value.toLowerCase()));
+ }
+
+ private List getContextIds(List> list) {
+ if (list != null) {
+ return list.stream().map(p -> p.getFst()).collect(Collectors.toList());
+ }
+ return Lists.newArrayList();
+ }
+
+
+ public Map getCommunities() {
+ return communities;
+ }
+
+ public void setCommunities(Map communities) {
+ this.communities = communities;
+ }
+
+ public String toJson() {
+ GsonBuilder builder = new GsonBuilder();
+ builder.registerTypeAdapter(Selection.class, new InterfaceAdapter());
+ Gson gson = builder.create();
+
+ return gson.toJson(this);
+ }
+
+ public int size() {
+ return communities.keySet().size();
+ }
+
+ public Community getCommunityById(String id){
+ return communities.get(id);
+ }
+
+ public List getCommunityList() {
+ return Lists.newLinkedList(communities.values());
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/CommunityConfigurationFactory.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/CommunityConfigurationFactory.java
new file mode 100644
index 000000000..98a2437fb
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/CommunityConfigurationFactory.java
@@ -0,0 +1,146 @@
+package eu.dnetlib.dhp;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import eu.dnetlib.dhp.selectioncriteria.InterfaceAdapter;
+import eu.dnetlib.dhp.selectioncriteria.Selection;
+import eu.dnetlib.dhp.selectioncriteria.VerbResolver;
+import eu.dnetlib.dhp.selectioncriteria.VerbResolverFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.dom4j.Document;
+import org.dom4j.DocumentException;
+import org.dom4j.Node;
+import org.dom4j.io.SAXReader;
+
+
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by miriam on 03/08/2018.
+ */
+public class CommunityConfigurationFactory {
+
+ private static final Log log = LogFactory.getLog(CommunityConfigurationFactory.class);
+
+ private static VerbResolver resolver = VerbResolverFactory.newInstance();
+
+ public static CommunityConfiguration newInstance(final String xml) throws DocumentException {
+
+ log.debug(String.format("parsing community configuration from:\n%s", xml));
+
+ final Document doc = new SAXReader().read(new StringReader(xml));
+
+ final Map communities = Maps.newHashMap();
+
+ for(final Object o : doc.selectNodes("//community")) {
+
+ final Node node = (Node) o;
+
+ final Community community = parseCommunity(node);
+
+ if (community.isValid()) {
+ communities.put(community.getId(), community);
+ }
+ }
+
+ log.info(String.format("loaded %s community configuration profiles", communities.size()));
+ log.debug(String.format("loaded community configuration:\n%s", communities.toString()));
+
+
+ return new CommunityConfiguration(communities);
+ }
+
+ public static CommunityConfiguration fromJson(final String json) {
+ GsonBuilder builder = new GsonBuilder();
+ builder.registerTypeAdapter(Selection.class, new InterfaceAdapter());
+ Gson gson = builder.create();
+ final CommunityConfiguration conf = gson.fromJson(json, CommunityConfiguration.class);
+ log.info(String.format("loaded %s community configuration profiles", conf.size()));
+ conf.init();
+ log.info("created inverse maps");
+
+ return conf;
+ }
+
+ private static Community parseCommunity(final Node node) {
+
+ final Community c = new Community();
+
+ c.setId(node.valueOf("./@id"));
+
+ log.info(String.format("community id: %s", c.getId()));
+
+ c.setSubjects(parseSubjects(node));
+ c.setDatasources(parseDatasources(node));
+ c.setZenodoCommunities(parseZenodoCommunities(node));
+ return c;
+ }
+
+ private static List parseSubjects(final Node node) {
+
+ final List subjects = Lists.newArrayList();
+
+ final List list = node.selectNodes("./subjects/subject");
+
+ for(Node n : list){
+ log.debug("text of the node " + n.getText());
+ subjects.add(StringUtils.trim(n.getText()));
+ }
+ log.info("size of the subject list " + subjects.size());
+ return subjects;
+ }
+
+
+ private static List parseDatasources(final Node node) {
+ final List list = node.selectNodes("./datasources/datasource");
+ final List datasourceList = new ArrayList<>();
+ for(Node n : list){
+ Datasource d = new Datasource();
+ d.setOpenaireId(n.selectSingleNode("./openaireId").getText());
+ d.setSelCriteria(n.selectSingleNode("./selcriteria"), resolver);
+ datasourceList.add(d);
+ }
+ log.info("size of the datasource list " + datasourceList.size());
+ return datasourceList;
+ }
+
+ private static List parseZenodoCommunities(final Node node) {
+ final Node oacommunitynode = node.selectSingleNode("./oacommunity");
+ String oacommunity = null;
+ if (oacommunitynode != null){
+ String tmp = oacommunitynode.getText();
+ if(StringUtils.isNotBlank(tmp))
+ oacommunity = tmp;
+ }
+
+
+ final List list = node.selectNodes("./zenodocommunities/zenodocommunity");
+ final List zenodoCommunityList = new ArrayList<>();
+ for(Node n : list){
+ ZenodoCommunity zc = new ZenodoCommunity();
+ zc.setZenodoCommunityId(n.selectSingleNode("./zenodoid").getText());
+ zc.setSelCriteria(n.selectSingleNode("./selcriteria"));
+
+ zenodoCommunityList.add(zc);
+ }
+ if(oacommunity != null){
+ ZenodoCommunity zc = new ZenodoCommunity();
+ zc.setZenodoCommunityId(oacommunity);
+ zenodoCommunityList.add(zc);
+ }
+ log.info("size of the zenodo community list " + zenodoCommunityList.size());
+ return zenodoCommunityList;
+ }
+
+
+
+
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Constraint.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Constraint.java
new file mode 100644
index 000000000..6df127046
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Constraint.java
@@ -0,0 +1,62 @@
+package eu.dnetlib.dhp;
+
+
+
+import eu.dnetlib.dhp.selectioncriteria.Selection;
+import eu.dnetlib.dhp.selectioncriteria.VerbResolver;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+
+
+public class Constraint implements Serializable {
+ private String verb;
+ private String field;
+ private String value;
+ private Selection selection;
+
+ public Constraint() {
+ }
+
+ public String getVerb() {
+ return verb;
+ }
+
+ public void setVerb(String verb) {
+ this.verb = verb;
+ }
+
+ public String getField() {
+ return field;
+ }
+
+ public void setField(String field) {
+ this.field = field;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+
+
+ public void setSelection(Selection sel){
+ selection = sel;
+ }
+
+ public void setSelection(VerbResolver resolver) throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
+ selection = resolver.getSelectionCriteria(verb,value);
+ }
+
+
+ public boolean verifyCriteria(String metadata){
+ return selection.apply(metadata);
+ }
+
+
+
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Constraints.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Constraints.java
new file mode 100644
index 000000000..27572d8ae
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Constraints.java
@@ -0,0 +1,79 @@
+package eu.dnetlib.dhp;
+
+
+import com.google.gson.Gson;
+
+import com.google.gson.reflect.TypeToken;
+import eu.dnetlib.dhp.selectioncriteria.VerbResolver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by miriam on 02/08/2018.
+ */
+public class Constraints implements Serializable {
+ private static final Log log = LogFactory.getLog(Constraints.class);
+ //private ConstraintEncapsulator ce;
+ private List constraint;
+
+
+ public Constraints() {
+ }
+ public List getConstraint() {
+ return constraint;
+ }
+
+ public void setConstraint(List constraint) {
+ this.constraint = constraint;
+ }
+
+ public void setSc(String json){
+ Type collectionType = new TypeToken>(){}.getType();
+ constraint = new Gson().fromJson(json, collectionType);
+
+ }
+
+ void setSelection(VerbResolver resolver) {
+ for(Constraint st: constraint){
+
+ try {
+ st.setSelection(resolver);
+ } catch (NoSuchMethodException e) {
+ log.error(e.getMessage());
+ } catch (IllegalAccessException e) {
+ log.error(e.getMessage());
+ } catch (InvocationTargetException e) {
+ log.error(e.getMessage());
+ } catch (InstantiationException e) {
+ log.error(e.getMessage());
+ }
+ }
+
+ }
+
+
+ //Constraint in and
+ public boolean verifyCriteria(final Map> param) {
+
+ for(Constraint sc : constraint) {
+ boolean verified = false;
+ for(String value : param.get(sc.getField())){
+ if (sc.verifyCriteria(value.trim())){
+ verified = true;
+ }
+ }
+ if(!verified)
+ return verified;
+ }
+ return true;
+ }
+
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Datasource.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Datasource.java
new file mode 100644
index 000000000..a4a254f33
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Datasource.java
@@ -0,0 +1,65 @@
+package eu.dnetlib.dhp;
+
+
+import com.google.gson.Gson;
+
+import eu.dnetlib.dhp.selectioncriteria.VerbResolver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.dom4j.Node;
+
+/**
+ * Created by miriam on 01/08/2018.
+ */
+public class Datasource {
+ private static final Log log = LogFactory.getLog(Datasource.class);
+
+ private String openaireId;
+
+ private SelectionConstraints selectionConstraints;
+
+
+ public SelectionConstraints getSelCriteria() {
+ return selectionConstraints;
+ }
+
+ public SelectionConstraints getSelectionConstraints() {
+ return selectionConstraints;
+ }
+
+ public void setSelectionConstraints(SelectionConstraints selectionConstraints) {
+ this.selectionConstraints = selectionConstraints;
+ }
+
+ public void setSelCriteria(SelectionConstraints selCriteria) {
+ this.selectionConstraints = selCriteria;
+ }
+
+ public String getOpenaireId() {
+ return openaireId;
+ }
+
+ public void setOpenaireId(String openaireId) {
+ this.openaireId = openaireId;
+ }
+
+ private void setSelCriteria(String json, VerbResolver resolver){
+ log.info("Selection constraints for datasource = " + json);
+ selectionConstraints = new Gson().fromJson(json, SelectionConstraints.class);
+
+ selectionConstraints.setSelection(resolver);
+ }
+
+ public void setSelCriteria(Node n, VerbResolver resolver){
+ try{
+ setSelCriteria(n.getText(),resolver);
+ }catch(Exception e) {
+ log.info("not set selection criteria... ");
+ selectionConstraints =null;
+ }
+
+ }
+
+
+
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Pair.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Pair.java
new file mode 100644
index 000000000..4a1fece43
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/Pair.java
@@ -0,0 +1,38 @@
+package eu.dnetlib.dhp;
+
+import com.google.gson.Gson;
+
+/**
+ * Created by miriam on 03/08/2018.
+ */
+public class Pair {
+ private A fst;
+ private B snd;
+
+ public A getFst() {
+ return fst;
+ }
+
+ public Pair setFst(A fst) {
+ this.fst = fst;
+ return this;
+ }
+
+ public B getSnd() {
+ return snd;
+ }
+
+ public Pair setSnd(B snd) {
+ this.snd = snd;
+ return this;
+ }
+
+ public Pair(A a, B b){
+ fst = a;
+ snd = b;
+ }
+
+ public String toJson(){
+ return new Gson().toJson(this);
+ }
+}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/ProtoMap.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/ProtoMap.java
new file mode 100644
index 000000000..01f4b8a37
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/ProtoMap.java
@@ -0,0 +1,10 @@
+package eu.dnetlib.dhp;
+
+import java.util.HashMap;
+
+public class ProtoMap extends HashMap {
+
+ public ProtoMap(){
+ super();
+ }
+}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/QueryInformationSystem.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/QueryInformationSystem.java
new file mode 100644
index 000000000..1794e4928
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/QueryInformationSystem.java
@@ -0,0 +1,66 @@
+package eu.dnetlib.dhp;
+
+import com.google.common.base.Joiner;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import org.dom4j.DocumentException;
+
+import java.util.List;
+
+
+public class QueryInformationSystem {
+ private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " +
+ " let $subj := $x//CONFIGURATION/context/param[./@name='subject']/text() " +
+ " let $datasources := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::contentproviders')]/concept " +
+ " let $organizations := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::resultorganizations')]/concept " +
+ " let $communities := $x//CONFIGURATION/context/category[./@id=concat($x//CONFIGURATION/context/@id,'::zenodocommunities')]/concept " +
+ " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " +
+ " return " +
+ " " +
+ " { $x//CONFIGURATION/context/@id} " +
+ " " +
+ " {for $y in tokenize($subj,',') " +
+ " return " +
+ " {$y}} " +
+ " " +
+ " " +
+ " {for $d in $datasources " +
+ " where $d/param[./@name='enabled']/text()='true' " +
+ " return " +
+ " " +
+ " " +
+ " {$d//param[./@name='openaireId']/text()} " +
+ " " +
+ " " +
+ " {$d/param[./@name='selcriteria']/text()} " +
+ " " +
+ " } " +
+ " " +
+ " " +
+ " {for $zc in $communities " +
+ " return " +
+ " " +
+ " " +
+ " {$zc/param[./@name='zenodoid']/text()} " +
+ " " +
+ " " +
+ " {$zc/param[./@name='selcriteria']/text()} " +
+ " " +
+ " } " +
+ " " +
+ " ";
+
+ public static CommunityConfiguration getCommunityConfiguration(final String isLookupUrl) throws ISLookUpException, DocumentException {
+ ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
+ final List res = isLookUp.quickSearchProfile(XQUERY);
+
+ final String xmlConf = "" + Joiner.on(" ").join(res) + "";
+
+
+ return CommunityConfigurationFactory.newInstance(xmlConf);
+
+ }
+
+
+}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/ResultTagger.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/ResultTagger.java
new file mode 100644
index 000000000..c6c30c9d7
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/ResultTagger.java
@@ -0,0 +1,175 @@
+package eu.dnetlib.dhp;
+import com.google.gson.Gson;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static eu.dnetlib.dhp.TagginConstants.*;
+
+
+/**
+ * Created by miriam on 02/08/2018.
+ */
+public class ResultTagger {
+
+
+ private String trust = "0.8";
+
+
+ private boolean clearContext(Result result){
+ int tmp = result.getContext().size();
+ List clist = result.getContext().stream()
+ .filter(c -> (!c.getId().contains(ZENODO_COMMUNITY_INDICATOR))).collect(Collectors.toList());
+ result.setContext(clist);
+ return (tmp != clist.size());
+ }
+
+ private Map> getParamMap(final Result result, Map params) {
+ Map> param = new HashMap<>();
+ String json = new Gson().toJson(result,Result.class);
+ DocumentContext jsonContext = JsonPath.parse(json);
+ if (params == null){
+ params = new HashMap<>();
+ }
+ for(String key : params.keySet()) {
+ try {
+ param.put(key, jsonContext.read(params.get(key)));
+ } catch (com.jayway.jsonpath.PathNotFoundException e) {
+ param.put(key, new ArrayList<>());
+ // throw e;
+ }
+ }
+ return param;
+
+ }
+
+
+ public Result enrichContextCriteria(final Result result, final CommunityConfiguration conf, final Map criteria) {
+ final Map> param = getParamMap(result, criteria);
+
+ //Verify if the entity is deletedbyinference. In case verify if to clean the context list from all the zenodo communities
+ if(result.getDataInfo().getDeletedbyinference()){
+ return result;
+ }
+
+ //communities contains all the communities to be added as context for the result
+ final Set communities = new HashSet<>();
+
+
+ //tagging for Subject
+ final Set subjects = new HashSet<>();
+ result.getSubject().stream()
+ .map(subject -> subject.getValue())
+ .filter(StringUtils::isNotBlank)
+ .map(String::toLowerCase)
+ .map(String::trim)
+ .collect(Collectors.toCollection(HashSet::new))
+ .forEach(s -> subjects.addAll(conf.getCommunityForSubjectValue(s)));
+
+ communities.addAll(subjects);
+
+
+ //Tagging for datasource
+ final Set datasources = new HashSet<>();
+ final Set tmp = new HashSet<>();
+
+ for(Instance i : result.getInstance()){
+ tmp.add(StringUtils.substringAfter(i.getCollectedfrom().getKey(),"|"));
+ tmp.add(StringUtils.substringAfter(i.getHostedby().getKey(),"|"));
+ }
+
+ result.getInstance()
+ .stream()
+ .map(i -> new Pair<>(i.getCollectedfrom().getKey(), i.getHostedby().getKey()))
+ .flatMap(p -> Stream.of(p.getFst(), p.getSnd()))
+ .map(s -> StringUtils.substringAfter(s, "|"))
+ .collect(Collectors.toCollection(HashSet::new))
+ .forEach(dsId -> datasources.addAll(conf.getCommunityForDatasource(dsId,param)));
+
+ communities.addAll(datasources);
+
+ /*Tagging for Zenodo Communities*/
+ final Set czenodo = new HashSet<>();
+ //final ResultProtos.Result.Metadata.Builder mBuilder = builder.getEntityBuilder().getResultBuilder().getMetadataBuilder();
+ result.getContext()
+ .stream()
+ .filter(c -> c.getId().contains(ZENODO_COMMUNITY_INDICATOR))
+ .collect(Collectors.toList())
+ .forEach(c->czenodo.addAll(conf.getCommunityForZenodoCommunityValue(c.getId().substring(c.getId().lastIndexOf("/")+1).trim())));
+
+ communities.addAll(czenodo);
+
+ clearContext(result);
+
+ /*Verify if there is something to bulktag*/
+ if(communities.isEmpty()){
+ return result;
+
+ }
+
+ result.getContext()
+ .stream()
+ .map(c -> {
+ if(communities.contains(c.getId())){
+ List dataInfoList = c.getDataInfo();
+ if (subjects.contains(c.getId()))
+ dataInfoList.add(getDataInfo(BULKTAG_DATA_INFO_TYPE, CLASS_ID_SUBJECT, CLASS_NAME_BULKTAG_SUBJECT));
+ if (datasources.contains(c.getId()))
+ dataInfoList.add(getDataInfo(BULKTAG_DATA_INFO_TYPE, CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE));
+ if (czenodo.contains(c.getId()))
+ dataInfoList.add(getDataInfo(BULKTAG_DATA_INFO_TYPE, CLASS_ID_CZENODO, CLASS_NAME_BULKTAG_ZENODO));
+ }
+ return c;
+ })
+ .collect(Collectors.toList());
+
+
+ communities.removeAll(result.getContext().stream().map(c -> c.getId()).collect(Collectors.toSet()));
+
+ if(communities.isEmpty())
+ return result;
+
+ List toaddcontext = communities
+ .stream()
+ .map(c -> {
+ Context context = new Context();
+ context.setId(c);
+ List dataInfoList = Arrays.asList();
+ if (subjects.contains(c))
+ dataInfoList.add(getDataInfo(BULKTAG_DATA_INFO_TYPE, CLASS_ID_SUBJECT, CLASS_NAME_BULKTAG_SUBJECT));
+ if (datasources.contains(c))
+ dataInfoList.add(getDataInfo(BULKTAG_DATA_INFO_TYPE, CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE));
+ if (czenodo.contains(c))
+ dataInfoList.add(getDataInfo(BULKTAG_DATA_INFO_TYPE, CLASS_ID_CZENODO, CLASS_NAME_BULKTAG_ZENODO));
+ context.setDataInfo(dataInfoList);
+ return context;
+ })
+ .collect(Collectors.toList());
+
+ result.getContext().addAll(toaddcontext);
+ return result;
+ }
+
+ public static DataInfo getDataInfo(String inference_provenance, String inference_class_id, String inference_class_name){
+ DataInfo di = new DataInfo();
+ di.setInferred(true);
+ di.setInferenceprovenance(inference_provenance);
+ di.setProvenanceaction(getQualifier(inference_class_id, inference_class_name));
+ return di;
+ }
+
+ public static Qualifier getQualifier(String inference_class_id, String inference_class_name) {
+ Qualifier pa = new Qualifier();
+ pa.setClassid(inference_class_id);
+ pa.setClassname(inference_class_name);
+ pa.setSchemeid(DNET_SCHEMA_ID);
+ pa.setSchemename(DNET_SCHEMA_NAME);
+ return pa;
+ }
+
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/SelectionConstraints.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/SelectionConstraints.java
new file mode 100644
index 000000000..2890986d3
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/SelectionConstraints.java
@@ -0,0 +1,49 @@
+package eu.dnetlib.dhp;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import eu.dnetlib.dhp.selectioncriteria.VerbResolver;
+
+
+import java.io.Serializable;
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class SelectionConstraints implements Serializable {
+ private List criteria;
+
+ public SelectionConstraints() {
+ }
+
+
+ public List getCriteria() {
+ return criteria;
+ }
+
+ public void setCriteria(List criteria) {
+ this.criteria = criteria;
+ }
+
+ public void setSc(String json){
+ Type collectionType = new TypeToken>(){}.getType();
+ criteria = new Gson().fromJson(json, collectionType);
+ }
+
+ //Constraints in or
+ public boolean verifyCriteria(final Map> param){
+ for(Constraints selc : criteria) {
+ if(selc.verifyCriteria(param)){
+ return true;
+ }
+ }
+ return false;
+ }
+ public void setSelection(VerbResolver resolver) {
+
+ for(Constraints cs : criteria){
+ cs.setSelection(resolver);
+ }
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/SparkBulkTagJob.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/SparkBulkTagJob.java
new file mode 100644
index 000000000..755afd2ed
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/SparkBulkTagJob.java
@@ -0,0 +1,71 @@
+package eu.dnetlib.dhp;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+
+
+public class SparkBulkTagJob {
+
+ public static void main(String[] args) throws Exception {
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkBulkTagJob.class.getResourceAsStream("/eu/dnetlib/dhp/input_bulktag_parameters.json")));
+ parser.parseArgument(args);
+ final SparkSession spark = SparkSession
+ .builder()
+ .appName(SparkBulkTagJob.class.getSimpleName())
+ .master(parser.get("master"))
+ .enableHiveSupport()
+ .getOrCreate();
+
+ final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+ final String inputPath = parser.get("sourcePath");
+ final String outputPath = "/tmp/provision/bulktagging";
+
+ final ResultTagger resultTagger = new ResultTagger();
+ ProtoMap protoMappingParams = new Gson().fromJson(parser.get("mappingProto"),ProtoMap.class);;
+
+ File directory = new File(outputPath);
+
+ if (!directory.exists()) {
+ directory.mkdirs();
+ }
+
+ CommunityConfiguration cc = QueryInformationSystem.getCommunityConfiguration(parser.get("isLookupUrl"));
+
+
+ sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class))
+ .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
+ .map(p -> new ObjectMapper().writeValueAsString(p))
+ .saveAsTextFile(outputPath+"/publication");
+ sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class))
+ .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
+ .map(p -> new ObjectMapper().writeValueAsString(p))
+ .saveAsTextFile(outputPath+"/dataset");
+ sc.sequenceFile(inputPath + "/software", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class))
+ .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
+ .map(p -> new ObjectMapper().writeValueAsString(p))
+ .saveAsTextFile(outputPath+"/software");
+ sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)
+ .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class))
+ .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
+ .map(p -> new ObjectMapper().writeValueAsString(p))
+ .saveAsTextFile(outputPath+"/otherresearchproduct");
+
+
+
+ }
+}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/TagginConstants.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/TagginConstants.java
new file mode 100644
index 000000000..b089d9fd5
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/TagginConstants.java
@@ -0,0 +1,27 @@
+package eu.dnetlib.dhp;
+
+
+public class TagginConstants {
+
+
+ public final static String BULKTAG_DATA_INFO_TYPE = "bulktagging";
+
+ public final static String DNET_SCHEMA_NAME = "dnet:provenanceActions";
+ public final static String DNET_SCHEMA_ID = "dnet:provenanceActions";
+
+ public final static String CLASS_ID_SUBJECT = "bulktagging:community:subject";
+ public final static String CLASS_ID_DATASOURCE = "bulktagging:community:datasource";
+ public final static String CLASS_ID_CZENODO = "bulktagging:community:zenodocommunity";
+
+ public final static String SCHEMA_ID = "dnet:provenanceActions";
+ public final static String COUNTER_GROUP = "Bulk Tagging";
+
+ public final static String ZENODO_COMMUNITY_INDICATOR = "zenodo.org/communities/";
+
+ public final static String CLASS_NAME_BULKTAG_SUBJECT = "Bulktagging for Community - Subject";
+ public final static String CLASS_NAME_BULKTAG_DATASOURCE = "Bulktagging for Community - Datasource";
+ public final static String CLASS_NAME_BULKTAG_ZENODO = "Bulktagging for Community - Zenodo";
+
+
+
+}
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/ZenodoCommunity.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/ZenodoCommunity.java
new file mode 100644
index 000000000..af39d18e3
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/ZenodoCommunity.java
@@ -0,0 +1,46 @@
+package eu.dnetlib.dhp;
+
+import com.google.gson.Gson;
+import org.dom4j.Node;
+
+
+/**
+ * Created by miriam on 01/08/2018.
+ */
+public class ZenodoCommunity {
+
+ private String zenodoCommunityId;
+
+ private SelectionConstraints selCriteria;
+
+ public String getZenodoCommunityId() {
+ return zenodoCommunityId;
+ }
+
+ public void setZenodoCommunityId(String zenodoCommunityId) {
+ this.zenodoCommunityId = zenodoCommunityId;
+ }
+
+ public SelectionConstraints getSelCriteria() {
+ return selCriteria;
+ }
+
+ public void setSelCriteria(SelectionConstraints selCriteria) {
+ this.selCriteria = selCriteria;
+ }
+
+ private void setSelCriteria(String json){
+ //Type collectionType = new TypeToken>(){}.getType();
+ selCriteria = new Gson().fromJson(json, SelectionConstraints.class);
+
+ }
+
+ public void setSelCriteria(Node n){
+ if (n==null){
+ selCriteria = null;
+ }else{
+ setSelCriteria(n.getText());
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/ContainsVerb.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/ContainsVerb.java
new file mode 100644
index 000000000..eb7f059d8
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/ContainsVerb.java
@@ -0,0 +1,27 @@
+package eu.dnetlib.dhp.selectioncriteria;
+
+@VerbClass("contains")
+public class ContainsVerb implements Selection {
+
+ private String param;
+
+ public ContainsVerb() {
+ }
+
+ public ContainsVerb(final String param) {
+ this.param = param;
+ }
+
+ @Override
+ public boolean apply(String value) {
+ return value.contains(param);
+ }
+
+ public String getParam() {
+ return param;
+ }
+
+ public void setParam(String param) {
+ this.param = param;
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/EqualVerb.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/EqualVerb.java
new file mode 100644
index 000000000..9b35e9583
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/EqualVerb.java
@@ -0,0 +1,29 @@
+package eu.dnetlib.dhp.selectioncriteria;
+
+
+@VerbClass("equals")
+public class EqualVerb implements Selection {
+
+ private String param;
+
+ public EqualVerb() {
+ }
+
+ public EqualVerb(final String param) {
+ this.param = param;
+ }
+
+
+ @Override
+ public boolean apply(String value) {
+ return value.equalsIgnoreCase(param);
+ }
+
+ public String getParam() {
+ return param;
+ }
+
+ public void setParam(String param) {
+ this.param = param;
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/InterfaceAdapter.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/InterfaceAdapter.java
new file mode 100644
index 000000000..7cd261b85
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/InterfaceAdapter.java
@@ -0,0 +1,37 @@
+package eu.dnetlib.dhp.selectioncriteria;
+
+import com.google.gson.*;
+
+import java.lang.reflect.Type;
+
+public class InterfaceAdapter implements JsonSerializer, JsonDeserializer {
+
+ private static final String CLASSNAME = "CLASSNAME";
+ private static final String DATA = "DATA";
+
+ public Object deserialize(JsonElement jsonElement, Type type,
+ JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
+
+ JsonObject jsonObject = jsonElement.getAsJsonObject();
+ JsonPrimitive prim = (JsonPrimitive) jsonObject.get(CLASSNAME);
+ String className = prim.getAsString();
+ Class klass = getObjectClass(className);
+ return jsonDeserializationContext.deserialize(jsonObject.get(DATA), klass);
+ }
+ public JsonElement serialize(Object jsonElement, Type type, JsonSerializationContext jsonSerializationContext) {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty(CLASSNAME, jsonElement.getClass().getName());
+ jsonObject.add(DATA, jsonSerializationContext.serialize(jsonElement));
+ return jsonObject;
+ }
+ /****** Helper method to get the className of the object to be deserialized *****/
+ public Class getObjectClass(String className) {
+ try {
+ return Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ //e.printStackTrace();
+ throw new JsonParseException(e.getMessage());
+ }
+ }
+}
+
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/NotContainsVerb.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/NotContainsVerb.java
new file mode 100644
index 000000000..ecfabd7de
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/NotContainsVerb.java
@@ -0,0 +1,27 @@
+package eu.dnetlib.dhp.selectioncriteria;
+
+@VerbClass("not_contains")
+public class NotContainsVerb implements Selection {
+
+ private String param;
+
+ public NotContainsVerb() {
+ }
+
+ public NotContainsVerb(final String param) {
+ this.param = param;
+ }
+
+ @Override
+ public boolean apply(String value) {
+ return !value.contains(param);
+ }
+
+ public String getParam() {
+ return param;
+ }
+
+ public void setParam(String param) {
+ this.param = param;
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/NotEqualVerb.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/NotEqualVerb.java
new file mode 100644
index 000000000..c2c9e73ad
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/NotEqualVerb.java
@@ -0,0 +1,29 @@
+package eu.dnetlib.dhp.selectioncriteria;
+
+
+@VerbClass("not_equals")
+public class NotEqualVerb implements Selection {
+
+ private String param;
+
+
+ public NotEqualVerb(final String param) {
+ this.param = param;
+ }
+
+ public NotEqualVerb() {
+ }
+
+ public String getParam() {
+ return param;
+ }
+
+ public void setParam(String param) {
+ this.param = param;
+ }
+
+ @Override
+ public boolean apply(String value) {
+ return !value.equalsIgnoreCase(param);
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/Selection.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/Selection.java
new file mode 100644
index 000000000..cd9888a7e
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/Selection.java
@@ -0,0 +1,6 @@
+package eu.dnetlib.dhp.selectioncriteria;
+
+public interface Selection {
+
+ boolean apply(String value);
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/VerbClass.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/VerbClass.java
new file mode 100644
index 000000000..9f519f091
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/VerbClass.java
@@ -0,0 +1,13 @@
+package eu.dnetlib.dhp.selectioncriteria;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface VerbClass {
+
+ public String value();
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/VerbResolver.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/VerbResolver.java
new file mode 100644
index 000000000..a8df69ea6
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/VerbResolver.java
@@ -0,0 +1,25 @@
+package eu.dnetlib.dhp.selectioncriteria;
+
+
+import org.reflections.Reflections;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class VerbResolver implements Serializable {
+ private final Map> map;
+
+ public VerbResolver(){
+ this.map = new Reflections("eu.dnetlib").getTypesAnnotatedWith(VerbClass.class).stream()
+ .collect(Collectors.toMap(v -> v.getAnnotation(VerbClass.class).value(), v->(Class)v));
+ }
+
+
+ public Selection getSelectionCriteria(String name, String param) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
+
+ return map.get(name).getDeclaredConstructor((String.class)).newInstance(param);
+
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/VerbResolverFactory.java b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/VerbResolverFactory.java
new file mode 100644
index 000000000..8879e2d3b
--- /dev/null
+++ b/dhp-workflows/dhp-bulktag/src/main/java/eu/dnetlib/dhp/selectioncriteria/VerbResolverFactory.java
@@ -0,0 +1,10 @@
+package eu.dnetlib.dhp.selectioncriteria;
+
+public class VerbResolverFactory {
+
+ public static VerbResolver newInstance(){
+
+ return new VerbResolver();
+ }
+
+}
\ No newline at end of file