update for new propagation implementation and moving of updateResult for community business logic since the same can be used for result to community from organization and result to community from semrel

This commit is contained in:
Miriam Baglioni 2020-03-02 16:40:17 +01:00
parent 3d63f35dcb
commit 95f8c3092f
1 changed files with 47 additions and 5 deletions

View File

@ -8,10 +8,8 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2; import scala.Tuple2;
import java.util.ArrayList; import java.util.*;
import java.util.HashSet; import java.util.stream.Collectors;
import java.util.List;
import java.util.Set;
public class PropagationConstant { public class PropagationConstant {
public static final String INSTITUTIONAL_REPO_TYPE = "pubsrepository::institutional"; public static final String INSTITUTIONAL_REPO_TYPE = "pubsrepository::institutional";
@ -32,9 +30,12 @@ public class PropagationConstant {
public final static String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID = "result:project:semrel"; public final static String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID = "result:project:semrel";
public final static String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME = "Propagation of result to project through semantic relation"; public final static String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME = "Propagation of result to project through semantic relation";
public final static String PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID = "propagation:community:productsthroughsemrel"; public final static String PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID = "result:community:semrel";
public final static String PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME = " Propagation of result belonging to community through semantic relation"; public final static String PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME = " Propagation of result belonging to community through semantic relation";
public final static String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID = "result:community:organization";
public final static String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME = " Propagation of result belonging to community through organization";
public final static String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "propagation:orcid:result"; public final static String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "propagation:orcid:result";
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of ORCID through result linked by isSupplementedBy or isSupplementTo semantic relations"; public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of ORCID through result linked by isSupplementedBy or isSupplementTo semantic relations";
@ -56,6 +57,15 @@ public class PropagationConstant {
public static final String RELATION_RESULT_PROJECT_REL_CLASS = "isProducedBy"; public static final String RELATION_RESULT_PROJECT_REL_CLASS = "isProducedBy";
public static final String RELATION_PROJECT_RESULT_REL_CLASS = "produces"; public static final String RELATION_PROJECT_RESULT_REL_CLASS = "produces";
public static final String RELATION_RESULT_REPRESENTATIVERESULT_REL_CLASS = "isMergedIn";
public static final String RELATION_REPRESENTATIVERESULT_RESULT_CLASS = "merges";
public static final String RELATION_ORGANIZATIONORGANIZATION_REL_TYPE = "organizationOrganization";
public static final String RELATION_DEDUPORGANIZATION_SUBREL_TYPE = "dedup";
public static final String PROPAGATION_AUTHOR_PID = "ORCID"; public static final String PROPAGATION_AUTHOR_PID = "ORCID";
public static Country getCountry(String country){ public static Country getCountry(String country){
@ -146,4 +156,36 @@ public class PropagationConstant {
} }
return lst; return lst;
} }
public static void updateResultForCommunity(JavaPairRDD<String, Result> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type, String class_id, String class_name) {
results.leftOuterJoin(toupdateresult)
.map(p -> {
Result r = p._2()._1();
if (p._2()._2().isPresent()){
Set<String> communityList = p._2()._2().get().getAccumulator();
for(Context c: r.getContext()){
if (communityList.contains(c.getId())){
//verify if the datainfo for this context contains propagation
if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){
c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name));
//community id already in the context of the result. Remove it from the set that has to be added
communityList.remove(c.getId());
}
}
}
List<Context> cc = r.getContext();
for(String cId: communityList){
Context context = new Context();
context.setId(cId);
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
cc.add(context);
}
r.setContext(cc);
}
return r;
})
.map(p -> new ObjectMapper().writeValueAsString(p))
.saveAsTextFile(outputPath+"/"+type);
}
} }