forked from D-Net/dnet-hadoop
refactoring
This commit is contained in:
parent
34172455d1
commit
9567c13bc3
|
@ -28,7 +28,7 @@ public class CommunityConfiguration implements Serializable {
|
||||||
private Map<String, SelectionConstraints> selectionConstraintsMap = new HashMap<>();
|
private Map<String, SelectionConstraints> selectionConstraintsMap = new HashMap<>();
|
||||||
// map eosc datasource -> communityid
|
// map eosc datasource -> communityid
|
||||||
private Map<String, List<Pair<String, SelectionConstraints>>> eoscDatasourceMap = new HashMap<>();
|
private Map<String, List<Pair<String, SelectionConstraints>>> eoscDatasourceMap = new HashMap<>();
|
||||||
//map communityid -> remove constraints
|
// map communityid -> remove constraints
|
||||||
private Map<String, SelectionConstraints> removeConstraintsMap = new HashMap<>();
|
private Map<String, SelectionConstraints> removeConstraintsMap = new HashMap<>();
|
||||||
|
|
||||||
public Map<String, List<Pair<String, SelectionConstraints>>> getEoscDatasourceMap() {
|
public Map<String, List<Pair<String, SelectionConstraints>>> getEoscDatasourceMap() {
|
||||||
|
|
|
@ -109,7 +109,7 @@ public class CommunityConfigurationFactory {
|
||||||
return new SelectionConstraints();
|
return new SelectionConstraints();
|
||||||
}
|
}
|
||||||
SelectionConstraints selectionConstraints = new Gson()
|
SelectionConstraints selectionConstraints = new Gson()
|
||||||
.fromJson(constsNode.getText(), SelectionConstraints.class);
|
.fromJson(constsNode.getText(), SelectionConstraints.class);
|
||||||
|
|
||||||
selectionConstraints.setSelection(resolver);
|
selectionConstraints.setSelection(resolver);
|
||||||
log.info("number of selection constraints set " + selectionConstraints.getCriteria().size());
|
log.info("number of selection constraints set " + selectionConstraints.getCriteria().size());
|
||||||
|
|
|
@ -83,18 +83,18 @@ public class ResultTagger implements Serializable {
|
||||||
final Set<String> removeCommunities = new HashSet<>();
|
final Set<String> removeCommunities = new HashSet<>();
|
||||||
|
|
||||||
conf
|
conf
|
||||||
.getRemoveConstraintsMap()
|
.getRemoveConstraintsMap()
|
||||||
.keySet()
|
.keySet()
|
||||||
.forEach(communityId -> {
|
.forEach(communityId -> {
|
||||||
if (conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null &&
|
if (conf.getRemoveConstraintsMap().get(communityId).getCriteria() != null &&
|
||||||
conf
|
conf
|
||||||
.getRemoveConstraintsMap()
|
.getRemoveConstraintsMap()
|
||||||
.get(communityId)
|
.get(communityId)
|
||||||
.getCriteria()
|
.getCriteria()
|
||||||
.stream()
|
.stream()
|
||||||
.anyMatch(crit -> crit.verifyCriteria(param)))
|
.anyMatch(crit -> crit.verifyCriteria(param)))
|
||||||
removeCommunities.add(communityId);
|
removeCommunities.add(communityId);
|
||||||
});
|
});
|
||||||
|
|
||||||
// communities contains all the communities to be added as context for the result
|
// communities contains all the communities to be added as context for the result
|
||||||
final Set<String> communities = new HashSet<>();
|
final Set<String> communities = new HashSet<>();
|
||||||
|
@ -182,7 +182,7 @@ public class ResultTagger implements Serializable {
|
||||||
.keySet()
|
.keySet()
|
||||||
.forEach(communityId -> {
|
.forEach(communityId -> {
|
||||||
if (!removeCommunities.contains(communityId) &&
|
if (!removeCommunities.contains(communityId) &&
|
||||||
conf.getSelectionConstraintsMap().get(communityId).getCriteria() != null &&
|
conf.getSelectionConstraintsMap().get(communityId).getCriteria() != null &&
|
||||||
conf
|
conf
|
||||||
.getSelectionConstraintsMap()
|
.getSelectionConstraintsMap()
|
||||||
.get(communityId)
|
.get(communityId)
|
||||||
|
|
|
@ -40,9 +40,9 @@ public class BulkTagJobTest {
|
||||||
+ " \"description\" : \"$['description'][*]['value']\", "
|
+ " \"description\" : \"$['description'][*]['value']\", "
|
||||||
+ " \"subject\" :\"$['subject'][*]['value']\" , " +
|
+ " \"subject\" :\"$['subject'][*]['value']\" , " +
|
||||||
"\"fos\" : \"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"," +
|
"\"fos\" : \"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"," +
|
||||||
"\"sdg\" : \"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"," +
|
"\"sdg\" : \"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"," +
|
||||||
"\"hostedby\" : \"$['instance'][*]['hostedby']['key']\" , " +
|
"\"hostedby\" : \"$['instance'][*]['hostedby']['key']\" , " +
|
||||||
"\"collectedfrom\" : \"$['instance'][*]['collectedfrom']['key']\"} ";
|
"\"collectedfrom\" : \"$['instance'][*]['collectedfrom']['key']\"} ";
|
||||||
|
|
||||||
private static SparkSession spark;
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
@ -1527,43 +1527,44 @@ public class BulkTagJobTest {
|
||||||
.count());
|
.count());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void removeTest() throws Exception {
|
void removeTest() throws Exception {
|
||||||
final String pathMap = BulkTagJobTest.pathMap;
|
final String pathMap = BulkTagJobTest.pathMap;
|
||||||
SparkBulkTagJob
|
SparkBulkTagJob
|
||||||
.main(
|
.main(
|
||||||
new String[]{
|
new String[] {
|
||||||
"-isTest", Boolean.TRUE.toString(),
|
"-isTest", Boolean.TRUE.toString(),
|
||||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
"-sourcePath",
|
"-sourcePath",
|
||||||
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints").getPath(),
|
getClass()
|
||||||
"-taggingConf", taggingConf,
|
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints")
|
||||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
|
.getPath(),
|
||||||
"-outputPath", workingDir.toString() + "/dataset",
|
"-taggingConf", taggingConf,
|
||||||
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||||
"-pathMap", pathMap
|
"-outputPath", workingDir.toString() + "/dataset",
|
||||||
});
|
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
|
||||||
|
"-pathMap", pathMap
|
||||||
|
});
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
JavaRDD<Dataset> tmp = sc
|
JavaRDD<Dataset> tmp = sc
|
||||||
.textFile(workingDir.toString() + "/dataset")
|
.textFile(workingDir.toString() + "/dataset")
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
Assertions.assertEquals(12, tmp.count());
|
Assertions.assertEquals(12, tmp.count());
|
||||||
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
|
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
|
||||||
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
|
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
|
||||||
|
|
||||||
verificationDataset.createOrReplaceTempView("dataset");
|
verificationDataset.createOrReplaceTempView("dataset");
|
||||||
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
|
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
|
||||||
+ "from dataset "
|
+ "from dataset "
|
||||||
+ "lateral view explode(context) c as MyT "
|
+ "lateral view explode(context) c as MyT "
|
||||||
+ "lateral view explode(MyT.datainfo) d as MyD "
|
+ "lateral view explode(MyT.datainfo) d as MyD "
|
||||||
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
||||||
|
|
||||||
idExplodeCommunity.show(false);
|
idExplodeCommunity.show(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue