From 6afbd542ca2464471e5b5470d9cb7a844b893901 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 16 Apr 2020 18:40:14 +0200 Subject: [PATCH] changed the save mode to avoid NegativeArraySize... error. Needed to modify also the preparationstep2 --- .../PrepareResultOrcidAssociationStep1.java | 22 +- .../PrepareResultOrcidAssociationStep2.java | 7 +- .../PrepareResultCommunitySet.java | 4 + .../ResultCommunityList.java | 4 + .../ResultOrganizations.java | 4 + ...ResultToCommunityFromOrganizationJob2.java | 390 ++++++++++++++++++ .../OrcidPropagationJobTest.java | 4 + 7 files changed, 424 insertions(+), 11 deletions(-) create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultCommunityList.java create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultOrganizations.java create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob2.java create mode 100644 dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/OrcidPropagationJobTest.java diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java index 4af652ef0d..8f4ecb6495 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java @@ -6,6 +6,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -47,7 +48,7 @@ public class PrepareResultOrcidAssociationStep1 { final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); - final List allowedsemrel = Arrays.asList(parser.get("allowedsemrel").split(";")); + final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel)); final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); @@ -76,7 +77,7 @@ public class PrepareResultOrcidAssociationStep1 { //read the relation table and the table related to the result it is using final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); org.apache.spark.sql.Dataset relation = spark.createDataset(sc.textFile(inputPath + "/relation") - .map(item -> new ObjectMapper().readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)).rdd(), Encoders.bean(Relation.class)); relation.createOrReplaceTempView("relation"); log.info("Reading Graph table from: {}", inputPath + "/" + resultType); @@ -84,7 +85,7 @@ public class PrepareResultOrcidAssociationStep1 { result.createOrReplaceTempView("result"); - getPossibleResultOrcidAssociation(spark, allowedsemrel, outputPath); + getPossibleResultOrcidAssociation(spark, allowedsemrel, outputPath + "/" + resultType); } @@ -107,12 +108,15 @@ public class PrepareResultOrcidAssociationStep1 { spark.sql(query) .as(Encoders.bean(ResultOrcidList.class)) - .toJSON() - .write() - .mode(SaveMode.Append) - .option("compression","gzip") - .text(outputPath) - ; + .toJavaRDD() + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(outputPath, GzipCodec.class); +// .toJSON() +// .write() +// .mode(SaveMode.Append) +// .option("compression","gzip") +// .text(outputPath) +// ; } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java index 660d0b87a9..4ed911c424 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java @@ -21,7 +21,7 @@ public class PrepareResultOrcidAssociationStep2 { public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils.toString(SparkOrcidToResultFromSemRelJob3.class - .getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult2_parameters.json")); + .getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters2.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser( jsonConfiguration); @@ -50,7 +50,10 @@ public class PrepareResultOrcidAssociationStep2 { private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) { - Dataset resultOrcidAssoc = readAssocResultOrcidList(spark, inputPath); + Dataset resultOrcidAssoc = readAssocResultOrcidList(spark, inputPath + "/publication") + .union(readAssocResultOrcidList(spark, inputPath + "/dataset")) + .union(readAssocResultOrcidList(spark, inputPath + "/otherresearchproduct")) + .union(readAssocResultOrcidList(spark, inputPath + "/software")); resultOrcidAssoc .toJavaRDD() diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java new file mode 100644 index 0000000000..255ef73502 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.resulttocommunityfromorganization; + +public class PrepareInfo { +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultCommunityList.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultCommunityList.java new file mode 100644 index 0000000000..82a93662da --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultCommunityList.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.resulttocommunityfromorganization; + +public class ResultCommunityList { +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultOrganizations.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultOrganizations.java new file mode 100644 index 0000000000..b3ddcc3a47 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultOrganizations.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.resulttocommunityfromorganization; + +public class ResultOrganizations { +} diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob2.java new file mode 100644 index 0000000000..de14946ccf --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob2.java @@ -0,0 +1,390 @@ +package eu.dnetlib.dhp.resulttocommunityfromorganization; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import eu.dnetlib.dhp.TypedRow; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.util.*; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.PropagationConstant.*; + +public class SparkResultToCommunityFromOrganization { + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToCommunityFromOrganization.class.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromorganization/input_communitytoresult_parameters.json"))); + parser.parseArgument(args); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + final SparkSession spark = SparkSession + .builder() + .appName(SparkResultToCommunityFromOrganization.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String outputPath = "/tmp/provision/propagation/communitytoresult"; + final OrganizationMap organizationMap = new Gson().fromJson(parser.get("organizationtoresultcommunitymap"), OrganizationMap.class); + boolean writeUpdates = TRUE.equals(parser.get("writeUpdate")); + boolean saveGraph = TRUE.equals(parser.get("saveGraph")); + + System.out.println(new Gson().toJson(organizationMap)); + + createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration())); + + JavaRDD relations_rdd_all = sc.textFile(inputPath + "/relation") + .map(item -> new ObjectMapper().readValue(item, Relation.class)); + JavaRDD publications_rdd_all = sc.textFile(inputPath + "/publication") + .map(item -> new ObjectMapper().readValue(item, Publication.class)); + JavaRDD dataset_rdd_all = sc.textFile(inputPath + "/dataset") + .map(item -> new ObjectMapper().readValue(item, Dataset.class)); + JavaRDD orp_rdd_all = sc.textFile(inputPath + "/otheresearchproduct") + .map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class)); + JavaRDD software_rdd_all = sc.textFile(inputPath + "/software") + .map(item -> new ObjectMapper().readValue(item, Software.class)); + + org.apache.spark.sql.Dataset relation = spark.createDataset(relations_rdd_all.rdd(), Encoders.bean(Relation.class)); + + relation.createOrReplaceTempView("relation"); +// String query = "SELECT source, target" + +// " FROM relation " + +// " WHERE datainfo.deletedbyinference = false " + +// " AND relClass = '" + RELATION_RESULT_ORGANIZATION_REL_CLASS + "'"; +// +// org.apache.spark.sql.Dataset result_organization = spark.sql(query); +// result_organization.createOrReplaceTempView("result_organization"); +// +// query = "SELECT source, target " + +// "FROM relation " + +// "WHERE datainfo.deletedbyinference = false " + +// "AND relClass = '" + RELATION_REPRESENTATIVERESULT_RESULT_CLASS + "'" ; + + String query = "SELECT result_organization.source, result_organization.target, org_set " + + "FROM (SELECT source, target " + + " FROM relation " + + " WHERE datainfo.deletedbyinference = false " + + " AND relClass = '" + RELATION_RESULT_ORGANIZATION_REL_CLASS + "') result_organization " + + "LEFT JOIN (SELECT source, collect_set(target) org_set " + + " FROM relation " + + " WHERE datainfo.deletedbyinference = false " + + " AND relClass = '" + RELATION_REPRESENTATIVERESULT_RESULT_CLASS + "' " + + " GROUP BY source) organization_organization " + + "ON result_organization.target = organization_organization.source "; + + org.apache.spark.sql.Dataset result_organizationset = spark.sql(query); + + JavaPairRDD result_communitySet = result_organizationset.toJavaRDD().map(r -> { + String rId = r.getString(0); + List orgs = r.getList(2); + String oTarget = r.getString(1); + TypedRow tp = new TypedRow(); + if (organizationMap.containsKey(oTarget)) { + tp.addAll(organizationMap.get(oTarget)); + } + try{ + for (String oId : orgs) { + if (organizationMap.containsKey(oId)) { + tp.addAll(organizationMap.get(oId)); + } + } + } + catch(Exception e){ + //System.out.println("organizationTargetID: " + oTarget); + } + + if(tp.getAccumulator() == null ){ + return null; + } + tp.setSourceId(rId); + + + return tp; + }) + .filter(tr -> !(tr==null)) + .mapToPair(toPair()).cache(); + + if(writeUpdates){ + result_communitySet.map(c->new ObjectMapper().writeValueAsString(c)).saveAsTextFile(outputPath +"/result_communityset"); + } + + if(saveGraph){ + updatePublicationResult(publications_rdd_all, result_communitySet) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/publication"); + + updateDatasetResult(dataset_rdd_all, result_communitySet) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/dataset"); + + updateORPResult(orp_rdd_all, result_communitySet) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/otherresearchproduct"); + + updateSoftwareResult(software_rdd_all, result_communitySet) + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath + "/software"); + } + + + //relations between organziations and results +// JavaPairRDD organization_result = relations +// .filter(r -> !r.getDataInfo().getDeletedbyinference()) +// .filter(r -> RELATION_RESULT_ORGANIZATION_REL_CLASS.equals(r.getRelClass()) +// && RELATION_RESULTORGANIZATION_REL_TYPE.equals(r.getRelType())) +// .map(r -> { +// TypedRow tp = new TypedRow(); +// tp.setSourceId(r.getTarget()); +// tp.setTargetId(r.getSource() ); +// return tp; +// }) +// .mapToPair(toPair()); + + //relations between representative organization and merged Id. One relation per merged organization +// JavaPairRDD organization_organization = relations +// .filter(r -> !r.getDataInfo().getDeletedbyinference()) +// .filter( r -> RELATION_ORGANIZATIONORGANIZATION_REL_TYPE.equals(r.getRelType()) +// && RELATION_REPRESENTATIVERESULT_RESULT_CLASS.equals(r.getRelClass())) +// .map(r -> { +// TypedRow tp = new TypedRow(); +// tp.setSourceId(r.getSource()); +// tp.setTargetId(r.getTarget()); +// return tp; +// }) +// .mapToPair(toPair()); + + //get the original id of the organizations to be checked against the id associated to the communities +// JavaPairRDD organizationoriginal_result = organization_result.leftOuterJoin(organization_organization) +// .map(c -> { +// if (!c._2()._2().isPresent()) +// return c._2()._1(); +// return c._2()._1().setSourceId(c._2()._2().get().getTargetId()); +// }) +// .mapToPair(toPair()); + + //associates to each result connected to an organization the list of communities related to that organization +// JavaPairRDD to_add_result_communities = organizationoriginal_result.map(o -> { +// List communityList = organizationMap.get(o._1()); +// if (communityList.size() == 0) +// return null; +// TypedRow tp = o._2(); +// tp.setAccumulator(new HashSet<>(communityList)); +// tp.setSourceId(tp.getTargetId()); +// return tp; +// }) +// .filter(r -> !(r == null)) +// .mapToPair(toPair()) +// .reduceByKey((a, b) -> { +// if (a == null) +// return b; +// if (b == null) +// return a; +// a.addAll(b.getAccumulator()); +// return a; +// }); + + +// JavaRDD publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) +// .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)).cache(); +// JavaRDD datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) +// .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)).cache(); +// JavaRDD software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class) +// .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)).cache(); +// JavaRDD other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) +// .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)).cache(); +// +// JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); +// JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); +// JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); +// JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); +// +// updateResultForCommunity(pubs, to_add_result_communities, outputPath, "publication", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); +// updateResultForCommunity(dss, to_add_result_communities, outputPath, "dataset", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); +// updateResultForCommunity(sfw, to_add_result_communities, outputPath, "software", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); +// updateResultForCommunity(orp, to_add_result_communities, outputPath, "otherresearchproduct", PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME); + + } + + private static JavaRDD updatePublicationResult(JavaRDD result, JavaPairRDD result_communitySet){ + JavaRDD tmp = result.map(r -> (Result)r); + return updateResultDataset(tmp, result_communitySet).map(r -> (Publication)r); + + } + private static JavaRDD updateDatasetResult(JavaRDD result, JavaPairRDD result_communitySet){ + JavaRDD tmp = result.map(r -> (Result)r); + return updateResultDataset(tmp, result_communitySet).map(r -> (Dataset) r); + + } + private static JavaRDD updateORPResult(JavaRDD result, JavaPairRDD result_communitySet){ + JavaRDD tmp = result.map(r -> (Result)r); + return updateResultDataset(tmp, result_communitySet).map(r -> (OtherResearchProduct)r); + + } + private static JavaRDD updateSoftwareResult(JavaRDD result, JavaPairRDD result_communitySet){ + JavaRDD tmp = result.map(r -> (Result)r); + return updateResultDataset(tmp, result_communitySet).map(r -> (Software) r); + + } + private static JavaRDD updateResultDataset(JavaRDD result, JavaPairRDD result_communitySet){ + return result.mapToPair(p -> new Tuple2<>(p.getId(),p)).leftOuterJoin(result_communitySet) + .map(c -> { + Result r = c._2()._1(); + if(c._2()._2().isPresent()){ + Set communitySet = c._2()._2().get().getAccumulator(); + List contextList = r.getContext().stream().map(con -> con.getId()).collect(Collectors.toList()); + for(String cId:communitySet){ + if(!contextList.contains(cId)){ + Context newContext = new Context(); + newContext.setId(cId); + newContext.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME))); + r.getContext().add(newContext); + } + } + } + return r; + }); + + } + // private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { +// results.leftOuterJoin(toupdateresult) +// .map(p -> { +// Result r = p._2()._1(); +// if (p._2()._2().isPresent()){ +// Set communityList = p._2()._2().get().getAccumulator(); +// for(Context c: r.getContext()){ +// if (communityList.contains(c.getId())){ +// //verify if the datainfo for this context contains propagation +// if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ +// c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME)); +// //community id already in the context of the result. Remove it from the set that has to be added +// communityList.remove(c.getId()); +// } +// } +// } +// List cc = r.getContext(); +// for(String cId: communityList){ +// Context context = new Context(); +// context.setId(cId); +// context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME))); +// cc.add(context); +// } +// r.setContext(cc); +// } +// return r; +// }) +// .map(p -> new ObjectMapper().writeValueAsString(p)) +// .saveAsTextFile(outputPath+"/"+type); +// } + + +} + + +/* +package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization; + +import com.google.gson.Gson; +import eu.dnetlib.data.mapreduce.hbase.propagation.Value; +import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; +import eu.dnetlib.data.proto.OafProtos; +import eu.dnetlib.data.proto.TypeProtos; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; + +import java.io.IOException; +import java.util.Set; + +import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; +import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity; +import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getRelationTarget; + +public class PropagationCommunityThroughOrganizationMapper extends TableMapper { + private Text valueOut; + private ImmutableBytesWritable keyOut; + private OrganizationMap organizationMap; + + //seleziono il tipo della entry: + //Result: + //se non e' deleted by inference ed ha organizzazioni a cui e' associato, + // //emetto id della relazione ed id del risultato per ogni organizzazione a cui e' associato + //ORGANIZATION: + //se non e' deleted by inference e non e' deduplicata emetto l'id della organizzazione + //se non e' deleted by inference ed e' deduplicata, emetto id della organizzazione ed id del deduplicato e lista delle organizzazioni mergiate + @Override + protected void setup(final Context context) throws IOException, InterruptedException { + super.setup(context); + valueOut = new Text(); + keyOut = new ImmutableBytesWritable(); + organizationMap = new Gson().fromJson(context.getConfiguration().get("organizationtoresult.community.map"), OrganizationMap.class); + System.out.println("got organizationtoresult map: " + new Gson().toJson(organizationMap)) ; + } + + @Override + protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { + final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType(); + final OafProtos.OafEntity entity = getEntity(value, type);//getEntity already verified that it is not delByInference + if (entity != null) { + switch (type) { + case organization: + DedupedList communityList = getCommunityList(Bytes.toString(keyIn.get()), + getRelationTarget(value, DEDUP_RELATION_ORGANIZATION + REL_DEDUP_REPRESENTATIVE_RESULT, context, COUNTER_PROPAGATION), context); + if (communityList.size() > 0){ + valueOut.set(Value.newInstance( + new Gson().toJson( + communityList, //search for organizationtoresult it merges + DedupedList.class), + ORGANIZATION_COMMUNITY_TRUST, + Type.fromorganization).toJson()); + context.write(keyIn, valueOut); + context.getCounter(COUNTER_PROPAGATION, "emit for organizationtoresult ").increment(1); + }else{ + context.getCounter(COUNTER_PROPAGATION, "community list size = 0 ").increment(1); + } + + break; + case result: + Set result_organization = getRelationTarget(value, RELATION_ORGANIZATION + REL_RESULT_ORGANIZATION, context, COUNTER_PROPAGATION); + for(String org: result_organization) + emit(org, Bytes.toString(keyIn.get()), context); + break; + } + } + } + + private DedupedList getCommunityList(String organizationId, Set relationTarget, Context context) { + DedupedList communityList = new DedupedList(); + relationTarget.stream().forEach(org -> communityList.addAll(organizationMap.get(StringUtils.substringAfter(org, "|")))); + communityList.addAll(organizationMap.get(StringUtils.substringAfter(organizationId,"|"))); + communityList.stream().forEach(c->context.getCounter(COUNTER_PROPAGATION,"found organizationtoresult for " + c).increment(1)); + return communityList; + } + + private void emit(String org, String resId, Context context) throws IOException, InterruptedException { + keyOut.set(Bytes.toBytes(org)); + valueOut.set(Value.newInstance(resId, ORGANIZATION_COMMUNITY_TRUST, Type.fromresult).toJson()); + context.write(keyOut,valueOut); + context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1); + } + +} + */ \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/OrcidPropagationJobTest.java b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/OrcidPropagationJobTest.java new file mode 100644 index 0000000000..a01b6d2dd4 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/OrcidPropagationJobTest.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.orcidtoresultfromsemrel; + +public class OrcidPropagationJobTest { +}