1
0
Fork 0

moved the version as it was before the change

This commit is contained in:
Miriam Baglioni 2023-05-09 10:48:34 +02:00
parent 99ac5bab46
commit 8c05f49665
1 changed files with 20 additions and 25 deletions

View File

@ -72,33 +72,28 @@ public class PrepareResultCommunitySet {
String outputPath,
OrganizationMap organizationMap) {
Dataset<Relation> relationAffiliation = readPath(spark, inputPath, Relation.class)
.filter(
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
r.getRelClass().equalsIgnoreCase(ModelConstants.HAS_AUTHOR_INSTITUTION));
Dataset<Relation> relation = readPath(spark, inputPath, Relation.class);
relation.createOrReplaceTempView("relation");
Dataset<Relation> relationOrganization = readPath(spark, inputPath, Relation.class)
.filter(
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
r.getRelClass().equalsIgnoreCase(ModelConstants.MERGES));
String query = "SELECT result_organization.source resultId, result_organization.target orgId, org_set merges "
+ "FROM (SELECT source, target "
+ " FROM relation "
+ " WHERE datainfo.deletedbyinference = false "
+ " AND lower(relClass) = '"
+ ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase()
+ "') result_organization "
+ "LEFT JOIN (SELECT source, collect_set(target) org_set "
+ " FROM relation "
+ " WHERE datainfo.deletedbyinference = false "
+ " AND lower(relClass) = '"
+ ModelConstants.MERGES.toLowerCase()
+ "' "
+ " GROUP BY source) organization_organization "
+ "ON result_organization.target = organization_organization.source ";
Dataset<ResultOrganizations> result_organizationset = relationAffiliation
.joinWith(
relationOrganization,
relationAffiliation.col("target").equalTo(relationOrganization.col("source")),
"left")
.groupByKey((MapFunction<Tuple2<Relation, Relation>, String>) t2 -> t2._2().getSource(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<Relation, Relation>, ResultOrganizations>) (k, it) -> {
ResultOrganizations rOrgs = new ResultOrganizations();
rOrgs.setOrgId(k);
Tuple2<Relation, Relation> first = it.next();
rOrgs.setResultId(first._1().getSource());
ArrayList<String> merges = new ArrayList<>();
merges.add(first._2().getTarget());
it.forEachRemaining(t -> merges.add(t._2().getTarget()));
rOrgs.setMerges(merges);
return rOrgs;
}, Encoders.bean(ResultOrganizations.class));
Dataset<ResultOrganizations> result_organizationset = spark
.sql(query)
.as(Encoders.bean(ResultOrganizations.class));
result_organizationset
.map(mapResultCommunityFn(organizationMap), Encoders.bean(ResultCommunityList.class))