package eu.dnetlib.dhp.resulttocommunityfromorganization; /** * @author miriam.baglioni * @Date 16/10/23 */ /** * @author miriam.baglioni * @Date 16/10/23 */ import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.PropagationConstant.OBJECT_MAPPER; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.*; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.*; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import eu.dnetlib.dhp.api.Utils; import eu.dnetlib.dhp.api.model.CommunityEntityMap; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; public class PrepareResultCommunity2 { private static final Logger log = LoggerFactory .getLogger(eu.dnetlib.dhp.resulttocommunityfromorganization.PrepareResultCommunitySet.class); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( eu.dnetlib.dhp.resulttocommunityfromorganization.PrepareResultCommunitySet.class .getResourceAsStream( "/eu/dnetlib/dhp/resulttocommunityfromorganization/input_preparecommunitytoresult_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); Boolean isSparkSessionManaged = isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); final boolean production = Boolean.valueOf(parser.get("production")); log.info("production: {}", production); final CommunityEntityMap organizationMap = Utils.getCommunityOrganization(production); log.info("organizationMap: {}", new Gson().toJson(organizationMap)); SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); prepareInfo(spark, inputPath, outputPath, organizationMap); }); } private static void prepareInfo( SparkSession spark, String inputPath, String outputPath, CommunityEntityMap organizationMap) { final StructType structureSchema = new StructType() .add( "dataInfo", new StructType() .add("deletedbyinference", DataTypes.BooleanType) .add("invisible", DataTypes.BooleanType)) .add("source", DataTypes.StringType) .add("target", DataTypes.StringType) .add("relClass", DataTypes.StringType); readPath(spark, inputPath, Relation.class) .foreach((ForeachFunction) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); Dataset resultOrganization = spark .read() .schema(Encoders.bean(Relation.class).schema()) .json(inputPath) .filter( "dataInfo.deletedbyinference != true " + "and relClass == '" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") .select( new Column("source").as("resultId"), new Column("target").as("organizationId")); resultOrganization .foreach((ForeachFunction) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); Dataset organizationOrganization = spark .read() .schema(structureSchema) .json(inputPath) .filter( "dataInfo.deletedbyinference != true " + "and relClass == '" + ModelConstants.MERGES + "'") .select( new Column("source").as("masterOrganization"), new Column("target").as("duplicateOrganization")); resultOrganization .joinWith( organizationOrganization, resultOrganization .col("organizationId") .equalTo(organizationOrganization.col("masterOrganization")), "left") .groupByKey( (MapFunction, String>) t2 -> (String) t2._1().getAs("resultId"), Encoders.STRING()) .mapGroups((MapGroupsFunction, ResultCommunityList>) (k, v) -> { ResultCommunityList rcl = new ResultCommunityList(); rcl.setResultId(k); ArrayList cl = new ArrayList<>(); Tuple2 first = v.next(); cl.addAll(organizationMap.get(first._1().getAs("organizationId"))); if (Optional.ofNullable(first._2()).isPresent()) { cl.addAll(organizationMap.get(first._2().getAs(("duplicateOrganization")))); } v.forEachRemaining(o -> cl.addAll(organizationMap.get(o._2().getAs("duplicateOrganization")))); if (cl.size() == 0) return null; rcl.setCommunityList(new ArrayList<>(cl.stream().distinct().collect(Collectors.toList()))); return rcl; }, Encoders.bean(ResultCommunityList.class)) .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); // Dataset relation = readPath(spark, inputPath, Relation.class) // .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() ); // // relation.createOrReplaceTempView("relation"); // // String query = "SELECT result_organization.source resultId, result_organization.target orgId, org_set merges " // + "FROM (SELECT source, target " // + " FROM relation " // + " AND lower(relClass) = '" // + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() // + "') result_organization " // + "LEFT JOIN (SELECT source, collect_set(target) org_set " // + " FROM relation " // + " AND lower(relClass) = '" // + ModelConstants.MERGES.toLowerCase() // + "' " // + " GROUP BY source) organization_organization " // + "ON result_organization.target = organization_organization.source "; // // Dataset result_organizationset = spark // .sql(query) // .as(Encoders.bean(ResultOrganizations.class)); // // result_organizationset // .map(mapResultCommunityFn(organizationMap), Encoders.bean(ResultCommunityList.class)) // .filter(Objects::nonNull) // .toJavaRDD() // .mapToPair(value -> new Tuple2<>(value.getResultId(), value)) // .reduceByKey((a, b) -> { // ArrayList cl = a.getCommunityList(); // b.getCommunityList().stream().forEach(s -> { // if (!cl.contains(s)) { // cl.add(s); // } // }); // a.setCommunityList(cl); // return a; // }) // .map(value -> OBJECT_MAPPER.writeValueAsString(value._2())) // .saveAsTextFile(outputPath, GzipCodec.class); } private static MapFunction mapResultCommunityFn( CommunityEntityMap organizationMap) { return value -> { String rId = value.getResultId(); Optional> orgs = Optional.ofNullable(value.getMerges()); String oTarget = value.getOrgId(); Set communitySet = new HashSet<>(); if (organizationMap.containsKey(oTarget)) { communitySet.addAll(organizationMap.get(oTarget)); } if (orgs.isPresent()) for (String oId : orgs.get()) { if (organizationMap.containsKey(oId)) { communitySet.addAll(organizationMap.get(oId)); } } if (!communitySet.isEmpty()) { ResultCommunityList rcl = new ResultCommunityList(); rcl.setResultId(rId); ArrayList communityList = new ArrayList<>(); communityList.addAll(communitySet); rcl.setCommunityList(communityList); return rcl; } return null; }; } }