forked from D-Net/dnet-hadoop
relations for openorgs: not it choose only one master
This commit is contained in:
parent
c4a59d1b9a
commit
1699d41d39
|
@ -1,13 +1,9 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.dedup;
|
package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import java.io.IOException;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import java.util.*;
|
||||||
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
@ -17,12 +13,19 @@ import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
import scala.Tuple3;
|
import scala.Tuple3;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.*;
|
|
||||||
|
|
||||||
public class SparkPrepareOrgRels extends AbstractSparkAction {
|
public class SparkPrepareOrgRels extends AbstractSparkAction {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
|
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
|
||||||
|
@ -125,12 +128,11 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
|
||||||
List<String> ids = sortIds(l);
|
List<String> ids = sortIds(l);
|
||||||
List<Tuple3<String, String, String>> rels = new ArrayList<>();
|
List<Tuple3<String, String, String>> rels = new ArrayList<>();
|
||||||
|
|
||||||
for (String source : ids) {
|
String source = ids.get(0);
|
||||||
if (source.contains("openorgs____") || ids.indexOf(source) == 0)
|
for (String target : ids) {
|
||||||
for (String target : ids) {
|
rels.add(new Tuple3<>(source, target, groupId));
|
||||||
rels.add(new Tuple3<>(source, target, groupId));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return rels.iterator();
|
return rels.iterator();
|
||||||
})
|
})
|
||||||
.rdd(),
|
.rdd(),
|
||||||
|
@ -235,14 +237,14 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
|
||||||
.joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner")
|
.joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner")
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<Tuple2<Tuple2<String, String>, Tuple2<String, Organization>>, OrgSimRel>) r -> new OrgSimRel(
|
(MapFunction<Tuple2<Tuple2<String, String>, Tuple2<String, Organization>>, OrgSimRel>) r -> new OrgSimRel(
|
||||||
r._1()._1(),
|
r._1()._1(),
|
||||||
r._2()._2().getOriginalId().get(0),
|
r._2()._2().getOriginalId().get(0),
|
||||||
r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "",
|
r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "",
|
||||||
r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "",
|
r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "",
|
||||||
r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
|
r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
|
||||||
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
|
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
|
||||||
r._2()._2().getCollectedfrom().get(0).getValue(),
|
r._2()._2().getCollectedfrom().get(0).getValue(),
|
||||||
"group::" + r._1()._1()),
|
"group::" + r._1()._1()),
|
||||||
Encoders.bean(OrgSimRel.class))
|
Encoders.bean(OrgSimRel.class))
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<OrgSimRel, Tuple2<String, OrgSimRel>>) o -> new Tuple2<>(o.getLocal_id(), o),
|
(MapFunction<OrgSimRel, Tuple2<String, OrgSimRel>>) o -> new Tuple2<>(o.getLocal_id(), o),
|
||||||
|
|
Loading…
Reference in New Issue