diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob2.java index de14946ccf..3116161e9e 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob2.java @@ -4,6 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import eu.dnetlib.dhp.TypedRow; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.orcidtoresultfromsemrel.ResultOrcidList; +import eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob3; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; @@ -13,378 +15,114 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; import java.util.*; import java.util.stream.Collectors; import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +public class SparkResultToCommunityFromOrganizationJob2 { + + private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityFromOrganizationJob2.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); -public class SparkResultToCommunityFromOrganization { public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils.toString(SparkResultToCommunityFromOrganizationJob2.class + .getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromorganization/input_communitytoresult_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + jsonConfiguration); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToCommunityFromOrganization.class.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromorganization/input_communitytoresult_parameters.json"))); parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String possibleupdatespath = parser.get("preparedInfoPath"); + log.info("preparedInfoPath: {}", possibleupdatespath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); + log.info("resultType: {}", resultType); + + final Boolean saveGraph = Optional + .ofNullable(parser.get("saveGraph")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("saveGraph: {}", saveGraph); + + Class resultClazz = (Class) Class.forName(resultClassName); + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final SparkSession spark = SparkSession - .builder() - .appName(SparkResultToCommunityFromOrganization.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final String inputPath = parser.get("sourcePath"); - final String outputPath = "/tmp/provision/propagation/communitytoresult"; - final OrganizationMap organizationMap = new Gson().fromJson(parser.get("organizationtoresultcommunitymap"), OrganizationMap.class); - boolean writeUpdates = TRUE.equals(parser.get("writeUpdate")); - boolean saveGraph = TRUE.equals(parser.get("saveGraph")); - System.out.println(new Gson().toJson(organizationMap)); - - createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); - - JavaRDD relations_rdd_all = sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)); - JavaRDD publications_rdd_all = sc.textFile(inputPath + "/publication") - .map(item -> new ObjectMapper().readValue(item, Publication.class)); - JavaRDD dataset_rdd_all = sc.textFile(inputPath + "/dataset") - .map(item -> new ObjectMapper().readValue(item, Dataset.class)); - JavaRDD orp_rdd_all = sc.textFile(inputPath + "/otheresearchproduct") - .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); - JavaRDD software_rdd_all = sc.textFile(inputPath + "/software") - .map(item -> new ObjectMapper().readValue(item, Software.class)); - - org.apache.spark.sql.Dataset relation = spark.createDataset(relations_rdd_all.rdd(), Encoders.bean(Relation.class)); - - relation.createOrReplaceTempView("relation"); -// String query = "SELECT source, target" + -// " FROM relation " + -// " WHERE datainfo.deletedbyinference = false " + -// " AND relClass = '" + RELATION_RESULT_ORGANIZATION_REL_CLASS + "'"; -// -// org.apache.spark.sql.Dataset result_organization = spark.sql(query); -// result_organization.createOrReplaceTempView("result_organization"); -// -// query = "SELECT source, target " + -// "FROM relation " + -// "WHERE datainfo.deletedbyinference = false " + -// "AND relClass = '" + RELATION_REPRESENTATIVERESULT_RESULT_CLASS + "'" ; - - String query = "SELECT result_organization.source, result_organization.target, org_set " + - "FROM (SELECT source, target " + - " FROM relation " + - " WHERE datainfo.deletedbyinference = false " + - " AND relClass = '" + RELATION_RESULT_ORGANIZATION_REL_CLASS + "') result_organization " + - "LEFT JOIN (SELECT source, collect_set(target) org_set " + - " FROM relation " + - " WHERE datainfo.deletedbyinference = false " + - " AND relClass = '" + RELATION_REPRESENTATIVERESULT_RESULT_CLASS + "' " + - " GROUP BY source) organization_organization " + - "ON result_organization.target = organization_organization.source "; - - org.apache.spark.sql.Dataset result_organizationset = spark.sql(query); - - JavaPairRDD result_communitySet = result_organizationset.toJavaRDD().map(r -> { - String rId = r.getString(0); - List orgs = r.getList(2); - String oTarget = r.getString(1); - TypedRow tp = new TypedRow(); - if (organizationMap.containsKey(oTarget)) { - tp.addAll(organizationMap.get(oTarget)); - } - try{ - for (String oId : orgs) { - if (organizationMap.containsKey(oId)) { - tp.addAll(organizationMap.get(oId)); + runWithSparkHiveSession(conf, isSparkSessionManaged, + spark -> { + if (isTest(parser)) { + removeOutputDir(spark, outputPath); } - } - } - catch(Exception e){ - //System.out.println("organizationTargetID: " + oTarget); - } - - if(tp.getAccumulator() == null ){ - return null; - } - tp.setSourceId(rId); - - - return tp; - }) - .filter(tr -> !(tr==null)) - .mapToPair(toPair()).cache(); - - if(writeUpdates){ - result_communitySet.map(c->new ObjectMapper().writeValueAsString(c)).saveAsTextFile(outputPath +"/result_communityset"); - } - - if(saveGraph){ - updatePublicationResult(publications_rdd_all, result_communitySet) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/publication"); - - updateDatasetResult(dataset_rdd_all, result_communitySet) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/dataset"); - - updateORPResult(orp_rdd_all, result_communitySet) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/otherresearchproduct"); - - updateSoftwareResult(software_rdd_all, result_communitySet) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/software"); - } - - - //relations between organziations and results -// JavaPairRDD organization_result = relations -// .filter(r -> !r.getDataInfo().getDeletedbyinference()) -// .filter(r -> RELATION_RESULT_ORGANIZATION_REL_CLASS.equals(r.getRelClass()) -// && RELATION_RESULTORGANIZATION_REL_TYPE.equals(r.getRelType())) -// .map(r -> { -// TypedRow tp = new TypedRow(); -// tp.setSourceId(r.getTarget()); -// tp.setTargetId(r.getSource() ); -// return tp; -// }) -// .mapToPair(toPair()); - - //relations between representative organization and merged Id. One relation per merged organization -// JavaPairRDD organization_organization = relations -// .filter(r -> !r.getDataInfo().getDeletedbyinference()) -// .filter( r -> RELATION_ORGANIZATIONORGANIZATION_REL_TYPE.equals(r.getRelType()) -// && RELATION_REPRESENTATIVERESULT_RESULT_CLASS.equals(r.getRelClass())) -// .map(r -> { -// TypedRow tp = new TypedRow(); -// tp.setSourceId(r.getSource()); -// tp.setTargetId(r.getTarget()); -// return tp; -// }) -// .mapToPair(toPair()); - - //get the original id of the organizations to be checked against the id associated to the communities -// JavaPairRDD organizationoriginal_result = organization_result.leftOuterJoin(organization_organization) -// .map(c -> { -// if (!c._2()._2().isPresent()) -// return c._2()._1(); -// return c._2()._1().setSourceId(c._2()._2().get().getTargetId()); -// }) -// .mapToPair(toPair()); - - //associates to each result connected to an organization the list of communities related to that organization -// JavaPairRDD to_add_result_communities = organizationoriginal_result.map(o -> { -// List communityList = organizationMap.get(o._1()); -// if (communityList.size() == 0) -// return null; -// TypedRow tp = o._2(); -// tp.setAccumulator(new HashSet<>(communityList)); -// tp.setSourceId(tp.getTargetId()); -// return tp; -// }) -// .filter(r -> !(r == null)) -// .mapToPair(toPair()) -// .reduceByKey((a, b) -> { -// if (a == null) -// return b; -// if (b == null) -// return a; -// a.addAll(b.getAccumulator()); -// return a; -// }); - - -// JavaRDD publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) -// .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)).cache(); -// JavaRDD datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) -// .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)).cache(); -// JavaRDD software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class) -// .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)).cache(); -// JavaRDD other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) -// .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)).cache(); -// -// JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); -// JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); -// JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); -// JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); -// -// updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); -// updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); -// updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); -// updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); + execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath); + }); } - private static JavaRDD updatePublicationResult(JavaRDD result, JavaPairRDD result_communitySet){ - JavaRDD tmp = result.map(r -> (Result)r); - return updateResultDataset(tmp, result_communitySet).map(r -> (Publication)r); + private static void execPropagation(SparkSession spark, String inputPath, String outputPath, + Class resultClazz, String possibleUpdatesPath) { + org.apache.spark.sql.Dataset possibleUpdates = readResultCommunityList(spark, possibleUpdatesPath); + org.apache.spark.sql.Dataset result = readPathEntity(spark, inputPath, resultClazz); - } - private static JavaRDD updateDatasetResult(JavaRDD result, JavaPairRDD result_communitySet){ - JavaRDD tmp = result.map(r -> (Result)r); - return updateResultDataset(tmp, result_communitySet).map(r -> (Dataset) r); - } - private static JavaRDD updateORPResult(JavaRDD result, JavaPairRDD result_communitySet){ - JavaRDD tmp = result.map(r -> (Result)r); - return updateResultDataset(tmp, result_communitySet).map(r -> (OtherResearchProduct)r); - - } - private static JavaRDD updateSoftwareResult(JavaRDD result, JavaPairRDD result_communitySet){ - JavaRDD tmp = result.map(r -> (Result)r); - return updateResultDataset(tmp, result_communitySet).map(r -> (Software) r); - - } - private static JavaRDD updateResultDataset(JavaRDD result, JavaPairRDD result_communitySet){ - return result.mapToPair(p -> new Tuple2<>(p.getId(),p)).leftOuterJoin(result_communitySet) - .map(c -> { - Result r = c._2()._1(); - if(c._2()._2().isPresent()){ - Set communitySet = c._2()._2().get().getAccumulator(); - List contextList = r.getContext().stream().map(con -> con.getId()).collect(Collectors.toList()); + result + .joinWith(possibleUpdates, result.col("id").equalTo(possibleUpdates.col("resultId")), + "left_outer") + .map(value -> { + R ret = value._1(); + Optional rcl = Optional.ofNullable(value._2()); + if(rcl.isPresent()){ + ArrayList communitySet = rcl.get().getCommunityList(); + List contextList = ret.getContext().stream().map(con -> con.getId()).collect(Collectors.toList()); for(String cId:communitySet){ if(!contextList.contains(cId)){ Context newContext = new Context(); newContext.setId(cId); newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME))); - r.getContext().add(newContext); + ret.getContext().add(newContext); } } } - return r; - }); + return ret; + }, Encoders.bean(resultClazz)) + .toJSON() + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .text(outputPath); } - // private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { -// results.leftOuterJoin(toupdateresult) -// .map(p -> { -// Result r = p._2()._1(); -// if (p._2()._2().isPresent()){ -// Set communityList = p._2()._2().get().getAccumulator(); -// for(Context c: r.getContext()){ -// if (communityList.contains(c.getId())){ -// //verify if the datainfo for this context contains propagation -// if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ -// c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME)); -// //community id already in the context of the result. Remove it from the set that has to be added -// communityList.remove(c.getId()); -// } -// } -// } -// List cc = r.getContext(); -// for(String cId: communityList){ -// Context context = new Context(); -// context.setId(cId); -// context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME))); -// cc.add(context); -// } -// r.setContext(cc); -// } -// return r; -// }) -// .map(p -> new ObjectMapper().writeValueAsString(p)) -// .saveAsTextFile(outputPath+"/"+type); -// } - -} - - -/* -package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization; - -import com.google.gson.Gson; -import eu.dnetlib.data.mapreduce.hbase.propagation.Value; -import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; -import eu.dnetlib.data.proto.OafProtos; -import eu.dnetlib.data.proto.TypeProtos; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableMapper; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Text; - -import java.io.IOException; -import java.util.Set; - -import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; -import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity; -import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getRelationTarget; - -public class PropagationCommunityThroughOrganizationMapper extends TableMapper { - private Text valueOut; - private ImmutableBytesWritable keyOut; - private OrganizationMap organizationMap; - - //seleziono il tipo della entry: - //Result: - //se non e' deleted by inference ed ha organizzazioni a cui e' associato, - // //emetto id della relazione ed id del risultato per ogni organizzazione a cui e' associato - //ORGANIZATION: - //se non e' deleted by inference e non e' deduplicata emetto l'id della organizzazione - //se non e' deleted by inference ed e' deduplicata, emetto id della organizzazione ed id del deduplicato e lista delle organizzazioni mergiate - @Override - protected void setup(final Context context) throws IOException, InterruptedException { - super.setup(context); - valueOut = new Text(); - keyOut = new ImmutableBytesWritable(); - organizationMap = new Gson().fromJson(context.getConfiguration().get("organizationtoresult.community.map"), OrganizationMap.class); - System.out.println("got organizationtoresult map: " + new Gson().toJson(organizationMap)) ; - } - - @Override - protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { - final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType(); - final OafProtos.OafEntity entity = getEntity(value, type);//getEntity already verified that it is not delByInference - if (entity != null) { - switch (type) { - case organization: - DedupedList communityList = getCommunityList(Bytes.toString(keyIn.get()), - getRelationTarget(value, DEDUP_RELATION_ORGANIZATION + REL_DEDUP_REPRESENTATIVE_RESULT, context, COUNTER_PROPAGATION), context); - if (communityList.size() > 0){ - valueOut.set(Value.newInstance( - new Gson().toJson( - communityList, //search for organizationtoresult it merges - DedupedList.class), - ORGANIZATION_COMMUNITY_TRUST, - Type.fromorganization).toJson()); - context.write(keyIn, valueOut); - context.getCounter(COUNTER_PROPAGATION, "emit for organizationtoresult ").increment(1); - }else{ - context.getCounter(COUNTER_PROPAGATION, "community list size = 0 ").increment(1); - } - - break; - case result: - Set result_organization = getRelationTarget(value, RELATION_ORGANIZATION + REL_RESULT_ORGANIZATION, context, COUNTER_PROPAGATION); - for(String org: result_organization) - emit(org, Bytes.toString(keyIn.get()), context); - break; - } - } - } - - private DedupedList getCommunityList(String organizationId, Set relationTarget, Context context) { - DedupedList communityList = new DedupedList(); - relationTarget.stream().forEach(org -> communityList.addAll(organizationMap.get(StringUtils.substringAfter(org, "|")))); - communityList.addAll(organizationMap.get(StringUtils.substringAfter(organizationId,"|"))); - communityList.stream().forEach(c->context.getCounter(COUNTER_PROPAGATION,"found organizationtoresult for " + c).increment(1)); - return communityList; - } - - private void emit(String org, String resId, Context context) throws IOException, InterruptedException { - keyOut.set(Bytes.toBytes(org)); - valueOut.set(Value.newInstance(resId, ORGANIZATION_COMMUNITY_TRUST, Type.fromresult).toJson()); - context.write(keyOut,valueOut); - context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1); + private static org.apache.spark.sql.Dataset readResultCommunityList(SparkSession spark, String possibleUpdatesPath) { + return spark + .read() + .textFile(possibleUpdatesPath) + .map(value -> OBJECT_MAPPER.readValue(value, ResultCommunityList.class), Encoders.bean(ResultCommunityList.class)); } } - */ \ No newline at end of file