forked from D-Net/dnet-hadoop
Merge pull request 'removeTaggingCondition' (#297) from removeTaggingCondition into beta
Reviewed-on: D-Net/dnet-hadoop#297
This commit is contained in:
commit
eb2fa8556b
|
@ -15,6 +15,7 @@ public class Community implements Serializable {
|
|||
private List<Provider> providers = new ArrayList<>();
|
||||
private List<ZenodoCommunity> zenodoCommunities = new ArrayList<>();
|
||||
private SelectionConstraints constraints = new SelectionConstraints();
|
||||
private SelectionConstraints removeConstraints = new SelectionConstraints();
|
||||
|
||||
public String toJson() {
|
||||
final Gson g = new Gson();
|
||||
|
@ -67,4 +68,12 @@ public class Community implements Serializable {
|
|||
public void setConstraints(SelectionConstraints constraints) {
|
||||
this.constraints = constraints;
|
||||
}
|
||||
|
||||
public SelectionConstraints getRemoveConstraints() {
|
||||
return removeConstraints;
|
||||
}
|
||||
|
||||
public void setRemoveConstraints(SelectionConstraints removeConstraints) {
|
||||
this.removeConstraints = removeConstraints;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,8 @@ public class CommunityConfiguration implements Serializable {
|
|||
private Map<String, SelectionConstraints> selectionConstraintsMap = new HashMap<>();
|
||||
// map eosc datasource -> communityid
|
||||
private Map<String, List<Pair<String, SelectionConstraints>>> eoscDatasourceMap = new HashMap<>();
|
||||
// map communityid -> remove constraints
|
||||
private Map<String, SelectionConstraints> removeConstraintsMap = new HashMap<>();
|
||||
|
||||
public Map<String, List<Pair<String, SelectionConstraints>>> getEoscDatasourceMap() {
|
||||
return eoscDatasourceMap;
|
||||
|
@ -71,6 +73,14 @@ public class CommunityConfiguration implements Serializable {
|
|||
this.selectionConstraintsMap = selectionConstraintsMap;
|
||||
}
|
||||
|
||||
public Map<String, SelectionConstraints> getRemoveConstraintsMap() {
|
||||
return removeConstraintsMap;
|
||||
}
|
||||
|
||||
public void setRemoveConstraintsMap(Map<String, SelectionConstraints> removeConstraintsMap) {
|
||||
this.removeConstraintsMap = removeConstraintsMap;
|
||||
}
|
||||
|
||||
CommunityConfiguration(final Map<String, Community> communities) {
|
||||
this.communities = communities;
|
||||
init();
|
||||
|
@ -90,6 +100,9 @@ public class CommunityConfiguration implements Serializable {
|
|||
if (selectionConstraintsMap == null) {
|
||||
selectionConstraintsMap = Maps.newHashMap();
|
||||
}
|
||||
if (removeConstraintsMap == null) {
|
||||
removeConstraintsMap = Maps.newHashMap();
|
||||
}
|
||||
|
||||
for (Community c : getCommunities().values()) {
|
||||
// get subjects
|
||||
|
@ -111,6 +124,8 @@ public class CommunityConfiguration implements Serializable {
|
|||
zenodocommunityMap);
|
||||
}
|
||||
selectionConstraintsMap.put(id, c.getConstraints());
|
||||
|
||||
removeConstraintsMap.put(id, c.getRemoveConstraints());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -86,6 +86,7 @@ public class CommunityConfigurationFactory {
|
|||
c.setProviders(parseDatasources(node));
|
||||
c.setZenodoCommunities(parseZenodoCommunities(node));
|
||||
c.setConstraints(parseConstrains(node));
|
||||
c.setRemoveConstraints(parseRemoveConstrains(node));
|
||||
return c;
|
||||
}
|
||||
|
||||
|
@ -102,6 +103,19 @@ public class CommunityConfigurationFactory {
|
|||
return selectionConstraints;
|
||||
}
|
||||
|
||||
private static SelectionConstraints parseRemoveConstrains(Node node) {
|
||||
Node constsNode = node.selectSingleNode("./removeConstraints");
|
||||
if (constsNode == null || StringUtils.isBlank(StringUtils.trim(constsNode.getText()))) {
|
||||
return new SelectionConstraints();
|
||||
}
|
||||
SelectionConstraints selectionConstraints = new Gson()
|
||||
.fromJson(constsNode.getText(), SelectionConstraints.class);
|
||||
|
||||
selectionConstraints.setSelection(resolver);
|
||||
log.info("number of selection constraints set " + selectionConstraints.getCriteria().size());
|
||||
return selectionConstraints;
|
||||
}
|
||||
|
||||
private static List<String> parseSubjects(final Node node) {
|
||||
|
||||
final List<String> subjects = Lists.newArrayList();
|
||||
|
|
|
@ -79,6 +79,23 @@ public class ResultTagger implements Serializable {
|
|||
break;
|
||||
}
|
||||
|
||||
// communities contains all the communities to be not added to the context
|
||||
final Set<String> removeCommunities = new HashSet<>();
|
||||
|
||||
conf
|
||||
.getRemoveConstraintsMap()
|
||||
.keySet()
|
||||
.forEach(communityId -> {
|
||||
if (conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null &&
|
||||
conf
|
||||
.getRemoveConstraintsMap()
|
||||
.get(communityId)
|
||||
.getCriteria()
|
||||
.stream()
|
||||
.anyMatch(crit -> crit.verifyCriteria(param)))
|
||||
removeCommunities.add(communityId);
|
||||
});
|
||||
|
||||
// communities contains all the communities to be added as context for the result
|
||||
final Set<String> communities = new HashSet<>();
|
||||
|
||||
|
@ -164,7 +181,8 @@ public class ResultTagger implements Serializable {
|
|||
.getSelectionConstraintsMap()
|
||||
.keySet()
|
||||
.forEach(communityId -> {
|
||||
if (conf.getSelectionConstraintsMap().get(communityId).getCriteria() != null &&
|
||||
if (!removeCommunities.contains(communityId) &&
|
||||
conf.getSelectionConstraintsMap().get(communityId).getCriteria() != null &&
|
||||
conf
|
||||
.getSelectionConstraintsMap()
|
||||
.get(communityId)
|
||||
|
@ -175,6 +193,9 @@ public class ResultTagger implements Serializable {
|
|||
});
|
||||
|
||||
communities.addAll(aconstraints);
|
||||
|
||||
communities.removeAll(removeCommunities);
|
||||
|
||||
if (aconstraints.size() > 0)
|
||||
log.info("Found {} for advancedConstraints ", aconstraints.size());
|
||||
|
||||
|
|
|
@ -10,6 +10,9 @@ where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] and $x//con
|
|||
return
|
||||
<community>
|
||||
{ $x//CONFIGURATION/context/@id}
|
||||
<removeConstraints>
|
||||
{$x//CONFIGURATION/context/param[./@name='removeConstraints']/text() }
|
||||
</removeConstraints>
|
||||
<advancedConstraints>
|
||||
{$x//CONFIGURATION/context/param[./@name='advancedConstraints']/text() }
|
||||
</advancedConstraints>
|
||||
|
|
|
@ -39,8 +39,10 @@ public class BulkTagJobTest {
|
|||
+ " \"contributor\" : \"$['contributor'][*]['value']\","
|
||||
+ " \"description\" : \"$['description'][*]['value']\", "
|
||||
+ " \"subject\" :\"$['subject'][*]['value']\" , " +
|
||||
"\"fos\" : \"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"" +
|
||||
"} ";
|
||||
"\"fos\" : \"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"," +
|
||||
"\"sdg\" : \"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"," +
|
||||
"\"hostedby\" : \"$['instance'][*]['hostedby']['key']\" , " +
|
||||
"\"collectedfrom\" : \"$['instance'][*]['collectedfrom']['key']\"} ";
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
|
@ -56,7 +58,7 @@ public class BulkTagJobTest {
|
|||
.toString(
|
||||
BulkTagJobTest.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_dth.xml"));
|
||||
"/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_remove.xml"));
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
@ -1525,4 +1527,45 @@ public class BulkTagJobTest {
|
|||
.count());
|
||||
}
|
||||
|
||||
@Test
|
||||
void removeTest() throws Exception {
|
||||
final String pathMap = BulkTagJobTest.pathMap;
|
||||
SparkBulkTagJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isTest", Boolean.TRUE.toString(),
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-sourcePath",
|
||||
getClass()
|
||||
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints")
|
||||
.getPath(),
|
||||
"-taggingConf", taggingConf,
|
||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||
"-outputPath", workingDir.toString() + "/dataset",
|
||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||
"-pathMap", pathMap
|
||||
});
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Dataset> tmp = sc
|
||||
.textFile(workingDir.toString() + "/dataset")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||
|
||||
Assertions.assertEquals(12, tmp.count());
|
||||
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
|
||||
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
|
||||
|
||||
verificationDataset.createOrReplaceTempView("dataset");
|
||||
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
|
||||
+ "from dataset "
|
||||
+ "lateral view explode(context) c as MyT "
|
||||
+ "lateral view explode(MyT.datainfo) d as MyD "
|
||||
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
||||
|
||||
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
||||
|
||||
Assertions.assertEquals(3, idExplodeCommunity.filter("community = 'dth'").count());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue