implementation of the openorgs wfs, implementation of the raw_all wf to migrate openorgs db entities

This commit is contained in:
miconis 2021-02-10 11:51:50 +01:00
parent c7e2d5a59a
commit 4b2124a18e
13 changed files with 858 additions and 385 deletions

View File

@ -1,12 +1,9 @@
package eu.dnetlib.dhp.oa.dedup;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.IOException;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
@ -18,83 +15,88 @@ import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Optional;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class SparkCopyOpenorgs extends AbstractSparkAction{
private static final Logger log = LoggerFactory.getLogger(SparkCopyRels.class);
public class SparkCopyOpenorgs extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgs.class);
public SparkCopyOpenorgs(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public SparkCopyOpenorgs(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json")));
parser.parseArgument(args);
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
new SparkCopyOpenorgs(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
SparkConf conf = new SparkConf();
new SparkCopyOpenorgs(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
public void run(ISLookUpService isLookUpService)
throws DocumentException, IOException, ISLookUpException {
@Override
public void run(ISLookUpService isLookUpService)
throws DocumentException, IOException, ISLookUpException {
// read oozie parameters
final String graphBasePath = parser.get("graphBasePath");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
// read oozie parameters
final String graphBasePath = parser.get("graphBasePath");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
log.info("numPartitions: '{}'", numPartitions);
log.info("graphBasePath: '{}'", graphBasePath);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
log.info("numPartitions: '{}'", numPartitions);
log.info("graphBasePath: '{}'", graphBasePath);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
String subEntity = "organization";
log.info("Copying openorgs to the working dir");
String subEntity = "organization";
log.info("Copying openorgs to the working dir");
final String outputPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity);
removeOutputDir(spark, outputPath);
final String outputPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity);
removeOutputDir(spark, outputPath);
final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
final Class<OafEntity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
final Class<OafEntity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
filterEntities(spark, entityPath, clazz)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
filterEntities(spark, entityPath, clazz)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
}
public static <T extends OafEntity> Dataset<T> filterEntities(
final SparkSession spark,
final String entitiesInputPath,
final Class<T> clazz) {
public static <T extends OafEntity> Dataset<T> filterEntities(
final SparkSession spark,
final String entitiesInputPath,
final Class<T> clazz) {
// <id, json_entity>
Dataset<T> entities = spark
.read()
.textFile(entitiesInputPath)
.map(
(MapFunction<String, T>) it -> {
T entity = OBJECT_MAPPER.readValue(it, clazz);
return entity;
},
Encoders.kryo(clazz));
// <id, json_entity>
Dataset<T> entities = spark
.read()
.textFile(entitiesInputPath)
.map(
(MapFunction<String, T>) it -> {
T entity = OBJECT_MAPPER.readValue(it, clazz);
return entity;
},
Encoders.kryo(clazz));
return entities.filter(entities.col("id").contains("openorgs____"));
}
return entities.filter(entities.col("id").contains("openorgs____"));
}
}

View File

@ -0,0 +1,181 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.pace.config.DedupConfig;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
//copy simrels (verified) from relation to the workdir in order to make them available for the deduplication
public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgsMergeRels.class);
public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup";
public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions";
public SparkCopyOpenorgsMergeRels(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyOpenorgsMergeRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
new SparkCopyOpenorgsMergeRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
public void run(ISLookUpService isLookUpService)
throws DocumentException, IOException, ISLookUpException {
// read oozie parameters
final String graphBasePath = parser.get("graphBasePath");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
log.info("numPartitions: '{}'", numPartitions);
log.info("graphBasePath: '{}'", graphBasePath);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
log.info("Copying OpenOrgs Merge Rels");
final String outputPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
removeOutputDir(spark, outputPath);
final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
DedupConfig dedupConf = getConfigurations(isLookUpService, actionSetId).get(0);
JavaRDD<Relation> rawRels = spark
.read()
.textFile(relationPath)
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(this::isOpenorgs) //takes only relations coming from openorgs
.filter(this::filterOpenorgsRels) //takes only isSimilarTo relations between organizations from openorgs
.filter(this::excludeOpenorgsMesh) //excludes relations between an organization and an openorgsmesh
.filter(this::excludeNonOpenorgs); //excludes relations with no openorgs id involved
//turn openorgs isSimilarTo relations into mergerels
JavaRDD<Relation> mergeRels = rawRels.flatMap(rel -> {
List<Relation> mergerels = new ArrayList<>();
String openorgsId = rel.getSource().contains("openorgs____")? rel.getSource() : rel.getTarget();
String mergedId = rel.getSource().contains("openorgs____")? rel.getTarget() : rel.getSource();
mergerels.add(rel(openorgsId, mergedId, "merges", dedupConf));
mergerels.add(rel(mergedId, openorgsId, "isMergedIn", dedupConf));
return mergerels.iterator();
});
mergeRels.saveAsTextFile(outputPath);
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
private boolean filterOpenorgsRels(Relation rel) {
if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") && rel.getSubRelType().equals("dedup"))
return true;
return false;
}
private boolean isOpenorgs(Relation rel) {
if (rel.getCollectedfrom() != null) {
for (KeyValue k: rel.getCollectedfrom()) {
if (k.getValue().equals("OpenOrgs Database")) {
return true;
}
}
}
return false;
}
private boolean excludeOpenorgsMesh(Relation rel) {
if (rel.getSource().equals("openorgsmesh") || rel.getTarget().equals("openorgsmesh")) {
return false;
}
return true;
}
private boolean excludeNonOpenorgs(Relation rel) {
if (rel.getSource().equals("openorgs____") || rel.getTarget().equals("openorgs____")) {
return true;
}
return false;
}
private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) {
String entityType = dedupConf.getWf().getEntityType();
Relation r = new Relation();
r.setSource(source);
r.setTarget(target);
r.setRelClass(relClass);
r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1));
r.setSubRelType("dedup");
DataInfo info = new DataInfo();
info.setDeletedbyinference(false);
info.setInferred(true);
info.setInvisible(false);
info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
Qualifier provenanceAction = new Qualifier();
provenanceAction.setClassid(PROVENANCE_ACTION_CLASS);
provenanceAction.setClassname(PROVENANCE_ACTION_CLASS);
provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS);
provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS);
info.setProvenanceaction(provenanceAction);
// TODO calculate the trust value based on the similarity score of the elements in the CC
// info.setTrust();
r.setDataInfo(info);
return r;
}
}

View File

@ -1,29 +1,37 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.pace.config.DedupConfig;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Optional;
//copy simrels (verified) from relation to the workdir in order to make them available for the deduplication
public class SparkCopyRels extends AbstractSparkAction{
private static final Logger log = LoggerFactory.getLogger(SparkCopyRels.class);
public class SparkCopyOpenorgsSimRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgsMergeRels.class);
public SparkCopyRels(ArgumentApplicationParser parser, SparkSession spark) {
public SparkCopyOpenorgsSimRels(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
@ -31,13 +39,13 @@ public class SparkCopyRels extends AbstractSparkAction{
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyRels.class
SparkCopyOpenorgsSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/copyRels_parameters.json")));
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
new SparkCopyRels(parser, getSparkSession(conf))
new SparkCopyOpenorgsSimRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@ -49,8 +57,6 @@ public class SparkCopyRels extends AbstractSparkAction{
final String graphBasePath = parser.get("graphBasePath");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final String destination = parser.get("destination");
final String entity = parser.get("entityType");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
@ -60,30 +66,24 @@ public class SparkCopyRels extends AbstractSparkAction{
log.info("graphBasePath: '{}'", graphBasePath);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
log.info("entity: '{}'", entity);
log.info("Copying " + destination + " for: '{}'", entity);
log.info("Copying OpenOrgs SimRels");
final String outputPath;
if (destination.contains("mergerel")) {
outputPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, entity);
}
else {
outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, entity);
}
final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, "organization");
removeOutputDir(spark, outputPath);
final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
JavaRDD<Relation> simRels =
spark.read()
.textFile(relationPath)
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(r -> filterRels(r, entity));
JavaRDD<Relation> rawRels = spark
.read()
.textFile(relationPath)
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(this::isOpenorgs)
.filter(this::filterOpenorgsRels);
simRels.saveAsTextFile(outputPath);
save(spark.createDataset(rawRels.rdd(),Encoders.bean(Relation.class)), outputPath, SaveMode.Append);
}
private static MapFunction<String, Relation> patchRelFn() {
@ -96,20 +96,23 @@ public class SparkCopyRels extends AbstractSparkAction{
};
}
private boolean filterRels(Relation rel, String entityType) {
private boolean filterOpenorgsRels(Relation rel) {
switch(entityType) {
case "result":
if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("resultResult") && rel.getSubRelType().equals("dedup"))
if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") && rel.getSubRelType().equals("dedup"))
return true;
return false;
}
private boolean isOpenorgs(Relation rel) {
if (rel.getCollectedfrom() != null) {
for (KeyValue k: rel.getCollectedfrom()) {
if (k.getValue().equals("OpenOrgs Database")) {
return true;
break;
case "organization":
if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") && rel.getSubRelType().equals("dedup"))
return true;
break;
default:
return false;
}
}
}
return false;
}
}

View File

@ -0,0 +1,110 @@
package eu.dnetlib.dhp.oa.dedup;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.IOException;
import java.util.Optional;
public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkUpdateEntity.class);
private static final String IDJSONPATH = "$.id";
public SparkCopyRelationsNoOpenorgs(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyRelationsNoOpenorgs.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
new SparkUpdateEntity(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
public void run(ISLookUpService isLookUpService) throws IOException {
final String graphBasePath = parser.get("graphBasePath");
final String workingPath = parser.get("workingPath");
final String dedupGraphPath = parser.get("dedupGraphPath");
log.info("graphBasePath: '{}'", graphBasePath);
log.info("workingPath: '{}'", workingPath);
log.info("dedupGraphPath: '{}'", dedupGraphPath);
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
final String outputPath = DedupUtility.createEntityPath(dedupGraphPath, "relation");
removeOutputDir(spark, outputPath);
JavaRDD<Relation> simRels = spark
.read()
.textFile(relationPath)
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(this::excludeOpenorgsRels);
simRels.saveAsTextFile(outputPath);
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
private boolean excludeOpenorgsRels(Relation rel) {
if (rel.getCollectedfrom() != null) {
for (KeyValue k: rel.getCollectedfrom()) {
if (k.getValue().equals("OpenOrgs Database")) {
return false;
}
}
}
return true;
}
}

View File

@ -1,18 +1,15 @@
package eu.dnetlib.dhp.oa.dedup;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent;
import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor;
import eu.dnetlib.dhp.oa.dedup.model.Block;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.MapDocumentUtil;
import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.DNET_PROVENANCE_ACTIONS;
import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.PROVENANCE_ACTION_CLASS;
import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.hash;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
@ -29,205 +26,215 @@ import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent;
import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor;
import eu.dnetlib.dhp.oa.dedup.model.Block;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.DNET_PROVENANCE_ACTIONS;
import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.PROVENANCE_ACTION_CLASS;
import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.hash;
public class SparkRemoveDiffRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkRemoveDiffRels.class);
private static final Logger log = LoggerFactory.getLogger(SparkRemoveDiffRels.class);
public SparkRemoveDiffRels(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public SparkRemoveDiffRels(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
parser.parseArgument(args);
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
new SparkCreateSimRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
SparkConf conf = new SparkConf();
new SparkCreateSimRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
public void run(ISLookUpService isLookUpService)
throws DocumentException, IOException, ISLookUpException {
@Override
public void run(ISLookUpService isLookUpService)
throws DocumentException, IOException, ISLookUpException {
// read oozie parameters
final String graphBasePath = parser.get("graphBasePath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
// read oozie parameters
final String graphBasePath = parser.get("graphBasePath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
log.info("numPartitions: '{}'", numPartitions);
log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
log.info("numPartitions: '{}'", numPartitions);
log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
// for each dedup configuration
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
// for each dedup configuration
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
final String entity = dedupConf.getWf().getEntityType();
final String subEntity = dedupConf.getWf().getSubEntityValue();
log.info("Removing diffrels for: '{}'", subEntity);
final String entity = dedupConf.getWf().getEntityType();
final String subEntity = dedupConf.getWf().getSubEntityValue();
log.info("Removing diffrels for: '{}'", subEntity);
final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
final String relationPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
final String relationPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
final int maxIterations = dedupConf.getWf().getMaxIterations();
log.info("Max iterations {}", maxIterations);
final int maxIterations = dedupConf.getWf().getMaxIterations();
log.info("Max iterations {}", maxIterations);
JavaRDD<Relation> mergeRelsRDD = spark
.read()
.load(mergeRelsPath)
.as(Encoders.bean(Relation.class))
.where("relClass == 'merges'")
.toJavaRDD();
JavaRDD<Relation> mergeRelsRDD = spark
.read()
.load(mergeRelsPath)
.as(Encoders.bean(Relation.class))
.where("relClass == 'merges'")
.toJavaRDD();
JavaRDD<Tuple2<Tuple2<String, String>, String>> diffRelsRDD = spark
.read()
.textFile(relationPath)
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD().filter(r -> filterRels(r, entity))
.map(rel -> {
if (rel.getSource().compareTo(rel.getTarget()) < 0)
return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel");
else
return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel");
});
JavaRDD<Tuple2<Tuple2<String, String>, String>> diffRelsRDD = spark
.read()
.textFile(relationPath)
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(r -> filterRels(r, entity))
.map(rel -> {
if (rel.getSource().compareTo(rel.getTarget()) < 0)
return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel");
else
return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel");
});
JavaRDD<Tuple2<Tuple2<String, String>, String>> flatMergeRels = mergeRelsRDD
.mapToPair(rel -> new Tuple2<>(rel.getSource(), rel.getTarget()))
.groupByKey()
.flatMap(g -> {
List<Tuple2<Tuple2<String,String>, String>> rels = new ArrayList<>();
JavaRDD<Tuple2<Tuple2<String, String>, String>> flatMergeRels = mergeRelsRDD
.mapToPair(rel -> new Tuple2<>(rel.getSource(), rel.getTarget()))
.groupByKey()
.flatMap(g -> {
List<Tuple2<Tuple2<String, String>, String>> rels = new ArrayList<>();
List<String> ids = StreamSupport
.stream(g._2().spliterator(), false)
.collect(Collectors.toList());
List<String> ids = StreamSupport
.stream(g._2().spliterator(), false)
.collect(Collectors.toList());
for (int i = 0; i < ids.size(); i++){
for (int j = i+1; j < ids.size(); j++){
if (ids.get(i).compareTo(ids.get(j)) < 0)
rels.add(new Tuple2<>(new Tuple2<>(ids.get(i), ids.get(j)), g._1()));
else
rels.add(new Tuple2<>(new Tuple2<>(ids.get(j), ids.get(i)), g._1()));
}
}
return rels.iterator();
for (int i = 0; i < ids.size(); i++) {
for (int j = i + 1; j < ids.size(); j++) {
if (ids.get(i).compareTo(ids.get(j)) < 0)
rels.add(new Tuple2<>(new Tuple2<>(ids.get(i), ids.get(j)), g._1()));
else
rels.add(new Tuple2<>(new Tuple2<>(ids.get(j), ids.get(i)), g._1()));
}
}
return rels.iterator();
});
});
JavaRDD<Relation> purgedMergeRels = flatMergeRels.union(diffRelsRDD)
.mapToPair(rel -> new Tuple2<>(rel._1(), Arrays.asList(rel._2())))
.reduceByKey((a, b) -> {
List<String> list = new ArrayList<String>();
list.addAll(a);
list.addAll(b);
return list;
})
.filter(rel -> rel._2().size() == 1)
.mapToPair(rel -> new Tuple2<>(rel._2().get(0), rel._1()))
.flatMap(rel -> {
List<Tuple2<String, String>> rels = new ArrayList<>();
String source = rel._1();
rels.add(new Tuple2<>(source, rel._2()._1()));
rels.add(new Tuple2<>(source, rel._2()._2()));
return rels.iterator();
})
.distinct()
.flatMap(rel -> tupleToMergeRel(rel, dedupConf));
JavaRDD<Relation> purgedMergeRels = flatMergeRels
.union(diffRelsRDD)
.mapToPair(rel -> new Tuple2<>(rel._1(), Arrays.asList(rel._2())))
.reduceByKey((a, b) -> {
List<String> list = new ArrayList<String>();
list.addAll(a);
list.addAll(b);
return list;
})
.filter(rel -> rel._2().size() == 1)
.mapToPair(rel -> new Tuple2<>(rel._2().get(0), rel._1()))
.flatMap(rel -> {
List<Tuple2<String, String>> rels = new ArrayList<>();
String source = rel._1();
rels.add(new Tuple2<>(source, rel._2()._1()));
rels.add(new Tuple2<>(source, rel._2()._2()));
return rels.iterator();
})
.distinct()
.flatMap(rel -> tupleToMergeRel(rel, dedupConf));
spark
.createDataset(purgedMergeRels.rdd(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Overwrite).parquet(mergeRelsPath);
}
}
spark
.createDataset(purgedMergeRels.rdd(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Overwrite)
.json(mergeRelsPath);
}
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
private boolean filterRels(Relation rel, String entityType) {
private boolean filterRels(Relation rel, String entityType) {
switch(entityType) {
case "result":
if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("resultResult") && rel.getSubRelType().equals("dedup"))
return true;
break;
case "organization":
if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("organizationOrganization") && rel.getSubRelType().equals("dedup"))
return true;
break;
default:
return false;
}
return false;
}
switch (entityType) {
case "result":
if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("resultResult")
&& rel.getSubRelType().equals("dedup"))
return true;
break;
case "organization":
if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("organizationOrganization")
&& rel.getSubRelType().equals("dedup"))
return true;
break;
default:
return false;
}
return false;
}
public Iterator<Relation> tupleToMergeRel(Tuple2<String, String> rel, DedupConfig dedupConf) {
public Iterator<Relation> tupleToMergeRel(Tuple2<String, String> rel, DedupConfig dedupConf) {
List<Relation> rels = new ArrayList<>();
List<Relation> rels = new ArrayList<>();
rels.add(rel(rel._1(), rel._2(), "merges", dedupConf));
rels.add(rel(rel._2(), rel._1(), "isMergedIn", dedupConf));
rels.add(rel(rel._1(), rel._2(), "merges", dedupConf));
rels.add(rel(rel._2(), rel._1(), "isMergedIn", dedupConf));
return rels.iterator();
}
return rels.iterator();
}
private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) {
private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) {
String entityType = dedupConf.getWf().getEntityType();
String entityType = dedupConf.getWf().getEntityType();
Relation r = new Relation();
r.setSource(source);
r.setTarget(target);
r.setRelClass(relClass);
r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1));
r.setSubRelType("dedup");
Relation r = new Relation();
r.setSource(source);
r.setTarget(target);
r.setRelClass(relClass);
r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1));
r.setSubRelType("dedup");
DataInfo info = new DataInfo();
info.setDeletedbyinference(false);
info.setInferred(true);
info.setInvisible(false);
info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
Qualifier provenanceAction = new Qualifier();
provenanceAction.setClassid(PROVENANCE_ACTION_CLASS);
provenanceAction.setClassname(PROVENANCE_ACTION_CLASS);
provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS);
provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS);
info.setProvenanceaction(provenanceAction);
DataInfo info = new DataInfo();
info.setDeletedbyinference(false);
info.setInferred(true);
info.setInvisible(false);
info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
Qualifier provenanceAction = new Qualifier();
provenanceAction.setClassid(PROVENANCE_ACTION_CLASS);
provenanceAction.setClassname(PROVENANCE_ACTION_CLASS);
provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS);
provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS);
info.setProvenanceaction(provenanceAction);
// TODO calculate the trust value based on the similarity score of the elements in the CC
// info.setTrust();
// TODO calculate the trust value based on the similarity score of the elements in the CC
// info.setTrust();
r.setDataInfo(info);
return r;
}
r.setDataInfo(info);
return r;
}
}

View File

@ -18,9 +18,9 @@
"paramRequired": true
},
{
"paramName": "e",
"paramLongName": "entityType",
"paramDescription": "type of the entity for the merge relations",
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "the url for the lookup service",
"paramRequired": true
},
{

View File

@ -85,9 +85,6 @@
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<!--TODO-->
<!-- facendo le query su openaire e openorgs deve creare l'input set, solo di organizzazioni -->
<action name="resetOrgSimRels">
<fs>
<delete path="${workingPath}/${actionSetId}/organization_simrel"/>
@ -96,34 +93,6 @@
<error to="Kill"/>
</action>
<action name="CopySimRels">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Copy Merge Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCopyRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--entityType</arg><arg>organization</arg>
<arg>--destination</arg><arg>simrel</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="CreateSimRels"/>
<error to="Kill"/>
</action>
<action name="CreateSimRels">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
@ -147,6 +116,33 @@
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="CopyOpenorgsSimRels"/>
<error to="Kill"/>
</action>
<!-- copy similarity relations coming from openorgs in order to improve dedup quality -->
<action name="CopyOpenorgsSimRels">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Copy OpenOrgs Sim Rels</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsSimRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="CreateMergeRels"/>
<error to="Kill"/>
</action>

View File

@ -12,6 +12,10 @@
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property>
<name>actionSetIdOpenorgs</name>
<description>id of the actionSet for OpenOrgs dedup</description>
</property>
<property>
<name>workingPath</name>
<description>path for the working directory</description>
@ -169,17 +173,17 @@
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
</spark>
<ok to="CopyMergeRels"/>
<ok to="CopyOpenorgsMergeRels"/>
<error to="Kill"/>
</action>
<!-- copy organization relations in the working dir-->
<action name="CopyMergeRels">
<!-- copy organization relations in the working dir (in the organization_mergerel dir)-->
<action name="CopyOpenorgsMergeRels">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Copy Merge Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCopyRels</class>
<class>eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsMergeRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
@ -193,15 +197,15 @@
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--entityType</arg><arg>organization</arg>
<arg>--destination</arg><arg>mergerel</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="CopyEntities"/>
<error to="Kill"/>
</action>
<!-- copy openorgs to the working dir (in the organization_deduprecord dir)-->
<action name="CopyOpenorgs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
@ -222,7 +226,7 @@
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
</spark>
<ok to="UpdateEntity"/>
<error to="Kill"/>
@ -253,15 +257,28 @@
<error to="Kill"/>
</action>
<!-- copy all relations without openorgs relations to the dedupgraph -->
<action name="copyRelations">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/relation"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/relation</arg>
<arg>${dedupGraphPath}/relation</arg>
</distcp>
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Update Entity</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--dedupGraphPath</arg><arg>${dedupGraphPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>

View File

@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.MigrateAction;
import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Context;
@ -76,6 +77,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
public static final String SOURCE_TYPE = "source_type";
public static final String TARGET_TYPE = "target_type";
private static final String ORG_ORG_RELTYPE = "organizationOrganization";
private static final String ORG_ORG_SUBRELTYPE = "dedup";
private final DbClient dbClient;
private final long lastUpdateTimestamp;
@ -114,35 +118,53 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
final Predicate<Oaf> verifyNamespacePrefix = new VerifyNsPrefixPredicate(nsPrefixBlacklist);
final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims");
log.info("processClaims: {}", processClaims);
final MigrateAction process = parser.get("action") != null ? MigrateAction.valueOf(parser.get("action"))
: MigrateAction.openaire;
log.info("migrateAction: {}", process);
try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, dbUrl, dbUser,
dbPassword, isLookupUrl)) {
if (processClaims) {
log.info("Processing claims...");
smdbe.execute("queryClaims.sql", smdbe::processClaims);
} else {
log.info("Processing datasources...");
smdbe.execute("queryDatasources.sql", smdbe::processDatasource, verifyNamespacePrefix);
log.info("Processing projects...");
if (dbSchema.equalsIgnoreCase("beta")) {
smdbe.execute("queryProjects.sql", smdbe::processProject, verifyNamespacePrefix);
} else {
smdbe.execute("queryProjects_production.sql", smdbe::processProject, verifyNamespacePrefix);
}
switch (process) {
case claims:
log.info("Processing claims...");
smdbe.execute("queryClaims.sql", smdbe::processClaims);
break;
case openaire:
log.info("Processing datasources...");
smdbe.execute("queryDatasources.sql", smdbe::processDatasource, verifyNamespacePrefix);
log.info("Processing orgs...");
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix);
log.info("Processing projects...");
if (dbSchema.equalsIgnoreCase("beta")) {
smdbe.execute("queryProjects.sql", smdbe::processProject, verifyNamespacePrefix);
} else {
smdbe.execute("queryProjects_production.sql", smdbe::processProject, verifyNamespacePrefix);
}
log.info("Processing relationsNoRemoval ds <-> orgs ...");
smdbe
.execute(
"queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization, verifyNamespacePrefix);
log.info("Processing Organizations...");
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix);
log.info("Processing projects <-> orgs ...");
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix);
log.info("Processing relationsNoRemoval ds <-> orgs ...");
smdbe
.execute(
"queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization,
verifyNamespacePrefix);
log.info("Processing projects <-> orgs ...");
smdbe
.execute(
"queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix);
break;
case openorgs:
log.info("Processing Openorgs...");
smdbe
.execute(
"queryOrganizationsFromOpenOrgsDB.sql", smdbe::processOrganization, verifyNamespacePrefix);
log.info("Processing Openorgs Merge Rels...");
smdbe.execute("querySimilarityFromOpenOrgsDB.sql", smdbe::processOrgOrgSimRels);
break;
}
log.info("All done.");
}
@ -585,6 +607,43 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
}
}
public List<Oaf> processOrgOrgSimRels(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs); // TODO
final String orgId1 = createOpenaireId(20, rs.getString("id1"), true);
final String orgId2 = createOpenaireId(40, rs.getString("id2"), true);
final String relClass = rs.getString("relclass");
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
final Relation r1 = new Relation();
r1.setRelType(ORG_ORG_RELTYPE);
r1.setSubRelType(ORG_ORG_SUBRELTYPE);
r1.setRelClass(relClass);
r1.setSource(orgId1);
r1.setTarget(orgId2);
r1.setCollectedfrom(collectedFrom);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
final Relation r2 = new Relation();
r2.setRelType(ORG_ORG_RELTYPE);
r2.setSubRelType(ORG_ORG_SUBRELTYPE);
r2.setRelClass(relClass);
r2.setSource(orgId2);
r2.setTarget(orgId1);
r2.setCollectedfrom(collectedFrom);
r2.setDataInfo(info);
r2.setLastupdatetimestamp(lastUpdateTimestamp);
return Arrays.asList(r1, r2);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws IOException {
super.close();

View File

@ -0,0 +1,9 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
//enum to specify the different actions available for the MigrateDbEntitiesApplication job
public enum MigrateAction {
claims, // migrate claims to the raw graph
openorgs, // migrate organizations from openorgs to the raw graph
openaire // migrate openaire entities to the raw graph
}

View File

@ -25,6 +25,18 @@
<property>
<name>postgresPassword</name>
<description>the password postgres</description>
</property>
<property>
<name>postgresOpenOrgsURL</name>
<description>the postgres URL to access to the OpenOrgs database</description>
</property>
<property>
<name>postgresOpenOrgsUser</name>
<description>the user of OpenOrgs database</description>
</property>
<property>
<name>postgresOpenOrgsPassword</name>
<description>the password of OpenOrgs database</description>
</property>
<property>
<name>dbSchema</name>
@ -178,14 +190,34 @@
<action name="ImportDB">
<java>
<prepare>
<delete path="${contentPath}/db_records"/>
<delete path="${contentPath}/db_openaire"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
<arg>--hdfsPath</arg><arg>${contentPath}/db_records</arg>
<arg>--hdfsPath</arg><arg>${contentPath}/db_openaire</arg>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--action</arg><arg>openaire</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java>
<ok to="ImportDB_openorgs"/>
<error to="Kill"/>
</action>
<action name="ImportDB_openorgs">
<java>
<prepare>
<delete path="${contentPath}/db_openorgs"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
<arg>--hdfsPath</arg><arg>${contentPath}/db_openorgs</arg>
<arg>--postgresUrl</arg><arg>${postgresOpenOrgsURL}</arg>
<arg>--postgresUser</arg><arg>${postgresOpenOrgsUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresOpenOrgsPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--action</arg><arg>openorgs</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java>
@ -314,7 +346,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
<arg>--sourcePaths</arg><arg>${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>

View File

@ -4,6 +4,8 @@ SELECT
o.name AS legalname,
array_agg(DISTINCT n.name) AS "alternativeNames",
(array_agg(u.url))[1] AS websiteurl,
'' AS logourl,
o.creation_date AS dateofcollection,
o.modification_date AS dateoftransformation,
false AS inferred,
false AS deletedbyinference,
@ -13,7 +15,17 @@ SELECT
'OpenOrgs Database' AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid,
null AS eclegalbody,
null AS eclegalperson,
null AS ecnonprofit,
null AS ecresearchorganization,
null AS echighereducation,
null AS ecinternationalorganizationeurinterests,
null AS ecinternationalorganization,
null AS ecenterprise,
null AS ecsmevalidated,
null AS ecnutscode
FROM organizations o
LEFT OUTER JOIN acronyms a ON (a.id = o.id)
LEFT OUTER JOIN urls u ON (u.id = o.id)
@ -22,6 +34,7 @@ FROM organizations o
GROUP BY
o.id,
o.name,
o.creation_date,
o.modification_date,
o.country
@ -33,6 +46,8 @@ SELECT
n.name AS legalname,
ARRAY[]::text[] AS "alternativeNames",
(array_agg(u.url))[1] AS websiteurl,
'' AS logourl,
o.creation_date AS dateofcollection,
o.modification_date AS dateoftransformation,
false AS inferred,
false AS deletedbyinference,
@ -42,12 +57,24 @@ SELECT
'OpenOrgs Database' AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid,
null AS eclegalbody,
null AS eclegalperson,
null AS ecnonprofit,
null AS ecresearchorganization,
null AS echighereducation,
null AS ecinternationalorganizationeurinterests,
null AS ecinternationalorganization,
null AS ecenterprise,
null AS ecsmevalidated,
null AS ecnutscode
FROM other_names n
LEFT OUTER JOIN organizations o ON (n.id = o.id)
LEFT OUTER JOIN urls u ON (u.id = o.id)
LEFT OUTER JOIN other_ids i ON (i.id = o.id)
GROUP BY
o.id, o.modification_date, o.country, n.name
o.id,
o.creation_date,
o.modification_date,
o.country,
n.name;

View File

@ -1,17 +1,47 @@
SELECT local_id AS id1, oa_original_id AS id2 FROM openaire_simrels WHERE reltype = 'is_similar'
-- relations approved by the user
SELECT
local_id AS id1,
oa_original_id AS id2,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
false AS inferred,
false AS deletedbyinference,
0.99 AS trust,
'' AS inferenceprovenance,
'isSimilarTo' AS relclass
FROM oa_duplicates WHERE reltype = 'is_similar'
UNION ALL
-- relations between openorgs and mesh (alternative names)
SELECT
o.id AS id1,
'openorgsmesh'||substring(o.id, 13)||'-'||md5(a.acronym) AS id2
FROM acronyms a
LEFT OUTER JOIN organizations o ON (a.id = o.id)
UNION ALL
SELECT
o.id AS id1,
'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS id2
o.id AS id1,
'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS id2,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
false AS inferred,
false AS deletedbyinference,
0.99 AS trust,
'' AS inferenceprovenance,
'isSimilarTo' AS relclass
FROM other_names n
LEFT OUTER JOIN organizations o ON (n.id = o.id)
UNION ALL
-- diff relations approved by the user
SELECT
local_id AS id1,
oa_original_id AS id2,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
false AS inferred,
false AS deletedbyinference,
0.99 AS trust,
'' AS inferenceprovenance,
'isDifferentFrom' AS relclass
FROM oa_duplicates WHERE reltype = 'is_different'
--TODO ???
--Creare relazioni isDifferentFrom anche tra i suggerimenti: (A is_similar B) and (A is_different C) => (B is_different C)