forked from D-Net/dnet-hadoop
Compare commits
34 Commits
master
...
deduptesti
@ -1,14 +1,14 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.provision;
|
||||
package eu.dnetlib.dhp.schema.oaf;
|
||||
|
||||
public class ProvisionConstants {
|
||||
public class ModelHardLimits {
|
||||
|
||||
public static final int MAX_EXTERNAL_ENTITIES = 50;
|
||||
public static final int MAX_AUTHORS = 200;
|
||||
public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
|
||||
public static final int MAX_TITLE_LENGTH = 5000;
|
||||
public static final int MAX_TITLES = 10;
|
||||
public static final int MAX_ABSTRACT_LENGTH = 100000;
|
||||
public static final int MAX_ABSTRACT_LENGTH = 150000;
|
||||
public static final int MAX_INSTANCES = 10;
|
||||
|
||||
}
|
@ -0,0 +1,49 @@
|
||||
|
||||
package eu.dnetlib.dhp.schema.oaf;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
|
||||
public class ResultTypeComparator implements Comparator<Result> {
|
||||
|
||||
@Override
|
||||
public int compare(Result left, Result right) {
|
||||
|
||||
if (left == null && right == null)
|
||||
return 0;
|
||||
if (left == null)
|
||||
return 1;
|
||||
if (right == null)
|
||||
return -1;
|
||||
|
||||
String lClass = left.getResulttype().getClassid();
|
||||
String rClass = right.getResulttype().getClassid();
|
||||
|
||||
if (lClass.equals(rClass))
|
||||
return 0;
|
||||
|
||||
if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
|
||||
return -1;
|
||||
if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
|
||||
return -1;
|
||||
if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
|
||||
return -1;
|
||||
if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
|
||||
return -1;
|
||||
if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
|
||||
return 1;
|
||||
|
||||
// Else (but unlikely), lexicographical ordering will do.
|
||||
return lClass.compareTo(rClass);
|
||||
}
|
||||
}
|
@ -0,0 +1,102 @@
|
||||
|
||||
package eu.dnetlib.dhp.schema.oaf.utils;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.CleaningFunctions;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
|
||||
/**
|
||||
* Factory class for OpenAIRE identifiers in the Graph
|
||||
*/
|
||||
public class IdentifierFactory implements Serializable {
|
||||
|
||||
public static final String ID_SEPARATOR = "::";
|
||||
public static final String ID_PREFIX_SEPARATOR = "|";
|
||||
public final static String ID_REGEX = "^[0-9][0-9]\\" + ID_PREFIX_SEPARATOR + ".{12}" + ID_SEPARATOR
|
||||
+ "[a-zA-Z0-9]{32}$";
|
||||
|
||||
public final static String DOI_REGEX = "(^10\\.[0-9]{4,9}\\/[-._;()\\/:a-zA-Z0-9]+$)|" +
|
||||
"(^10\\.1002\\/[^\\s]+$)|" +
|
||||
"(^10\\.1021\\/[a-zA-Z0-9_][a-zA-Z0-9_][0-9]++$)|" +
|
||||
"(^10\\.1207\\/[a-zA-Z0-9_]+\\&[0-9]+_[0-9]+$)";
|
||||
|
||||
public static final int ID_PREFIX_LEN = 12;
|
||||
public static final String NONE = "none";
|
||||
|
||||
/**
|
||||
* Creates an identifier from the most relevant PID (if available) in the given entity T. Returns entity.id
|
||||
* when no PID is available
|
||||
* @param entity the entity providing PIDs and a default ID.
|
||||
* @param <T> the specific entity type. Currently Organization and Result subclasses are supported.
|
||||
* @return an identifier from the most relevant PID, entity.id otherwise
|
||||
*/
|
||||
public static <T extends OafEntity> String createIdentifier(T entity) {
|
||||
|
||||
if (Objects.isNull(entity.getPid()) || entity.getPid().isEmpty()) {
|
||||
return entity.getId();
|
||||
}
|
||||
|
||||
return entity
|
||||
.getPid()
|
||||
.stream()
|
||||
.filter(s -> pidFilter(s))
|
||||
.min(new PidComparator<>(entity))
|
||||
.map(s -> idFromPid(entity, s))
|
||||
.map(IdentifierFactory::verifyIdSyntax)
|
||||
.orElseGet(entity::getId);
|
||||
}
|
||||
|
||||
protected static boolean pidFilter(StructuredProperty s) {
|
||||
if (Objects.isNull(s.getQualifier()) ||
|
||||
StringUtils.isBlank(StringUtils.trim(s.getValue()))) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
switch (PidType.valueOf(s.getQualifier().getClassid())) {
|
||||
case doi:
|
||||
final String doi = StringUtils.trim(StringUtils.lowerCase(s.getValue()));
|
||||
return doi.matches(DOI_REGEX);
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static String verifyIdSyntax(String s) {
|
||||
if (StringUtils.isBlank(s) || !s.matches(ID_REGEX)) {
|
||||
throw new RuntimeException(String.format("malformed id: '%s'", s));
|
||||
} else {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
private static <T extends OafEntity> String idFromPid(T entity, StructuredProperty s) {
|
||||
return new StringBuilder()
|
||||
.append(StringUtils.substringBefore(entity.getId(), ID_PREFIX_SEPARATOR))
|
||||
.append(ID_PREFIX_SEPARATOR)
|
||||
.append(createPrefix(s.getQualifier().getClassid()))
|
||||
.append(ID_SEPARATOR)
|
||||
.append(DHPUtils.md5(CleaningFunctions.normalizePidValue(s).getValue()))
|
||||
.toString();
|
||||
}
|
||||
|
||||
// create the prefix (length = 12)
|
||||
private static String createPrefix(String pidType) {
|
||||
StringBuilder prefix = new StringBuilder(StringUtils.left(pidType, ID_PREFIX_LEN));
|
||||
while (prefix.length() < ID_PREFIX_LEN) {
|
||||
prefix.append("_");
|
||||
}
|
||||
return prefix.substring(0, ID_PREFIX_LEN);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
|
||||
package eu.dnetlib.dhp.schema.oaf.utils;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
public class OrganizationPidComparator implements Comparator<PidType> {
|
||||
|
||||
@Override
|
||||
public int compare(PidType pLeft, PidType pRight) {
|
||||
if (pLeft.equals(PidType.GRID))
|
||||
return -1;
|
||||
if (pRight.equals(PidType.GRID))
|
||||
return 1;
|
||||
|
||||
if (pLeft.equals(PidType.mag_id))
|
||||
return -1;
|
||||
if (pRight.equals(PidType.mag_id))
|
||||
return 1;
|
||||
|
||||
if (pLeft.equals(PidType.urn))
|
||||
return -1;
|
||||
if (pRight.equals(PidType.urn))
|
||||
return 1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
|
||||
package eu.dnetlib.dhp.schema.oaf.utils;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
|
||||
public class PidComparator<T extends OafEntity> implements Comparator<StructuredProperty> {
|
||||
|
||||
private T entity;
|
||||
|
||||
public PidComparator(T entity) {
|
||||
this.entity = entity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(StructuredProperty left, StructuredProperty right) {
|
||||
|
||||
if (left == null && right == null)
|
||||
return 0;
|
||||
if (left == null)
|
||||
return 1;
|
||||
if (right == null)
|
||||
return -1;
|
||||
|
||||
PidType lClass = PidType.valueOf(left.getQualifier().getClassid());
|
||||
PidType rClass = PidType.valueOf(right.getQualifier().getClassid());
|
||||
|
||||
if (lClass.equals(rClass))
|
||||
return 0;
|
||||
|
||||
if (ModelSupport.isSubClass(entity, Result.class)) {
|
||||
return compareResultPids(lClass, rClass);
|
||||
}
|
||||
if (ModelSupport.isSubClass(entity, Organization.class)) {
|
||||
return compareOrganizationtPids(lClass, rClass);
|
||||
}
|
||||
|
||||
// Else (but unlikely), lexicographical ordering will do.
|
||||
return lClass.compareTo(rClass);
|
||||
}
|
||||
|
||||
private int compareResultPids(PidType lClass, PidType rClass) {
|
||||
return new ResultPidComparator().compare(lClass, rClass);
|
||||
}
|
||||
|
||||
private int compareOrganizationtPids(PidType lClass, PidType rClass) {
|
||||
return new OrganizationPidComparator().compare(lClass, rClass);
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
|
||||
package eu.dnetlib.dhp.schema.oaf.utils;
|
||||
|
||||
import org.apache.commons.lang3.EnumUtils;
|
||||
|
||||
public enum PidType {
|
||||
|
||||
// Result
|
||||
doi, pmid, pmc, handle, arXiv, NCID, GBIF, nct, pdb,
|
||||
|
||||
// Organization
|
||||
GRID, mag_id, urn,
|
||||
|
||||
// Used by dedup
|
||||
undefined, original;
|
||||
|
||||
public static boolean isValid(String type) {
|
||||
return EnumUtils.isValidEnum(PidType.class, type);
|
||||
}
|
||||
|
||||
public static PidType tryValueOf(String s) {
|
||||
try {
|
||||
return PidType.valueOf(s);
|
||||
} catch (Exception e) {
|
||||
return PidType.original;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
|
||||
package eu.dnetlib.dhp.schema.oaf.utils;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
public class ResultPidComparator implements Comparator<PidType> {
|
||||
|
||||
@Override
|
||||
public int compare(PidType pLeft, PidType pRight) {
|
||||
if (pLeft.equals(PidType.doi))
|
||||
return -1;
|
||||
if (pRight.equals(PidType.doi))
|
||||
return 1;
|
||||
|
||||
if (pLeft.equals(PidType.pmid))
|
||||
return -1;
|
||||
if (pRight.equals(PidType.pmid))
|
||||
return 1;
|
||||
|
||||
if (pLeft.equals(PidType.pmc))
|
||||
return -1;
|
||||
if (pRight.equals(PidType.pmc))
|
||||
return 1;
|
||||
|
||||
if (pLeft.equals(PidType.handle))
|
||||
return -1;
|
||||
if (pRight.equals(PidType.handle))
|
||||
return 1;
|
||||
|
||||
if (pLeft.equals(PidType.arXiv))
|
||||
return -1;
|
||||
if (pRight.equals(PidType.arXiv))
|
||||
return 1;
|
||||
|
||||
if (pLeft.equals(PidType.NCID))
|
||||
return -1;
|
||||
if (pRight.equals(PidType.NCID))
|
||||
return 1;
|
||||
|
||||
if (pLeft.equals(PidType.GBIF))
|
||||
return -1;
|
||||
if (pRight.equals(PidType.GBIF))
|
||||
return 1;
|
||||
|
||||
if (pLeft.equals(PidType.nct))
|
||||
return -1;
|
||||
if (pRight.equals(PidType.nct))
|
||||
return 1;
|
||||
|
||||
if (pLeft.equals(PidType.urn))
|
||||
return -1;
|
||||
if (pRight.equals(PidType.urn))
|
||||
return 1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
|
||||
package eu.dnetlib.dhp.schema.oaf.utils;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
|
||||
public class IdentifierFactoryTest {
|
||||
|
||||
private static ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
@Test
|
||||
public void testCreateIdentifierForPublication() throws IOException {
|
||||
|
||||
verifyIdentifier("publication_doi.json", "50|doi_________::" + DHPUtils.md5("10.1016/j.cmet.2011.03.013"));
|
||||
verifyIdentifier("publication_pmc.json", "50|pmc_________::" + DHPUtils.md5("21459329"));
|
||||
verifyIdentifier(
|
||||
"publication_urn.json",
|
||||
"50|urn_________::" + DHPUtils.md5("urn:nbn:nl:ui:29-f3ed5f9e-edf6-457e-8848-61b58a4075e2"));
|
||||
|
||||
final String defaultID = "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f";
|
||||
verifyIdentifier("publication_3.json", defaultID);
|
||||
verifyIdentifier("publication_4.json", defaultID);
|
||||
verifyIdentifier("publication_5.json", defaultID);
|
||||
}
|
||||
|
||||
protected void verifyIdentifier(String filename, String expectedID) throws IOException {
|
||||
final String json = IOUtils.toString(getClass().getResourceAsStream(filename));
|
||||
final Publication pub = OBJECT_MAPPER.readValue(json, Publication.class);
|
||||
|
||||
String id = IdentifierFactory.createIdentifier(pub);
|
||||
|
||||
assertNotNull(id);
|
||||
assertEquals(expectedID, id);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1 @@
|
||||
{"id":"50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f","pid":[{"qualifier":{"classid":"scp-number"},"value":"79953761260"}]}
|
@ -0,0 +1 @@
|
||||
{"id":"50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f","pid":[]}
|
@ -0,0 +1 @@
|
||||
{"id":"50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f"}
|
@ -0,0 +1 @@
|
||||
{"id":"50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f","pid":[{"qualifier":{"classid":"doi"},"value":"10.1016/j.cmet.2011.03.013"},{"qualifier":{"classid":"urn"},"value":"urn:nbn:nl:ui:29-f3ed5f9e-edf6-457e-8848-61b58a4075e2"},{"qualifier":{"classid":"scp-number"},"value":"79953761260"},{"qualifier":{"classid":"pmc"},"value":"21459329"}]}
|
@ -0,0 +1 @@
|
||||
{"id":"50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f","pid":[{"qualifier":{"classid":"urn"},"value":"urn:nbn:nl:ui:29-f3ed5f9e-edf6-457e-8848-61b58a4075e2"},{"qualifier":{"classid":"scp-number"},"value":"79953761260"},{"qualifier":{"classid":"pmc"},"value":"21459329"}]}
|
@ -0,0 +1 @@
|
||||
{"id":"50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f","pid":[{"qualifier":{"classid":"urn"},"value":"urn:nbn:nl:ui:29-f3ed5f9e-edf6-457e-8848-61b58a4075e2"},{"qualifier":{"classid":"scp-number"},"value":"79953761260"},{"qualifier":{"classid":"pmcid"},"value":"21459329"}]}
|
@ -0,0 +1,46 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.substringAfter;
|
||||
import static org.apache.commons.lang3.StringUtils.substringBefore;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||
|
||||
public class IdGenerator implements Serializable {
|
||||
|
||||
// pick the best pid from the list (consider date and pidtype)
|
||||
public static <T extends OafEntity> String generate(List<Identifier<T>> pids, String defaultID) {
|
||||
if (pids == null || pids.size() == 0)
|
||||
return defaultID;
|
||||
|
||||
Identifier<T> bp = pids
|
||||
.stream()
|
||||
.min(Identifier::compareTo)
|
||||
.get();
|
||||
|
||||
String prefix = substringBefore(bp.getOriginalID(), "|");
|
||||
String ns = substringBefore(substringAfter(bp.getOriginalID(), "|"), "::");
|
||||
String suffix = substringAfter(bp.getOriginalID(), "::");
|
||||
|
||||
final String pidType = substringBefore(ns, "_");
|
||||
if (PidType.isValid(pidType)) {
|
||||
return prefix + "|" + dedupify(ns) + "::" + suffix;
|
||||
} else {
|
||||
return prefix + "|dedup_wf_001::" + suffix;
|
||||
}
|
||||
}
|
||||
|
||||
private static String dedupify(String ns) {
|
||||
StringBuilder prefix = new StringBuilder(substringBefore(ns, "_")).append("_dedup");
|
||||
while (prefix.length() < 12) {
|
||||
prefix.append("_");
|
||||
}
|
||||
return prefix.substring(0, 12);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,184 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class SparkCollectSimRels extends AbstractSparkAction {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkCollectSimRels.class);
|
||||
|
||||
Dataset<Row> simGroupsDS;
|
||||
Dataset<Row> groupsDS;
|
||||
|
||||
public SparkCollectSimRels(ArgumentApplicationParser parser, SparkSession spark, Dataset<Row> simGroupsDS,
|
||||
Dataset<Row> groupsDS) {
|
||||
super(parser, spark);
|
||||
this.simGroupsDS = simGroupsDS;
|
||||
this.groupsDS = groupsDS;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
SparkBlockStats.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
final String dbUrl = parser.get("postgresUrl");
|
||||
final String dbUser = parser.get("postgresUser");
|
||||
final String dbPassword = parser.get("postgresPassword");
|
||||
|
||||
SparkSession spark = getSparkSession(conf);
|
||||
|
||||
DataFrameReader readOptions = spark
|
||||
.read()
|
||||
.format("jdbc")
|
||||
.option("url", dbUrl)
|
||||
.option("user", dbUser)
|
||||
.option("password", dbPassword);
|
||||
|
||||
new SparkCollectSimRels(
|
||||
parser,
|
||||
spark,
|
||||
readOptions.option("dbtable", "similarity_groups").load(),
|
||||
readOptions.option("dbtable", "groups").load())
|
||||
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
|
||||
}
|
||||
|
||||
@Override
|
||||
void run(ISLookUpService isLookUpService) throws DocumentException, ISLookUpException, IOException {
|
||||
|
||||
// read oozie parameters
|
||||
final String isLookUpUrl = parser.get("isLookUpUrl");
|
||||
final String actionSetId = parser.get("actionSetId");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final int numPartitions = Optional
|
||||
.ofNullable(parser.get("numPartitions"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(NUM_PARTITIONS);
|
||||
final String dbUrl = parser.get("postgresUrl");
|
||||
final String dbUser = parser.get("postgresUser");
|
||||
|
||||
log.info("numPartitions: '{}'", numPartitions);
|
||||
log.info("isLookUpUrl: '{}'", isLookUpUrl);
|
||||
log.info("actionSetId: '{}'", actionSetId);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("postgresUser: {}", dbUser);
|
||||
log.info("postgresUrl: {}", dbUrl);
|
||||
log.info("postgresPassword: xxx");
|
||||
|
||||
JavaPairRDD<String, List<String>> similarityGroup = simGroupsDS
|
||||
.toJavaRDD()
|
||||
.mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1)))
|
||||
.groupByKey()
|
||||
.mapToPair(
|
||||
i -> new Tuple2<>(i._1(), StreamSupport
|
||||
.stream(i._2().spliterator(), false)
|
||||
.collect(Collectors.toList())));
|
||||
|
||||
JavaPairRDD<String, String> groupIds = groupsDS
|
||||
.toJavaRDD()
|
||||
.mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1)));
|
||||
|
||||
JavaRDD<Tuple2<Tuple2<String, String>, List<String>>> groups = similarityGroup
|
||||
.leftOuterJoin(groupIds)
|
||||
.filter(g -> g._2()._2().isPresent())
|
||||
.map(g -> new Tuple2<>(new Tuple2<>(g._1(), g._2()._2().get()), g._2()._1()));
|
||||
|
||||
JavaRDD<Relation> relations = groups.flatMap(g -> {
|
||||
String firstId = g._2().get(0);
|
||||
List<Relation> rels = new ArrayList<>();
|
||||
|
||||
for (String id : g._2()) {
|
||||
if (!firstId.equals(id))
|
||||
rels.add(createSimRel(firstId, id, g._1()._2()));
|
||||
}
|
||||
|
||||
return rels.iterator();
|
||||
});
|
||||
|
||||
Dataset<Relation> resultRelations = spark
|
||||
.createDataset(
|
||||
relations.filter(r -> r.getRelType().equals("resultResult")).rdd(),
|
||||
Encoders.bean(Relation.class))
|
||||
.repartition(numPartitions);
|
||||
|
||||
Dataset<Relation> organizationRelations = spark
|
||||
.createDataset(
|
||||
relations.filter(r -> r.getRelType().equals("organizationOrganization")).rdd(),
|
||||
Encoders.bean(Relation.class))
|
||||
.repartition(numPartitions);
|
||||
|
||||
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
|
||||
switch (dedupConf.getWf().getSubEntityValue()) {
|
||||
case "organization":
|
||||
savePostgresRelation(organizationRelations, workingPath, actionSetId, "organization");
|
||||
break;
|
||||
default:
|
||||
savePostgresRelation(
|
||||
resultRelations, workingPath, actionSetId, dedupConf.getWf().getSubEntityValue());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private Relation createSimRel(String source, String target, String entity) {
|
||||
final Relation r = new Relation();
|
||||
r.setSubRelType("dedupSimilarity");
|
||||
r.setRelClass("isSimilarTo");
|
||||
r.setDataInfo(new DataInfo());
|
||||
|
||||
switch (entity) {
|
||||
case "result":
|
||||
r.setSource("50|" + source);
|
||||
r.setTarget("50|" + target);
|
||||
r.setRelType("resultResult");
|
||||
break;
|
||||
case "organization":
|
||||
r.setSource("20|" + source);
|
||||
r.setTarget("20|" + target);
|
||||
r.setRelType("organizationOrganization");
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("unmanaged entity type: " + entity);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
private void savePostgresRelation(Dataset<Relation> newRelations, String workingPath, String actionSetId,
|
||||
String entityType) {
|
||||
newRelations
|
||||
.write()
|
||||
.mode(SaveMode.Append)
|
||||
.parquet(DedupUtility.createSimRelPath(workingPath, actionSetId, entityType));
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,170 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class SparkPrepareNewOrgs extends AbstractSparkAction {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
|
||||
|
||||
public SparkPrepareNewOrgs(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
super(parser, spark);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
SparkPrepareNewOrgs.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||
|
||||
new SparkPrepareNewOrgs(parser, getSparkSession(conf))
|
||||
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(ISLookUpService isLookUpService) throws IOException {
|
||||
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
final String isLookUpUrl = parser.get("isLookUpUrl");
|
||||
final String actionSetId = parser.get("actionSetId");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final int numConnections = Optional
|
||||
.ofNullable(parser.get("numConnections"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(NUM_CONNECTIONS);
|
||||
|
||||
final String apiUrl = Optional
|
||||
.ofNullable(parser.get("apiUrl"))
|
||||
.orElse("");
|
||||
|
||||
final String dbUrl = parser.get("dbUrl");
|
||||
final String dbTable = parser.get("dbTable");
|
||||
final String dbUser = parser.get("dbUser");
|
||||
final String dbPwd = parser.get("dbPwd");
|
||||
|
||||
log.info("graphBasePath: '{}'", graphBasePath);
|
||||
log.info("isLookUpUrl: '{}'", isLookUpUrl);
|
||||
log.info("actionSetId: '{}'", actionSetId);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("numPartitions: '{}'", numConnections);
|
||||
log.info("apiUrl: '{}'", apiUrl);
|
||||
log.info("dbUrl: '{}'", dbUrl);
|
||||
log.info("dbUser: '{}'", dbUser);
|
||||
log.info("table: '{}'", dbTable);
|
||||
log.info("dbPwd: '{}'", "xxx");
|
||||
|
||||
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
|
||||
final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization");
|
||||
|
||||
Dataset<OrgSimRel> newOrgs = createNewOrgs(spark, mergeRelPath, entityPath);
|
||||
|
||||
final Properties connectionProperties = new Properties();
|
||||
connectionProperties.put("user", dbUser);
|
||||
connectionProperties.put("password", dbPwd);
|
||||
|
||||
newOrgs
|
||||
.repartition(numConnections)
|
||||
.write()
|
||||
.mode(SaveMode.Append)
|
||||
.jdbc(dbUrl, dbTable, connectionProperties);
|
||||
|
||||
if (!apiUrl.isEmpty())
|
||||
updateSimRels(apiUrl);
|
||||
|
||||
}
|
||||
|
||||
public static Dataset<OrgSimRel> createNewOrgs(
|
||||
final SparkSession spark,
|
||||
final String mergeRelsPath,
|
||||
final String entitiesPath) {
|
||||
|
||||
// <id, json_entity>
|
||||
Dataset<Tuple2<String, Organization>> entities = spark
|
||||
.read()
|
||||
.textFile(entitiesPath)
|
||||
.map(
|
||||
(MapFunction<String, Tuple2<String, Organization>>) it -> {
|
||||
Organization entity = OBJECT_MAPPER.readValue(it, Organization.class);
|
||||
return new Tuple2<>(entity.getId(), entity);
|
||||
},
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class)));
|
||||
|
||||
Dataset<Tuple2<String, String>> mergerels = spark
|
||||
.createDataset(
|
||||
spark
|
||||
.read()
|
||||
.load(mergeRelsPath)
|
||||
.as(Encoders.bean(Relation.class))
|
||||
.where("relClass == 'isMergedIn'")
|
||||
.toJavaRDD()
|
||||
.mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget()))
|
||||
.rdd(),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
|
||||
|
||||
return entities
|
||||
.joinWith(mergerels, entities.col("_1").equalTo(mergerels.col("_1")), "left")
|
||||
.filter((FilterFunction<Tuple2<Tuple2<String, Organization>, Tuple2<String, String>>>) t -> t._2() == null)
|
||||
.filter(
|
||||
(FilterFunction<Tuple2<Tuple2<String, Organization>, Tuple2<String, String>>>) t -> !t
|
||||
._1()
|
||||
._1()
|
||||
.contains("openorgs"))
|
||||
.map(
|
||||
(MapFunction<Tuple2<Tuple2<String, Organization>, Tuple2<String, String>>, OrgSimRel>) r -> new OrgSimRel(
|
||||
"",
|
||||
r._1()._2().getOriginalId().get(0),
|
||||
r._1()._2().getLegalname() != null ? r._1()._2().getLegalname().getValue() : "",
|
||||
r._1()._2().getLegalshortname() != null ? r._1()._2().getLegalshortname().getValue() : "",
|
||||
r._1()._2().getCountry() != null ? r._1()._2().getCountry().getClassid() : "",
|
||||
r._1()._2().getWebsiteurl() != null ? r._1()._2().getWebsiteurl().getValue() : "",
|
||||
r._1()._2().getCollectedfrom().get(0).getValue(), ""),
|
||||
Encoders.bean(OrgSimRel.class));
|
||||
|
||||
}
|
||||
|
||||
private static String updateSimRels(final String apiUrl) throws IOException {
|
||||
|
||||
log.info("Updating simrels on the portal");
|
||||
|
||||
final HttpGet req = new HttpGet(apiUrl);
|
||||
try (final CloseableHttpClient client = HttpClients.createDefault()) {
|
||||
try (final CloseableHttpResponse response = client.execute(req)) {
|
||||
return IOUtils.toString(response.getEntity().getContent());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,265 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import scala.Tuple2;
|
||||
import scala.Tuple3;
|
||||
|
||||
public class SparkPrepareOrgRels extends AbstractSparkAction {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
|
||||
|
||||
public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
super(parser, spark);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
SparkCreateSimRels.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||
|
||||
new SparkPrepareOrgRels(parser, getSparkSession(conf))
|
||||
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(ISLookUpService isLookUpService) throws IOException {
|
||||
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
final String isLookUpUrl = parser.get("isLookUpUrl");
|
||||
final String actionSetId = parser.get("actionSetId");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
final int numConnections = Optional
|
||||
.ofNullable(parser.get("numConnections"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(NUM_CONNECTIONS);
|
||||
|
||||
final String dbUrl = parser.get("dbUrl");
|
||||
final String dbTable = parser.get("dbTable");
|
||||
final String dbUser = parser.get("dbUser");
|
||||
final String dbPwd = parser.get("dbPwd");
|
||||
|
||||
log.info("graphBasePath: '{}'", graphBasePath);
|
||||
log.info("isLookUpUrl: '{}'", isLookUpUrl);
|
||||
log.info("actionSetId: '{}'", actionSetId);
|
||||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("numPartitions: '{}'", numConnections);
|
||||
log.info("dbUrl: '{}'", dbUrl);
|
||||
log.info("dbUser: '{}'", dbUser);
|
||||
log.info("table: '{}'", dbTable);
|
||||
log.info("dbPwd: '{}'", "xxx");
|
||||
|
||||
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
|
||||
final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization");
|
||||
|
||||
Dataset<OrgSimRel> relations = createRelations(spark, mergeRelPath, entityPath);
|
||||
|
||||
final Properties connectionProperties = new Properties();
|
||||
connectionProperties.put("user", dbUser);
|
||||
connectionProperties.put("password", dbPwd);
|
||||
|
||||
relations
|
||||
.repartition(numConnections)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.jdbc(dbUrl, dbTable, connectionProperties);
|
||||
|
||||
}
|
||||
|
||||
public static Dataset<OrgSimRel> createRelations(
|
||||
final SparkSession spark,
|
||||
final String mergeRelsPath,
|
||||
final String entitiesPath) {
|
||||
|
||||
Dataset<Tuple2<String, Organization>> entities = spark
|
||||
.read()
|
||||
.textFile(entitiesPath)
|
||||
.map(
|
||||
(MapFunction<String, Tuple2<String, Organization>>) it -> {
|
||||
Organization entity = OBJECT_MAPPER.readValue(it, Organization.class);
|
||||
return new Tuple2<>(entity.getId(), entity);
|
||||
},
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class)));
|
||||
|
||||
Dataset<Tuple3<String, String, String>> relations = spark
|
||||
.createDataset(
|
||||
spark
|
||||
.read()
|
||||
.load(mergeRelsPath)
|
||||
.as(Encoders.bean(Relation.class))
|
||||
.where("relClass == 'merges'")
|
||||
.toJavaRDD()
|
||||
.mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget()))
|
||||
.filter(t -> !t._2().contains("openorgsmesh"))
|
||||
.groupByKey()
|
||||
.map(g -> Lists.newArrayList(g._2()))
|
||||
.filter(l -> l.size() > 1)
|
||||
.flatMap(l -> {
|
||||
String groupId = "group::" + UUID.randomUUID();
|
||||
List<String> ids = sortIds(l);
|
||||
List<Tuple3<String, String, String>> rels = new ArrayList<>();
|
||||
|
||||
String source = ids.get(0);
|
||||
for (String target : ids) {
|
||||
rels.add(new Tuple3<>(source, target, groupId));
|
||||
}
|
||||
|
||||
return rels.iterator();
|
||||
})
|
||||
.rdd(),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING()));
|
||||
|
||||
Dataset<Tuple2<String, OrgSimRel>> relations2 = relations // <openorgs, corda>
|
||||
.joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner")
|
||||
.map(
|
||||
(MapFunction<Tuple2<Tuple3<String, String, String>, Tuple2<String, Organization>>, OrgSimRel>) r -> new OrgSimRel(
|
||||
r._1()._1(),
|
||||
r._2()._2().getOriginalId().get(0),
|
||||
r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "",
|
||||
r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "",
|
||||
r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
|
||||
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
|
||||
r._2()._2().getCollectedfrom().get(0).getValue(),
|
||||
r._1()._3()),
|
||||
Encoders.bean(OrgSimRel.class))
|
||||
.map(
|
||||
(MapFunction<OrgSimRel, Tuple2<String, OrgSimRel>>) o -> new Tuple2<>(o.getLocal_id(), o),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.bean(OrgSimRel.class)));
|
||||
|
||||
return relations2
|
||||
.joinWith(entities, relations2.col("_1").equalTo(entities.col("_1")), "inner")
|
||||
.map(
|
||||
(MapFunction<Tuple2<Tuple2<String, OrgSimRel>, Tuple2<String, Organization>>, OrgSimRel>) r -> {
|
||||
OrgSimRel orgSimRel = r._1()._2();
|
||||
orgSimRel.setLocal_id(r._2()._2().getOriginalId().get(0));
|
||||
return orgSimRel;
|
||||
},
|
||||
Encoders.bean(OrgSimRel.class));
|
||||
|
||||
}
|
||||
|
||||
// select best ids from the list. Priority: 1) openorgs, 2)corda, 3)alphabetic
|
||||
public static List<String> sortIds(List<String> ids) {
|
||||
|
||||
ids.sort((o1, o2) -> {
|
||||
|
||||
if (o1.contains("openorgs____") && o2.contains("openorgs____"))
|
||||
return o1.compareTo(o2);
|
||||
if (o1.contains("corda") && o2.contains("corda"))
|
||||
return o1.compareTo(o2);
|
||||
|
||||
if (o1.contains("openorgs____"))
|
||||
return -1;
|
||||
if (o2.contains("openorgs____"))
|
||||
return 1;
|
||||
|
||||
if (o1.contains("corda"))
|
||||
return -1;
|
||||
if (o2.contains("corda"))
|
||||
return 1;
|
||||
|
||||
return o1.compareTo(o2);
|
||||
});
|
||||
|
||||
return ids;
|
||||
}
|
||||
|
||||
public static Dataset<OrgSimRel> createRelationsFromScratch(
|
||||
final SparkSession spark,
|
||||
final String mergeRelsPath,
|
||||
final String entitiesPath) {
|
||||
|
||||
// <id, json_entity>
|
||||
Dataset<Tuple2<String, Organization>> entities = spark
|
||||
.read()
|
||||
.textFile(entitiesPath)
|
||||
.map(
|
||||
(MapFunction<String, Tuple2<String, Organization>>) it -> {
|
||||
Organization entity = OBJECT_MAPPER.readValue(it, Organization.class);
|
||||
return new Tuple2<>(entity.getId(), entity);
|
||||
},
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class)));
|
||||
|
||||
Dataset<Tuple2<String, String>> relations = spark
|
||||
.createDataset(
|
||||
spark
|
||||
.read()
|
||||
.load(mergeRelsPath)
|
||||
.as(Encoders.bean(Relation.class))
|
||||
.where("relClass == 'merges'")
|
||||
.toJavaRDD()
|
||||
.mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget()))
|
||||
.groupByKey()
|
||||
.flatMap(g -> {
|
||||
List<Tuple2<String, String>> rels = new ArrayList<>();
|
||||
for (String id1 : g._2()) {
|
||||
for (String id2 : g._2()) {
|
||||
if (!id1.equals(id2))
|
||||
if (id1.contains("openorgs____") && !id2.contains("openorgsmesh"))
|
||||
rels.add(new Tuple2<>(id1, id2));
|
||||
}
|
||||
}
|
||||
return rels.iterator();
|
||||
})
|
||||
.rdd(),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
|
||||
|
||||
Dataset<Tuple2<String, OrgSimRel>> relations2 = relations // <openorgs, corda>
|
||||
.joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner")
|
||||
.map(
|
||||
(MapFunction<Tuple2<Tuple2<String, String>, Tuple2<String, Organization>>, OrgSimRel>) r -> new OrgSimRel(
|
||||
r._1()._1(),
|
||||
r._2()._2().getOriginalId().get(0),
|
||||
r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "",
|
||||
r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "",
|
||||
r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
|
||||
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
|
||||
r._2()._2().getCollectedfrom().get(0).getValue(),
|
||||
"group::" + r._1()._1()),
|
||||
Encoders.bean(OrgSimRel.class))
|
||||
.map(
|
||||
(MapFunction<OrgSimRel, Tuple2<String, OrgSimRel>>) o -> new Tuple2<>(o.getLocal_id(), o),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.bean(OrgSimRel.class)));
|
||||
|
||||
return relations2
|
||||
.joinWith(entities, relations2.col("_1").equalTo(entities.col("_1")), "inner")
|
||||
.map(
|
||||
(MapFunction<Tuple2<Tuple2<String, OrgSimRel>, Tuple2<String, Organization>>, OrgSimRel>) r -> {
|
||||
OrgSimRel orgSimRel = r._1()._2();
|
||||
orgSimRel.setLocal_id(r._2()._2().getOriginalId().get(0));
|
||||
return orgSimRel;
|
||||
},
|
||||
Encoders.bean(OrgSimRel.class));
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,138 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.dedup.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import eu.dnetlib.dhp.oa.dedup.DatePicker;
|
||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.PidComparator;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||
|
||||
public class Identifier<T extends OafEntity> implements Serializable, Comparable<Identifier> {
|
||||
|
||||
public static String CROSSREF_ID = "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2";
|
||||
public static String DATACITE_ID = "10|openaire____::9e3be59865b2c1c335d32dae2fe7b254";
|
||||
public static String BASE_DATE = "2000-01-01";
|
||||
|
||||
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
||||
|
||||
private T entity;
|
||||
|
||||
public static <T extends OafEntity> Identifier newInstance(T entity) {
|
||||
return new Identifier(entity);
|
||||
}
|
||||
|
||||
public Identifier(T entity) {
|
||||
this.entity = entity;
|
||||
}
|
||||
|
||||
public T getEntity() {
|
||||
return entity;
|
||||
}
|
||||
|
||||
public void setEntity(T entity) {
|
||||
this.entity = entity;
|
||||
}
|
||||
|
||||
public Date getDate() {
|
||||
String date = BASE_DATE;
|
||||
if (ModelSupport.isSubClass(getEntity(), Result.class)) {
|
||||
Result result = (Result) getEntity();
|
||||
if (isWellformed(result.getDateofacceptance())) {
|
||||
date = result.getDateofacceptance().getValue();
|
||||
}
|
||||
}
|
||||
try {
|
||||
return sdf.parse(date);
|
||||
} catch (ParseException e) {
|
||||
return new Date();
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isWellformed(Field<String> date) {
|
||||
return date != null && StringUtils.isNotBlank(date.getValue())
|
||||
&& date.getValue().matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(date.getValue());
|
||||
}
|
||||
|
||||
public List<KeyValue> getCollectedFrom() {
|
||||
return entity.getCollectedfrom();
|
||||
}
|
||||
|
||||
public EntityType getEntityType() {
|
||||
return EntityType.fromClass(entity.getClass());
|
||||
}
|
||||
|
||||
public String getOriginalID() {
|
||||
return entity.getId();
|
||||
}
|
||||
|
||||
private PidType getPidType() {
|
||||
return PidType.tryValueOf(StringUtils.substringBefore(StringUtils.substringAfter(entity.getId(), "|"), "_"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Identifier i) {
|
||||
// priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4)
|
||||
// alphabetical order of the originalID
|
||||
|
||||
Set<String> lKeys = Optional
|
||||
.ofNullable(getCollectedFrom())
|
||||
.map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet()))
|
||||
.orElse(Sets.newHashSet());
|
||||
|
||||
final Optional<List<KeyValue>> cf = Optional.ofNullable(i.getCollectedFrom());
|
||||
Set<String> rKeys = cf
|
||||
.map(c -> c.stream().map(KeyValue::getKey).collect(Collectors.toSet()))
|
||||
.orElse(Sets.newHashSet());
|
||||
|
||||
if (this.getPidType().compareTo(i.getPidType()) == 0) { // same type
|
||||
if (getEntityType() == EntityType.publication) {
|
||||
if (isFromDatasourceID(lKeys, CROSSREF_ID)
|
||||
&& !isFromDatasourceID(rKeys, CROSSREF_ID))
|
||||
return -1;
|
||||
if (isFromDatasourceID(rKeys, CROSSREF_ID)
|
||||
&& !isFromDatasourceID(lKeys, CROSSREF_ID))
|
||||
return 1;
|
||||
}
|
||||
if (getEntityType() == EntityType.dataset) {
|
||||
if (isFromDatasourceID(lKeys, DATACITE_ID)
|
||||
&& !isFromDatasourceID(rKeys, DATACITE_ID))
|
||||
return -1;
|
||||
if (isFromDatasourceID(rKeys, DATACITE_ID)
|
||||
&& !isFromDatasourceID(lKeys, DATACITE_ID))
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (this.getDate().compareTo(i.getDate()) == 0) {// same date
|
||||
// the minus because we need to take the alphabetically lower id
|
||||
return this.getOriginalID().compareTo(i.getOriginalID());
|
||||
} else
|
||||
// the minus is because we need to take the elder date
|
||||
return this.getDate().compareTo(i.getDate());
|
||||
} else {
|
||||
return new PidComparator<>(getEntity()).compare(toSP(getPidType()), toSP(i.getPidType()));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private StructuredProperty toSP(PidType pidType) {
|
||||
return OafMapperUtils.structuredProperty("", pidType.toString(), pidType.toString(), "", "", new DataInfo());
|
||||
}
|
||||
|
||||
public boolean isFromDatasourceID(Set<String> collectedFrom, String dsId) {
|
||||
return collectedFrom.contains(dsId);
|
||||
}
|
||||
}
|
@ -0,0 +1,108 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.dedup.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class OrgSimRel implements Serializable {
|
||||
|
||||
String local_id;
|
||||
String oa_original_id;
|
||||
String oa_name;
|
||||
String oa_acronym;
|
||||
String oa_country;
|
||||
String oa_url;
|
||||
String oa_collectedfrom;
|
||||
String group_id;
|
||||
|
||||
public OrgSimRel() {
|
||||
}
|
||||
|
||||
public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country,
|
||||
String oa_url, String oa_collectedfrom, String group_id) {
|
||||
this.local_id = local_id;
|
||||
this.oa_original_id = oa_original_id;
|
||||
this.oa_name = oa_name;
|
||||
this.oa_acronym = oa_acronym;
|
||||
this.oa_country = oa_country;
|
||||
this.oa_url = oa_url;
|
||||
this.oa_collectedfrom = oa_collectedfrom;
|
||||
this.group_id = group_id;
|
||||
}
|
||||
|
||||
public String getLocal_id() {
|
||||
return local_id;
|
||||
}
|
||||
|
||||
public void setLocal_id(String local_id) {
|
||||
this.local_id = local_id;
|
||||
}
|
||||
|
||||
public String getOa_original_id() {
|
||||
return oa_original_id;
|
||||
}
|
||||
|
||||
public void setOa_original_id(String oa_original_id) {
|
||||
this.oa_original_id = oa_original_id;
|
||||
}
|
||||
|
||||
public String getOa_name() {
|
||||
return oa_name;
|
||||
}
|
||||
|
||||
public void setOa_name(String oa_name) {
|
||||
this.oa_name = oa_name;
|
||||
}
|
||||
|
||||
public String getOa_acronym() {
|
||||
return oa_acronym;
|
||||
}
|
||||
|
||||
public void setOa_acronym(String oa_acronym) {
|
||||
this.oa_acronym = oa_acronym;
|
||||
}
|
||||
|
||||
public String getOa_country() {
|
||||
return oa_country;
|
||||
}
|
||||
|
||||
public void setOa_country(String oa_country) {
|
||||
this.oa_country = oa_country;
|
||||
}
|
||||
|
||||
public String getOa_url() {
|
||||
return oa_url;
|
||||
}
|
||||
|
||||
public void setOa_url(String oa_url) {
|
||||
this.oa_url = oa_url;
|
||||
}
|
||||
|
||||
public String getOa_collectedfrom() {
|
||||
return oa_collectedfrom;
|
||||
}
|
||||
|
||||
public void setOa_collectedfrom(String oa_collectedfrom) {
|
||||
this.oa_collectedfrom = oa_collectedfrom;
|
||||
}
|
||||
|
||||
public String getGroup_id() {
|
||||
return group_id;
|
||||
}
|
||||
|
||||
public void setGroup_id(String group_id) {
|
||||
this.group_id = group_id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "OrgSimRel{" +
|
||||
"local_id='" + local_id + '\'' +
|
||||
", oa_original_id='" + oa_original_id + '\'' +
|
||||
", oa_name='" + oa_name + '\'' +
|
||||
", oa_acronym='" + oa_acronym + '\'' +
|
||||
", oa_country='" + oa_country + '\'' +
|
||||
", oa_url='" + oa_url + '\'' +
|
||||
", oa_collectedfrom='" + oa_collectedfrom + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
[
|
||||
{
|
||||
"paramName": "la",
|
||||
"paramLongName": "isLookUpUrl",
|
||||
"paramDescription": "address for the LookUp",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "asi",
|
||||
"paramLongName": "actionSetId",
|
||||
"paramDescription": "action set identifier (name of the orchestrator)",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "path of the working directory",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "np",
|
||||
"paramLongName": "numPartitions",
|
||||
"paramDescription": "number of partitions for the similarity relations intermediate phases",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "purl",
|
||||
"paramLongName": "postgresUrl",
|
||||
"paramDescription": "the url of the postgres server",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "pusr",
|
||||
"paramLongName": "postgresUser",
|
||||
"paramDescription": "the owner of the postgres database",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "ppwd",
|
||||
"paramLongName": "postgresPassword",
|
||||
"paramDescription": "the password for the postgres user",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
@ -0,0 +1,18 @@
|
||||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
</configuration>
|
@ -0,0 +1,208 @@
|
||||
<workflow-app name="Organization Dedup" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>graphBasePath</name>
|
||||
<description>the raw graph base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookUpUrl</name>
|
||||
<description>the address of the lookUp service</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>actionSetId</name>
|
||||
<description>id of the actionSet</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>workingPath</name>
|
||||
<description>path for the working directory</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dedupGraphPath</name>
|
||||
<description>path for the output graph</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>cutConnectedComponent</name>
|
||||
<description>max number of elements in a connected component</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dbUrl</name>
|
||||
<description>the url of the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dbUser</name>
|
||||
<description>the user of the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dbTable</name>
|
||||
<description>the name of the table in the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dbPwd</name>
|
||||
<description>the passowrd of the user of the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozieActionShareLibForSpark2</name>
|
||||
<description>oozie action sharelib for spark 2.*</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<description>spark 2.* yarn history server address</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>mapreduce.job.queuename</name>
|
||||
<value>${queueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="resetWorkingPath"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="resetWorkingPath">
|
||||
<fs>
|
||||
<delete path="${workingPath}"/>
|
||||
</fs>
|
||||
<ok to="copyRelations"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="copyRelations">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>-pb</arg>
|
||||
<arg>${graphBasePath}/relation</arg>
|
||||
<arg>${workingPath}/${actionSetId}/organization_simrel</arg>
|
||||
</distcp>
|
||||
<ok to="CreateSimRel"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="CreateSimRel">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Create Similarity Relations</name>
|
||||
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels</class>
|
||||
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--numPartitions</arg><arg>8000</arg>
|
||||
</spark>
|
||||
<ok to="CreateMergeRel"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="CreateMergeRel">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Create Merge Relations</name>
|
||||
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels</class>
|
||||
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
|
||||
<arg>--cutConnectedComponent</arg><arg>${cutConnectedComponent}</arg>
|
||||
</spark>
|
||||
<ok to="PrepareNewOrgs"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="PrepareNewOrgs">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Prepare New Organizations</name>
|
||||
<class>eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs</class>
|
||||
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
|
||||
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
|
||||
<arg>--dbTable</arg><arg>${dbTable}</arg>
|
||||
<arg>--dbUser</arg><arg>${dbUser}</arg>
|
||||
<arg>--dbPwd</arg><arg>${dbPwd}</arg>
|
||||
<arg>--numConnections</arg><arg>20</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
@ -0,0 +1,18 @@
|
||||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
</configuration>
|
@ -0,0 +1,240 @@
|
||||
<workflow-app name="Organization Dedup" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>graphBasePath</name>
|
||||
<description>the raw graph base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookUpUrl</name>
|
||||
<description>the address of the lookUp service</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>actionSetId</name>
|
||||
<description>id of the actionSet</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>workingPath</name>
|
||||
<description>path for the working directory</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dedupGraphPath</name>
|
||||
<description>path for the output graph</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>cutConnectedComponent</name>
|
||||
<description>max number of elements in a connected component</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dbUrl</name>
|
||||
<description>the url of the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dbUser</name>
|
||||
<description>the user of the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dbTable</name>
|
||||
<description>the name of the table in the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dbPwd</name>
|
||||
<description>the passowrd of the user of the database</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozieActionShareLibForSpark2</name>
|
||||
<description>oozie action sharelib for spark 2.*</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<description>spark 2.* yarn history server address</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>mapreduce.job.queuename</name>
|
||||
<value>${queueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="resetWorkingPath"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="resetWorkingPath">
|
||||
<fs>
|
||||
<delete path="${workingPath}"/>
|
||||
</fs>
|
||||
<ok to="copyRelations"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="copyRelations">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>-pb</arg>
|
||||
<arg>/tmp/graph_openorgs_and_corda/relation</arg>
|
||||
<arg>${workingPath}/${actionSetId}/organization_simrel</arg>
|
||||
</distcp>
|
||||
<ok to="CreateSimRel"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="CreateSimRel">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Create Similarity Relations</name>
|
||||
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels</class>
|
||||
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--numPartitions</arg><arg>8000</arg>
|
||||
</spark>
|
||||
<ok to="CreateMergeRel"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="CreateMergeRel">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Create Merge Relations</name>
|
||||
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels</class>
|
||||
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
|
||||
<arg>--cutConnectedComponent</arg><arg>${cutConnectedComponent}</arg>
|
||||
</spark>
|
||||
<ok to="PrepareOrgRels"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="PrepareOrgRels">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Prepare Organization Relations</name>
|
||||
<class>eu.dnetlib.dhp.oa.dedup.SparkPrepareOrgRels</class>
|
||||
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
|
||||
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
|
||||
<arg>--dbTable</arg><arg>${dbTable}</arg>
|
||||
<arg>--dbUser</arg><arg>${dbUser}</arg>
|
||||
<arg>--dbPwd</arg><arg>${dbPwd}</arg>
|
||||
<arg>--numConnections</arg><arg>20</arg>
|
||||
</spark>
|
||||
<ok to="PrepareNewOrgs"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="PrepareNewOrgs">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Prepare New Organizations</name>
|
||||
<class>eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs</class>
|
||||
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
|
||||
<arg>--apiUrl</arg><arg>${apiUrl}</arg>
|
||||
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
|
||||
<arg>--dbTable</arg><arg>${dbTable}</arg>
|
||||
<arg>--dbUser</arg><arg>${dbUser}</arg>
|
||||
<arg>--dbPwd</arg><arg>${dbPwd}</arg>
|
||||
<arg>--numConnections</arg><arg>20</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
@ -0,0 +1,62 @@
|
||||
[
|
||||
{
|
||||
"paramName": "i",
|
||||
"paramLongName": "graphBasePath",
|
||||
"paramDescription": "the base path of raw graph",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "the working directory path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "la",
|
||||
"paramLongName": "isLookUpUrl",
|
||||
"paramDescription": "the url of the lookup service",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "asi",
|
||||
"paramLongName": "actionSetId",
|
||||
"paramDescription": "the id of the actionset (orchestrator)",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "nc",
|
||||
"paramLongName": "numConnections",
|
||||
"paramDescription": "number of connections to the postgres db (for the write operation)",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "au",
|
||||
"paramLongName": "apiUrl",
|
||||
"paramDescription": "the url for the APIs of the openorgs service",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "du",
|
||||
"paramLongName": "dbUrl",
|
||||
"paramDescription": "the url of the database",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "dusr",
|
||||
"paramLongName": "dbUser",
|
||||
"paramDescription": "the user of the database",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "t",
|
||||
"paramLongName": "dbTable",
|
||||
"paramDescription": "the name of the table in the database",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "dpwd",
|
||||
"paramLongName": "dbPwd",
|
||||
"paramDescription": "the password for the user of the database",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
@ -0,0 +1,56 @@
|
||||
[
|
||||
{
|
||||
"paramName": "i",
|
||||
"paramLongName": "graphBasePath",
|
||||
"paramDescription": "the base path of raw graph",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "the working directory path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "la",
|
||||
"paramLongName": "isLookUpUrl",
|
||||
"paramDescription": "the url of the lookup service",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "asi",
|
||||
"paramLongName": "actionSetId",
|
||||
"paramDescription": "the id of the actionset (orchestrator)",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "nc",
|
||||
"paramLongName": "numConnections",
|
||||
"paramDescription": "number of connections to the postgres db (for the write operation)",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "du",
|
||||
"paramLongName": "dbUrl",
|
||||
"paramDescription": "the url of the database",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "dusr",
|
||||
"paramLongName": "dbUser",
|
||||
"paramDescription": "the user of the database",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "t",
|
||||
"paramLongName": "dbTable",
|
||||
"paramDescription": "the name of the table in the database",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "dpwd",
|
||||
"paramLongName": "dbPwd",
|
||||
"paramDescription": "the password for the user of the database",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
@ -0,0 +1,117 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.MethodOrderer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestMethodOrder;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import scala.Tuple2;
|
||||
|
||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||
public class IdGeneratorTest {
|
||||
|
||||
private static ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
private static List<Identifier<Publication>> bestIds;
|
||||
private static List<Identifier<Publication>> bestIds2;
|
||||
private static List<Identifier<Publication>> bestIds3;
|
||||
|
||||
private static String testEntityBasePath;
|
||||
|
||||
@BeforeAll
|
||||
public static void setUp() throws Exception {
|
||||
testEntityBasePath = Paths
|
||||
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/json").toURI())
|
||||
.toFile()
|
||||
.getAbsolutePath();
|
||||
|
||||
bestIds = createBestIds(testEntityBasePath + "/publication_idgeneration.json", Publication.class);
|
||||
bestIds2 = createBestIds(testEntityBasePath + "/publication_idgeneration2.json", Publication.class);
|
||||
bestIds3 = createBestIds(testEntityBasePath + "/publication_idgeneration3.json", Publication.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void generateIdTest1() {
|
||||
String id1 = IdGenerator.generate(bestIds, "50|defaultID");
|
||||
|
||||
System.out
|
||||
.println("id list 1 = " + bestIds.stream().map(i -> i.getOriginalID()).collect(Collectors.toList()));
|
||||
|
||||
assertEquals("50|doi_dedup___::0968af610a356656706657e4f234b340", id1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void generateIdTest2() {
|
||||
String id1 = IdGenerator.generate(bestIds2, "50|defaultID");
|
||||
String id2 = IdGenerator.generate(bestIds3, "50|defaultID");
|
||||
|
||||
System.out
|
||||
.println("id list 2 = " + bestIds2.stream().map(i -> i.getOriginalID()).collect(Collectors.toList()));
|
||||
System.out.println("winner 2 = " + id1);
|
||||
System.out
|
||||
.println("id list 3 = " + bestIds3.stream().map(i -> i.getOriginalID()).collect(Collectors.toList()));
|
||||
System.out.println("winner 3 = " + id2);
|
||||
|
||||
assertEquals("50|doi_dedup___::1a77a3bba737f8b669dcf330ad3b37e2", id1);
|
||||
assertEquals("50|dedup_wf_001::0829b5191605bdbea36d6502b8c1ce1g", id2);
|
||||
}
|
||||
|
||||
protected static <T extends OafEntity> List<Identifier<T>> createBestIds(String path, Class<T> clazz) {
|
||||
final Stream<Identifier<T>> ids = readSample(path, clazz)
|
||||
.stream()
|
||||
.map(Tuple2::_2)
|
||||
.map(Identifier::newInstance);
|
||||
return ids.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static <T> List<Tuple2<String, T>> readSample(String path, Class<T> clazz) {
|
||||
List<Tuple2<String, T>> res = new ArrayList<>();
|
||||
BufferedReader reader;
|
||||
try {
|
||||
reader = new BufferedReader(new FileReader(path));
|
||||
String line = reader.readLine();
|
||||
while (line != null) {
|
||||
res
|
||||
.add(
|
||||
new Tuple2<>(
|
||||
MapDocumentUtil.getJPathString("$.id", line),
|
||||
OBJECT_MAPPER.readValue(line, clazz)));
|
||||
// read next line
|
||||
line = reader.readLine();
|
||||
}
|
||||
reader.close();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
public static StructuredProperty pid(String pid, String classid, String classname) {
|
||||
return OafMapperUtils.structuredProperty(pid, classid, classname, "", "", new DataInfo());
|
||||
}
|
||||
|
||||
public static List<KeyValue> keyValue(String key, String value) {
|
||||
return Lists.newArrayList(OafMapperUtils.keyValue(key, value));
|
||||
}
|
||||
}
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
File diff suppressed because one or more lines are too long
@ -0,0 +1,3 @@
|
||||
{ "id" : "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1g", "pid" : [ { "value" : "pid1", "qualifier" : { "classid" : "original" } } ], "dateofacceptance" : { "value" : "2000-01-01"}, "collectedfrom" : [ { "key" : "key", "value" : "value" } ] }
|
||||
{ "id" : "50|doi_________::1a77a3bba737f8b669dcf330ad3b37e2", "pid" : [ { "value" : "pid2", "qualifier" : { "classid" : "doi" } } ], "dateofacceptance" : { "value" : "2000-01-01"}, "collectedfrom" : [ { "key" : "key", "value" : "value" } ] }
|
||||
{ "id" : "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f", "pid" : [ { "value" : "pid3", "qualifier" : { "classid" : "original" } } ], "dateofacceptance" : { "value" : "2000-01-01"}, "collectedfrom" : [ { "key" : "key", "value" : "value" } ] }
|
@ -0,0 +1,3 @@
|
||||
{ "id" : "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1g", "pid" : [ { "value" : "pid1", "qualifier" : { "classid" : "original" } } ], "dateofacceptance" : { "value" : "2000-01-01"}, "collectedfrom" : [ { "key" : "key", "value" : "value" } ] }
|
||||
{ "id" : "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1h", "pid" : [ { "value" : "pid2", "qualifier" : { "classid" : "original" } } ], "dateofacceptance" : { "value" : "2000-01-01"}, "collectedfrom" : [ { "key" : "key", "value" : "value" } ] }
|
||||
{ "id" : "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1i", "pid" : [ { "value" : "pid3", "qualifier" : { "classid" : "original" } } ], "dateofacceptance" : { "value" : "2000-01-01"}, "collectedfrom" : [ { "key" : "key", "value" : "value" } ] }
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@ -0,0 +1,99 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.graph.raw;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.lenient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class GenerateEntitiesApplicationTest {
|
||||
|
||||
@Mock
|
||||
private ISLookUpService isLookUpService;
|
||||
|
||||
@Mock
|
||||
private VocabularyGroup vocs;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws IOException, ISLookUpException {
|
||||
|
||||
lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs());
|
||||
lenient()
|
||||
.when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY))
|
||||
.thenReturn(synonyms());
|
||||
|
||||
vocs = VocabularyGroup.loadVocsFromIS(isLookUpService);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeResult() throws IOException {
|
||||
Result publication = getResult("oaf_record.xml", Publication.class);
|
||||
Result dataset = getResult("odf_dataset.xml", Dataset.class);
|
||||
Result software = getResult("odf_software.xml", Software.class);
|
||||
Result orp = getResult("oaf_orp.xml", OtherResearchProduct.class);
|
||||
|
||||
verifyMerge(publication, dataset, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
|
||||
verifyMerge(dataset, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
|
||||
|
||||
verifyMerge(publication, software, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
|
||||
verifyMerge(software, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
|
||||
|
||||
verifyMerge(publication, orp, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
|
||||
verifyMerge(orp, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
|
||||
|
||||
verifyMerge(dataset, software, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
|
||||
verifyMerge(software, dataset, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
|
||||
|
||||
verifyMerge(dataset, orp, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
|
||||
verifyMerge(orp, dataset, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
|
||||
|
||||
verifyMerge(software, orp, Software.class, ModelConstants.SOFTWARE_RESULTTYPE_CLASSID);
|
||||
verifyMerge(orp, software, Software.class, ModelConstants.SOFTWARE_RESULTTYPE_CLASSID);
|
||||
}
|
||||
|
||||
protected <T extends Result> void verifyMerge(Result publication, Result dataset, Class<T> clazz,
|
||||
String resultType) {
|
||||
final Result merge = GenerateEntitiesApplication.mergeResults(publication, dataset);
|
||||
assertTrue(clazz.isAssignableFrom(merge.getClass()));
|
||||
assertEquals(resultType, merge.getResulttype().getClassid());
|
||||
}
|
||||
|
||||
protected <T extends Result> Result getResult(String xmlFileName, Class<T> clazz) throws IOException {
|
||||
final String xml = IOUtils.toString(getClass().getResourceAsStream(xmlFileName));
|
||||
return new OdfToOafMapper(vocs, false)
|
||||
.processMdRecord(xml)
|
||||
.stream()
|
||||
.filter(s -> clazz.isAssignableFrom(s.getClass()))
|
||||
.map(s -> (Result) s)
|
||||
.findFirst()
|
||||
.get();
|
||||
}
|
||||
|
||||
private List<String> vocs() throws IOException {
|
||||
return IOUtils
|
||||
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
|
||||
}
|
||||
|
||||
private List<String> synonyms() throws IOException {
|
||||
return IOUtils
|
||||
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,83 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<record xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
xmlns:dr="http://www.driver-repository.eu/namespace/dr"
|
||||
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
||||
xmlns:oaf="http://namespace.openaire.eu/oaf"
|
||||
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||
<header xmlns="http://namespace.openaire.eu/">
|
||||
<dri:objIdentifier>pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2</dri:objIdentifier>
|
||||
<dri:recordIdentifier>10.3897/oneeco.2.e13718</dri:recordIdentifier>
|
||||
<dri:dateOfCollection/>
|
||||
<dri:mdFormat/>
|
||||
<dri:mdFormatInterpretation/>
|
||||
<dri:repositoryId/>
|
||||
<dr:objectIdentifier/>
|
||||
<dr:dateOfCollection>2020-03-23T00:20:51.392Z</dr:dateOfCollection>
|
||||
<dr:dateOfTransformation>2020-03-23T00:26:59.078Z</dr:dateOfTransformation>
|
||||
<oaf:datasourceprefix>pensoft_____</oaf:datasourceprefix>
|
||||
</header>
|
||||
<metadata xmlns="http://namespace.openaire.eu/">
|
||||
<dc:title>Ecosystem Service capacity is higher in areas of multiple designation types</dc:title>
|
||||
<dc:creator>Nikolaidou,Charitini</dc:creator>
|
||||
<dc:creator nameIdentifier="0000-0001-6651-1178" nameIdentifierScheme="ORCID">Votsi,Nefta</dc:creator>
|
||||
<dc:creator>Sgardelis,Steanos</dc:creator>
|
||||
<dc:creator>Halley,John</dc:creator>
|
||||
<dc:creator>Pantis,John</dc:creator>
|
||||
<dc:creator>Tsiafouli,Maria</dc:creator>
|
||||
<dc:date>2017</dc:date>
|
||||
<dc:description>The implementation of the Ecosystem Service (ES) concept into practice might be a challenging task as it has to take into account previous “traditional” policies and approaches that have evaluated nature and biodiversity differently. Among them the Habitat (92/43/EC) and Bird Directives (79/409/EC), the Water Framework Directive (2000/60/EC), and the Noise Directive (2002/49/EC) have led to the evaluation/designation of areas in Europe with different criteria. In this study our goal was to understand how the ES capacity of an area is related to its designation and if areas with multiple designations have higher capacity in providing ES. We selected four catchments in Greece with a great variety of characteristics covering over 25% of the national territory. Inside the catchments we assessed the ES capacity (following the methodology of Burkhard et al. 2009) of areas designated as Natura 2000 sites, Quiet areas and Wetlands or Water bodies and found those areas that have multiple designations. Data were analyzed by GLM to reveal differences regarding the ES capacity among the different types of areas. We also investigated by PCA synergies and trade-offs among different kinds of ES and tested for correlations among landscape properties, such as elevation, aspect and slope and the ES potential. Our results show that areas with different types or multiple designations have a different capacity in providing ES. Areas of one designation type (Protected or Quiet Areas) had in general intermediate scores in most ES but scores were higher compared to areas with no designation, which displayed stronger capacity in provisioning services. Among Protected Areas and Quiet Areas the latter scored better in general. Areas that combined both designation types (Protected and Quiet Areas) showed the highest capacity in 13 out of 29 ES, that were mostly linked with natural and forest ecosystems. We found significant synergies among most regulating, supporting and cultural ES which in turn display trade-offs with provisioning services. The different ES are spatially related and display strong correlation with landscape properties, such as elevation and slope. We suggest that the designation status of an area can be used as an alternative tool for environmental policy, indicating the capacity for ES provision. Multiple designations of areas can be used as proxies for locating ES “hotspots”. This integration of “traditional” evaluation and designation and the “newer” ES concept forms a time- and cost-effective way to be adopted by stakeholders and policy-makers in order to start complying with new standards and demands for nature conservation and environmental management.</dc:description>
|
||||
<dc:format>text/html</dc:format>
|
||||
<dc:identifier>https://doi.org/10.3897/oneeco.2.e13718</dc:identifier>
|
||||
<dc:identifier>https://oneecosystem.pensoft.net/article/13718/</dc:identifier>
|
||||
<dc:language>eng</dc:language>
|
||||
<dc:publisher>Pensoft Publishers</dc:publisher>
|
||||
<dc:relation>info:eu-repo/semantics/altIdentifier/eissn/2367-8194</dc:relation>
|
||||
<dc:relation>info:eu-repo/grantAgreement/EC/FP7/226852</dc:relation>
|
||||
<dc:source>One Ecosystem 2: e13718</dc:source>
|
||||
<dc:source>One Ecosystem 2: e13718</dc:source>
|
||||
<dc:source>One Ecosystem 2: e13718</dc:source>
|
||||
<dc:subject>Ecosystem Services hotspots</dc:subject>
|
||||
<dc:subject>Natura 2000</dc:subject>
|
||||
<dc:subject>Quiet Protected Areas</dc:subject>
|
||||
<dc:subject>Biodiversity</dc:subject>
|
||||
<dc:subject>Agriculture</dc:subject>
|
||||
<dc:subject>Elevation</dc:subject>
|
||||
<dc:subject>Slope</dc:subject>
|
||||
<dc:subject>Ecosystem Service trade-offs and synergies</dc:subject>
|
||||
<dc:subject> cultural services</dc:subject>
|
||||
<dc:subject>provisioning services</dc:subject>
|
||||
<dc:subject>regulating services</dc:subject>
|
||||
<dc:subject>supporting services</dc:subject>
|
||||
<dc:type>Research Artefact</dc:type>
|
||||
<dr:CobjCategory type="other">0020</dr:CobjCategory>
|
||||
<oaf:dateAccepted>2017-01-01</oaf:dateAccepted>
|
||||
<oaf:projectid>corda_______::226852</oaf:projectid>
|
||||
<oaf:accessrights>OPEN</oaf:accessrights>
|
||||
<oaf:hostedBy id="openaire____::issn226852" name="One Ecosystem"/>
|
||||
<oaf:collectedFrom
|
||||
id="openaire____::45e3c7b69bcee6cc5fa945c9e183deb9" name="Pensoft"/>
|
||||
<oaf:identifier identifierType="doi">10.3897/oneeco.2.e13718</oaf:identifier>
|
||||
<oaf:fulltext>https://oneecosystem.pensoft.net/article/13718/</oaf:fulltext>
|
||||
<oaf:journal eissn="2367-8194" issn="">One Ecosystem</oaf:journal>
|
||||
<oaf:refereed>0001</oaf:refereed>
|
||||
</metadata>
|
||||
<about xmlns:oai="http://www.openarchives.org/OAI/2.0/">
|
||||
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
|
||||
<originDescription altered="true" harvestDate="2020-03-23T00:20:51.392Z">
|
||||
<baseURL>http%3A%2F%2Fzookeys.pensoft.net%2Foai.php</baseURL>
|
||||
<identifier>10.3897/oneeco.2.e13718</identifier>
|
||||
<datestamp>2017-09-08</datestamp>
|
||||
<metadataNamespace>http://www.openarchives.org/OAI/2.0/oai_dc/</metadataNamespace>
|
||||
</originDescription>
|
||||
</provenance>
|
||||
<oaf:datainfo>
|
||||
<oaf:inferred>false</oaf:inferred>
|
||||
<oaf:deletedbyinference>false</oaf:deletedbyinference>
|
||||
<oaf:trust>0.9</oaf:trust>
|
||||
<oaf:inferenceprovenance/>
|
||||
<oaf:provenanceaction classid="sysimport:crosswalk:repository"
|
||||
classname="sysimport:crosswalk:repository"
|
||||
schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
|
||||
</oaf:datainfo>
|
||||
</about>
|
||||
</record>
|
Loading…
Reference in New Issue