dhp-graph-dump/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/DumpResult.java

673 lines
24 KiB
Java

package eu.dnetlib.dhp.oa.graph.dump.skgif;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.graph.dump.skgif.ResultMapper.map;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.*;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.skgif.model.*;
import eu.dnetlib.dhp.skgif.model.AccessRight;
import scala.Tuple2;
/**
* @author miriam.baglioni
* @Date 06/02/24
*/
public class DumpResult implements Serializable {
private static final Logger log = LoggerFactory.getLogger(DumpResult.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
DumpResult.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/skgif/dump_result_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, workingDir + "aggrelation");
mapResult(spark, inputPath, workingDir, outputPath);
});
}
//per ogni result emetto id + journal se esiste + istanza + hosted by dell'istanza
public static <R extends Result> void mapResult(SparkSession spark, String inputPath,
String workingDir, String outputPath) {
// emit the snippet of the entities to be included in other entities for the dematerialization
// emitMinEntities(spark, inputPath, workingDir);
// selection of the relevant relations from result type to other entity. Only teh semantic relevant ones are
// considered
selectRelations(spark, inputPath, workingDir);
// merge of relations and manifestation for the same result
getRelationAndManifestation(spark, workingDir, inputPath);
// dump of the result and enrichment with relevant information for relations and manifestations
dumpResult(spark, inputPath, workingDir, outputPath);
}
// private static void emitMinEntities(SparkSession spark, String inputPath, String workingDir) {
//
// Utils.readPath(spark, inputPath + "organization", Organization.class)
// .filter((FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference())
// .map((MapFunction<Organization, EncloseMinElement>) o -> {
// EncloseMinElement eme = new EncloseMinElement();
// eme.setEnclosedEntityId(o.getId());
// eme.setMinOrganization(Utils.getMinOrganization(o));
// return eme;
// }, Encoders.bean(EncloseMinElement.class) )
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression","gzip")
// .json(workingDir + "encloseMinEntity");
//
// Utils.readPath(spark, inputPath + "project", Project.class)
// .filter((FilterFunction<Project>) p -> !p.getDataInfo().getDeletedbyinference())
// .map((MapFunction<Project,EncloseMinElement>) p -> {
// EncloseMinElement eme = new EncloseMinElement();
// eme.setEnclosedEntityId(p.getId());
// eme.setMinGrant(Utils.getMinGrant(p));
// return eme;
// }, Encoders.bean(EncloseMinElement.class))
// .write()
// .mode(SaveMode.Append)
// .option("compression","gzip")
// .json(workingDir + "encloseMinEntity");
//
// getMinProduct(spark, inputPath + "publication" , Publication.class)
// .union(getMinProduct(spark, inputPath + "dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class))
// .union(getMinProduct(spark, inputPath + "software", Software.class))
// .union(getMinProduct(spark, inputPath + "otherresearchproduct", OtherResearchProduct.class))
// .write()
// .mode(SaveMode.Append)
// .option("compression","gzip")
// .json(workingDir + "encloseMinEntity");
//
//
// }
private static void getRelationAndManifestation(SparkSession spark, String workingDir, String inputPath) {
Dataset<RelationPerProduct> aggRelations = Utils
.readPath(spark, workingDir + "aggrelation", RelationPerProduct.class);
ModelSupport.entityTypes
.keySet()
.stream()
.filter(ModelSupport::isResult)
.forEach(e -> {
Utils.removeOutputDir(spark, workingDir + e.name() + "/partialresearchproduct");
Dataset<Datasource> datasource = Utils
.readPath(spark, inputPath + "/datasource", Datasource.class)
.filter(
(FilterFunction<Datasource>) d -> Optional.ofNullable(d.getEoscdatasourcetype()).isPresent() &&
d.getEoscdatasourcetype().getClassid().equalsIgnoreCase("Journal archive"));
Dataset<EmitPerManifestation> man = Utils
.readPath(spark, workingDir + e.name() + "/manifestation", EmitPerManifestation.class);
Dataset<PartialResearchProduct> partialResearchProduct = man
.joinWith(datasource, man.col("instance.hostedby.key").equalTo(datasource.col("id")), "left")
.groupByKey(
(MapFunction<Tuple2<EmitPerManifestation, Datasource>, String>) t2 -> t2._1().getResultId(),
Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Tuple2<EmitPerManifestation, Datasource>, PartialResearchProduct>) (
k, v) -> {
PartialResearchProduct prp = new PartialResearchProduct();
prp.setResultId(k);
List<Manifestation> manifestationList = new ArrayList<>();
while (v.hasNext())
manifestationList.add(getManifestation(v.next()));
prp.setManifestations(manifestationList);
return prp;
}, Encoders.bean(PartialResearchProduct.class));
partialResearchProduct
.joinWith(
aggRelations, partialResearchProduct.col("resultId").equalTo(aggRelations.col("resultId")),
"left")
.map(
(MapFunction<Tuple2<PartialResearchProduct, RelationPerProduct>, PartialResearchProduct>) t2 -> {
PartialResearchProduct prp = t2._1();
if (Optional.ofNullable(t2._2()).isPresent()) {
prp
.setRelated_products(
t2
._2()
.getRelatedProduct()
.keySet()
.stream()
.map(
key -> Relations.newInstance(key, t2._2().getRelatedProduct().get(key)))
.collect(Collectors.toList()));
prp.setRelevant_organizations(t2._2().getOrganizations());
prp.setFunding(t2._2().getFunding());
}
return prp;
}, Encoders.bean(PartialResearchProduct.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + e.name() + "/partialresearchproduct");
});
}
private static Manifestation getManifestation(Tuple2<EmitPerManifestation, Datasource> t2) {
// se il lato sinistro c'e' allora ho la biblio e la venue
// se non c'e' allora ho solo gli altri valori
EmitPerManifestation epm = t2._1();
Manifestation manifestation = new Manifestation();
manifestation.setProduct_local_type(epm.getInstance().getInstancetype().getClassname());
manifestation.setProduct_local_type_schema(epm.getInstance().getInstancetype().getSchemename());
if (Optional.ofNullable(epm.getInstance().getDateofacceptance()).isPresent())
manifestation
.setDates(
Arrays
.asList(
Dates.newInstance(epm.getInstance().getDateofacceptance().getValue(), "publishing")));
if (Optional.ofNullable(epm.getInstance().getRefereed()).isPresent())
switch (epm.getInstance().getRefereed().getClassid()) {
case "0000":
manifestation.setPeer_review(PeerReview.UNAVAILABLE.label);
break;
case "0001":
manifestation.setPeer_review(PeerReview.PEER_REVIEWED.label);
break;
case "0002":
manifestation.setPeer_review(PeerReview.NON_PEER_REVIEWED.label);
break;
}
manifestation.setMetadata_curation("unavailable");
if (Optional.ofNullable(epm.getInstance().getAccessright()).isPresent())
switch (epm.getInstance().getAccessright().getClassid()) {
case "OPEN":
case "OPEN DATA":
case "OPEN SOURCE":
manifestation.setAccess_right(AccessRight.OPEN.label);
break;
case "CLOSED":
manifestation.setAccess_right(AccessRight.CLOSED.label);
break;
case "RESTRICTED":
manifestation.setAccess_right(AccessRight.RESTRICTED.label);
break;
case "EMBARGO":
case "12MONTHS":
case "6MONTHS":
manifestation.setAccess_right(AccessRight.EMBARGO.label);
break;
default:
manifestation.setAccess_right(AccessRight.UNAVAILABLE.label);
}
manifestation
.setLicence(
Optional
.ofNullable(epm.getInstance().getLicense())
.map(value -> value.getValue())
.orElse(null));
if (Optional.ofNullable(epm.getInstance().getUrl()).isPresent() && epm.getInstance().getUrl().size() > 0)
manifestation
.setUrl(epm.getInstance().getUrl().get(0));
else
manifestation.setUrl(null);
if (Optional.ofNullable(epm.getInstance().getPid()).isPresent() && epm.getInstance().getPid().size() > 0) {
manifestation.setPid(epm.getInstance().getPid().get(0).getValue());
}
if (Optional.ofNullable(t2._2()).isPresent()) {
manifestation.setBiblio(getBiblio(epm));
if (Optional.ofNullable(t2._2().getJournal().getIssnPrinted()).isPresent())
manifestation
.setVenue(
MinVenue
.newInstance(
Utils.getIdentifier(Prefixes.VENUE, t2._2().getJournal().getIssnPrinted()),
t2._1().getJournal().getName()));
else if (Optional.ofNullable(t2._2().getJournal().getIssnOnline()).isPresent())
manifestation
.setVenue(
MinVenue
.newInstance(
Utils.getIdentifier(Prefixes.VENUE, t2._1().getJournal().getIssnOnline()),
t2._1().getJournal().getName()));
}
manifestation
.setHosting_datasource(
MinVenue
.newInstance(
// Utils.getIdentifier(Prefixes.DATASOURCE, epm.getInstance().getHostedby().getKey()),
epm.getInstance().getHostedby().getKey(),
epm.getInstance().getHostedby().getValue()));
return manifestation;
}
private static Biblio getBiblio(EmitPerManifestation epm) {
Biblio biblio = new Biblio();
biblio.setEdition(epm.getJournal().getEdition());
biblio.setIssue(epm.getJournal().getIss());
biblio.setPublisher(epm.getPublisher());
biblio.setVolume(epm.getJournal().getVol());
biblio.setEnd_page(epm.getJournal().getEp());
biblio.setStart_page(epm.getJournal().getSp());
return biblio;
}
private static <R extends Result> void dumpResult(SparkSession spark, String inputPath, String workingDir,
String outputPath) {
ModelSupport.entityTypes
.keySet()
.parallelStream()
.filter(ModelSupport::isResult)
.forEach(e -> {
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Utils.removeOutputDir(spark, workingDir + e.name() + "/researchproduct");
Dataset<R> results = Utils.readPath(spark, inputPath + e.name(), resultClazz);
Dataset<PartialResearchProduct> prr = Utils
.readPath(spark, workingDir + e.name() + "/partialresearchproduct", PartialResearchProduct.class);
results
.joinWith(prr, results.col("id").equalTo(prr.col("resultId")), "left")
.map((MapFunction<Tuple2<R, PartialResearchProduct>, ResearchProduct>) t2 -> {
ResearchProduct rp = map(t2._1());
if (Optional.ofNullable(t2._2()).isPresent()) {
if (Optional.ofNullable(t2._2().getRelated_products()).isPresent())
rp.setRelated_products(t2._2().getRelated_products());
if (Optional.ofNullable(t2._2().getFunding()).isPresent())
rp.setFunding(t2._2().getFunding());
if (Optional.ofNullable(t2._2().getRelevant_organizations()).isPresent())
rp.setRelevant_organizations(t2._2().getRelevant_organizations());
if (Optional.ofNullable(t2._2().getManifestations()).isPresent())
rp.setManifestations(t2._2().getManifestations());
}
return rp;
}, Encoders.bean(ResearchProduct.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + e.name() + "/researchproduct");
});
Dataset<ResearchProduct> researchProducts = spark.emptyDataset(Encoders.bean(ResearchProduct.class));
for (EntityType e : ModelSupport.entityTypes.keySet()) {
if (ModelSupport.isResult(e))
researchProducts = researchProducts
.union(Utils.readPath(spark, workingDir + e.name() + "/researchproduct", ResearchProduct.class));
}
researchProducts
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "ResearchProduct");
}
private static void selectRelations(SparkSession spark, String inputPath, String workingDir) {
List<String> relationsProducts = Arrays
.asList(
RelationType.CITATION.label,
// RelationType.RESULT_AFFILIATIED_TO_ORGANIZATION.label,
RelationType.SUPPLEMENT.label,
// RelationType.RESULT_OUTCOME_FUNDING.label,
RelationType.DOCUMENTS.label,
RelationType.PART.label,
RelationType.VERSION.label);
// relationsProducts
// .stream()
// .forEach(r -> buildRelationPerProducts(spark, inputPath, workingDir, r));
// buildRelationPerAffiliation(
// spark, inputPath, workingDir, RelationType.RESULT_AFFILIATIED_TO_ORGANIZATION.label);
buildRelationPerGrant(spark, inputPath, workingDir, RelationType.RESULT_OUTCOME_FUNDING.label);
RDD<RelationPerProduct> temp = spark
.read()
.schema(Encoders.bean(RelationPerProduct.class).schema())
.json(workingDir + "aggrelation_temp")
.as(Encoders.bean(RelationPerProduct.class))
.toJavaRDD()
.mapToPair(v -> new Tuple2<>(v.getResultId(), v))
.reduceByKey((a, b) -> {
mergeRelationPerProduct(a, b);
return a;
})
.map(v -> v._2())
.rdd();
spark
.createDataset(temp, Encoders.bean(RelationPerProduct.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + "/aggrelation");
}
private static void buildRelationPerGrant(SparkSession spark, String inputPath, String workingDir,
String relationType) {
log.info("Relation: {}", relationType);
final StructType relationstructureSchema = new StructType()
.add(
"dataInfo", new StructType()
.add("deletedbyinference", DataTypes.BooleanType))
.add("source", DataTypes.StringType)
.add("target", DataTypes.StringType)
.add("relClass", DataTypes.StringType);
final StructType grantSchema = new StructType()
.add(
"local_identifier", DataTypes.StringType)
.add("funder", DataTypes.StringType)
.add("code", DataTypes.StringType)
.add("title", DataTypes.StringType)
;
Dataset<Row> relation = spark
.read()
.schema(relationstructureSchema)
.json(inputPath + "relation")
.filter(
"datainfo.deletedbyinference != true and " +
"relclass == '" + relationType + "'")
.drop("dataInfo");
Dataset<Row> minProduct = spark
.read()
.schema(grantSchema)
.json(workingDir + "minGrant");
relation
.joinWith(
minProduct, relation.col("target").equalTo(minProduct.col("local_identifier")))
.selectExpr("_1.source as sourceResult", "_1.relClass as relClass", "_2.*")
.groupByKey((MapFunction<Row, String>) r -> r.getAs("sourceResult"), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Row, RelationPerProduct>) (k, it) -> {
RelationPerProduct rpp = new RelationPerProduct();
rpp.setResultId(k);
rpp.setRelatedProduct(new HashMap<>());
updateRelevantGrant(rpp, it.next());
it.forEachRemaining(r -> updateRelevantGrant(rpp, r));
return rpp;
}, Encoders.bean(RelationPerProduct.class))
// .show(false);
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(workingDir + "aggrelation_temp");
}
private static void updateRelevantGrant(RelationPerProduct rpp, Row next) {
if (!Optional.ofNullable(rpp.getFunding()).isPresent())
rpp.setFunding(new ArrayList<>());
MinGrant mo = new MinGrant();
mo.setLocal_identifier(next.getAs("local_identifier"));
mo.setTitle(next.getAs("title"));
mo.setFunder(next.getAs("fundet"));
mo.setCode(next.getAs("code"));
rpp.getFunding().add(mo);
}
private static void buildRelationPerAffiliation(SparkSession spark, String inputPath, String workingDir,
String relationType) {
log.info("Relation: {}", relationType);
final StructType relationstructureSchema = new StructType()
.add(
"dataInfo", new StructType()
.add("deletedbyinference", DataTypes.BooleanType))
.add("source", DataTypes.StringType)
.add("target", DataTypes.StringType)
.add("relClass", DataTypes.StringType);
final StructType orgSchema = new StructType()
.add(
"local_identifier", DataTypes.StringType)
.add("name", DataTypes.StringType)
.add("ror", DataTypes.StringType)
.add("isni", DataTypes.StringType)
.add("fundRef", DataTypes.StringType)
.add("rinGold", DataTypes.StringType)
.add("wikidata", DataTypes.StringType);
Dataset<Row> relation = spark
.read()
.schema(relationstructureSchema)
.json(inputPath + "relation")
.filter(
"datainfo.deletedbyinference != true and " +
"relclass == '" + relationType + "'")
.drop("dataInfo");
Dataset<Row> minOrganization = spark
.read()
.schema(orgSchema)
.json(workingDir + "minOrganization");
relation
.joinWith(
minOrganization, relation.col("target").equalTo(minOrganization.col("local_identifier")))
.selectExpr("_1.source as sourceResult", "_1.relClass as relClass", "_2.*")
.groupByKey((MapFunction<Row, String>) r -> r.getAs("sourceResult"), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Row, RelationPerProduct>) (k, it) -> {
RelationPerProduct rpp = new RelationPerProduct();
rpp.setResultId(k);
rpp.setRelatedProduct(new HashMap<>());
updateRelevantOrganization(rpp, it.next());
it.forEachRemaining(r -> updateRelevantOrganization(rpp, r));
return rpp;
}, Encoders.bean(RelationPerProduct.class))
// .show(false);
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(workingDir + "aggrelation_temp");
}
private static void updateRelevantOrganization(RelationPerProduct rpp, Row next) {
if (!Optional.ofNullable(rpp.getOrganizations()).isPresent())
rpp.setOrganizations(new ArrayList<>());
MinOrganization mo = new MinOrganization();
mo.setLocal_identifier(next.getAs("local_identifier"));
mo.setIsni(next.getAs("isni"));
mo.setRor(next.getAs("ror"));
mo.setName(next.getAs("name"));
mo.setWikidata(next.getAs("wikidata"));
mo.setFundRef(next.getAs("fundRef"));
mo.setRinGold(next.getAs("rinGold"));
rpp.getOrganizations().add(mo);
}
private static void buildRelationPerProducts(SparkSession spark, String inputPath, String workingDir,
String relationType) {
log.info("Relation: {}", relationType);
final StructType relationstructureSchema = new StructType()
.add(
"dataInfo", new StructType()
.add("deletedbyinference", DataTypes.BooleanType))
.add("source", DataTypes.StringType)
.add("target", DataTypes.StringType)
.add("relClass", DataTypes.StringType);
final StructType productSchema = new StructType()
.add(
"local_identifier", DataTypes.StringType)
.add("title", DataTypes.StringType)
.add("doi", DataTypes.StringType)
.add("pmcid", DataTypes.StringType)
.add("arxivid", DataTypes.StringType)
.add("pmid", DataTypes.StringType);
Dataset<Row> relation = spark
.read()
.schema(relationstructureSchema)
.json(inputPath + "relation")
.filter(
"datainfo.deletedbyinference != true and " +
"relclass == '" + relationType + "'")
.drop("dataInfo");
Dataset<Row> minProduct = spark
.read()
.schema(productSchema)
.json(workingDir + "minProduct");
relation
.joinWith(
minProduct, relation.col("target").equalTo(minProduct.col("local_identifier")))
.selectExpr("_1.source as sourceResult", "_1.relClass as relClass", "_2.*")
.groupByKey((MapFunction<Row, String>) r -> r.getAs("sourceResult"), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Row, RelationPerProduct>) (k, it) -> {
RelationPerProduct rpp = new RelationPerProduct();
rpp.setResultId(k);
rpp.setRelatedProduct(new HashMap<>());
updateRelatedProduct(rpp, it.next());
it.forEachRemaining(r -> updateRelatedProduct(rpp, r));
return rpp;
}, Encoders.bean(RelationPerProduct.class))
// .show(false);
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(workingDir + "aggrelation_temp");
// .map((MapFunction<Tuple2<Row, EncloseMinElement>, RelationPerProduct>) t2 -> {
// RelationPerProduct rpp = new RelationPerProduct();
// t2._2().setResultId(t2._1().getAs("source"));
// t2._2().setSemantics(t2._1().getAs("relClass"));
// insertEnclosedElement(rpp, t2._2());
// rpp.setResultId(t2._1().getAs("source"));
// return rpp;
// }, Encoders.bean(RelationPerProduct.class))
// .filter(Objects::nonNull)
// .toJavaRDD()
// .mapToPair(value -> new Tuple2<>(value.getResultId(), value))
// .reduceByKey((a, b) -> {
// mergeRelationPerProduct(a, b);
// return a;
// })
//
// .map(value -> value._2)
// .rdd(),
// Encoders.bean(RelationPerProduct.class))
//// .saveAsTextFile(workingDir + "/aggrelation", GzipCodec.class);
//// .groupByKey((MapFunction<EncloseMinElement, String>) eme -> eme.getResultId(), Encoders.STRING())
//// .mapGroups((MapGroupsFunction<String, EncloseMinElement, RelationPerProduct>) (k, v) -> {
//// RelationPerProduct rpp = new RelationPerProduct();
//// rpp.setResultId(k);
//// insertEnclosedElement(rpp, v.next());
//// v.forEachRemaining(e -> insertEnclosedElement(rpp, e));
//// return rpp;
//// }, Encoders.bean(RelationPerProduct.class))
// .write()
// .mode(SaveMode.Append)
// .option("compression", "gzip")
// .json(workingDir + "/aggrelation_temp");
}
private static void updateRelatedProduct(RelationPerProduct rpp, Row next) {
String key = next.getAs("relClass");
if (!rpp.getRelatedProduct().keySet().contains(key))
rpp.getRelatedProduct().put(key, new ArrayList<>());
MinProduct mp = new MinProduct();
mp.setLocal_identifier(next.getAs("local_identifier"));
mp.setTitle(next.getAs("title"));
mp.setPmid(next.getAs("pmid"));
mp.setArxivid(next.getAs("arxivid"));
mp.setPmcid(next.getAs("pmcid"));
mp.setDoi(next.getAs("doi"));
rpp.getRelatedProduct().get(key).add(mp);
}
private static void insertEnclosedElement(RelationPerProduct rpp, EncloseMinElement element) {
if (Optional.ofNullable(element.getMinOrganization()).isPresent())
rpp.getOrganizations().add(element.getMinOrganization());
if (Optional.ofNullable(element.getMinGrant()).isPresent())
rpp.getFunding().add(element.getMinGrant());
if (Optional.ofNullable(element.getMinProduct()).isPresent()) {
String sem = element.getSemantics();
if (!rpp.getRelatedProduct().containsKey(sem))
rpp.getRelatedProduct().put(sem, new ArrayList<>());
rpp.getRelatedProduct().get(sem).add(element.getMinProduct());
}
}
private static void mergeRelationPerProduct(RelationPerProduct rpp1, RelationPerProduct rpp2) {
if (Optional.ofNullable(rpp2.getOrganizations()).isPresent())
rpp1.getOrganizations().addAll(rpp2.getOrganizations());
if (Optional.ofNullable(rpp2.getFunding()).isPresent())
rpp1.getFunding().addAll(rpp2.getFunding());
if (Optional.ofNullable(rpp2.getRelatedProduct()).isPresent()) {
Map<String, List<MinProduct>> temp = rpp2.getRelatedProduct();
for (String key : temp.keySet()) {
if (!rpp1.getRelatedProduct().containsKey(key))
rpp1.getRelatedProduct().put(key, new ArrayList<>());
for (MinProduct mp : rpp2.getRelatedProduct().get(key))
rpp1.getRelatedProduct().get(key).add(mp);
}
}
}
}