diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java index e9933c4e5..19e6127f6 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java @@ -1,13 +1,9 @@ + package eu.dnetlib.dhp.oa.dedup; -import com.google.common.collect.Lists; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Organization; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import java.io.IOException; +import java.util.*; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -17,12 +13,19 @@ import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; import scala.Tuple3; -import java.io.IOException; -import java.util.*; - public class SparkPrepareOrgRels extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class); @@ -125,12 +128,11 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { List ids = sortIds(l); List> rels = new ArrayList<>(); - for (String source : ids) { - if (source.contains("openorgs____") || ids.indexOf(source) == 0) - for (String target : ids) { - rels.add(new Tuple3<>(source, target, groupId)); - } + String source = ids.get(0); + for (String target : ids) { + rels.add(new Tuple3<>(source, target, groupId)); } + return rels.iterator(); }) .rdd(), @@ -235,14 +237,14 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { .joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner") .map( (MapFunction, Tuple2>, OrgSimRel>) r -> new OrgSimRel( - r._1()._1(), - r._2()._2().getOriginalId().get(0), - r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "", - r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "", - r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", - r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", - r._2()._2().getCollectedfrom().get(0).getValue(), - "group::" + r._1()._1()), + r._1()._1(), + r._2()._2().getOriginalId().get(0), + r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "", + r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "", + r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", + r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", + r._2()._2().getCollectedfrom().get(0).getValue(), + "group::" + r._1()._1()), Encoders.bean(OrgSimRel.class)) .map( (MapFunction>) o -> new Tuple2<>(o.getLocal_id(), o),