forked from D-Net/dnet-hadoop
[Enrichment - result to community through organization] reimplementation of the data preparation step using spark
This commit is contained in:
parent
4d8339614b
commit
a653e1b3ea
|
@ -9,7 +9,9 @@ import java.util.*;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
@ -70,28 +72,33 @@ public class PrepareResultCommunitySet {
|
||||||
String outputPath,
|
String outputPath,
|
||||||
OrganizationMap organizationMap) {
|
OrganizationMap organizationMap) {
|
||||||
|
|
||||||
Dataset<Relation> relation = readPath(spark, inputPath, Relation.class);
|
Dataset<Relation> relationAffiliation = readPath(spark, inputPath, Relation.class)
|
||||||
relation.createOrReplaceTempView("relation");
|
.filter(
|
||||||
|
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
|
||||||
|
r.getRelClass().equalsIgnoreCase(ModelConstants.HAS_AUTHOR_INSTITUTION));
|
||||||
|
|
||||||
String query = "SELECT result_organization.source resultId, result_organization.target orgId, org_set merges "
|
Dataset<Relation> relationOrganization = readPath(spark, inputPath, Relation.class)
|
||||||
+ "FROM (SELECT source, target "
|
.filter(
|
||||||
+ " FROM relation "
|
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
|
||||||
+ " WHERE datainfo.deletedbyinference = false "
|
r.getRelClass().equalsIgnoreCase(ModelConstants.MERGES));
|
||||||
+ " AND lower(relClass) = '"
|
|
||||||
+ ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase()
|
|
||||||
+ "') result_organization "
|
|
||||||
+ "LEFT JOIN (SELECT source, collect_set(target) org_set "
|
|
||||||
+ " FROM relation "
|
|
||||||
+ " WHERE datainfo.deletedbyinference = false "
|
|
||||||
+ " AND lower(relClass) = '"
|
|
||||||
+ ModelConstants.MERGES.toLowerCase()
|
|
||||||
+ "' "
|
|
||||||
+ " GROUP BY source) organization_organization "
|
|
||||||
+ "ON result_organization.target = organization_organization.source ";
|
|
||||||
|
|
||||||
Dataset<ResultOrganizations> result_organizationset = spark
|
Dataset<ResultOrganizations> result_organizationset = relationAffiliation
|
||||||
.sql(query)
|
.joinWith(
|
||||||
.as(Encoders.bean(ResultOrganizations.class));
|
relationOrganization,
|
||||||
|
relationAffiliation.col("target").equalTo(relationOrganization.col("source")),
|
||||||
|
"left")
|
||||||
|
.groupByKey((MapFunction<Tuple2<Relation, Relation>, String>) t2 -> t2._2().getSource(), Encoders.STRING())
|
||||||
|
.mapGroups((MapGroupsFunction<String, Tuple2<Relation, Relation>, ResultOrganizations>) (k, it) -> {
|
||||||
|
ResultOrganizations rOrgs = new ResultOrganizations();
|
||||||
|
rOrgs.setOrgId(k);
|
||||||
|
Tuple2<Relation, Relation> first = it.next();
|
||||||
|
rOrgs.setResultId(first._1().getSource());
|
||||||
|
ArrayList<String> merges = new ArrayList<>();
|
||||||
|
merges.add(first._2().getTarget());
|
||||||
|
it.forEachRemaining(t -> merges.add(t._2().getTarget()));
|
||||||
|
rOrgs.setMerges(merges);
|
||||||
|
return rOrgs;
|
||||||
|
}, Encoders.bean(ResultOrganizations.class));
|
||||||
|
|
||||||
result_organizationset
|
result_organizationset
|
||||||
.map(mapResultCommunityFn(organizationMap), Encoders.bean(ResultCommunityList.class))
|
.map(mapResultCommunityFn(organizationMap), Encoders.bean(ResultCommunityList.class))
|
||||||
|
|
Loading…
Reference in New Issue