forked from D-Net/dnet-hadoop
addition of pids in the query for the export of openorgs for the provision, addition of ec_fields in the openorgs model
This commit is contained in:
parent
eaaefb8b4c
commit
bf685d849f
|
@ -142,4 +142,12 @@ abstract class AbstractSparkAction implements Serializable {
|
||||||
.isPresent())
|
.isPresent())
|
||||||
.orElse(false);
|
.orElse(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static Boolean parseECField(Field<String> field) {
|
||||||
|
if (field == null)
|
||||||
|
return null;
|
||||||
|
if (StringUtils.isBlank(field.getValue()) || field.getValue().equalsIgnoreCase("null"))
|
||||||
|
return null;
|
||||||
|
return field.getValue().equalsIgnoreCase("true");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,14 +78,14 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
|
||||||
|
|
||||||
final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
|
final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
|
||||||
|
|
||||||
//collect organization merge relations from openorgs database
|
// collect organization merge relations from openorgs database
|
||||||
JavaRDD<Relation> mergeRelsRDD = spark
|
JavaRDD<Relation> mergeRelsRDD = spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(relationPath)
|
.textFile(relationPath)
|
||||||
.map(patchRelFn(), Encoders.bean(Relation.class))
|
.map(patchRelFn(), Encoders.bean(Relation.class))
|
||||||
.toJavaRDD()
|
.toJavaRDD()
|
||||||
.filter(this::isOpenorgs) //take only openorgs relations
|
.filter(this::isOpenorgs) // take only openorgs relations
|
||||||
.filter(this::isMergeRel); //take merges and isMergedIn relations
|
.filter(this::isMergeRel); // take merges and isMergedIn relations
|
||||||
|
|
||||||
log.info("Number of Openorgs Merge Relations collected: {}", mergeRelsRDD.count());
|
log.info("Number of Openorgs Merge Relations collected: {}", mergeRelsRDD.count());
|
||||||
|
|
||||||
|
@ -99,7 +99,8 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isMergeRel(Relation rel) {
|
private boolean isMergeRel(Relation rel) {
|
||||||
return (rel.getRelClass().equals(ModelConstants.MERGES) || rel.getRelClass().equals(ModelConstants.IS_MERGED_IN))
|
return (rel.getRelClass().equals(ModelConstants.MERGES)
|
||||||
|
|| rel.getRelClass().equals(ModelConstants.IS_MERGED_IN))
|
||||||
&& rel.getRelType().equals(ModelConstants.ORG_ORG_RELTYPE)
|
&& rel.getRelType().equals(ModelConstants.ORG_ORG_RELTYPE)
|
||||||
&& rel.getSubRelType().equals(ModelConstants.DEDUP);
|
&& rel.getSubRelType().equals(ModelConstants.DEDUP);
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ package eu.dnetlib.dhp.oa.dedup;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
|
@ -23,6 +22,7 @@ import eu.dnetlib.dhp.schema.common.EntityType;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
@ -90,13 +90,14 @@ public class SparkCreateOrgsDedupRecord extends AbstractSparkAction {
|
||||||
|
|
||||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
JavaPairRDD<String, Organization> entities = sc.textFile(entitiesInputPath)
|
JavaPairRDD<String, Organization> entities = sc
|
||||||
|
.textFile(entitiesInputPath)
|
||||||
.map(it -> OBJECT_MAPPER.readValue(it, Organization.class))
|
.map(it -> OBJECT_MAPPER.readValue(it, Organization.class))
|
||||||
.mapToPair(o -> new Tuple2<>(o.getId(), o));
|
.mapToPair(o -> new Tuple2<>(o.getId(), o));
|
||||||
|
|
||||||
log.info("Number of organization entities processed: {}", entities.count());
|
log.info("Number of organization entities processed: {}", entities.count());
|
||||||
|
|
||||||
//collect root ids (ids in the source of 'merges' relations
|
// collect root ids (ids in the source of 'merges' relations
|
||||||
JavaPairRDD<String, String> roots = spark
|
JavaPairRDD<String, String> roots = spark
|
||||||
.read()
|
.read()
|
||||||
.load(mergeRelsPath)
|
.load(mergeRelsPath)
|
||||||
|
@ -109,10 +110,11 @@ public class SparkCreateOrgsDedupRecord extends AbstractSparkAction {
|
||||||
.mapToPair(t -> t)
|
.mapToPair(t -> t)
|
||||||
.distinct();
|
.distinct();
|
||||||
|
|
||||||
Dataset<Organization> rootOrgs = spark.createDataset(
|
Dataset<Organization> rootOrgs = spark
|
||||||
|
.createDataset(
|
||||||
entities
|
entities
|
||||||
.leftOuterJoin(roots)
|
.leftOuterJoin(roots)
|
||||||
.filter(e -> e._2()._2().isPresent()) //if it has been joined with 'root' then it's a root record
|
.filter(e -> e._2()._2().isPresent()) // if it has been joined with 'root' then it's a root record
|
||||||
.map(e -> e._2()._1())
|
.map(e -> e._2()._1())
|
||||||
.rdd(),
|
.rdd(),
|
||||||
Encoders.bean(Organization.class));
|
Encoders.bean(Organization.class));
|
||||||
|
|
|
@ -183,7 +183,17 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
|
||||||
r._1()._2().getWebsiteurl() != null ? r._1()._2().getWebsiteurl().getValue() : "",
|
r._1()._2().getWebsiteurl() != null ? r._1()._2().getWebsiteurl().getValue() : "",
|
||||||
r._1()._2().getCollectedfrom().get(0).getValue(),
|
r._1()._2().getCollectedfrom().get(0).getValue(),
|
||||||
"",
|
"",
|
||||||
structuredPropertyListToString(r._1()._2().getPid())),
|
structuredPropertyListToString(r._1()._2().getPid()),
|
||||||
|
parseECField(r._1()._2().getEclegalbody()),
|
||||||
|
parseECField(r._1()._2().getEclegalperson()),
|
||||||
|
parseECField(r._1()._2().getEcnonprofit()),
|
||||||
|
parseECField(r._1()._2().getEcresearchorganization()),
|
||||||
|
parseECField(r._1()._2().getEchighereducation()),
|
||||||
|
parseECField(r._1()._2().getEcinternationalorganizationeurinterests()),
|
||||||
|
parseECField(r._1()._2().getEcinternationalorganization()),
|
||||||
|
parseECField(r._1()._2().getEcenterprise()),
|
||||||
|
parseECField(r._1()._2().getEcsmevalidated()),
|
||||||
|
parseECField(r._1()._2().getEcnutscode())),
|
||||||
Encoders.bean(OrgSimRel.class));
|
Encoders.bean(OrgSimRel.class));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -224,7 +224,17 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
|
||||||
.map(c -> Optional.ofNullable(c.get(0)).map(KeyValue::getValue).orElse(""))
|
.map(c -> Optional.ofNullable(c.get(0)).map(KeyValue::getValue).orElse(""))
|
||||||
.orElse(""),
|
.orElse(""),
|
||||||
r._1()._3(),
|
r._1()._3(),
|
||||||
structuredPropertyListToString(o.getPid()));
|
structuredPropertyListToString(o.getPid()),
|
||||||
|
parseECField(o.getEclegalbody()),
|
||||||
|
parseECField(o.getEclegalperson()),
|
||||||
|
parseECField(o.getEcnonprofit()),
|
||||||
|
parseECField(o.getEcresearchorganization()),
|
||||||
|
parseECField(o.getEchighereducation()),
|
||||||
|
parseECField(o.getEcinternationalorganizationeurinterests()),
|
||||||
|
parseECField(o.getEcinternationalorganization()),
|
||||||
|
parseECField(o.getEcenterprise()),
|
||||||
|
parseECField(o.getEcsmevalidated()),
|
||||||
|
parseECField(o.getEcnutscode()));
|
||||||
},
|
},
|
||||||
Encoders.bean(OrgSimRel.class))
|
Encoders.bean(OrgSimRel.class))
|
||||||
.map(
|
.map(
|
||||||
|
@ -301,7 +311,17 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
|
||||||
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
|
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
|
||||||
r._2()._2().getCollectedfrom().get(0).getValue(),
|
r._2()._2().getCollectedfrom().get(0).getValue(),
|
||||||
"group::" + r._1()._1(),
|
"group::" + r._1()._1(),
|
||||||
structuredPropertyListToString(r._2()._2().getPid())),
|
structuredPropertyListToString(r._2()._2().getPid()),
|
||||||
|
parseECField(r._2()._2().getEclegalbody()),
|
||||||
|
parseECField(r._2()._2().getEclegalperson()),
|
||||||
|
parseECField(r._2()._2().getEcnonprofit()),
|
||||||
|
parseECField(r._2()._2().getEcresearchorganization()),
|
||||||
|
parseECField(r._2()._2().getEchighereducation()),
|
||||||
|
parseECField(r._2()._2().getEcinternationalorganizationeurinterests()),
|
||||||
|
parseECField(r._2()._2().getEcinternationalorganization()),
|
||||||
|
parseECField(r._2()._2().getEcenterprise()),
|
||||||
|
parseECField(r._2()._2().getEcsmevalidated()),
|
||||||
|
parseECField(r._2()._2().getEcnutscode())),
|
||||||
Encoders.bean(OrgSimRel.class))
|
Encoders.bean(OrgSimRel.class))
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<OrgSimRel, Tuple2<String, OrgSimRel>>) o -> new Tuple2<>(o.getLocal_id(), o),
|
(MapFunction<OrgSimRel, Tuple2<String, OrgSimRel>>) o -> new Tuple2<>(o.getLocal_id(), o),
|
||||||
|
|
|
@ -101,7 +101,7 @@ public class SparkUpdateEntity extends AbstractSparkAction {
|
||||||
.mapToPair(
|
.mapToPair(
|
||||||
(PairFunction<String, String, String>) s -> new Tuple2<>(
|
(PairFunction<String, String, String>) s -> new Tuple2<>(
|
||||||
MapDocumentUtil.getJPathString(IDJSONPATH, s), s));
|
MapDocumentUtil.getJPathString(IDJSONPATH, s), s));
|
||||||
if (type == EntityType.organization) //exclude root records from organizations
|
if (type == EntityType.organization) // exclude root records from organizations
|
||||||
entitiesWithId = excludeRootOrgs(entitiesWithId, rel);
|
entitiesWithId = excludeRootOrgs(entitiesWithId, rel);
|
||||||
|
|
||||||
JavaRDD<String> map = entitiesWithId
|
JavaRDD<String> map = entitiesWithId
|
||||||
|
@ -156,7 +156,8 @@ public class SparkUpdateEntity extends AbstractSparkAction {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static JavaPairRDD<String, String> excludeRootOrgs(JavaPairRDD<String, String> entitiesWithId, Dataset<Relation> rel) {
|
private static JavaPairRDD<String, String> excludeRootOrgs(JavaPairRDD<String, String> entitiesWithId,
|
||||||
|
Dataset<Relation> rel) {
|
||||||
|
|
||||||
JavaPairRDD<String, String> roots = rel
|
JavaPairRDD<String, String> roots = rel
|
||||||
.where("relClass == 'merges'")
|
.where("relClass == 'merges'")
|
||||||
|
|
|
@ -14,12 +14,25 @@ public class OrgSimRel implements Serializable {
|
||||||
String oa_collectedfrom;
|
String oa_collectedfrom;
|
||||||
String group_id;
|
String group_id;
|
||||||
String pid_list; // separator for type-pid: "###"; separator for pids: "@@@"
|
String pid_list; // separator for type-pid: "###"; separator for pids: "@@@"
|
||||||
|
Boolean ec_legalbody;
|
||||||
|
Boolean ec_legalperson;
|
||||||
|
Boolean ec_nonprofit;
|
||||||
|
Boolean ec_researchorganization;
|
||||||
|
Boolean ec_highereducation;
|
||||||
|
Boolean ec_internationalorganizationeurinterests;
|
||||||
|
Boolean ec_internationalorganization;
|
||||||
|
Boolean ec_enterprise;
|
||||||
|
Boolean ec_smevalidated;
|
||||||
|
Boolean ec_nutscode;
|
||||||
|
|
||||||
public OrgSimRel() {
|
public OrgSimRel() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country,
|
public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country,
|
||||||
String oa_url, String oa_collectedfrom, String group_id, String pid_list) {
|
String oa_url, String oa_collectedfrom, String group_id, String pid_list, Boolean ec_legalbody,
|
||||||
|
Boolean ec_legalperson, Boolean ec_nonprofit, Boolean ec_researchorganization, Boolean ec_highereducation,
|
||||||
|
Boolean ec_internationalorganizationeurinterests, Boolean ec_internationalorganization, Boolean ec_enterprise,
|
||||||
|
Boolean ec_smevalidated, Boolean ec_nutscode) {
|
||||||
this.local_id = local_id;
|
this.local_id = local_id;
|
||||||
this.oa_original_id = oa_original_id;
|
this.oa_original_id = oa_original_id;
|
||||||
this.oa_name = oa_name;
|
this.oa_name = oa_name;
|
||||||
|
@ -29,6 +42,16 @@ public class OrgSimRel implements Serializable {
|
||||||
this.oa_collectedfrom = oa_collectedfrom;
|
this.oa_collectedfrom = oa_collectedfrom;
|
||||||
this.group_id = group_id;
|
this.group_id = group_id;
|
||||||
this.pid_list = pid_list;
|
this.pid_list = pid_list;
|
||||||
|
this.ec_legalbody = ec_legalbody;
|
||||||
|
this.ec_legalperson = ec_legalperson;
|
||||||
|
this.ec_nonprofit = ec_nonprofit;
|
||||||
|
this.ec_researchorganization = ec_researchorganization;
|
||||||
|
this.ec_highereducation = ec_highereducation;
|
||||||
|
this.ec_internationalorganizationeurinterests = ec_internationalorganizationeurinterests;
|
||||||
|
this.ec_internationalorganization = ec_internationalorganization;
|
||||||
|
this.ec_enterprise = ec_enterprise;
|
||||||
|
this.ec_smevalidated = ec_smevalidated;
|
||||||
|
this.ec_nutscode = ec_nutscode;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getLocal_id() {
|
public String getLocal_id() {
|
||||||
|
@ -103,6 +126,86 @@ public class OrgSimRel implements Serializable {
|
||||||
this.pid_list = pid_list;
|
this.pid_list = pid_list;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Boolean getEc_legalbody() {
|
||||||
|
return ec_legalbody;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEc_legalbody(Boolean ec_legalbody) {
|
||||||
|
this.ec_legalbody = ec_legalbody;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getEc_legalperson() {
|
||||||
|
return ec_legalperson;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEc_legalperson(Boolean ec_legalperson) {
|
||||||
|
this.ec_legalperson = ec_legalperson;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getEc_nonprofit() {
|
||||||
|
return ec_nonprofit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEc_nonprofit(Boolean ec_nonprofit) {
|
||||||
|
this.ec_nonprofit = ec_nonprofit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getEc_researchorganization() {
|
||||||
|
return ec_researchorganization;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEc_researchorganization(Boolean ec_researchorganization) {
|
||||||
|
this.ec_researchorganization = ec_researchorganization;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getEc_highereducation() {
|
||||||
|
return ec_highereducation;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEc_highereducation(Boolean ec_highereducation) {
|
||||||
|
this.ec_highereducation = ec_highereducation;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getEc_internationalorganizationeurinterests() {
|
||||||
|
return ec_internationalorganizationeurinterests;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEc_internationalorganizationeurinterests(Boolean ec_internationalorganizationeurinterests) {
|
||||||
|
this.ec_internationalorganizationeurinterests = ec_internationalorganizationeurinterests;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getEc_internationalorganization() {
|
||||||
|
return ec_internationalorganization;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEc_internationalorganization(Boolean ec_internationalorganization) {
|
||||||
|
this.ec_internationalorganization = ec_internationalorganization;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getEc_enterprise() {
|
||||||
|
return ec_enterprise;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEc_enterprise(Boolean ec_enterprise) {
|
||||||
|
this.ec_enterprise = ec_enterprise;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getEc_smevalidated() {
|
||||||
|
return ec_smevalidated;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEc_smevalidated(Boolean ec_smevalidated) {
|
||||||
|
this.ec_smevalidated = ec_smevalidated;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getEc_nutscode() {
|
||||||
|
return ec_nutscode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEc_nutscode(Boolean ec_nutscode) {
|
||||||
|
this.ec_nutscode = ec_nutscode;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "OrgSimRel{" +
|
return "OrgSimRel{" +
|
||||||
|
@ -115,6 +218,17 @@ public class OrgSimRel implements Serializable {
|
||||||
", oa_collectedfrom='" + oa_collectedfrom + '\'' +
|
", oa_collectedfrom='" + oa_collectedfrom + '\'' +
|
||||||
", group_id='" + group_id + '\'' +
|
", group_id='" + group_id + '\'' +
|
||||||
", pid_list='" + pid_list + '\'' +
|
", pid_list='" + pid_list + '\'' +
|
||||||
|
", ec_legalbody=" + ec_legalbody +
|
||||||
|
", ec_legalperson=" + ec_legalperson +
|
||||||
|
", ec_nonprofit=" + ec_nonprofit +
|
||||||
|
", ec_researchorganization=" + ec_researchorganization +
|
||||||
|
", ec_highereducation=" + ec_highereducation +
|
||||||
|
", ec_internationalorganizationeurinterests=" + ec_internationalorganizationeurinterests +
|
||||||
|
", ec_internationalorganization=" + ec_internationalorganization +
|
||||||
|
", ec_enterprise=" + ec_enterprise +
|
||||||
|
", ec_smevalidated=" + ec_smevalidated +
|
||||||
|
", ec_nutscode=" + ec_nutscode +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,6 @@ import java.io.Serializable;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
@ -31,6 +30,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
|
||||||
|
@ -102,34 +102,6 @@ public class SparkOpenorgsTest implements Serializable {
|
||||||
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
|
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Disabled
|
|
||||||
@Test
|
|
||||||
public void copyOpenorgsTest() throws Exception {
|
|
||||||
|
|
||||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
|
||||||
IOUtils
|
|
||||||
.toString(
|
|
||||||
SparkCopyOpenorgs.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json")));
|
|
||||||
parser
|
|
||||||
.parseArgument(
|
|
||||||
new String[] {
|
|
||||||
"-i", testGraphBasePath,
|
|
||||||
"-asi", testActionSetId,
|
|
||||||
"-w", testOutputBasePath,
|
|
||||||
"-np", "50"
|
|
||||||
});
|
|
||||||
|
|
||||||
new SparkCopyOpenorgs(parser, spark).run(isLookUpService);
|
|
||||||
|
|
||||||
long orgs_deduprecord = jsc
|
|
||||||
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord")
|
|
||||||
.count();
|
|
||||||
|
|
||||||
assertEquals(100, orgs_deduprecord);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void copyOpenorgsMergeRels() throws Exception {
|
public void copyOpenorgsMergeRels() throws Exception {
|
||||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
@ -155,9 +127,12 @@ public class SparkOpenorgsTest implements Serializable {
|
||||||
.load(DedupUtility.createMergeRelPath(testOutputBasePath, testActionSetId, "organization"))
|
.load(DedupUtility.createMergeRelPath(testOutputBasePath, testActionSetId, "organization"))
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
Dataset<Relation> orgrels = spark.read().load(DedupUtility.createMergeRelPath(testOutputBasePath, testActionSetId, "organization")).as(Encoders.bean(Relation.class));
|
Dataset<Relation> orgrels = spark
|
||||||
|
.read()
|
||||||
|
.load(DedupUtility.createMergeRelPath(testOutputBasePath, testActionSetId, "organization"))
|
||||||
|
.as(Encoders.bean(Relation.class));
|
||||||
|
|
||||||
for (Relation r: orgrels.toJavaRDD().collect())
|
for (Relation r : orgrels.toJavaRDD().collect())
|
||||||
System.out.println("r = " + r.getSource() + "---" + r.getTarget() + "---" + r.getRelClass());
|
System.out.println("r = " + r.getSource() + "---" + r.getTarget() + "---" + r.getRelClass());
|
||||||
|
|
||||||
assertEquals(384, orgs_mergerel);
|
assertEquals(384, orgs_mergerel);
|
||||||
|
|
|
@ -1,28 +1,7 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.raw;
|
package eu.dnetlib.dhp.oa.graph.raw;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
|
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.HAS_PARTICIPANT;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PARTICIPANT;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PROVIDED_BY;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_RELATED_TO;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.ORP_DEFAULT_RESULTTYPE;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.PARTICIPATION;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROJECT_ORGANIZATION;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVIDES;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVISION;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.RELATIONSHIP;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE;
|
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.USER_CLAIM;
|
|
||||||
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.asString;
|
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.asString;
|
||||||
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId;
|
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId;
|
||||||
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.dataInfo;
|
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.dataInfo;
|
||||||
|
@ -161,27 +140,24 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
|
||||||
.execute(
|
.execute(
|
||||||
"queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix);
|
"queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix);
|
||||||
break;
|
break;
|
||||||
case openorgs_dedup: //generates organization entities and relations for openorgs dedup
|
case openorgs_dedup: // generates organization entities and relations for openorgs dedup
|
||||||
log.info("Processing Openorgs...");
|
log.info("Processing Openorgs...");
|
||||||
smdbe
|
smdbe
|
||||||
.execute(
|
.execute(
|
||||||
"queryOpenOrgsForOrgsDedup.sql", smdbe::processOrganization, verifyNamespacePrefix);
|
"queryOpenOrgsForOrgsDedup.sql", smdbe::processOrganization, verifyNamespacePrefix);
|
||||||
|
|
||||||
log.info("Processing Openorgs Merge Rels...");
|
log.info("Processing Openorgs Sim Rels...");
|
||||||
smdbe.execute("queryOpenOrgsSimilarityForOrgsDedup.sql", smdbe::processOrgOrgSimRels);
|
smdbe.execute("queryOpenOrgsSimilarityForOrgsDedup.sql", smdbe::processOrgOrgSimRels);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case openorgs: //generates organization entities and relations for provision
|
case openorgs: // generates organization entities and relations for provision
|
||||||
log.info("Processing Openorgs For Provision...");
|
log.info("Processing Openorgs For Provision...");
|
||||||
smdbe
|
smdbe
|
||||||
.execute(
|
.execute(
|
||||||
"queryOpenOrgsForProvision.sql", smdbe::processOrganization, verifyNamespacePrefix);
|
"queryOpenOrgsForProvision.sql", smdbe::processOrganization, verifyNamespacePrefix);
|
||||||
|
|
||||||
log.info("Processing Openorgs Merge Rels...");
|
log.info("Processing Openorgs Merge Rels...");
|
||||||
smdbe.execute("queryOpenOrgsSimilarityForProvision.sql", smdbe::processOrgOrgSimRels);
|
smdbe.execute("queryOpenOrgsSimilarityForProvision.sql", smdbe::processOrgOrgMergeRels);
|
||||||
//TODO cambiare il mapping delle relazioni in modo che crei merges e isMergedIn
|
|
||||||
// TODO (specifico per questo caso, questa funzione di mapping verrà usata così com'è nel caso di openorgs dedup
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case openaire_organizations:
|
case openaire_organizations:
|
||||||
|
@ -635,6 +611,41 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<Oaf> processOrgOrgMergeRels(final ResultSet rs) {
|
||||||
|
try {
|
||||||
|
final DataInfo info = prepareDataInfo(rs); // TODO
|
||||||
|
|
||||||
|
final String orgId1 = createOpenaireId(20, rs.getString("id1"), true);
|
||||||
|
final String orgId2 = createOpenaireId(20, rs.getString("id2"), true);
|
||||||
|
|
||||||
|
final List<KeyValue> collectedFrom = listKeyValues(
|
||||||
|
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
|
||||||
|
|
||||||
|
final Relation r1 = new Relation();
|
||||||
|
r1.setRelType(ORG_ORG_RELTYPE);
|
||||||
|
r1.setSubRelType(ModelConstants.DEDUP);
|
||||||
|
r1.setRelClass(MERGES);
|
||||||
|
r1.setSource(orgId1);
|
||||||
|
r1.setTarget(orgId2);
|
||||||
|
r1.setCollectedfrom(collectedFrom);
|
||||||
|
r1.setDataInfo(info);
|
||||||
|
r1.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||||
|
|
||||||
|
final Relation r2 = new Relation();
|
||||||
|
r2.setRelType(ORG_ORG_RELTYPE);
|
||||||
|
r2.setSubRelType(ModelConstants.DEDUP);
|
||||||
|
r2.setRelClass(IS_MERGED_IN);
|
||||||
|
r2.setSource(orgId2);
|
||||||
|
r2.setTarget(orgId1);
|
||||||
|
r2.setCollectedfrom(collectedFrom);
|
||||||
|
r2.setDataInfo(info);
|
||||||
|
r2.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||||
|
return Arrays.asList(r1, r2);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public List<Oaf> processOrgOrgSimRels(final ResultSet rs) {
|
public List<Oaf> processOrgOrgSimRels(final ResultSet rs) {
|
||||||
try {
|
try {
|
||||||
final DataInfo info = prepareDataInfo(rs); // TODO
|
final DataInfo info = prepareDataInfo(rs); // TODO
|
||||||
|
@ -647,7 +658,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
|
||||||
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
|
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
|
||||||
|
|
||||||
final Relation r1 = new Relation();
|
final Relation r1 = new Relation();
|
||||||
r1.setRelType(ModelConstants.ORG_ORG_RELTYPE);
|
r1.setRelType(ORG_ORG_RELTYPE);
|
||||||
r1.setSubRelType(ModelConstants.DEDUP);
|
r1.setSubRelType(ModelConstants.DEDUP);
|
||||||
r1.setRelClass(relClass);
|
r1.setRelClass(relClass);
|
||||||
r1.setSource(orgId1);
|
r1.setSource(orgId1);
|
||||||
|
|
|
@ -60,20 +60,22 @@ SELECT
|
||||||
o.country || '@@@dnet:countries' AS country,
|
o.country || '@@@dnet:countries' AS country,
|
||||||
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
|
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
|
||||||
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid,
|
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid,
|
||||||
null AS eclegalbody,
|
(array_remove(array_cat(ARRAY[o.ec_legalbody], array_agg(od.ec_legalbody)), NULL))[1] AS eclegalbody,
|
||||||
null AS eclegalperson,
|
(array_remove(array_cat(ARRAY[o.ec_legalperson], array_agg(od.ec_legalperson)), NULL))[1] AS eclegalperson,
|
||||||
null AS ecnonprofit,
|
(array_remove(array_cat(ARRAY[o.ec_nonprofit], array_agg(od.ec_nonprofit)), NULL))[1] AS ecnonprofit,
|
||||||
null AS ecresearchorganization,
|
(array_remove(array_cat(ARRAY[o.ec_researchorganization], array_agg(od.ec_researchorganization)), NULL))[1] AS ecresearchorganization,
|
||||||
null AS echighereducation,
|
(array_remove(array_cat(ARRAY[o.ec_highereducation], array_agg(od.ec_highereducation)), NULL))[1] AS echighereducation,
|
||||||
null AS ecinternationalorganizationeurinterests,
|
(array_remove(array_cat(ARRAY[o.ec_internationalorganizationeurinterests], array_agg(od.ec_internationalorganizationeurinterests)), NULL))[1] AS ecinternationalorganizationeurinterests,
|
||||||
null AS ecinternationalorganization,
|
(array_remove(array_cat(ARRAY[o.ec_internationalorganization], array_agg(od.ec_internationalorganization)), NULL))[1] AS ecinternationalorganization,
|
||||||
null AS ecenterprise,
|
(array_remove(array_cat(ARRAY[o.ec_enterprise], array_agg(od.ec_enterprise)), NULL))[1] AS ecenterprise,
|
||||||
null AS ecsmevalidated,
|
(array_remove(array_cat(ARRAY[o.ec_smevalidated], array_agg(od.ec_smevalidated)), NULL))[1] AS ecsmevalidated,
|
||||||
null AS ecnutscode
|
(array_remove(array_cat(ARRAY[o.ec_nutscode], array_agg(od.ec_nutscode)), NULL))[1] AS ecnutscode
|
||||||
FROM other_names n
|
FROM other_names n
|
||||||
LEFT OUTER JOIN organizations o ON (n.id = o.id)
|
LEFT OUTER JOIN organizations o ON (n.id = o.id)
|
||||||
LEFT OUTER JOIN urls u ON (u.id = o.id)
|
LEFT OUTER JOIN urls u ON (u.id = o.id)
|
||||||
LEFT OUTER JOIN other_ids i ON (i.id = o.id)
|
LEFT OUTER JOIN other_ids i ON (i.id = o.id)
|
||||||
|
LEFT OUTER JOIN oa_duplicates d ON (o.id = d.local_id)
|
||||||
|
LEFT OUTER JOIN organizations od ON (d.oa_original_id = od.id)
|
||||||
WHERE
|
WHERE
|
||||||
o.status = 'approved'
|
o.status = 'approved'
|
||||||
GROUP BY
|
GROUP BY
|
||||||
|
|
|
@ -15,22 +15,25 @@ SELECT
|
||||||
'OpenOrgs Database' AS collectedfromname,
|
'OpenOrgs Database' AS collectedfromname,
|
||||||
o.country || '@@@dnet:countries' AS country,
|
o.country || '@@@dnet:countries' AS country,
|
||||||
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
|
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
|
||||||
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid,
|
array_remove(array_cat(array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types'), array_agg(DISTINCT idup.otherid || '###' || idup.type || '@@@dnet:pid_types')), NULL) AS pid,
|
||||||
null AS eclegalbody,
|
(array_remove(array_cat(ARRAY[o.ec_legalbody], array_agg(od.ec_legalbody)), NULL))[1] AS eclegalbody,
|
||||||
null AS eclegalperson,
|
(array_remove(array_cat(ARRAY[o.ec_legalperson], array_agg(od.ec_legalperson)), NULL))[1] AS eclegalperson,
|
||||||
null AS ecnonprofit,
|
(array_remove(array_cat(ARRAY[o.ec_nonprofit], array_agg(od.ec_nonprofit)), NULL))[1] AS ecnonprofit,
|
||||||
null AS ecresearchorganization,
|
(array_remove(array_cat(ARRAY[o.ec_researchorganization], array_agg(od.ec_researchorganization)), NULL))[1] AS ecresearchorganization,
|
||||||
null AS echighereducation,
|
(array_remove(array_cat(ARRAY[o.ec_highereducation], array_agg(od.ec_highereducation)), NULL))[1] AS echighereducation,
|
||||||
null AS ecinternationalorganizationeurinterests,
|
(array_remove(array_cat(ARRAY[o.ec_internationalorganizationeurinterests], array_agg(od.ec_internationalorganizationeurinterests)), NULL))[1] AS ecinternationalorganizationeurinterests,
|
||||||
null AS ecinternationalorganization,
|
(array_remove(array_cat(ARRAY[o.ec_internationalorganization], array_agg(od.ec_internationalorganization)), NULL))[1] AS ecinternationalorganization,
|
||||||
null AS ecenterprise,
|
(array_remove(array_cat(ARRAY[o.ec_enterprise], array_agg(od.ec_enterprise)), NULL))[1] AS ecenterprise,
|
||||||
null AS ecsmevalidated,
|
(array_remove(array_cat(ARRAY[o.ec_smevalidated], array_agg(od.ec_smevalidated)), NULL))[1] AS ecsmevalidated,
|
||||||
null AS ecnutscode
|
(array_remove(array_cat(ARRAY[o.ec_nutscode], array_agg(od.ec_nutscode)), NULL))[1] AS ecnutscode
|
||||||
FROM organizations o
|
FROM organizations o
|
||||||
LEFT OUTER JOIN acronyms a ON (a.id = o.id)
|
LEFT OUTER JOIN acronyms a ON (a.id = o.id)
|
||||||
LEFT OUTER JOIN urls u ON (u.id = o.id)
|
LEFT OUTER JOIN urls u ON (u.id = o.id)
|
||||||
LEFT OUTER JOIN other_ids i ON (i.id = o.id)
|
LEFT OUTER JOIN other_ids i ON (i.id = o.id)
|
||||||
LEFT OUTER JOIN other_names n ON (n.id = o.id)
|
LEFT OUTER JOIN other_names n ON (n.id = o.id)
|
||||||
|
LEFT OUTER JOIN oa_duplicates d ON (o.id = d.local_id AND d.reltype != 'is_different')
|
||||||
|
LEFT OUTER JOIN organizations od ON (d.oa_original_id = od.id)
|
||||||
|
LEFT OUTER JOIN other_ids idup ON (od.id = idup.id)
|
||||||
WHERE
|
WHERE
|
||||||
o.status = 'approved'
|
o.status = 'approved'
|
||||||
GROUP BY
|
GROUP BY
|
||||||
|
|
|
@ -7,6 +7,5 @@ SELECT
|
||||||
false AS inferred,
|
false AS inferred,
|
||||||
false AS deletedbyinference,
|
false AS deletedbyinference,
|
||||||
0.99 AS trust,
|
0.99 AS trust,
|
||||||
'' AS inferenceprovenance,
|
'' AS inferenceprovenance
|
||||||
'isSimilarTo' AS relclass
|
|
||||||
FROM oa_duplicates WHERE reltype = 'is_similar' OR reltype = 'suggested';
|
FROM oa_duplicates WHERE reltype = 'is_similar' OR reltype = 'suggested';
|
|
@ -51,6 +51,3 @@ GROUP BY
|
||||||
d.id,
|
d.id,
|
||||||
d.officialname,
|
d.officialname,
|
||||||
o.country;
|
o.country;
|
||||||
|
|
||||||
-- TODO modificare in modo da fare il merge dei pid di tutti i record mergiati (per gli openorgs, approvati)
|
|
||||||
-- TODO invece per tutti gli altri con dei duplicati non fare questa cosa
|
|
Loading…
Reference in New Issue