From a86426776ad034147b84cc80c926592aeff0a47f Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 28 Feb 2020 18:20:19 +0100 Subject: [PATCH] Changed from Oaf to Result the type of the updateResult method parameter, not to be forced to cast each time --- ...parkResultToCommunityThroughSemRelJob.java | 285 ++++++------------ .../SparkOrcidToReseltFromSemRelJob.java | 78 +++++ .../SparkOrcidToResultFromSemRelJob.java | 123 ++++++++ 3 files changed, 291 insertions(+), 195 deletions(-) create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToReseltFromSemRelJob.java create mode 100644 dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java index b2178f1fb..707ac9eea 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -15,10 +15,8 @@ import org.apache.spark.sql.SparkSession; import scala.Tuple2; import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; +import java.util.*; +import java.util.stream.Collectors; import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.PropagationConstant.PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME; @@ -26,7 +24,7 @@ import static eu.dnetlib.dhp.PropagationConstant.PROPAGATION_RELATION_RESULT_ORG public class SparkResultToCommunityThroughSemRelJob { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToCommunityThroughSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/communitytoresultthroughsemrel/input_communitytoresultthroughsemrel_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResultToCommunityThroughSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_propagationresultcommunityfromsemrel_parameters.json"))); parser.parseArgument(args); final SparkSession spark = SparkSession .builder() @@ -39,7 +37,7 @@ public class SparkResultToCommunityThroughSemRelJob { final String inputPath = parser.get("sourcePath"); final String outputPath = "/tmp/provision/propagation/communitytoresultthroughsemrel"; - final List allowedsemrel = Arrays.asList(parser.get("allowedsemrel").split(";")); + final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); final List communityIdList = QueryInformationSystem.getCommunityList(parser.get("isLookupUrl")); File directory = new File(outputPath); @@ -47,209 +45,106 @@ public class SparkResultToCommunityThroughSemRelJob { if (!directory.exists()) { directory.mkdirs(); } -/* + JavaPairRDD result_result = getResultResultSemRel(allowedsemrel, + sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class))); + JavaRDD publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)).cache(); + JavaRDD datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)).cache(); + JavaRDD software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)).cache(); + JavaRDD other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)).cache(); - JavaRDD relations = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)).cache(); - - JavaPairRDD result_result = relations - .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())) - .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) - .mapToPair(toPair()); - - JavaPairRDD result_project = relations - .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter(r -> RELATION_RESULT_PROJECT_REL_CLASS.equals(r.getRelClass()) && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType())) - .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) - .mapToPair(toPair()); - - //relationships from project to result. One pair for each relationship for results having allowed semantics relation with another result - JavaPairRDD project_result = result_project.join(result_result) - .map(c -> { - String projectId = c._2()._1().getTargetId(); - String resultId = c._2()._2().getTargetId(); - return new TypedRow().setSourceId(projectId).setTargetId(resultId); - }) - .mapToPair(toPair()); - - //relationships from project to result. One Pair for each project => project id list of results related to the project - JavaPairRDD project_results = relations - .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter(r -> RELATION_PROJECT_RESULT_REL_CLASS.equals(r.getRelClass()) && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType())) - .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) + JavaPairRDD resultLinkedToCommunities = publications + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"publication")) + .filter(p -> !(p == null)) .mapToPair(toPair()) - .reduceByKey((a, b) -> { - if (a == null) { - return b; + .union(datasets + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"dataset")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ) + .union(software + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"software")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ) + .union(other + .map(p -> getTypedRow(communityIdList, p.getContext(), p.getId(),"otherresearchproduct")) + .filter(p -> !(p == null)) + .mapToPair(toPair()) + ); + + JavaPairRDD to_add_result_communities = resultLinkedToCommunities.join(result_result).map(r -> r._2()._1().setSourceId(r._2()._2().getTargetId())) + .mapToPair(toPair()); + + JavaPairRDD pubs = publications.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD dss = datasets.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD sfw = software.mapToPair(p -> new Tuple2<>(p.getId(),p)); + JavaPairRDD orp = other.mapToPair(p -> new Tuple2<>(p.getId(),p)); + + updateResult(pubs, to_add_result_communities, outputPath, "publication"); + updateResult(dss, to_add_result_communities, outputPath, "dataset"); + updateResult(sfw, to_add_result_communities, outputPath, "software"); + updateResult(orp, to_add_result_communities, outputPath, "otherresearchproduct"); + //leftouterjoin result.to_add_result_communities (result = java pair rdd result) [left outer join perche' li voglio tutti anche quelli che non ho aggiornato] + //per quelli che matchano cercare nel risultato se i context da aggiungere sono gia' presenti. Se non ci sono aggiungerli, altrimenti nulla + + } + + private static void updateResult(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type) { + results.leftOuterJoin(toupdateresult) + .map(p -> { + Result r = p._2()._1(); + if (p._2()._2().isPresent()){ + Set communityList = p._2()._2().get().getAccumulator(); + for(Context c: r.getContext()){ + if (communityList.contains(c.getId())){ + //verify if the datainfo for this context contains propagation + if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ + c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)); + //community id already in the context of the result. Remove it from the set that has to be added + communityList.remove(c.getId()); + } + } + } + List cc = r.getContext(); + for(String cId: communityList){ + Context context = new Context(); + context.setId(cId); + context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME))); + cc.add(context); + } + r.setContext(cc); } - if (b == null) { - return a; - } - a.addAll(b.getAccumulator()); - return a; - }); - - - - JavaRDD newRels = project_result.join(project_results) - .flatMap(c -> { - String resId = c._2()._1().getTargetId(); - - if (c._2()._2().getAccumulator().contains(resId)) { - return null; - } - String progId = c._2()._1().getSourceId(); - List rels = new ArrayList(); - - rels.add(getRelation(progId, resId, RELATION_PROJECT_RESULT_REL_CLASS, - RELATION_RESULTPROJECT_REL_TYPE, RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); - rels.add(getRelation(resId, progId, RELATION_RESULT_PROJECT_REL_CLASS, - RELATION_RESULTPROJECT_REL_TYPE, RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); - return rels.iterator(); + return r; }) - .cache(); - - newRels.map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/relation_new"); - - newRels.union(relations).map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/relation"); - + .map(p -> new ObjectMapper().writeValueAsString(p)) + .saveAsTextFile(outputPath+"/"+type); } -} -*/ - } -} -/* -package eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult; -import com.google.common.collect.Maps; -import com.google.gson.Gson; -import eu.dnetlib.data.mapreduce.hbase.propagation.Value; -import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; -import eu.dnetlib.data.proto.OafProtos; -import eu.dnetlib.data.proto.ResultProtos; -import eu.dnetlib.data.proto.TypeProtos; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableMapper; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Text; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Set; -import java.util.stream.Collectors; - -import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION; -import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.DEFAULT_COMMUNITY_RELATION_SET; -import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity; -import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getRelationTarget; - - -public class CommunityToResultMapper extends TableMapper { - - private Text keyOut; - private Text valueOut; - private String[] sem_rels; - private String trust; - CommunityList idCommunityList; - - @Override - protected void setup(final Context context) throws IOException, InterruptedException { - - idCommunityList = new Gson().fromJson(context.getConfiguration().get("community.id.list"), CommunityList.class); - keyOut = new Text(); - valueOut = new Text(); - - sem_rels = context.getConfiguration().getStrings("propagatetocommunity.semanticrelations", DEFAULT_COMMUNITY_RELATION_SET); - trust = context.getConfiguration().get("propagatetocommunity.trust","0.85"); - - } - - @Override - protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { - - final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType(); - - //If the type is not result I do not need to process it - if (!type.equals(TypeProtos.Type.result)) { - return; - } - - //verify if entity is valid - final OafProtos.OafEntity entity = getEntity(value, type); - if (entity == null) { - context.getCounter(COUNTER_PROPAGATION, "Del by inference or null body for result").increment(1); - return; - } - final Set toemitrelations = new HashSet<>(); - //verify if we have some relation - for (String sem_rel : sem_rels) - toemitrelations.addAll(getRelationTarget(value, sem_rel, context, COUNTER_PROPAGATION)); - - if (toemitrelations.isEmpty()) { - context.getCounter(COUNTER_PROPAGATION, "No allowed semantic relation present in result").increment(1); - return; - } - - //verify if we have a relation to a context in the body - Set contextIds = entity.getResult().getMetadata().getContextList() + private static TypedRow getTypedRow(List communityIdList, List context, String id, String type) { + Set result_communities = context .stream() - .map(ResultProtos.Result.Context::getId) + .map(c -> c.getId()) .collect(Collectors.toSet()); - - //verify if we have a relation to a context in the update part made by the inference - NavigableMap map = value.getFamilyMap(Bytes.toBytes(TypeProtos.Type.result.toString())); - - final Map stringMap = Maps.newHashMap(); - for (Map.Entry e : map.entrySet()) { - stringMap.put(Bytes.toString(e.getKey()), e.getValue()); - } - - // we fetch all the body updates - for (final String o : stringMap.keySet()) { - if (o.startsWith("update_")) { - final OafProtos.Oaf update = OafProtos.Oaf.parseFrom(stringMap.get(o)); - contextIds.addAll(update.getEntity().getResult().getMetadata().getContextList() - .stream() - .map(ResultProtos.Result.Context::getId) - .map(s -> s.split("::")[0]) - .collect(Collectors.toSet())); + TypedRow tp = new TypedRow(); + tp.setSourceId(id); + tp.setType(type); + for (String communityId : result_communities) { + if (communityIdList.contains(communityId)) { + tp.add(communityId); } } - - //we verify if we have something - if (contextIds.isEmpty()) { - context.getCounter(COUNTER_PROPAGATION, "No context in the body and in the update of the result").increment(1); - return; + if (tp.getAccumulator() != null) { + return tp; } - - //verify if some of the context collected for the result are associated to a community in the communityIdList - for (String id : idCommunityList) { - if (contextIds.contains(id)) { - for (String target : toemitrelations) { - keyOut.set(target); - valueOut.set(Value.newInstance(id).setTrust(trust).toJson()); - context.write(keyOut, valueOut); - context.getCounter(COUNTER_PROPAGATION, "Emit propagation for " + id).increment(1); - } - } - } - + return null; } - } - - - */ \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToReseltFromSemRelJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToReseltFromSemRelJob.java new file mode 100644 index 000000000..682bf2200 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToReseltFromSemRelJob.java @@ -0,0 +1,78 @@ +package eu.dnetlib.dhp.orcidtoresultfromsemrel; + +public class SparkOrcidToReseltFromSemRelJob { +} + +/* +public class PropagationOrcidToResultMapper extends TableMapper { + private static final Log log = LogFactory.getLog(PropagationOrcidToResultMapper.class); // NOPMD by marko on 11/24/08 5:02 PM + private Text valueOut; + private ImmutableBytesWritable keyOut; + private String[] sem_rels; + private String trust; + + @Override + protected void setup(final Context context) throws IOException, InterruptedException { + super.setup(context); + valueOut = new Text(); + keyOut = new ImmutableBytesWritable(); + + sem_rels = context.getConfiguration().getStrings("propagatetoorcid.semanticrelations", DEFAULT_RESULT_RELATION_SET); + trust = context.getConfiguration().get("propagatetoorcid.trust","0.85"); + + } + + @Override + protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { + final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType(); + final OafProtos.OafEntity entity = getEntity(value, type);//getEntity already verified that it is not delByInference + + + if (entity != null) { + + if (type == TypeProtos.Type.result){ + Set result_result = new HashSet<>(); + //verifico se il risultato ha una relazione semantica verso uno o piu' risultati. + //per ogni risultato linkato con issupplementto o issupplementedby emetto: + // id risultato linkato come chiave, + // id risultato oggetto del mapping e lista degli autori del risultato oggetto del mapper come value + for(String sem : sem_rels){ + result_result.addAll(getRelationTarget(value, sem, context, COUNTER_PROPAGATION)); + } + if(!result_result.isEmpty()){ + List authorlist = getAuthorList(entity.getResult().getMetadata().getAuthorList()); + Emit e = new Emit(); + e.setId(Bytes.toString(keyIn.get())); + e.setAuthor_list(authorlist); + valueOut.set(Value.newInstance(new Gson().toJson(e, Emit.class), + trust, + Type.fromsemrel).toJson()); + for (String result: result_result){ + keyOut.set(Bytes.toBytes(result)); + context.write(keyOut,valueOut); + context.getCounter(COUNTER_PROPAGATION,"emit for sem_rel").increment(1); + } + + //emetto anche id dell'oggetto del mapper come chiave e lista degli autori come valore + e.setId(keyIn.toString()); + e.setAuthor_list(authorlist); + valueOut.set(Value.newInstance(new Gson().toJson(e, Emit.class), trust, Type.fromresult).toJson()); + context.write(keyIn, valueOut); + context.getCounter(COUNTER_PROPAGATION,"emit for result with orcid").increment(1); + + } + } + + } + } + + private List getAuthorList(List author_list){ + + return author_list.stream().map(a -> new JsonFormat().printToString(a)).collect(Collectors.toList()); + + } + + + +} + */ \ No newline at end of file diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java new file mode 100644 index 000000000..3632cf745 --- /dev/null +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java @@ -0,0 +1,123 @@ +package eu.dnetlib.dhp.orcidtoresultfromsemrel; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.TypedRow; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Relation; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +import java.io.File; +import java.util.Arrays; +import java.util.List; + +import static eu.dnetlib.dhp.PropagationConstant.getResultResultSemRel; + +public class SparkOrcidToResultFromSemRelJob { + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkOrcidToResultFromSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_orcid" + + "toresult_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkOrcidToResultFromSemRelJob.class.getSimpleName()) + .master(parser.get("master")) + .enableHiveSupport() + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String inputPath = parser.get("sourcePath"); + final String outputPath = "/tmp/provision/propagation/orcidtoresult"; + + final List allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";")); + + File directory = new File(outputPath); + + if (!directory.exists()) { + directory.mkdirs(); + } + + JavaRDD relations = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)).cache(); + + JavaPairRDD result_result = getResultResultSemRel(allowedsemrel, relations); + } +} +/* +public class PropagationOrcidToResultMapper extends TableMapper { + private static final Log log = LogFactory.getLog(PropagationOrcidToResultMapper.class); // NOPMD by marko on 11/24/08 5:02 PM + private Text valueOut; + private ImmutableBytesWritable keyOut; + private String[] sem_rels; + private String trust; + + @Override + protected void setup(final Context context) throws IOException, InterruptedException { + super.setup(context); + valueOut = new Text(); + keyOut = new ImmutableBytesWritable(); + + sem_rels = context.getConfiguration().getStrings("propagatetoorcid.semanticrelations", DEFAULT_RESULT_RELATION_SET); + trust = context.getConfiguration().get("propagatetoorcid.trust","0.85"); + + } + + @Override + protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { + final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType(); + final OafProtos.OafEntity entity = getEntity(value, type);//getEntity already verified that it is not delByInference + + + if (entity != null) { + + if (type == TypeProtos.Type.result){ + Set result_result = new HashSet<>(); + //verifico se il risultato ha una relazione semantica verso uno o piu' risultati. + //per ogni risultato linkato con issupplementto o issupplementedby emetto: + // id risultato linkato come chiave, + // id risultato oggetto del mapping e lista degli autori del risultato oggetto del mapper come value + for(String sem : sem_rels){ + result_result.addAll(getRelationTarget(value, sem, context, COUNTER_PROPAGATION)); + } + if(!result_result.isEmpty()){ + List authorlist = getAuthorList(entity.getResult().getMetadata().getAuthorList()); + Emit e = new Emit(); + e.setId(Bytes.toString(keyIn.get())); + e.setAuthor_list(authorlist); + valueOut.set(Value.newInstance(new Gson().toJson(e, Emit.class), + trust, + Type.fromsemrel).toJson()); + for (String result: result_result){ + keyOut.set(Bytes.toBytes(result)); + context.write(keyOut,valueOut); + context.getCounter(COUNTER_PROPAGATION,"emit for sem_rel").increment(1); + } + + //emetto anche id dell'oggetto del mapper come chiave e lista degli autori come valore + e.setId(keyIn.toString()); + e.setAuthor_list(authorlist); + valueOut.set(Value.newInstance(new Gson().toJson(e, Emit.class), trust, Type.fromresult).toJson()); + context.write(keyIn, valueOut); + context.getCounter(COUNTER_PROPAGATION,"emit for result with orcid").increment(1); + + } + } + + } + } + + private List getAuthorList(List author_list){ + + return author_list.stream().map(a -> new JsonFormat().printToString(a)).collect(Collectors.toList()); + + } + + + +} + */ \ No newline at end of file