Merge pull request 'OpenOrgs dedup and Integration with OpenAIRE Provision' (#102) from openorgswf into stable_ids

Reviewed-on: #102
pull/104/head
Claudio Atzori 3 years ago
commit 940556f6d3

@ -13,6 +13,11 @@ public class OrganizationPidComparator implements Comparator<StructuredProperty>
PidType lClass = PidType.tryValueOf(left.getQualifier().getClassid());
PidType rClass = PidType.tryValueOf(right.getQualifier().getClassid());
if (lClass.equals(PidType.openorgs))
return -1;
if (rClass.equals(PidType.openorgs))
return 1;
if (lClass.equals(PidType.GRID))
return -1;
if (rClass.equals(PidType.GRID))

@ -9,7 +9,7 @@ public enum PidType {
doi, pmid, pmc, handle, arXiv, nct, pdb,
// Organization
GRID, mag_id, urn,
openorgs, corda, corda_h2020, GRID, mag_id, urn,
// Used by dedup
undefined, original;

@ -94,7 +94,12 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.200</version>
<scope>test</scope>
</dependency>
</dependencies>

@ -6,7 +6,9 @@ import java.io.Serializable;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
@ -31,6 +33,9 @@ abstract class AbstractSparkAction implements Serializable {
protected static final int NUM_PARTITIONS = 1000;
protected static final int NUM_CONNECTIONS = 20;
protected static final String TYPE_VALUE_SEPARATOR = "###";
protected static final String SP_SEPARATOR = "@@@";
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@ -94,7 +99,22 @@ abstract class AbstractSparkAction implements Serializable {
dataset.write().option("compression", "gzip").mode(mode).json(outPath);
}
protected static <T> void saveParquet(Dataset<T> dataset, String outPath, SaveMode mode) {
dataset.write().option("compression", "gzip").mode(mode).parquet(outPath);
}
protected static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
protected static String structuredPropertyListToString(List<StructuredProperty> list) {
return list
.stream()
.filter(p -> p.getQualifier() != null)
.filter(p -> StringUtils.isNotBlank(p.getQualifier().getClassid()))
.filter(p -> StringUtils.isNotBlank(p.getValue()))
.map(p -> p.getValue() + TYPE_VALUE_SEPARATOR + p.getQualifier().getClassid())
.collect(Collectors.joining(SP_SEPARATOR));
}
}

@ -89,7 +89,7 @@ public class DedupRecordFactory {
t -> {
T duplicate = t._2();
// prepare the list of pids to use for the id generation
// prepare the list of pids to be used for the id generation
bestPids.add(Identifier.newInstance(duplicate));
entity.mergeFrom(duplicate);

@ -84,6 +84,11 @@ public class DedupUtility {
return String.format("%s/%s/%s_simrel", basePath, actionSetId, entityType);
}
public static String createOpenorgsMergeRelsPath(
final String basePath, final String actionSetId, final String entityType) {
return String.format("%s/%s/%s_openorgs_mergerels", basePath, actionSetId, entityType);
}
public static String createMergeRelPath(
final String basePath, final String actionSetId, final String entityType) {
return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType);

@ -36,7 +36,14 @@ public class IdGenerator implements Serializable {
}
private static String dedupify(String ns) {
StringBuilder prefix = new StringBuilder(substringBefore(ns, "_")).append("_dedup");
StringBuilder prefix;
if (PidType.valueOf(substringBefore(ns, "_")) == PidType.openorgs) {
prefix = new StringBuilder(substringBefore(ns, "_"));
} else {
prefix = new StringBuilder(substringBefore(ns, "_")).append("_dedup");
}
while (prefix.length() < 12) {
prefix.append("_");
}

@ -1,184 +0,0 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.*;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import scala.Tuple2;
public class SparkCollectSimRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCollectSimRels.class);
Dataset<Row> simGroupsDS;
Dataset<Row> groupsDS;
public SparkCollectSimRels(ArgumentApplicationParser parser, SparkSession spark, Dataset<Row> simGroupsDS,
Dataset<Row> groupsDS) {
super(parser, spark);
this.simGroupsDS = simGroupsDS;
this.groupsDS = groupsDS;
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkBlockStats.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
final String dbUrl = parser.get("postgresUrl");
final String dbUser = parser.get("postgresUser");
final String dbPassword = parser.get("postgresPassword");
SparkSession spark = getSparkSession(conf);
DataFrameReader readOptions = spark
.read()
.format("jdbc")
.option("url", dbUrl)
.option("user", dbUser)
.option("password", dbPassword);
new SparkCollectSimRels(
parser,
spark,
readOptions.option("dbtable", "similarity_groups").load(),
readOptions.option("dbtable", "groups").load())
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
void run(ISLookUpService isLookUpService) throws DocumentException, ISLookUpException, IOException {
// read oozie parameters
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
final String dbUrl = parser.get("postgresUrl");
final String dbUser = parser.get("postgresUser");
log.info("numPartitions: '{}'", numPartitions);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
log.info("postgresUser: {}", dbUser);
log.info("postgresUrl: {}", dbUrl);
log.info("postgresPassword: xxx");
JavaPairRDD<String, List<String>> similarityGroup = simGroupsDS
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1)))
.groupByKey()
.mapToPair(
i -> new Tuple2<>(i._1(), StreamSupport
.stream(i._2().spliterator(), false)
.collect(Collectors.toList())));
JavaPairRDD<String, String> groupIds = groupsDS
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1)));
JavaRDD<Tuple2<Tuple2<String, String>, List<String>>> groups = similarityGroup
.leftOuterJoin(groupIds)
.filter(g -> g._2()._2().isPresent())
.map(g -> new Tuple2<>(new Tuple2<>(g._1(), g._2()._2().get()), g._2()._1()));
JavaRDD<Relation> relations = groups.flatMap(g -> {
String firstId = g._2().get(0);
List<Relation> rels = new ArrayList<>();
for (String id : g._2()) {
if (!firstId.equals(id))
rels.add(createSimRel(firstId, id, g._1()._2()));
}
return rels.iterator();
});
Dataset<Relation> resultRelations = spark
.createDataset(
relations.filter(r -> r.getRelType().equals("resultResult")).rdd(),
Encoders.bean(Relation.class))
.repartition(numPartitions);
Dataset<Relation> organizationRelations = spark
.createDataset(
relations.filter(r -> r.getRelType().equals("organizationOrganization")).rdd(),
Encoders.bean(Relation.class))
.repartition(numPartitions);
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
switch (dedupConf.getWf().getSubEntityValue()) {
case "organization":
savePostgresRelation(organizationRelations, workingPath, actionSetId, "organization");
break;
default:
savePostgresRelation(
resultRelations, workingPath, actionSetId, dedupConf.getWf().getSubEntityValue());
break;
}
}
}
private Relation createSimRel(String source, String target, String entity) {
final Relation r = new Relation();
r.setSubRelType("dedupSimilarity");
r.setRelClass("isSimilarTo");
r.setDataInfo(new DataInfo());
switch (entity) {
case "result":
r.setSource("50|" + source);
r.setTarget("50|" + target);
r.setRelType("resultResult");
break;
case "organization":
r.setSource("20|" + source);
r.setTarget("20|" + target);
r.setRelType("organizationOrganization");
break;
default:
throw new IllegalArgumentException("unmanaged entity type: " + entity);
}
return r;
}
private void savePostgresRelation(Dataset<Relation> newRelations, String workingPath, String actionSetId,
String entityType) {
newRelations
.write()
.mode(SaveMode.Append)
.parquet(DedupUtility.createSimRelPath(workingPath, actionSetId, entityType));
}
}

@ -0,0 +1,104 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class SparkCopyOpenorgs extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgs.class);
public SparkCopyOpenorgs(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyOpenorgs.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
new SparkCopyOpenorgs(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
public void run(ISLookUpService isLookUpService)
throws DocumentException, IOException, ISLookUpException {
// read oozie parameters
final String graphBasePath = parser.get("graphBasePath");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
log.info("numPartitions: '{}'", numPartitions);
log.info("graphBasePath: '{}'", graphBasePath);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
String subEntity = "organization";
log.info("Copying openorgs to the working dir");
final String outputPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity);
final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
filterOpenorgs(spark, entityPath)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
public static Dataset<Organization> filterOpenorgs(
final SparkSession spark,
final String entitiesInputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Dataset<Organization> entities = spark
.createDataset(
sc
.textFile(entitiesInputPath)
.map(it -> OBJECT_MAPPER.readValue(it, Organization.class))
.rdd(),
Encoders.bean(Organization.class));
log.info("Number of organization entities processed: {}", entities.count());
entities = entities.filter(entities.col("id").contains("openorgs____"));
log.info("Number of Openorgs organization entities: {}", entities.count());
return entities;
}
}

@ -0,0 +1,191 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import net.sf.saxon.ma.trie.Tuple2;
public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgsMergeRels.class);
public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup";
public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions";
public SparkCopyOpenorgsMergeRels(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyOpenorgsMergeRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
new SparkCopyOpenorgsMergeRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
public void run(ISLookUpService isLookUpService)
throws DocumentException, IOException, ISLookUpException {
// read oozie parameters
final String graphBasePath = parser.get("graphBasePath");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
log.info("numPartitions: '{}'", numPartitions);
log.info("graphBasePath: '{}'", graphBasePath);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
log.info("Copying OpenOrgs Merge Rels");
final String outputPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
DedupConfig dedupConf = getConfigurations(isLookUpService, actionSetId).get(0);
JavaRDD<Relation> rawRels = spark
.read()
.textFile(relationPath)
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(this::isOpenorgs)
.filter(this::filterOpenorgsRels);
JavaRDD<Relation> selfRawRels = rawRels
.map(r -> r.getSource())
.distinct()
.map(s -> rel(s, s, "isSimilarTo", dedupConf));
log.info("Number of raw Openorgs Relations collected: {}", rawRels.count());
// turn openorgs isSimilarTo relations into mergerels
JavaRDD<Relation> mergeRelsRDD = rawRels
.union(selfRawRels)
.map(r -> {
r.setSource(createDedupID(r.getSource())); // create the dedup_id to align it to the openaire dedup
// format
return r;
})
.flatMap(rel -> {
List<Relation> mergerels = new ArrayList<>();
mergerels.add(rel(rel.getSource(), rel.getTarget(), "merges", dedupConf));
mergerels.add(rel(rel.getTarget(), rel.getSource(), "isMergedIn", dedupConf));
return mergerels.iterator();
});
log.info("Number of Openorgs Merge Relations created: {}", mergeRelsRDD.count());
spark
.createDataset(
mergeRelsRDD.rdd(),
Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Append)
.parquet(outputPath);
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
private boolean filterOpenorgsRels(Relation rel) {
if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization")
&& rel.getSubRelType().equals("dedup"))
return true;
return false;
}
private boolean isOpenorgs(Relation rel) {
if (rel.getCollectedfrom() != null) {
for (KeyValue k : rel.getCollectedfrom()) {
if (k.getValue() != null && k.getValue().equals("OpenOrgs Database")) {
return true;
}
}
}
return false;
}
private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) {
String entityType = dedupConf.getWf().getEntityType();
Relation r = new Relation();
r.setSource(source);
r.setTarget(target);
r.setRelClass(relClass);
r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1));
r.setSubRelType("dedup");
DataInfo info = new DataInfo();
info.setDeletedbyinference(false);
info.setInferred(true);
info.setInvisible(false);
info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
Qualifier provenanceAction = new Qualifier();
provenanceAction.setClassid(PROVENANCE_ACTION_CLASS);
provenanceAction.setClassname(PROVENANCE_ACTION_CLASS);
provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS);
provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS);
info.setProvenanceaction(provenanceAction);
// TODO calculate the trust value based on the similarity score of the elements in the CC
// info.setTrust();
r.setDataInfo(info);
return r;
}
public String createDedupID(String id) {
String prefix = id.split("\\|")[0];
return prefix + "|dedup_wf_001::" + DHPUtils.md5(id);
}
}

@ -0,0 +1,120 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
//copy simrels (verified) from relation to the workdir in order to make them available for the deduplication
public class SparkCopyOpenorgsSimRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgsSimRels.class);
public SparkCopyOpenorgsSimRels(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyOpenorgsSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
new SparkCopyOpenorgsSimRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
public void run(ISLookUpService isLookUpService)
throws DocumentException, IOException, ISLookUpException {
// read oozie parameters
final String graphBasePath = parser.get("graphBasePath");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
log.info("numPartitions: '{}'", numPartitions);
log.info("graphBasePath: '{}'", graphBasePath);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
log.info("Copying OpenOrgs SimRels");
final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, "organization");
final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
Dataset<Relation> rawRels = spark
.read()
.textFile(relationPath)
.map(patchRelFn(), Encoders.bean(Relation.class))
.filter(this::filterOpenorgsRels);
saveParquet(rawRels, outputPath, SaveMode.Append);
log.info("Copied " + rawRels.count() + " Similarity Relations");
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
private boolean filterOpenorgsRels(Relation rel) {
if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization")
&& rel.getSubRelType().equals("dedup") && isOpenorgs(rel))
return true;
return false;
}
private boolean isOpenorgs(Relation rel) {
if (rel.getCollectedfrom() != null) {
for (KeyValue k : rel.getCollectedfrom()) {
if (k.getValue() != null && k.getValue().equals("OpenOrgs Database")) {
return true;
}
}
}
return false;
}
}

@ -0,0 +1,110 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCopyRelationsNoOpenorgs.class);
public SparkCopyRelationsNoOpenorgs(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyRelationsNoOpenorgs.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
new SparkCopyRelationsNoOpenorgs(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
public void run(ISLookUpService isLookUpService) throws IOException {
final String graphBasePath = parser.get("graphBasePath");
final String workingPath = parser.get("workingPath");
final String dedupGraphPath = parser.get("dedupGraphPath");
log.info("graphBasePath: '{}'", graphBasePath);
log.info("workingPath: '{}'", workingPath);
log.info("dedupGraphPath: '{}'", dedupGraphPath);
final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
final String outputPath = DedupUtility.createEntityPath(dedupGraphPath, "relation");
JavaRDD<Relation> simRels = spark
.read()
.textFile(relationPath)
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(x -> !isOpenorgs(x));
log.info("Number of non-Openorgs relations collected: {}", simRels.count());
spark
.createDataset(simRels.rdd(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Overwrite)
.json(outputPath);
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
private boolean isOpenorgs(Relation rel) {
if (rel.getCollectedfrom() != null) {
for (KeyValue k : rel.getCollectedfrom()) {
if (k.getValue() != null && k.getValue().equals("OpenOrgs Database")) {
return true;
}
}
}
return false;
}
}

@ -10,6 +10,7 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
@ -99,18 +100,19 @@ public class SparkCreateSimRels extends AbstractSparkAction {
.createSortedBlocks(mapDocuments, dedupConf)
.repartition(numPartitions);
// create relations by comparing only elements in the same group
spark
Dataset<Relation> simRels = spark
.createDataset(
Deduper
.computeRelations(sc, blocks, dedupConf)
.map(t -> createSimRel(t._1(), t._2(), entity))
.repartition(numPartitions)
.rdd(),
Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Append)
.parquet(outputPath);
Encoders.bean(Relation.class));
saveParquet(simRels, outputPath, SaveMode.Append);
log.info("Generated " + simRels.count() + " Similarity Relations");
}
}

@ -11,6 +11,7 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
@ -23,6 +24,7 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
@ -31,7 +33,7 @@ import scala.Tuple2;
public class SparkPrepareNewOrgs extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
private static final Logger log = LoggerFactory.getLogger(SparkPrepareNewOrgs.class);
public SparkPrepareNewOrgs(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
@ -86,15 +88,18 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
log.info("table: '{}'", dbTable);
log.info("dbPwd: '{}'", "xxx");
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization");
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
Dataset<OrgSimRel> newOrgs = createNewOrgs(spark, mergeRelPath, entityPath);
Dataset<OrgSimRel> newOrgs = createNewOrgs(spark, mergeRelPath, relationPath, entityPath);
final Properties connectionProperties = new Properties();
connectionProperties.put("user", dbUser);
connectionProperties.put("password", dbPwd);
log.info("Number of New Organization created: '{}'", newOrgs.count());
newOrgs
.repartition(numConnections)
.write()
@ -109,9 +114,27 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
public static Dataset<OrgSimRel> createNewOrgs(
final SparkSession spark,
final String mergeRelsPath,
final String relationPath,
final String entitiesPath) {
// <id, json_entity>
// collect diffrels from the raw graph relations: <other id, "diffRel">
JavaPairRDD<String, String> diffRels = spark
.read()
.textFile(relationPath)
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(r -> filterRels(r, "organization"))
// take the worst id of the diffrel: <other id, "diffRel">
.mapToPair(rel -> {
if (compareIds(rel.getSource(), rel.getTarget()) > 0)
return new Tuple2<>(rel.getSource(), "diffRel");
else
return new Tuple2<>(rel.getTarget(), "diffRel");
})
.distinct();
log.info("Number of DiffRels collected: '{}'", diffRels.count());
// collect entities: <id, json_entity>
Dataset<Tuple2<String, Organization>> entities = spark
.read()
.textFile(entitiesPath)
@ -122,7 +145,8 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
},
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class)));
Dataset<Tuple2<String, String>> mergerels = spark
// collect mergerels and remove ids in the diffrels
Dataset<Tuple2<String, String>> openorgsRels = spark
.createDataset(
spark
.read()
@ -130,18 +154,24 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
.as(Encoders.bean(Relation.class))
.where("relClass == 'isMergedIn'")
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget()))
.mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget())) // <id, dedup_id>
.leftOuterJoin(diffRels) // <target, "diffRel">
.filter(rel -> !rel._2()._2().isPresent())
.mapToPair(rel -> new Tuple2<>(rel._1(), rel._2()._1()))
.rdd(),
Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
log.info("Number of Openorgs Relations loaded: '{}'", openorgsRels.count());
return entities
.joinWith(mergerels, entities.col("_1").equalTo(mergerels.col("_1")), "left")
.joinWith(openorgsRels, entities.col("_1").equalTo(openorgsRels.col("_1")), "left")
.filter((FilterFunction<Tuple2<Tuple2<String, Organization>, Tuple2<String, String>>>) t -> t._2() == null)
// take entities not in mergerels (they are single entities, therefore are new orgs)
.filter(
(FilterFunction<Tuple2<Tuple2<String, Organization>, Tuple2<String, String>>>) t -> !t
._1()
._1()
.contains("openorgs"))
// exclude openorgs, don't need to propose them as new orgs
.map(
(MapFunction<Tuple2<Tuple2<String, Organization>, Tuple2<String, String>>, OrgSimRel>) r -> new OrgSimRel(
"",
@ -150,7 +180,9 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
r._1()._2().getLegalshortname() != null ? r._1()._2().getLegalshortname().getValue() : "",
r._1()._2().getCountry() != null ? r._1()._2().getCountry().getClassid() : "",
r._1()._2().getWebsiteurl() != null ? r._1()._2().getWebsiteurl().getValue() : "",
r._1()._2().getCollectedfrom().get(0).getValue(), ""),
r._1()._2().getCollectedfrom().get(0).getValue(),
"",
structuredPropertyListToString(r._1()._2().getPid())),
Encoders.bean(OrgSimRel.class));
}
@ -167,4 +199,52 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
}
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
public static int compareIds(String o1, String o2) {
if (o1.contains("openorgs____") && o2.contains("openorgs____"))
return o1.compareTo(o2);
if (o1.contains("corda") && o2.contains("corda"))
return o1.compareTo(o2);
if (o1.contains("openorgs____"))
return -1;
if (o2.contains("openorgs____"))
return 1;
if (o1.contains("corda"))
return -1;
if (o2.contains("corda"))
return 1;
return o1.compareTo(o2);
}
private static boolean filterRels(Relation rel, String entityType) {
switch (entityType) {
case "result":
if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("resultResult")
&& rel.getSubRelType().equals("dedup"))
return true;
break;
case "organization":
if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("organizationOrganization")
&& rel.getSubRelType().equals("dedup"))
return true;
break;
default:
return false;
}
return false;
}
}

@ -3,14 +3,18 @@ package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -19,6 +23,7 @@ import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
@ -28,7 +33,7 @@ import scala.Tuple3;
public class SparkPrepareOrgRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
private static final Logger log = LoggerFactory.getLogger(SparkPrepareOrgRels.class);
public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
@ -80,8 +85,9 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization");
final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
Dataset<OrgSimRel> relations = createRelations(spark, mergeRelPath, entityPath);
Dataset<OrgSimRel> relations = createRelations(spark, mergeRelPath, relationPath, entityPath);
final Properties connectionProperties = new Properties();
connectionProperties.put("user", dbUser);
@ -95,11 +101,50 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
}
private static boolean filterRels(Relation rel, String entityType) {
switch (entityType) {
case "result":
if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("resultResult")
&& rel.getSubRelType().equals("dedup"))
return true;
break;
case "organization":
if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("organizationOrganization")
&& rel.getSubRelType().equals("dedup"))
return true;
break;
default:
return false;
}
return false;
}
// create openorgs simrels <best id, other id> starting from mergerels, remove the diffrels
public static Dataset<OrgSimRel> createRelations(
final SparkSession spark,
final String mergeRelsPath,
final String relationPath,
final String entitiesPath) {
// collect diffrels from the raw graph relations: <<best id, other id>, "diffRel">
JavaRDD<Tuple2<Tuple2<String, String>, String>> diffRels = spark
.read()
.textFile(relationPath)
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(r -> filterRels(r, "organization"))
// put the best id as source of the diffrel: <best id, other id>
.map(rel -> {
if (compareIds(rel.getSource(), rel.getTarget()) < 0)
return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel");
else
return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel");
})
.distinct();
log.info("Number of DiffRels collected: {}", diffRels.count());
// collect all the organizations
Dataset<Tuple2<String, Organization>> entities = spark
.read()
.textFile(entitiesPath)
@ -110,47 +155,74 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
},
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class)));
// relations with their group (connected component id)
JavaRDD<Tuple2<Tuple2<String, String>, String>> rawOpenorgsRels = spark
.read()
.load(mergeRelsPath)
.as(Encoders.bean(Relation.class))
.where("relClass == 'merges'")
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget()))
.filter(t -> !t._2().contains("openorgsmesh")) // remove openorgsmesh: they are only for dedup
.groupByKey()
.map(g -> Lists.newArrayList(g._2()))
.filter(l -> l.size() > 1)
.flatMap(l -> {
String groupId = "group::" + UUID.randomUUID();
List<String> ids = sortIds(l); // sort IDs by type
List<Tuple2<Tuple2<String, String>, String>> rels = new ArrayList<>();
String source = ids.get(0);
for (String target : ids) {
rels.add(new Tuple2<>(new Tuple2<>(source, target), groupId));
}
return rels.iterator();
});
log.info("Number of Raw Openorgs Relations created: {}", rawOpenorgsRels.count());
// filter out diffRels
JavaRDD<Tuple3<String, String, String>> openorgsRels = rawOpenorgsRels
.union(diffRels)
// concatenation of source and target: <source|||target, group id> or <source|||target, "diffRel">
.mapToPair(t -> new Tuple2<>(t._1()._1() + "@@@" + t._1()._2(), t._2()))
.groupByKey()
.map(
g -> new Tuple2<>(g._1(), StreamSupport
.stream(g._2().spliterator(), false)
.collect(Collectors.toList())))
// <source|||target, list(group_id, "diffRel")>: take only relations with only the group_id, it
// means they are correct. If the diffRel is present the relation has to be removed
.filter(g -> g._2().size() == 1 && g._2().get(0).contains("group::"))
.map(
t -> new Tuple3<>(
t._1().split("@@@")[0],
t._1().split("@@@")[1],
t._2().get(0)));
log.info("Number of Openorgs Relations created: '{}'", openorgsRels.count());
// <best ID basing on priority, ID, groupID>
Dataset<Tuple3<String, String, String>> relations = spark
.createDataset(
spark
.read()
.load(mergeRelsPath)
.as(Encoders.bean(Relation.class))
.where("relClass == 'merges'")
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget()))
.filter(t -> !t._2().contains("openorgsmesh"))
.groupByKey()
.map(g -> Lists.newArrayList(g._2()))
.filter(l -> l.size() > 1)
.flatMap(l -> {
String groupId = "group::" + UUID.randomUUID();
List<String> ids = sortIds(l);
List<Tuple3<String, String, String>> rels = new ArrayList<>();
for (String source : ids) {
if (source.contains("openorgs____") || ids.indexOf(source) == 0)
for (String target : ids) {
rels.add(new Tuple3<>(source, target, groupId));
}
}
return rels.iterator();
})
.rdd(),
openorgsRels.rdd(),
Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING()));
Dataset<Tuple2<String, OrgSimRel>> relations2 = relations // <openorgs, corda>
// create orgsimrels
Dataset<Tuple2<String, OrgSimRel>> relations2 = relations
.joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner")
.map(
(MapFunction<Tuple2<Tuple3<String, String, String>, Tuple2<String, Organization>>, OrgSimRel>) r -> new OrgSimRel(
r._1()._1(),
r._2()._2().getOriginalId().get(0),
r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "",
r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "",
r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
r._2()._2().getCollectedfrom().get(0).getValue(),
r._1()._3()),
(MapFunction<Tuple2<Tuple3<String, String, String>, Tuple2<String, Organization>>, OrgSimRel>) r -> {
return new OrgSimRel(
r._1()._1(),
r._2()._2().getOriginalId().get(0),
r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "",
r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "",
r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
r._2()._2().getCollectedfrom().get(0).getValue(),
r._1()._3(),
structuredPropertyListToString(r._2()._2().getPid()));
},
Encoders.bean(OrgSimRel.class))
.map(
(MapFunction<OrgSimRel, Tuple2<String, OrgSimRel>>) o -> new Tuple2<>(o.getLocal_id(), o),
@ -168,29 +240,28 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
}
// select best ids from the list. Priority: 1) openorgs, 2)corda, 3)alphabetic
public static List<String> sortIds(List<String> ids) {
ids.sort((o1, o2) -> {
if (o1.contains("openorgs____") && o2.contains("openorgs____"))
return o1.compareTo(o2);
if (o1.contains("corda") && o2.contains("corda"))
return o1.compareTo(o2);
public static int compareIds(String o1, String o2) {
if (o1.contains("openorgs____") && o2.contains("openorgs____"))
return o1.compareTo(o2);
if (o1.contains("corda") && o2.contains("corda"))
return o1.compareTo(o2);
if (o1.contains("openorgs____"))
return -1;
if (o2.contains("openorgs____"))
return 1;
if (o1.contains("openorgs____"))
return -1;
if (o2.contains("openorgs____"))
return 1;
if (o1.contains("corda"))
return -1;
if (o2.contains("corda"))
return 1;
if (o1.contains("corda"))
return -1;
if (o2.contains("corda"))
return 1;
return o1.compareTo(o2);
});
return o1.compareTo(o2);
}
// Sort IDs basing on the type. Priority: 1) openorgs, 2)corda, 3)alphabetic
public static List<String> sortIds(List<String> ids) {
ids.sort((o1, o2) -> compareIds(o1, o2));
return ids;
}
@ -245,7 +316,8 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
r._2()._2().getCollectedfrom().get(0).getValue(),
"group::" + r._1()._1()),
"group::" + r._1()._1(),
structuredPropertyListToString(r._2()._2().getPid())),
Encoders.bean(OrgSimRel.class))
.map(
(MapFunction<OrgSimRel, Tuple2<String, OrgSimRel>>) o -> new Tuple2<>(o.getLocal_id(), o),
@ -263,4 +335,13 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
}

@ -13,6 +13,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@ -22,11 +23,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
@ -103,12 +103,22 @@ public class SparkUpdateEntity extends AbstractSparkAction {
MapDocumentUtil.getJPathString(IDJSONPATH, s), s));
JavaRDD<String> map = entitiesWithId
.leftOuterJoin(mergedIds)
.map(
k -> k._2()._2().isPresent()
? updateDeletedByInference(k._2()._1(), clazz)
: k._2()._1());
.map(k -> {
if (k._2()._2().isPresent()) {
return updateDeletedByInference(k._2()._1(), clazz);
}
return k._2()._1();
});
if (type == EntityType.organization) // exclude openorgs with deletedbyinference=true
map = map.filter(it -> {
Organization org = OBJECT_MAPPER.readValue(it, Organization.class);
return !org.getId().contains("openorgs____") || (org.getId().contains("openorgs____")
&& !org.getDataInfo().getDeletedbyinference());
});
sourceEntity = map.union(sc.textFile(dedupRecordPath));
}
sourceEntity.saveAsTextFile(outputPath, GzipCodec.class);

@ -13,12 +13,13 @@ public class OrgSimRel implements Serializable {
String oa_url;
String oa_collectedfrom;
String group_id;
String pid_list; // separator for type-pid: "###"; separator for pids: "@@@"
public OrgSimRel() {
}
public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country,
String oa_url, String oa_collectedfrom, String group_id) {
String oa_url, String oa_collectedfrom, String group_id, String pid_list) {
this.local_id = local_id;
this.oa_original_id = oa_original_id;
this.oa_name = oa_name;
@ -27,6 +28,7 @@ public class OrgSimRel implements Serializable {
this.oa_url = oa_url;
this.oa_collectedfrom = oa_collectedfrom;
this.group_id = group_id;
this.pid_list = pid_list;
}
public String getLocal_id() {
@ -93,6 +95,14 @@ public class OrgSimRel implements Serializable {
this.group_id = group_id;
}
public String getPid_list() {
return pid_list;
}
public void setPid_list(String pid_list) {
this.pid_list = pid_list;
}
@Override
public String toString() {
return "OrgSimRel{" +
@ -103,6 +113,8 @@ public class OrgSimRel implements Serializable {
", oa_country='" + oa_country + '\'' +
", oa_url='" + oa_url + '\'' +
", oa_collectedfrom='" + oa_collectedfrom + '\'' +
", group_id='" + group_id + '\'' +
", pid_list='" + pid_list + '\'' +
'}';
}
}

@ -1,44 +1,32 @@
[
{
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "address for the LookUp",
"paramRequired": true
},
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescription": "action set identifier (name of the orchestrator)",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of the raw graph",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "path of the working directory",
"paramRequired": true
},
{
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "the url for the lookup service",
"paramRequired": true
},
{
"paramName": "np",
"paramLongName": "numPartitions",
"paramDescription": "number of partitions for the similarity relations intermediate phases",
"paramRequired": false
},
{
"paramName": "purl",
"paramLongName": "postgresUrl",
"paramDescription": "the url of the postgres server",
"paramRequired": true
},
{
"paramName": "pusr",
"paramLongName": "postgresUser",
"paramDescription": "the owner of the postgres database",
"paramRequired": true
},
{
"paramName": "ppwd",
"paramLongName": "postgresPassword",
"paramDescription": "the password for the postgres user",
"paramRequired": true
}
]

@ -0,0 +1,26 @@
[
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescription": "action set identifier (name of the orchestrator)",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of the raw graph",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "path of the working directory",
"paramRequired": true
},
{
"paramName": "np",
"paramLongName": "numPartitions",
"paramDescription": "number of partitions for the similarity relations intermediate phases",
"paramRequired": false
}
]

@ -1,208 +0,0 @@
<workflow-app name="Organization Dedup" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
<description>the raw graph base path</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property>
<name>workingPath</name>
<description>path for the working directory</description>
</property>
<property>
<name>dedupGraphPath</name>
<description>path for the output graph</description>
</property>
<property>
<name>cutConnectedComponent</name>
<description>max number of elements in a connected component</description>
</property>
<property>
<name>dbUrl</name>
<description>the url of the database</description>
</property>
<property>
<name>dbUser</name>
<description>the user of the database</description>
</property>
<property>
<name>dbTable</name>
<description>the name of the table in the database</description>
</property>
<property>
<name>dbPwd</name>
<description>the passowrd of the user of the database</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="resetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="resetWorkingPath">
<fs>
<delete path="${workingPath}"/>
</fs>
<ok to="copyRelations"/>
<error to="Kill"/>
</action>
<action name="copyRelations">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>-pb</arg>
<arg>${graphBasePath}/relation</arg>
<arg>${workingPath}/${actionSetId}/organization_simrel</arg>
</distcp>
<ok to="CreateSimRel"/>
<error to="Kill"/>
</action>
<action name="CreateSimRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Similarity Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="CreateMergeRel"/>
<error to="Kill"/>
</action>
<action name="CreateMergeRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Merge Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--cutConnectedComponent</arg><arg>${cutConnectedComponent}</arg>
</spark>
<ok to="PrepareNewOrgs"/>
<error to="Kill"/>
</action>
<action name="PrepareNewOrgs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare New Organizations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
<arg>--dbTable</arg><arg>${dbTable}</arg>
<arg>--dbUser</arg><arg>${dbUser}</arg>
<arg>--dbPwd</arg><arg>${dbPwd}</arg>
<arg>--numConnections</arg><arg>20</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -1,4 +1,4 @@
<workflow-app name="Organization Dedup" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="Openorgs Dedup" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
@ -24,22 +24,6 @@
<name>cutConnectedComponent</name>
<description>max number of elements in a connected component</description>
</property>
<property>
<name>dbUrl</name>
<description>the url of the database</description>
</property>
<property>
<name>dbUser</name>
<description>the user of the database</description>
</property>
<property>
<name>dbTable</name>
<description>the name of the table in the database</description>
</property>
<property>
<name>dbPwd</name>
<description>the passowrd of the user of the database</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -95,36 +79,55 @@
</configuration>
</global>
<start to="resetWorkingPath"/>
<start to="resetOrgSimRels"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="resetWorkingPath">
<action name="resetOrgSimRels">
<fs>
<delete path="${workingPath}"/>
<delete path="${workingPath}/${actionSetIdOpenorgs}/organization_simrel"/>
<delete path="${workingPath}/${actionSetIdOpenorgs}/organization_mergerel"/>
</fs>
<ok to="copyRelations"/>
<ok to="CreateSimRels"/>
<error to="Kill"/>
</action>
<action name="copyRelations">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>-pb</arg>
<arg>/tmp/graph_openorgs_and_corda/relation</arg>
<arg>${workingPath}/${actionSetId}/organization_simrel</arg>
</distcp>
<ok to="CreateSimRel"/>
<action name="CreateSimRels">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Similarity Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="CopyOpenorgsSimRels"/>
<error to="Kill"/>
</action>
<action name="CreateSimRel">
<!-- copy simrels relations coming from openorgs -->
<action name="CopyOpenorgsSimRels">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Similarity Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels</class>
<name>Copy OpenOrgs Sim Rels</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsSimRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
@ -138,15 +141,15 @@
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="CreateMergeRel"/>
<ok to="CreateMergeRels"/>
<error to="Kill"/>
</action>
<action name="CreateMergeRel">
<action name="CreateMergeRels">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
@ -166,7 +169,7 @@
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
<arg>--cutConnectedComponent</arg><arg>${cutConnectedComponent}</arg>
</spark>
<ok to="PrepareOrgRels"/>
@ -193,7 +196,7 @@
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
<arg>--dbTable</arg><arg>${dbTable}</arg>
<arg>--dbUser</arg><arg>${dbUser}</arg>
@ -224,7 +227,7 @@
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
<arg>--apiUrl</arg><arg>${apiUrl}</arg>
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
<arg>--dbTable</arg><arg>${dbTable}</arg>

@ -12,6 +12,10 @@
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property>
<name>actionSetIdOpenorgs</name>
<description>id of the actionSet for OpenOrgs dedup</description>
</property>
<property>
<name>workingPath</name>
<description>path for the working directory</description>
@ -79,7 +83,7 @@
</configuration>
</global>
<start to="resetWorkingPath"/>
<start to="testOpenorgs"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -88,11 +92,21 @@
<action name="resetWorkingPath">
<fs>
<delete path="${workingPath}"/>
<delete path="${dedupGraphPath}"/>
</fs>
<ok to="CreateSimRel"/>
<error to="Kill"/>
</action>
<action name="testOpenorgs">
<fs>
<delete path="${workingPath}/${actionSetIdOpenorgs}"/>
<delete path="${dedupGraphPath}"/>
</fs>
<ok to="CopyOpenorgsMergeRels"/>
<error to="Kill"/>
</action>
<action name="CreateSimRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
@ -169,10 +183,91 @@
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
</spark>
<ok to="CopyOpenorgsMergeRels"/>
<error to="Kill"/>
</action>
<!-- copy organization relations in the working dir (in the organization_mergerel dir)-->
<action name="CopyOpenorgsMergeRels">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Copy Openorgs Merge Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsMergeRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="CreateOrgsDedupRecord"/>
<error to="Kill"/>
</action>
<action name="CreateOrgsDedupRecord">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Organizations Dedup Records</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
</spark>
<ok to="UpdateEntity"/>
<error to="Kill"/>
</action>
<!--TODO replace with job for the creation of deduprecord for openorgs organizations -->
<!-- copy openorgs to the working dir (in the organization_deduprecord dir)-->
<!--<action name="CopyOpenorgs">-->
<!--<spark xmlns="uri:oozie:spark-action:0.2">-->
<!--<master>yarn</master>-->
<!--<mode>cluster</mode>-->
<!--<name>Copy Openorgs Entities</name>-->
<!--<class>eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgs</class>-->
<!--<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>-->
<!--<spark-opts>-->
<!--&#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!--&#45;&#45;executor-cores=${sparkExecutorCores}-->
<!--&#45;&#45;driver-memory=${sparkDriverMemory}-->
<!--&#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!--&#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!--&#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!--&#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!--&#45;&#45;conf spark.sql.shuffle.partitions=3840-->
<!--</spark-opts>-->
<!--<arg>&#45;&#45;graphBasePath</arg><arg>${graphBasePath}</arg>-->
<!--<arg>&#45;&#45;workingPath</arg><arg>${workingPath}</arg>-->
<!--<arg>&#45;&#45;actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>-->
<!--</spark>-->
<!--<ok to="UpdateEntity"/>-->
<!--<error to="Kill"/>-->
<!--</action>-->
<action name="UpdateEntity">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
@ -198,15 +293,28 @@
<error to="Kill"/>
</action>
<!-- copy all relations without openorgs relations to the dedupgraph -->
<action name="copyRelations">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/relation"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/relation</arg>
<arg>${dedupGraphPath}/relation</arg>
</distcp>
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Copy Non-Openorgs Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--dedupGraphPath</arg><arg>${dedupGraphPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>

@ -112,7 +112,7 @@ public class EntityMergerTest implements Serializable {
assertEquals("2018-09-30", pub_merged.getDateofacceptance().getValue());
// verify authors
assertEquals(9, pub_merged.getAuthor().size());
assertEquals(13, pub_merged.getAuthor().size());
assertEquals(4, AuthorMerger.countAuthorsPids(pub_merged.getAuthor()));
// verify title

@ -36,6 +36,8 @@ public class IdGeneratorTest {
private static List<Identifier<Publication>> bestIds2;
private static List<Identifier<Publication>> bestIds3;
private static List<Identifier<Organization>> bestIdsOrg;
private static String testEntityBasePath;
@BeforeAll
@ -48,6 +50,8 @@ public class IdGeneratorTest {
bestIds = createBestIds(testEntityBasePath + "/publication_idgeneration.json", Publication.class);
bestIds2 = createBestIds(testEntityBasePath + "/publication_idgeneration2.json", Publication.class);
bestIds3 = createBestIds(testEntityBasePath + "/publication_idgeneration3.json", Publication.class);
bestIdsOrg = createBestIds(testEntityBasePath + "/organization_idgeneration.json", Organization.class);
}
@Test
@ -76,6 +80,13 @@ public class IdGeneratorTest {
assertEquals("50|dedup_wf_001::0829b5191605bdbea36d6502b8c1ce1g", id2);
}
@Test
public void generateIdOrganizationTest() {
String id1 = IdGenerator.generate(bestIdsOrg, "20|defaultID");
assertEquals("20|openorgs____::599c15a70fcb03be6ba08f75f14d6076", id1);
}
protected static <T extends OafEntity> List<Identifier<T>> createBestIds(String path, Class<T> clazz) {
final Stream<Identifier<T>> ids = readSample(path, clazz)
.stream()

@ -174,27 +174,27 @@ public class SparkDedupTest implements Serializable {
long orgs_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization"))
.count();
long pubs_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication"))
.count();
long sw_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software"))
.count();
long ds_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset"))
.count();
long orp_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct"))
.count();
assertEquals(3082, orgs_simrel);
@ -206,67 +206,6 @@ public class SparkDedupTest implements Serializable {
@Test
@Order(2)
public void collectSimRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCollectSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json")));
parser
.parseArgument(
new String[] {
"-asi", testActionSetId,
"-la", "lookupurl",
"-w", testOutputBasePath,
"-np", "50",
"-purl", "jdbc:postgresql://localhost:5432/dnet_dedup",
"-pusr", "postgres_user",
"-ppwd", ""
});
new SparkCollectSimRels(
parser,
spark,
spark.read().load(testDedupAssertionsBasePath + "/similarity_groups"),
spark.read().load(testDedupAssertionsBasePath + "/groups"))
.run(isLookUpService);
long orgs_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.count();
long pubs_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
.count();
long sw_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
.count();
long ds_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
.count();
long orp_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
.count();
assertEquals(3672, orgs_simrel);
assertEquals(10459, pubs_simrel);
assertEquals(3767, sw_simrel);
assertEquals(3865, ds_simrel);
assertEquals(10173, orp_simrel);
}
@Test
@Order(3)
public void cutMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -362,7 +301,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(4)
@Order(3)
public void createMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -417,7 +356,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(5)
@Order(4)
public void createDedupRecordTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -456,7 +395,7 @@ public class SparkDedupTest implements Serializable {
testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord")
.count();
assertEquals(84, orgs_deduprecord);
assertEquals(85, orgs_deduprecord);
assertEquals(65, pubs_deduprecord);
assertEquals(51, sw_deduprecord);
assertEquals(97, ds_deduprecord);
@ -464,7 +403,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(6)
@Order(5)
public void updateEntityTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -540,7 +479,7 @@ public class SparkDedupTest implements Serializable {
.count();
assertEquals(896, publications);
assertEquals(837, organizations);
assertEquals(838, organizations);
assertEquals(100, projects);
assertEquals(100, datasource);
assertEquals(200, softwares);
@ -580,7 +519,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(7)
@Order(6)
public void propagateRelationTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -630,7 +569,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(8)
@Order(7)
public void testRelations() throws Exception {
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);

@ -0,0 +1,408 @@
package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.lenient;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.CollectionsUtils;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.platform.commons.util.StringUtils;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
@ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SparkOpenorgsDedupTest implements Serializable {
private static String dbUrl = "jdbc:h2:mem:openorgs_test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false";
private static String dbUser = "sa";
private static String dbTable = "tmp_dedup_events";
private static String dbPwd = "";
@Mock(serializable = true)
ISLookUpService isLookUpService;
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static SparkSession spark;
private static JavaSparkContext jsc;
private static String testGraphBasePath;
private static String testOutputBasePath;
private static String testDedupGraphBasePath;
private static final String testActionSetId = "test-orchestrator-openorgs";
@BeforeAll
public static void cleanUp() throws IOException, URISyntaxException {
testGraphBasePath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/openorgs_dedup").toURI())
.toFile()
.getAbsolutePath();
testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
final SparkConf conf = new SparkConf();
conf.set("spark.sql.shuffle.partitions", "200");
spark = SparkSession
.builder()
.appName(SparkDedupTest.class.getSimpleName())
.master("local[*]")
.config(conf)
.getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
@BeforeEach
public void setUp() throws IOException, ISLookUpException {
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId)))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
}
@Test
@Order(1)
public void createSimRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath,
"-asi", testActionSetId,
"-la", "lookupurl",
"-w", testOutputBasePath,
"-np", "50"
});
new SparkCreateSimRels(parser, spark).run(isLookUpService);
long orgs_simrel = spark
.read()
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization"))
.count();
assertEquals(288, orgs_simrel);
}
@Test
@Order(2)
public void copyOpenorgsSimRels() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyOpenorgsSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json")));
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath,
"-asi", testActionSetId,
"-w", testOutputBasePath,
"-la", "lookupurl",
"-np", "50"
});
new SparkCopyOpenorgsSimRels(parser, spark).run(isLookUpService);
long orgs_simrel = spark
.read()
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization"))
.count();
assertEquals(324, orgs_simrel);
}
@Test
@Order(3)
public void createMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateMergeRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
parser
.parseArgument(
new String[] {
"-i",
testGraphBasePath,
"-asi",
testActionSetId,
"-la",
"lookupurl",
"-w",
testOutputBasePath
});
new SparkCreateMergeRels(parser, spark).run(isLookUpService);
long orgs_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.count();
assertEquals(132, orgs_mergerel);
// verify that a DiffRel is in the mergerels (to be sure that the job supposed to remove them has something to
// do)
List<String> diffRels = jsc
.textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation"))
.map(s -> OBJECT_MAPPER.readValue(s, Relation.class))
.filter(r -> r.getRelClass().equals("isDifferentFrom"))
.map(r -> r.getTarget())
.collect();
assertEquals(18, diffRels.size());
List<String> mergeRels = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.as(Encoders.bean(Relation.class))
.toJavaRDD()
.map(r -> r.getTarget())
.collect();
assertFalse(Collections.disjoint(mergeRels, diffRels));
}
@Test
@Order(4)
public void prepareOrgRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json")));
parser
.parseArgument(
new String[] {
"-i",
testGraphBasePath,
"-asi",
testActionSetId,
"-la",
"lookupurl",
"-w",
testOutputBasePath,
"-du",
dbUrl,
"-dusr",
dbUser,
"-t",
dbTable,
"-dpwd",
dbPwd
});
new SparkPrepareOrgRels(parser, spark).run(isLookUpService);
final Properties connectionProperties = new Properties();
connectionProperties.put("user", dbUser);
connectionProperties.put("password", dbPwd);
Connection connection = DriverManager.getConnection(dbUrl, connectionProperties);
ResultSet resultSet = connection
.prepareStatement("SELECT COUNT(*) as total_rels FROM " + dbTable)
.executeQuery();
if (resultSet.next()) {
int total_rels = resultSet.getInt("total_rels");
assertEquals(32, total_rels);
} else
fail("No result in the sql DB");
resultSet.close();
// verify the number of organizations with duplicates
ResultSet resultSet2 = connection
.prepareStatement("SELECT COUNT(DISTINCT(local_id)) as total_orgs FROM " + dbTable)
.executeQuery();
if (resultSet2.next()) {
int total_orgs = resultSet2.getInt("total_orgs");
assertEquals(6, total_orgs);
} else
fail("No result in the sql DB");
resultSet2.close();
// verify that no DiffRel is in the DB
List<String> diffRels = jsc
.textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation"))
.map(s -> OBJECT_MAPPER.readValue(s, Relation.class))
.filter(r -> r.getRelClass().equals("isDifferentFrom"))
.map(r -> r.getSource() + "@@@" + r.getTarget())
.collect();
List<String> dbRels = new ArrayList<>();
ResultSet resultSet3 = connection
.prepareStatement("SELECT local_id, oa_original_id FROM " + dbTable)
.executeQuery();
while (resultSet3.next()) {
String source = OafMapperUtils.createOpenaireId("organization", resultSet3.getString("local_id"), true);
String target = OafMapperUtils
.createOpenaireId("organization", resultSet3.getString("oa_original_id"), true);
dbRels.add(source + "@@@" + target);
}
resultSet3.close();
assertTrue(Collections.disjoint(dbRels, diffRels));
connection.close();
}
@Test
@Order(5)
public void prepareNewOrgsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json")));
parser
.parseArgument(
new String[] {
"-i",
testGraphBasePath,
"-asi",
testActionSetId,
"-la",
"lookupurl",
"-w",
testOutputBasePath,
"-du",
dbUrl,
"-dusr",
dbUser,
"-t",
dbTable,
"-dpwd",
dbPwd
});
new SparkPrepareNewOrgs(parser, spark).run(isLookUpService);
final Properties connectionProperties = new Properties();
connectionProperties.put("user", dbUser);
connectionProperties.put("password", dbPwd);
long orgs_in_diffrel = jsc
.textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation"))
.map(s -> OBJECT_MAPPER.readValue(s, Relation.class))
.filter(r -> r.getRelClass().equals("isDifferentFrom"))
.map(r -> r.getTarget())
.distinct()
.count();
Connection connection = DriverManager.getConnection(dbUrl, connectionProperties);
jsc
.textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation"))
.map(s -> OBJECT_MAPPER.readValue(s, Relation.class))
.filter(r -> r.getRelClass().equals("isDifferentFrom"))
.map(r -> r.getTarget())
.distinct()
.foreach(s -> System.out.println("difforgs = " + s));
ResultSet resultSet0 = connection
.prepareStatement("SELECT oa_original_id FROM " + dbTable + " WHERE local_id = ''")
.executeQuery();
while (resultSet0.next())
System.out
.println(
"dborgs = " + OafMapperUtils.createOpenaireId(20, resultSet0.getString("oa_original_id"), true));
resultSet0.close();
ResultSet resultSet = connection
.prepareStatement("SELECT COUNT(*) as total_new_orgs FROM " + dbTable + " WHERE local_id = ''")
.executeQuery();
if (resultSet.next()) {
int total_new_orgs = resultSet.getInt("total_new_orgs");
assertEquals(orgs_in_diffrel + 1, total_new_orgs);
} else
fail("No result in the sql DB");
resultSet.close();
}
@AfterAll
public static void finalCleanUp() throws IOException {
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
}
}

@ -0,0 +1,236 @@
package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import jdk.nashorn.internal.ir.annotations.Ignore;
@ExtendWith(MockitoExtension.class)
public class SparkOpenorgsTest implements Serializable {
@Mock(serializable = true)
ISLookUpService isLookUpService;
private static SparkSession spark;
private static JavaSparkContext jsc;
private static String testGraphBasePath;
private static String testOutputBasePath;
private static String testDedupGraphBasePath;
private static final String testActionSetId = "test-orchestrator";
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@BeforeAll
public static void cleanUp() throws IOException, URISyntaxException {
testGraphBasePath = Paths
.get(SparkOpenorgsTest.class.getResource("/eu/dnetlib/dhp/dedup/openorgs").toURI())
.toFile()
.getAbsolutePath();
testOutputBasePath = createTempDirectory(SparkOpenorgsTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
testDedupGraphBasePath = createTempDirectory(SparkOpenorgsTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
final SparkConf conf = new SparkConf();
conf.set("spark.sql.shuffle.partitions", "200");
spark = SparkSession
.builder()
.appName(SparkDedupTest.class.getSimpleName())
.master("local[*]")
.config(conf)
.getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
@BeforeEach
public void setUp() throws IOException, ISLookUpException {
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId)))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
}
@Disabled
@Test
public void copyOpenorgsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyOpenorgs.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json")));
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath,
"-asi", testActionSetId,
"-w", testOutputBasePath,
"-np", "50"
});
new SparkCopyOpenorgs(parser, spark).run(isLookUpService);
long orgs_deduprecord = jsc
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord")
.count();
assertEquals(100, orgs_deduprecord);
}
@Test
public void copyOpenorgsMergeRels() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyOpenorgsMergeRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json")));
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath,
"-asi", testActionSetId,
"-w", testOutputBasePath,
"-la", "lookupurl",
"-np", "50"
});
new SparkCopyOpenorgsMergeRels(parser, spark).run(isLookUpService);
long orgs_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.count();
assertEquals(384, orgs_mergerel);
}
@Test
public void copyOpenorgsSimRels() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyOpenorgsSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json")));
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath,
"-asi", testActionSetId,
"-w", testOutputBasePath,
"-la", "lookupurl",
"-np", "50"
});
new SparkCopyOpenorgsSimRels(parser, spark).run(isLookUpService);
long orgs_simrel = spark
.read()
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.count();
assertEquals(73, orgs_simrel);
}
@Test
public void copyRelationsNoOpenorgsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyRelationsNoOpenorgs.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath,
"-w", testOutputBasePath,
"-o", testDedupGraphBasePath
});
new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService);
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(400, relations);
}
@AfterAll
public static void finalCleanUp() throws IOException {
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
}
private static MapFunction<String, Relation> patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
return rel;
};
}
}

@ -0,0 +1,24 @@
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value=""/>
<RESOURCE_TYPE value="DedupOrchestrationDSResourceType"/>
<RESOURCE_KIND value="DedupOrchestrationDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2001-12-31T12:00:00"/>
</HEADER>
<BODY>
<CONFIGURATION enabled="true">
<DEDUPLICATION>
<ENTITY code="20" label="Organization" name="organization"/>
<ACTION_SET id="test-orchestrator"/>
<SCAN_SEQUENCE>
<SCAN id="organization"/>
</SCAN_SEQUENCE>
</DEDUPLICATION>
</CONFIGURATION>
<STATUS>
<LAST_UPDATE value="2001-12-31T12:00:00"/>
</STATUS>
<SECURITY_PARAMETERS>SECURITY_PARAMETERS</SECURITY_PARAMETERS>
</BODY>
</RESOURCE_PROFILE>

@ -57,6 +57,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.MigrateAction;
import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
@ -84,6 +85,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
public static final String SOURCE_TYPE = "source_type";
public static final String TARGET_TYPE = "target_type";
private static final String ORG_ORG_RELTYPE = "organizationOrganization";
private static final String ORG_ORG_SUBRELTYPE = "dedup";
private final DbClient dbClient;
private final long lastUpdateTimestamp;
@ -122,35 +126,71 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
final Predicate<Oaf> verifyNamespacePrefix = new VerifyNsPrefixPredicate(nsPrefixBlacklist);
final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims");
log.info("processClaims: {}", processClaims);
final MigrateAction process = parser.get("action") != null ? MigrateAction.valueOf(parser.get("action"))
: MigrateAction.openaire;
log.info("migrateAction: {}", process);
try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, dbUrl, dbUser,
dbPassword, isLookupUrl)) {
if (processClaims) {
log.info("Processing claims...");
smdbe.execute("queryClaims.sql", smdbe::processClaims);
} else {
log.info("Processing datasources...");
smdbe.execute("queryDatasources.sql", smdbe::processDatasource, verifyNamespacePrefix);
log.info("Processing projects...");
if (dbSchema.equalsIgnoreCase("beta")) {
smdbe.execute("queryProjects.sql", smdbe::processProject, verifyNamespacePrefix);
} else {
smdbe.execute("queryProjects_production.sql", smdbe::processProject, verifyNamespacePrefix);
}
log.info("Processing orgs...");
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix);
log.info("Processing relationsNoRemoval ds <-> orgs ...");
smdbe
.execute(
"queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization, verifyNamespacePrefix);
log.info("Processing projects <-> orgs ...");
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix);
switch (process) {
case claims:
log.info("Processing claims...");
smdbe.execute("queryClaims.sql", smdbe::processClaims);
break;
case openaire:
log.info("Processing datasources...");
smdbe.execute("queryDatasources.sql", smdbe::processDatasource, verifyNamespacePrefix);
log.info("Processing projects...");
if (dbSchema.equalsIgnoreCase("beta")) {
smdbe.execute("queryProjects.sql", smdbe::processProject, verifyNamespacePrefix);
} else {
smdbe.execute("queryProjects_production.sql", smdbe::processProject, verifyNamespacePrefix);
}
log.info("Processing Organizations...");
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix);
log.info("Processing relationsNoRemoval ds <-> orgs ...");
smdbe
.execute(
"queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization,
verifyNamespacePrefix);
log.info("Processing projects <-> orgs ...");
smdbe
.execute(
"queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix);
break;
case openorgs_dedup:
log.info("Processing Openorgs...");
smdbe
.execute(
"queryOpenOrgsForOrgsDedup.sql", smdbe::processOrganization, verifyNamespacePrefix);
log.info("Processing Openorgs Merge Rels...");
smdbe.execute("queryOpenOrgsSimilarityForOrgsDedup.sql", smdbe::processOrgOrgSimRels);
break;
case openorgs:
log.info("Processing Openorgs For Provision...");
smdbe
.execute(
"queryOpenOrgsForProvision.sql", smdbe::processOrganization, verifyNamespacePrefix);
log.info("Processing Openorgs Merge Rels...");
smdbe.execute("queryOpenOrgsSimilarityForProvision.sql", smdbe::processOrgOrgSimRels);
break;
case openaire_organizations:
log.info("Processing Organizations...");
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix);
break;
}
log.info("All done.");
}
@ -597,6 +637,45 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
}
}
public List<Oaf> processOrgOrgSimRels(final ResultSet rs) {
try {
final DataInfo info = prepareDataInfo(rs); // TODO
final String orgId1 = createOpenaireId(20, rs.getString("id1"), true);
final String orgId2 = createOpenaireId(20, rs.getString("id2"), true);
final String relClass = rs.getString("relclass");
final List<KeyValue> collectedFrom = listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname"));
final Relation r1 = new Relation();
r1.setRelType(ORG_ORG_RELTYPE);
r1.setSubRelType(ORG_ORG_SUBRELTYPE);
r1.setRelClass(relClass);
r1.setSource(orgId1);
r1.setTarget(orgId2);
r1.setCollectedfrom(collectedFrom);
r1.setDataInfo(info);
r1.setLastupdatetimestamp(lastUpdateTimestamp);
// removed because there's no difference between two sides //TODO
// final Relation r2 = new Relation();
// r2.setRelType(ORG_ORG_RELTYPE);
// r2.setSubRelType(ORG_ORG_SUBRELTYPE);
// r2.setRelClass(relClass);
// r2.setSource(orgId2);
// r2.setTarget(orgId1);
// r2.setCollectedfrom(collectedFrom);
// r2.setDataInfo(info);
// r2.setLastupdatetimestamp(lastUpdateTimestamp);
// return Arrays.asList(r1, r2);
return Arrays.asList(r1);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws IOException {
super.close();

@ -0,0 +1,11 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
//enum to specify the different actions available for the MigrateDbEntitiesApplication job
public enum MigrateAction {
claims, // migrate claims to the raw graph
openorgs_dedup, // migrate organizations from openorgs to the raw graph
openorgs, // migrate organization from openorgs to the raw graph for provision
openaire, // migrate openaire entities to the raw graph
openaire_organizations // migrate openaire organizations entities to the raw graph
}

@ -25,6 +25,18 @@
<property>
<name>postgresPassword</name>
<description>the password postgres</description>
</property>
<property>
<name>postgresOpenOrgsURL</name>
<description>the postgres URL to access to the OpenOrgs database</description>
</property>
<property>
<name>postgresOpenOrgsUser</name>
<description>the user of OpenOrgs database</description>
</property>
<property>
<name>postgresOpenOrgsPassword</name>
<description>the password of OpenOrgs database</description>
</property>
<property>
<name>dbSchema</name>
@ -178,14 +190,34 @@
<action name="ImportDB">
<java>
<prepare>
<delete path="${contentPath}/db_records"/>
<delete path="${contentPath}/db_openaire"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
<arg>--hdfsPath</arg><arg>${contentPath}/db_records</arg>
<arg>--hdfsPath</arg><arg>${contentPath}/db_openaire</arg>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--action</arg><arg>openaire</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java>
<ok to="ImportDB_openorgs"/>
<error to="Kill"/>
</action>
<action name="ImportDB_openorgs">
<java>
<prepare>
<delete path="${contentPath}/db_openorgs"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
<arg>--hdfsPath</arg><arg>${contentPath}/db_openorgs</arg>
<arg>--postgresUrl</arg><arg>${postgresOpenOrgsURL}</arg>
<arg>--postgresUser</arg><arg>${postgresOpenOrgsUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresOpenOrgsPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--action</arg><arg>openorgs</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java>
@ -315,7 +347,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
<arg>--sourcePaths</arg><arg>${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--shouldHashId</arg><arg>${shouldHashId}</arg>

@ -0,0 +1,270 @@
<workflow-app name="create RAW Organizations" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphOutputPath</name>
<description>the target path to store raw graph</description>
</property>
<property>
<name>reuseContent</name>
<value>false</value>
<description>should import content from the aggregator or reuse a previous version</description>
</property>
<property>
<name>contentPath</name>
<description>path location to store (or reuse) content from the aggregator</description>
</property>
<property>
<name>postgresURL</name>
<description>the postgres URL to access to the database</description>
</property>
<property>
<name>postgresUser</name>
<description>the user postgres</description>
</property>
<property>
<name>postgresPassword</name>
<description>the password postgres</description>
</property>
<property>
<name>postgresOpenOrgsURL</name>
<description>the postgres URL to access to the OpenOrgs database</description>
</property>
<property>
<name>postgresOpenOrgsUser</name>
<description>the user of OpenOrgs database</description>
</property>
<property>
<name>postgresOpenOrgsPassword</name>
<description>the password of OpenOrgs database</description>
</property>
<property>
<name>dbSchema</name>
<value>beta</value>
<description>the database schema according to the D-Net infrastructure (beta or production)</description>
</property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>nsPrefixBlacklist</name>
<value></value>
<description>a blacklist of nsprefixes (comma separeted)</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reuse_aggregator_content"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="reuse_aggregator_content">
<switch>
<case to="ImportDB">${wf:conf('reuseContent') eq false}</case>
<case to="GenerateEntities">${wf:conf('reuseContent') eq true}</case>
<default to="ImportDB"/>
</switch>
</decision>
<fork name="ImportDB">
<path start="ImportDB_openaire_organizations"/>
<path start="ImportDB_openorgs"/>
</fork>
<action name="ImportDB_openaire_organizations">
<java>
<prepare>
<delete path="${contentPath}/db_openaire_organizations"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
<arg>--hdfsPath</arg><arg>${contentPath}/db_openaire_organizations</arg>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--action</arg><arg>openaire_organizations</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java>
<ok to="wait_import"/>
<error to="Kill"/>
</action>
<action name="ImportDB_openorgs">
<java>
<prepare>
<delete path="${contentPath}/db_openorgs"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
<arg>--hdfsPath</arg><arg>${contentPath}/db_openorgs</arg>
<arg>--postgresUrl</arg><arg>${postgresOpenOrgsURL}</arg>
<arg>--postgresUser</arg><arg>${postgresOpenOrgsUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresOpenOrgsPassword}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--action</arg><arg>openorgs_dedup</arg>
<arg>--dbschema</arg><arg>${dbSchema}</arg>
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
</java>
<ok to="wait_import"/>
<error to="Kill"/>
</action>
<join name="wait_import" to="GenerateEntities"/>
<action name="GenerateEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEntities</name>
<class>eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePaths</arg><arg>${contentPath}/db_openaire_organizations,${contentPath}/db_openorgs</arg>
<arg>--targetPath</arg><arg>${workingDir}/entities</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="GenerateGraph"/>
<error to="Kill"/>
</action>
<action name="GenerateGraph">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateGraph</name>
<class>eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/entities</arg>
<arg>--graphRawPath</arg><arg>${graphOutputPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<!--<action name="merge_claims_relation">-->
<!--<spark xmlns="uri:oozie:spark-action:0.2">-->
<!--<master>yarn</master>-->
<!--<mode>cluster</mode>-->
<!--<name>MergeClaims_relation</name>-->
<!--<class>eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication</class>-->
<!--<jar>dhp-graph-mapper-${projectVersion}.jar</jar>-->
<!--<spark-opts>-->
<!--&#45;&#45;executor-memory ${sparkExecutorMemory}-->
<!--&#45;&#45;executor-cores ${sparkExecutorCores}-->
<!--&#45;&#45;driver-memory=${sparkDriverMemory}-->
<!--&#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!--&#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!--&#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!--&#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!--&#45;&#45;conf spark.sql.shuffle.partitions=3840-->
<!--</spark-opts>-->
<!--<arg>&#45;&#45;rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>-->
<!--<arg>&#45;&#45;claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>-->
<!--<arg>&#45;&#45;outputRawGaphPath</arg><arg>${graphOutputPath}</arg>-->
<!--<arg>&#45;&#45;graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>-->
<!--</spark>-->
<!--<ok to="wait_merge"/>-->
<!--<error to="Kill"/>-->
<!--</action>-->
<!--<action name="merge_claims_organization">-->
<!--<spark xmlns="uri:oozie:spark-action:0.2">-->
<!--<master>yarn</master>-->
<!--<mode>cluster</mode>-->
<!--<name>MergeClaims_organization</name>-->
<!--<class>eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication</class>-->
<!--<jar>dhp-graph-mapper-${projectVersion}.jar</jar>-->
<!--<spark-opts>-->
<!--&#45;&#45;executor-memory ${sparkExecutorMemory}-->
<!--&#45;&#45;executor-cores ${sparkExecutorCores}-->
<!--&#45;&#45;driver-memory=${sparkDriverMemory}-->
<!--&#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!--&#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!--&#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!--&#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!--&#45;&#45;conf spark.sql.shuffle.partitions=200-->
<!--</spark-opts>-->
<!--<arg>&#45;&#45;rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>-->
<!--<arg>&#45;&#45;claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>-->
<!--<arg>&#45;&#45;outputRawGaphPath</arg><arg>${graphOutputPath}</arg>-->
<!--<arg>&#45;&#45;graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>-->
<!--</spark>-->
<!--<ok to="wait_merge"/>-->
<!--<error to="Kill"/>-->
<!--</action>-->
<end name="End"/>
</workflow-app>

@ -4,6 +4,8 @@ SELECT
o.name AS legalname,
array_agg(DISTINCT n.name) AS "alternativeNames",
(array_agg(u.url))[1] AS websiteurl,
'' AS logourl,
o.creation_date AS dateofcollection,
o.modification_date AS dateoftransformation,
false AS inferred,
false AS deletedbyinference,
@ -13,26 +15,41 @@ SELECT
'OpenOrgs Database' AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid,
null AS eclegalbody,
null AS eclegalperson,
null AS ecnonprofit,
null AS ecresearchorganization,
null AS echighereducation,
null AS ecinternationalorganizationeurinterests,
null AS ecinternationalorganization,
null AS ecenterprise,
null AS ecsmevalidated,
null AS ecnutscode
FROM organizations o
LEFT OUTER JOIN acronyms a ON (a.id = o.id)
LEFT OUTER JOIN urls u ON (u.id = o.id)
LEFT OUTER JOIN other_ids i ON (i.id = o.id)
LEFT OUTER JOIN other_names n ON (n.id = o.id)
WHERE
o.status = 'approved'
GROUP BY
o.id,
o.name,
o.creation_date,
o.modification_date,
o.country
UNION ALL
SELECT
'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS organizationid,
n.name AS legalshortname,
n.name AS legalname,
ARRAY[]::text[] AS "alternativeNames",
(array_agg(u.url))[1] AS websiteurl,
'' AS logourl,
o.creation_date AS dateofcollection,
o.modification_date AS dateoftransformation,
false AS inferred,
false AS deletedbyinference,
@ -42,12 +59,26 @@ SELECT
'OpenOrgs Database' AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid,
null AS eclegalbody,
null AS eclegalperson,
null AS ecnonprofit,
null AS ecresearchorganization,
null AS echighereducation,
null AS ecinternationalorganizationeurinterests,
null AS ecinternationalorganization,
null AS ecenterprise,
null AS ecsmevalidated,
null AS ecnutscode
FROM other_names n
LEFT OUTER JOIN organizations o ON (n.id = o.id)
LEFT OUTER JOIN urls u ON (u.id = o.id)
LEFT OUTER JOIN other_ids i ON (i.id = o.id)
WHERE
o.status = 'approved'
GROUP BY
o.id, o.modification_date, o.country, n.name
o.id,
o.creation_date,
o.modification_date,
o.country,
n.name;

@ -0,0 +1,41 @@
SELECT
o.id AS organizationid,
coalesce((array_agg(a.acronym))[1], o.name) AS legalshortname,
o.name AS legalname,
array_agg(DISTINCT n.name) AS "alternativeNames",
(array_agg(u.url))[1] AS websiteurl,
'' AS logourl,
o.creation_date AS dateofcollection,
o.modification_date AS dateoftransformation,
false AS inferred,
false AS deletedbyinference,
0.95 AS trust,
'' AS inferenceprovenance,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid,
null AS eclegalbody,
null AS eclegalperson,
null AS ecnonprofit,
null AS ecresearchorganization,
null AS echighereducation,
null AS ecinternationalorganizationeurinterests,
null AS ecinternationalorganization,
null AS ecenterprise,
null AS ecsmevalidated,
null AS ecnutscode
FROM organizations o
LEFT OUTER JOIN acronyms a ON (a.id = o.id)
LEFT OUTER JOIN urls u ON (u.id = o.id)
LEFT OUTER JOIN other_ids i ON (i.id = o.id)
LEFT OUTER JOIN other_names n ON (n.id = o.id)
WHERE
o.status = 'approved'
GROUP BY
o.id,
o.name,
o.creation_date,
o.modification_date,
o.country;

@ -0,0 +1,43 @@
-- relations approved by the user
SELECT
local_id AS id1,
oa_original_id AS id2,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
false AS inferred,
false AS deletedbyinference,
0.99 AS trust,
'' AS inferenceprovenance,
'isSimilarTo' AS relclass
FROM oa_duplicates WHERE reltype = 'is_similar'
UNION ALL
-- relations between openorgs and mesh (alternative names)
SELECT
o.id AS id1,
'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS id2,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
false AS inferred,
false AS deletedbyinference,
0.99 AS trust,
'' AS inferenceprovenance,
'isSimilarTo' AS relclass
FROM other_names n
LEFT OUTER JOIN organizations o ON (n.id = o.id)
UNION ALL
-- diff relations approved by the user
SELECT
local_id AS id1,
oa_original_id AS id2,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
false AS inferred,
false AS deletedbyinference,
0.99 AS trust,
'' AS inferenceprovenance,
'isDifferentFrom' AS relclass
FROM oa_duplicates WHERE reltype = 'is_different';

@ -0,0 +1,12 @@
-- relations approved by the user and suggested by the dedup
SELECT
local_id AS id1,
oa_original_id AS id2,
'openaire____::openorgs' AS collectedfromid,
'OpenOrgs Database' AS collectedfromname,
false AS inferred,
false AS deletedbyinference,
0.99 AS trust,
'' AS inferenceprovenance,
'isSimilarTo' AS relclass
FROM oa_duplicates WHERE reltype = 'is_similar' OR reltype = 'suggested';

@ -24,7 +24,7 @@ SELECT
d.officialname AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
array_remove(array_agg(DISTINCT i.pid || '###' || i.issuertype), NULL) AS pid
array_agg(DISTINCT i.pid || '###' || i.issuertype || '@@@dnet:pid_types') AS pid
FROM dsm_organizations o
LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom)
LEFT OUTER JOIN dsm_organizationpids p ON (p.organization = o.id)
@ -50,4 +50,4 @@ GROUP BY
o.trust,
d.id,
d.officialname,
o.country
o.country;

@ -1,17 +0,0 @@
SELECT local_id AS id1, oa_original_id AS id2 FROM openaire_simrels WHERE reltype = 'is_similar'
UNION ALL
SELECT
o.id AS id1,
'openorgsmesh'||substring(o.id, 13)||'-'||md5(a.acronym) AS id2
FROM acronyms a
LEFT OUTER JOIN organizations o ON (a.id = o.id)
UNION ALL
SELECT
o.id AS id1,
'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS id2
FROM other_names n
LEFT OUTER JOIN organizations o ON (n.id = o.id)

@ -114,9 +114,6 @@
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>

Loading…
Cancel
Save