diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunity2.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunity2.java deleted file mode 100644 index 96523c502..000000000 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunity2.java +++ /dev/null @@ -1,225 +0,0 @@ - -package eu.dnetlib.dhp.resulttocommunityfromorganization; - -/** - * @author miriam.baglioni - * @Date 16/10/23 - */ -/** - * @author miriam.baglioni - * @Date 16/10/23 - */ -import static eu.dnetlib.dhp.PropagationConstant.*; -import static eu.dnetlib.dhp.PropagationConstant.OBJECT_MAPPER; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.*; -import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.ObjectWritable; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.*; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.*; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.gson.Gson; - -import eu.dnetlib.dhp.api.Utils; -import eu.dnetlib.dhp.api.model.CommunityEntityMap; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Relation; -import scala.Tuple2; - -public class PrepareResultCommunity2 { - - private static final Logger log = LoggerFactory - .getLogger(eu.dnetlib.dhp.resulttocommunityfromorganization.PrepareResultCommunitySet.class); - - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - eu.dnetlib.dhp.resulttocommunityfromorganization.PrepareResultCommunitySet.class - .getResourceAsStream( - "/eu/dnetlib/dhp/resulttocommunityfromorganization/input_preparecommunitytoresult_parameters.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = isSparkSessionManaged(parser); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - final boolean production = Boolean.valueOf(parser.get("production")); - log.info("production: {}", production); - - final CommunityEntityMap organizationMap = Utils.getCommunityOrganization(production); - log.info("organizationMap: {}", new Gson().toJson(organizationMap)); - - SparkConf conf = new SparkConf(); - - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - removeOutputDir(spark, outputPath); - prepareInfo(spark, inputPath, outputPath, organizationMap); - }); - } - - private static void prepareInfo( - SparkSession spark, - String inputPath, - String outputPath, - CommunityEntityMap organizationMap) { - - final StructType structureSchema = new StructType() - .add( - "dataInfo", new StructType() - .add("deletedbyinference", DataTypes.BooleanType) - .add("invisible", DataTypes.BooleanType)) - .add("source", DataTypes.StringType) - .add("target", DataTypes.StringType) - .add("relClass", DataTypes.StringType); - - readPath(spark, inputPath, Relation.class) - .foreach((ForeachFunction) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); - - Dataset resultOrganization = spark - .read() - .schema(Encoders.bean(Relation.class).schema()) - .json(inputPath) - .filter( - "dataInfo.deletedbyinference != true " + - "and relClass == '" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") - .select( - new Column("source").as("resultId"), - new Column("target").as("organizationId")); - - resultOrganization - .foreach((ForeachFunction) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); - - Dataset organizationOrganization = spark - .read() - .schema(structureSchema) - .json(inputPath) - .filter( - "dataInfo.deletedbyinference != true " + - "and relClass == '" + ModelConstants.MERGES + "'") - .select( - new Column("source").as("masterOrganization"), - new Column("target").as("duplicateOrganization")); - - resultOrganization - .joinWith( - organizationOrganization, resultOrganization - .col("organizationId") - .equalTo(organizationOrganization.col("masterOrganization")), - "left") - .groupByKey( - (MapFunction, String>) t2 -> (String) t2._1().getAs("resultId"), Encoders.STRING()) - .mapGroups((MapGroupsFunction, ResultCommunityList>) (k, v) -> { - ResultCommunityList rcl = new ResultCommunityList(); - rcl.setResultId(k); - ArrayList cl = new ArrayList<>(); - Tuple2 first = v.next(); - cl.addAll(organizationMap.get(first._1().getAs("organizationId"))); - if (Optional.ofNullable(first._2()).isPresent()) { - cl.addAll(organizationMap.get(first._2().getAs(("duplicateOrganization")))); - } - v.forEachRemaining(o -> cl.addAll(organizationMap.get(o._2().getAs("duplicateOrganization")))); - if (cl.size() == 0) - return null; - rcl.setCommunityList(new ArrayList<>(cl.stream().distinct().collect(Collectors.toList()))); - return rcl; - }, Encoders.bean(ResultCommunityList.class)) - .filter(Objects::nonNull) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); - -// Dataset relation = readPath(spark, inputPath, Relation.class) -// .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() ); -// -// relation.createOrReplaceTempView("relation"); -// -// String query = "SELECT result_organization.source resultId, result_organization.target orgId, org_set merges " -// + "FROM (SELECT source, target " -// + " FROM relation " -// + " AND lower(relClass) = '" -// + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() -// + "') result_organization " -// + "LEFT JOIN (SELECT source, collect_set(target) org_set " -// + " FROM relation " -// + " AND lower(relClass) = '" -// + ModelConstants.MERGES.toLowerCase() -// + "' " -// + " GROUP BY source) organization_organization " -// + "ON result_organization.target = organization_organization.source "; -// -// Dataset result_organizationset = spark -// .sql(query) -// .as(Encoders.bean(ResultOrganizations.class)); -// -// result_organizationset -// .map(mapResultCommunityFn(organizationMap), Encoders.bean(ResultCommunityList.class)) -// .filter(Objects::nonNull) -// .toJavaRDD() -// .mapToPair(value -> new Tuple2<>(value.getResultId(), value)) -// .reduceByKey((a, b) -> { -// ArrayList cl = a.getCommunityList(); -// b.getCommunityList().stream().forEach(s -> { -// if (!cl.contains(s)) { -// cl.add(s); -// } -// }); -// a.setCommunityList(cl); -// return a; -// }) -// .map(value -> OBJECT_MAPPER.writeValueAsString(value._2())) -// .saveAsTextFile(outputPath, GzipCodec.class); - } - - private static MapFunction mapResultCommunityFn( - CommunityEntityMap organizationMap) { - return value -> { - String rId = value.getResultId(); - Optional> orgs = Optional.ofNullable(value.getMerges()); - String oTarget = value.getOrgId(); - Set communitySet = new HashSet<>(); - if (organizationMap.containsKey(oTarget)) { - communitySet.addAll(organizationMap.get(oTarget)); - } - if (orgs.isPresent()) - for (String oId : orgs.get()) { - if (organizationMap.containsKey(oId)) { - communitySet.addAll(organizationMap.get(oId)); - } - } - if (!communitySet.isEmpty()) { - ResultCommunityList rcl = new ResultCommunityList(); - rcl.setResultId(rId); - ArrayList communityList = new ArrayList<>(); - communityList.addAll(communitySet); - rcl.setCommunityList(communityList); - return rcl; - } - return null; - }; - } -}